#!/usr/bin/env python3 """ Nginx Log Processor This script processes Nginx access logs from stdin and generates statistics for GET requests that returned 200 status codes. Generated with assistance from Claude Sonnet 4 AI agent. Usage: cat access.log | ./process-nginx-logs.py For more details see https://eax.me/2025/2025-11-14-static-blog.html """ import sys import re import argparse from collections import defaultdict from urllib.parse import urlparse # Configuration constants TOP_PAGES_LIMIT = 100 # Number of top pages to display in statistics TOP_REFERRERS_LIMIT = 100 # Number of top referrers to display in statistics TOP_BROWSERS_LIMIT = 50 # Number of top browsers to display in statistics TOP_OS_LIMIT = 50 # Number of top operating systems to display in statistics TOP_DEBUG_USER_AGENTS_LIMIT = ( 50 # Number of top unrecognized user agents to display in debug mode ) EXCLUDE_REFERRERS_PATTERN = r"^(https?://)?(www\.)?eax\.me(/.*)?$|^https?://185\.14\.186\.115(/.*)?$" # Pattern to exclude internal referrers # List of IP addresses to ignore completely IGNORED_IPS = { } # Search engine grouping patterns SEARCH_ENGINE_PATTERNS = { "Yandex": [ r"^https?://(www\.)?yandex\.[a-z]{2,}", r"^https?://(www\.)?ya\.[a-z]{2,}", ], "Google": [ r"^https?://(www\.)?google\.[a-z]{2,}", r"^android-app://com\.google\.android\.googlequicksearchbox", ], "Bing": [ r"^https?://(www\.)?(bing|cn\.bing)\.com", ], "DuckDuckGo": [ r"^https?://(www\.)?duckduckgo\.com", ], "Baidu": [ r"^https?://(www\.)?baidu\.com", ], } def parse_log_line(line): """ Parse a single Nginx log line in Combined Log Format. Format: IP - - [timestamp] "METHOD /path HTTP/version" status size "referer" "user-agent" """ # Regex pattern for Nginx Combined Log Format pattern = r'^(\S+) \S+ \S+ \[([^\]]+)\] "([A-Z]+) ([^"]*) HTTP/[^"]*" (\d+) (\d+) "([^"]*)" "([^"]*)"' match = re.match(pattern, line.strip()) if not match: return None ip_address = match.group(1) timestamp = match.group(2) method = match.group(3) path = match.group(4) status = int(match.group(5)) size = match.group(6) referer = match.group(7) user_agent = match.group(8) return { "ip": ip_address, "timestamp": timestamp, "method": method, "path": path, "status": status, "size": size, "referer": referer, "user_agent": user_agent, } def clean_path(path): """ Clean and normalize the URL path. Remove query parameters and fragments. """ try: parsed = urlparse(path) return parsed.path except: return path def clean_referrer(referrer): """ Clean referrer URL by removing query parameters and converting to lowercase. Keeps everything before the first '?' character. """ # Convert to lowercase for consistent comparison referrer = referrer.lower() question_mark_pos = referrer.find("?") if question_mark_pos != -1: return referrer[:question_mark_pos] return referrer def get_search_engine_name(referrer): """ Check if referrer belongs to a known search engine and return its name. Returns the search engine name if found, otherwise returns the original referrer. """ for engine_name, patterns in SEARCH_ENGINE_PATTERNS.items(): for pattern in patterns: if re.match(pattern, referrer): return engine_name return referrer def is_bot_or_crawler(user_agent): """ Check if user agent belongs to a bot, crawler, or RSS reader. Returns True if it's a bot/crawler/RSS reader, False otherwise. """ if not user_agent: return False # User agent is exactly "-" if user_agent == "-": return True user_agent_lower = user_agent.lower() return ( "bot" in user_agent_lower or "crawler" in user_agent_lower or "spider" in user_agent_lower or "curl/" in user_agent_lower or "wget/" in user_agent_lower or "wordpress" in user_agent_lower or "go-http-client" in user_agent_lower or bool(re.search(r"https?://tt-rss\.org/", user_agent_lower)) or "nextcloud-news" in user_agent_lower or "newsraft" in user_agent_lower or "netnewswire" in user_agent_lower or "feedly" in user_agent_lower or bool(re.search(r"\+?https?://git\.io/rsstt", user_agent_lower)) or bool(re.search(r"\+?https?://github\.com/tt-rss", user_agent_lower)) or "feedbin" in user_agent_lower or "rss2email" in user_agent_lower or "freshrss" in user_agent_lower or bool(re.search(r"\+?https?://www\.inoreader\.com", user_agent_lower)) or "newsboat" in user_agent_lower or bool(re.search(r"\+?https?://miniflux\.app", user_agent_lower)) or "rss reader" in user_agent_lower or "commafeed" in user_agent_lower or "yarr" in user_agent_lower or "liferea" in user_agent_lower or "thunderbird" in user_agent_lower or "bazqux" in user_agent_lower or "reeder" in user_agent_lower or "feedburner" in user_agent_lower or "simplepie" in user_agent_lower or "feedflow" in user_agent_lower or "podcastaddict" in user_agent_lower or "antennapod" in user_agent_lower or "feedparser" in user_agent_lower or "perplexity" in user_agent_lower or "facebook.com" in user_agent_lower or "pocketcasts" in user_agent_lower or "instapaper" in user_agent_lower or "whatsapp" in user_agent_lower or "python" in user_agent_lower or "apache-httpclient" in user_agent_lower or "java" in user_agent_lower ) def parse_browser_from_user_agent(user_agent): """ Extract browser name from user agent string. Returns browser name without version or None if not detected. """ if not user_agent or user_agent == "-": return None user_agent_lower = user_agent.lower() # Check for specific browsers (order matters for accurate detection) if "edg/" in user_agent_lower or "edge/" in user_agent_lower: return "Edge" elif "chrome/" in user_agent_lower and "chromium/" not in user_agent_lower: return "Chrome" elif "firefox/" in user_agent_lower: return "Firefox" elif "safari/" in user_agent_lower and "chrome/" not in user_agent_lower: return "Safari" elif "opera/" in user_agent_lower or "opr/" in user_agent_lower: return "Opera" elif "chromium/" in user_agent_lower: return "Chromium" elif "yabrowser/" in user_agent_lower: return "Yandex Browser" elif "webkit/" in user_agent_lower and "macintosh" in user_agent_lower: return "Safari" elif "mozilla/" in user_agent_lower and "gecko" in user_agent_lower: return "Firefox" return None def parse_os_from_user_agent(user_agent): """ Extract operating system from user agent string. Returns OS name or None if not detected. """ if not user_agent or user_agent == "-": return None user_agent_lower = user_agent.lower() # Check for operating systems if "windows nt" in user_agent_lower: return "Windows" elif "android" in user_agent_lower: return "Android" elif "iphone" in user_agent_lower or "ipad" in user_agent_lower: return "iOS" elif ( "mac os x" in user_agent_lower or "macos" in user_agent_lower or "macintosh" in user_agent_lower ): return "macOS" elif "linux" in user_agent_lower and "android" not in user_agent_lower: return "Linux" elif "freebsd" in user_agent_lower: return "FreeBSD" elif "openbsd" in user_agent_lower: return "OpenBSD" elif "netbsd" in user_agent_lower: return "NetBSD" return None def main(): """ Main function to process logs from stdin. """ # Parse command line arguments parser = argparse.ArgumentParser(description="Process Nginx access logs") parser.add_argument( "--debug", action="store_true", help="Show debug information including unrecognized user agents", ) args = parser.parse_args() total_requests = 0 unique_ips = set() page_stats = defaultdict(lambda: {"views": 0, "unique_ips": set()}) referrer_stats = defaultdict(lambda: {"views": 0, "unique_ips": set()}) browser_stats = defaultdict(lambda: {"views": 0, "unique_ips": set()}) os_stats = defaultdict(lambda: {"views": 0, "unique_ips": set()}) debug_user_agents = defaultdict(int) # Process each line from stdin for line in sys.stdin: line = line.strip() if not line: continue parsed = parse_log_line(line) if not parsed: continue # Filter only GET requests with 200 status if parsed["method"] != "GET" or parsed["status"] != 200: continue # Skip requests from ignored IP addresses if parsed["ip"] in IGNORED_IPS: continue # Skip bots and crawlers from all statistics if is_bot_or_crawler(parsed["user_agent"]): continue # Clean and normalize the path path = clean_path(parsed["path"]) # Skip all RSS/feed requests regardless of user agent if re.match(r"^/feed/?$", path) or path == "/rss.xml": continue total_requests += 1 ip = parsed["ip"] unique_ips.add(ip) # Update page statistics page_stats[path]["views"] += 1 page_stats[path]["unique_ips"].add(ip) # Update referrer statistics (skip empty referrers marked with "-" and internal referrers) referrer = parsed["referer"] if ( referrer and referrer != "-" and not re.match(EXCLUDE_REFERRERS_PATTERN, referrer) ): # Clean referrer URL by removing query parameters clean_ref = clean_referrer(referrer) # Check if it's a known search engine and group accordingly final_ref = get_search_engine_name(clean_ref) referrer_stats[final_ref]["views"] += 1 referrer_stats[final_ref]["unique_ips"].add(ip) # Update browser statistics browser = parse_browser_from_user_agent(parsed["user_agent"]) if browser: browser_stats[browser]["views"] += 1 browser_stats[browser]["unique_ips"].add(ip) # Update OS statistics os_name = parse_os_from_user_agent(parsed["user_agent"]) if os_name: os_stats[os_name]["views"] += 1 os_stats[os_name]["unique_ips"].add(ip) # Debug: collect unrecognized user agents if args.debug and not browser: debug_user_agents[parsed["user_agent"]] += 1 # Output results print("Общее количество запросов: {}".format(total_requests)) print("Количество уникальных IP: {}".format(len(unique_ips))) print() if page_stats: print("Топ {} страниц по просмотрам:".format(TOP_PAGES_LIMIT)) print("-" * 130) print("{:<100} {:>12} {:>15}".format("Страница", "Просмотры", "Уникальные IP")) print("-" * 130) # Sort pages by number of views (descending) sorted_pages = sorted( page_stats.items(), key=lambda x: x[1]["views"], reverse=True )[:TOP_PAGES_LIMIT] for path, stats in sorted_pages: views = stats["views"] unique_ip_count = len(stats["unique_ips"]) # Truncate long paths for better display display_path = path if len(path) <= 99 else path[:96] + "..." print("{:<100} {:>12} {:>15}".format(display_path, views, unique_ip_count)) else: print("Нет данных для отображения (не найдено GET-запросов с кодом 200)") # Output referrer statistics print() if referrer_stats: print("Топ {} referrers по просмотрам:".format(TOP_REFERRERS_LIMIT)) print("-" * 130) print("{:<100} {:>12} {:>15}".format("Referrer", "Просмотры", "Уникальные IP")) print("-" * 130) # Sort referrers by number of views (descending) sorted_referrers = sorted( referrer_stats.items(), key=lambda x: x[1]["views"], reverse=True )[:TOP_REFERRERS_LIMIT] for referrer, stats in sorted_referrers: views = stats["views"] unique_ip_count = len(stats["unique_ips"]) # Truncate long referrer URLs for better display display_referrer = ( referrer if len(referrer) <= 99 else referrer[:96] + "..." ) print( "{:<100} {:>12} {:>15}".format(display_referrer, views, unique_ip_count) ) else: print("Нет данных по referrers для отображения") # Output browser statistics print() if browser_stats: print("Топ {} браузеров по просмотрам:".format(TOP_BROWSERS_LIMIT)) print("-" * 85) print( "{:<33} {:>12} {:>10} {:>12} {:>10}".format( "Браузер", "Просмотры", "% просм.", "Уник. IP", "% IP" ) ) print("-" * 85) # Sort browsers by number of views (descending) sorted_browsers = sorted( browser_stats.items(), key=lambda x: x[1]["views"], reverse=True )[:TOP_BROWSERS_LIMIT] for browser, stats in sorted_browsers: views = stats["views"] unique_ip_count = len(stats["unique_ips"]) # Truncate long browser names for better display display_browser = browser if len(browser) <= 32 else browser[:29] + "..." # Calculate percentages views_percent = (views / total_requests) * 100 if total_requests > 0 else 0 ip_percent = ( (unique_ip_count / len(unique_ips)) * 100 if len(unique_ips) > 0 else 0 ) print( "{:<33} {:>12} {:>10.1f} {:>12} {:>10.1f}".format( display_browser, views, views_percent, unique_ip_count, ip_percent ) ) else: print("Нет данных по браузерам для отображения") # Output OS statistics print() if os_stats: print("Топ {} операционных систем по просмотрам:".format(TOP_OS_LIMIT)) print("-" * 85) print( "{:<33} {:>12} {:>10} {:>12} {:>10}".format( "Операционная система", "Просмотры", "% просм.", "Уник. IP", "% IP" ) ) print("-" * 85) # Sort OS by number of views (descending) sorted_os = sorted(os_stats.items(), key=lambda x: x[1]["views"], reverse=True)[ :TOP_OS_LIMIT ] for os_name, stats in sorted_os: views = stats["views"] unique_ip_count = len(stats["unique_ips"]) # Truncate long OS names for better display display_os = os_name if len(os_name) <= 32 else os_name[:29] + "..." # Calculate percentages views_percent = (views / total_requests) * 100 if total_requests > 0 else 0 ip_percent = ( (unique_ip_count / len(unique_ips)) * 100 if len(unique_ips) > 0 else 0 ) print( "{:<33} {:>12} {:>10.1f} {:>12} {:>10.1f}".format( display_os, views, views_percent, unique_ip_count, ip_percent ) ) else: print("Нет данных по операционным системам для отображения") # Output debug information if requested if args.debug: print() if debug_user_agents: print( "Топ {} нераспознанных User-Agent:".format(TOP_DEBUG_USER_AGENTS_LIMIT) ) print("-" * 115) print("{:<100} {:>12}".format("User-Agent", "Количество")) print("-" * 115) # Sort user agents by frequency (descending) sorted_user_agents = sorted( debug_user_agents.items(), key=lambda x: x[1], reverse=True )[:TOP_DEBUG_USER_AGENTS_LIMIT] for user_agent, count in sorted_user_agents: # Truncate long user agent strings for better display display_user_agent = ( user_agent if len(user_agent) <= 99 else user_agent[:96] + "..." ) print("{:<100} {:>12}".format(display_user_agent, count)) else: print("Нет нераспознанных User-Agent для отображения") if __name__ == "__main__": main()