#!/usr/bin/env python3 """ Traffic Monitor Log Analysis Script This script analyzes traffic logs generated by the traffic monitor eBPF program. It supports multiple input formats (JSON, CSV, JSONL) and provides various analytics and visualizations. """ import argparse import json import csv import sys import os from datetime import datetime, timedelta from collections import defaultdict, Counter import ipaddress from typing import Dict, List, Any, Optional, Tuple import statistics class TrafficLogAnalyzer: """Main analyzer class for traffic monitor logs.""" def __init__(self, log_file: str, log_format: str = "auto"): self.log_file = log_file self.log_format = log_format self.events = [] self.stats = {} def load_logs(self) -> None: """Load log entries from file.""" if not os.path.exists(self.log_file): raise FileNotFoundError(f"Log file not found: {self.log_file}") format_type = self._detect_format() if self.log_format == "auto" else self.log_format if format_type == "json": self._load_json() elif format_type == "jsonl": self._load_jsonl() elif format_type == "csv": self._load_csv() else: raise ValueError(f"Unsupported format: {format_type}") print(f"Loaded {len(self.events)} log entries") def _detect_format(self) -> str: """Auto-detect log file format.""" with open(self.log_file, 'r') as f: first_line = f.readline().strip() if first_line.startswith('['): return "json" elif first_line.startswith('{'): return "jsonl" elif ',' in first_line and 'timestamp' in first_line: return "csv" else: raise ValueError("Cannot detect log format") def _load_json(self) -> None: """Load JSON array format.""" with open(self.log_file, 'r') as f: data = json.load(f) if isinstance(data, list): self.events = data else: self.events = [data] def _load_jsonl(self) -> None: """Load JSON Lines format.""" with open(self.log_file, 'r') as f: for line in f: line = line.strip() if line: self.events.append(json.loads(line)) def _load_csv(self) -> None: """Load CSV format.""" with open(self.log_file, 'r') as f: reader = csv.DictReader(f) for row in reader: # Convert numeric fields event = { 'timestamp': int(row['timestamp']), 'timestamp_iso': row['timestamp_iso'], 'src_ip': row['src_ip'], 'dst_ip': row['dst_ip'], 'src_port': int(row['src_port']), 'dst_port': int(row['dst_port']), 'protocol': row['protocol'], 'protocol_num': int(row['protocol_num']), 'packet_size': int(row['packet_size']), 'action': row['action'], 'interface': row['interface'], 'flow_hash': row['flow_hash'] } self.events.append(event) def analyze(self) -> Dict[str, Any]: """Perform comprehensive analysis of traffic logs.""" if not self.events: return {} analysis = { 'summary': self._analyze_summary(), 'top_sources': self._analyze_top_sources(), 'top_destinations': self._analyze_top_destinations(), 'protocol_distribution': self._analyze_protocols(), 'temporal_analysis': self._analyze_temporal_patterns(), 'action_distribution': self._analyze_actions(), 'interface_distribution': self._analyze_interfaces(), 'packet_sizes': self._analyze_packet_sizes(), 'threat_analysis': self._analyze_threats(), 'flow_analysis': self._analyze_flows() } self.stats = analysis return analysis def _analyze_summary(self) -> Dict[str, Any]: """Generate summary statistics.""" total_events = len(self.events) total_bytes = sum(event['packet_size'] for event in self.events) timestamps = [event['timestamp'] for event in self.events] time_range = max(timestamps) - min(timestamps) if timestamps else 0 unique_sources = len(set(event['src_ip'] for event in self.events)) unique_destinations = len(set(event['dst_ip'] for event in self.events)) return { 'total_events': total_events, 'total_bytes': total_bytes, 'time_range_seconds': time_range, 'unique_sources': unique_sources, 'unique_destinations': unique_destinations, 'avg_packet_size': total_bytes / total_events if total_events > 0 else 0, 'events_per_second': total_events / time_range if time_range > 0 else 0 } def _analyze_top_sources(self, limit: int = 10) -> List[Dict[str, Any]]: """Analyze top source IPs.""" source_stats = defaultdict(lambda: {'count': 0, 'bytes': 0, 'protocols': set()}) for event in self.events: src_ip = event['src_ip'] source_stats[src_ip]['count'] += 1 source_stats[src_ip]['bytes'] += event['packet_size'] source_stats[src_ip]['protocols'].add(event['protocol']) # Convert to list and sort by count sources = [] for ip, stats in source_stats.items(): sources.append({ 'ip': ip, 'count': stats['count'], 'bytes': stats['bytes'], 'protocols': list(stats['protocols']), 'percentage': (stats['count'] / len(self.events)) * 100 }) return sorted(sources, key=lambda x: x['count'], reverse=True)[:limit] def _analyze_top_destinations(self, limit: int = 10) -> List[Dict[str, Any]]: """Analyze top destination IPs.""" dest_stats = defaultdict(lambda: {'count': 0, 'bytes': 0, 'protocols': set()}) for event in self.events: dst_ip = event['dst_ip'] dest_stats[dst_ip]['count'] += 1 dest_stats[dst_ip]['bytes'] += event['packet_size'] dest_stats[dst_ip]['protocols'].add(event['protocol']) destinations = [] for ip, stats in dest_stats.items(): destinations.append({ 'ip': ip, 'count': stats['count'], 'bytes': stats['bytes'], 'protocols': list(stats['protocols']), 'percentage': (stats['count'] / len(self.events)) * 100 }) return sorted(destinations, key=lambda x: x['count'], reverse=True)[:limit] def _analyze_protocols(self) -> Dict[str, Any]: """Analyze protocol distribution.""" protocols = Counter(event['protocol'] for event in self.events) total = sum(protocols.values()) return { 'distribution': { protocol: { 'count': count, 'percentage': (count / total) * 100 } for protocol, count in protocols.most_common() } } def _analyze_temporal_patterns(self) -> Dict[str, Any]: """Analyze temporal patterns in traffic.""" if not self.events: return {} # Group events by hour hourly_counts = defaultdict(int) daily_counts = defaultdict(int) for event in self.events: dt = datetime.fromtimestamp(event['timestamp']) hour_key = dt.hour day_key = dt.strftime('%Y-%m-%d') hourly_counts[hour_key] += 1 daily_counts[day_key] += 1 # Calculate peak hours peak_hour = max(hourly_counts.items(), key=lambda x: x[1]) if hourly_counts else (0, 0) return { 'hourly_distribution': dict(hourly_counts), 'daily_distribution': dict(daily_counts), 'peak_hour': {'hour': peak_hour[0], 'count': peak_hour[1]}, 'total_days': len(daily_counts) } def _analyze_actions(self) -> Dict[str, Any]: """Analyze action distribution (LOG vs DROP).""" actions = Counter(event['action'] for event in self.events) total = sum(actions.values()) return { 'distribution': { action: { 'count': count, 'percentage': (count / total) * 100 } for action, count in actions.items() } } def _analyze_interfaces(self) -> Dict[str, Any]: """Analyze interface distribution.""" interfaces = Counter(event['interface'] for event in self.events) total = sum(interfaces.values()) return { 'distribution': { interface: { 'count': count, 'percentage': (count / total) * 100 } for interface, count in interfaces.items() } } def _analyze_packet_sizes(self) -> Dict[str, Any]: """Analyze packet size distribution.""" sizes = [event['packet_size'] for event in self.events] if not sizes: return {} return { 'min': min(sizes), 'max': max(sizes), 'mean': statistics.mean(sizes), 'median': statistics.median(sizes), 'std_dev': statistics.stdev(sizes) if len(sizes) > 1 else 0, 'percentiles': { '25th': statistics.quantiles(sizes, n=4)[0] if len(sizes) > 3 else 0, '75th': statistics.quantiles(sizes, n=4)[2] if len(sizes) > 3 else 0, '95th': statistics.quantiles(sizes, n=20)[18] if len(sizes) > 19 else 0, '99th': statistics.quantiles(sizes, n=100)[98] if len(sizes) > 99 else 0 } } def _analyze_threats(self) -> Dict[str, Any]: """Analyze potential security threats.""" threats = { 'port_scanners': self._detect_port_scanners(), 'high_volume_sources': self._detect_high_volume_sources(), 'unusual_protocols': self._detect_unusual_protocols(), 'suspicious_patterns': self._detect_suspicious_patterns() } return threats def _detect_port_scanners(self) -> List[Dict[str, Any]]: """Detect potential port scanning activity.""" src_port_counts = defaultdict(lambda: defaultdict(int)) for event in self.events: src_ip = event['src_ip'] dst_port = event['dst_port'] src_port_counts[src_ip][dst_port] += 1 scanners = [] for src_ip, ports in src_port_counts.items(): unique_ports = len(ports) if unique_ports >= 10: # Threshold for port scanning scanners.append({ 'ip': src_ip, 'unique_ports_accessed': unique_ports, 'total_attempts': sum(ports.values()), 'ports': list(ports.keys())[:20] # Show first 20 ports }) return sorted(scanners, key=lambda x: x['unique_ports_accessed'], reverse=True) def _detect_high_volume_sources(self) -> List[Dict[str, Any]]: """Detect sources with unusually high traffic volume.""" source_bytes = defaultdict(int) for event in self.events: source_bytes[event['src_ip']] += event['packet_size'] if not source_bytes: return [] # Calculate threshold (95th percentile) byte_counts = list(source_bytes.values()) if len(byte_counts) < 20: return [] threshold = statistics.quantiles(byte_counts, n=20)[18] # 95th percentile high_volume = [] for ip, bytes_sent in source_bytes.items(): if bytes_sent > threshold: high_volume.append({ 'ip': ip, 'bytes_sent': bytes_sent, 'threshold_ratio': bytes_sent / threshold }) return sorted(high_volume, key=lambda x: x['bytes_sent'], reverse=True) def _detect_unusual_protocols(self) -> List[Dict[str, Any]]: """Detect unusual or rare protocols.""" protocol_counts = Counter(event['protocol'] for event in self.events) total_events = len(self.events) unusual = [] for protocol, count in protocol_counts.items(): percentage = (count / total_events) * 100 if percentage < 1.0 and protocol not in ['TCP', 'UDP', 'ICMP']: # Less than 1% unusual.append({ 'protocol': protocol, 'count': count, 'percentage': percentage }) return sorted(unusual, key=lambda x: x['count'], reverse=True) def _detect_suspicious_patterns(self) -> List[Dict[str, Any]]: """Detect other suspicious patterns.""" patterns = [] # Check for repeated identical flows flow_patterns = defaultdict(int) for event in self.events: flow_key = (event['src_ip'], event['dst_ip'], event['dst_port'], event['protocol']) flow_patterns[flow_key] += 1 for flow, count in flow_patterns.items(): if count >= 100: # Threshold for suspicious repetition patterns.append({ 'type': 'repeated_flow', 'src_ip': flow[0], 'dst_ip': flow[1], 'dst_port': flow[2], 'protocol': flow[3], 'count': count }) return patterns def _analyze_flows(self) -> Dict[str, Any]: """Analyze network flows.""" flows = defaultdict(lambda: {'count': 0, 'bytes': 0, 'first_seen': None, 'last_seen': None}) for event in self.events: flow_key = (event['src_ip'], event['dst_ip'], event['dst_port'], event['protocol']) flows[flow_key]['count'] += 1 flows[flow_key]['bytes'] += event['packet_size'] timestamp = event['timestamp'] if flows[flow_key]['first_seen'] is None: flows[flow_key]['first_seen'] = timestamp flows[flow_key]['last_seen'] = timestamp else: flows[flow_key]['last_seen'] = max(flows[flow_key]['last_seen'], timestamp) # Convert to list for analysis flow_list = [] for flow_key, stats in flows.items(): duration = stats['last_seen'] - stats['first_seen'] flow_list.append({ 'src_ip': flow_key[0], 'dst_ip': flow_key[1], 'dst_port': flow_key[2], 'protocol': flow_key[3], 'packet_count': stats['count'], 'total_bytes': stats['bytes'], 'duration_seconds': duration, 'avg_packet_size': stats['bytes'] / stats['count'] if stats['count'] > 0 else 0 }) # Sort by packet count and get top flows top_flows = sorted(flow_list, key=lambda x: x['packet_count'], reverse=True)[:20] return { 'total_flows': len(flows), 'top_flows': top_flows, 'avg_packets_per_flow': statistics.mean([f['packet_count'] for f in flow_list]) if flow_list else 0, 'avg_bytes_per_flow': statistics.mean([f['total_bytes'] for f in flow_list]) if flow_list else 0 } def print_analysis(self) -> None: """Print comprehensive analysis report.""" if not self.stats: print("No analysis data available. Run analyze() first.") return print("\n" + "="*80) print("TRAFFIC MONITOR LOG ANALYSIS REPORT") print("="*80) # Summary summary = self.stats['summary'] print(f"\nšŸ“Š SUMMARY:") print(f" Total Events: {summary['total_events']:,}") print(f" Total Bytes: {summary['total_bytes']:,}") print(f" Time Range: {summary['time_range_seconds']:,} seconds") print(f" Unique Sources: {summary['unique_sources']:,}") print(f" Unique Destinations: {summary['unique_destinations']:,}") print(f" Avg Packet Size: {summary['avg_packet_size']:.1f} bytes") print(f" Events/Second: {summary['events_per_second']:.2f}") # Top Sources print(f"\nšŸ” TOP SOURCE IPs:") for src in self.stats['top_sources'][:10]: print(f" {src['ip']:15} - {src['count']:6,} packets ({src['percentage']:.1f}%) - {src['bytes']:,} bytes") # Protocol Distribution print(f"\nšŸ“” PROTOCOL DISTRIBUTION:") for protocol, data in self.stats['protocol_distribution']['distribution'].items(): print(f" {protocol:10} - {data['count']:6,} packets ({data['percentage']:.1f}%)") # Temporal Analysis temporal = self.stats['temporal_analysis'] if temporal: print(f"\nā° TEMPORAL ANALYSIS:") print(f" Peak Hour: {temporal['peak_hour']['hour']}:00 ({temporal['peak_hour']['count']:,} events)") print(f" Days in Dataset: {temporal['total_days']}") # Threat Analysis threats = self.stats['threat_analysis'] if threats['port_scanners']: print(f"\n🚨 POTENTIAL PORT SCANNERS:") for scanner in threats['port_scanners'][:5]: print(f" {scanner['ip']:15} - {scanner['unique_ports_accessed']} unique ports, {scanner['total_attempts']} attempts") if threats['high_volume_sources']: print(f"\nšŸ“ˆ HIGH VOLUME SOURCES:") for source in threats['high_volume_sources'][:5]: print(f" {source['ip']:15} - {source['bytes_sent']:,} bytes ({source['threshold_ratio']:.1f}x threshold)") # Packet Size Analysis sizes = self.stats['packet_sizes'] if sizes: print(f"\nšŸ“¦ PACKET SIZE ANALYSIS:") print(f" Min: {sizes['min']} bytes") print(f" Max: {sizes['max']} bytes") print(f" Mean: {sizes['mean']:.1f} bytes") print(f" Median: {sizes['median']:.1f} bytes") print(f" 95th Percentile: {sizes['percentiles']['95th']:.1f} bytes") # Flow Analysis flows = self.stats['flow_analysis'] if flows: print(f"\n🌊 FLOW ANALYSIS:") print(f" Total Flows: {flows['total_flows']:,}") print(f" Avg Packets per Flow: {flows['avg_packets_per_flow']:.1f}") print(f" Avg Bytes per Flow: {flows['avg_bytes_per_flow']:.1f}") print("\n" + "="*80) def export_report(self, output_file: str, format_type: str = "json") -> None: """Export analysis report to file.""" if not self.stats: raise ValueError("No analysis data available. Run analyze() first.") if format_type == "json": with open(output_file, 'w') as f: json.dump(self.stats, f, indent=2, default=str) elif format_type == "csv": # Export summary statistics as CSV with open(output_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['Metric', 'Value']) for key, value in self.stats['summary'].items(): writer.writerow([key, value]) else: raise ValueError(f"Unsupported export format: {format_type}") print(f"Report exported to: {output_file}") def main(): """Main CLI function.""" parser = argparse.ArgumentParser( description="Analyze traffic monitor logs", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: analyze_logs.py traffic.jsonl analyze_logs.py traffic.csv --format csv analyze_logs.py traffic.json --export-report analysis.json """ ) parser.add_argument('log_file', help='Path to log file') parser.add_argument('--format', choices=['auto', 'json', 'jsonl', 'csv'], default='auto', help='Log file format') parser.add_argument('--export-report', help='Export analysis report to file') parser.add_argument('--export-format', choices=['json', 'csv'], default='json', help='Export format') args = parser.parse_args() try: analyzer = TrafficLogAnalyzer(args.log_file, args.format) analyzer.load_logs() analyzer.analyze() analyzer.print_analysis() if args.export_report: analyzer.export_report(args.export_report, args.export_format) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()