diff --git a/traffic-monitor/scripts/analyze_logs.py b/traffic-monitor/scripts/analyze_logs.py new file mode 100755 index 00000000..abad5b33 --- /dev/null +++ b/traffic-monitor/scripts/analyze_logs.py @@ -0,0 +1,552 @@ +#!/usr/bin/env python3 +""" +Traffic Monitor Log Analysis Script + +This script analyzes traffic logs generated by the traffic monitor eBPF program. +It supports multiple input formats (JSON, CSV, JSONL) and provides various +analytics and visualizations. +""" + +import argparse +import json +import csv +import sys +import os +from datetime import datetime, timedelta +from collections import defaultdict, Counter +import ipaddress +from typing import Dict, List, Any, Optional, Tuple +import statistics + + +class TrafficLogAnalyzer: + """Main analyzer class for traffic monitor logs.""" + + def __init__(self, log_file: str, log_format: str = "auto"): + self.log_file = log_file + self.log_format = log_format + self.events = [] + self.stats = {} + + def load_logs(self) -> None: + """Load log entries from file.""" + if not os.path.exists(self.log_file): + raise FileNotFoundError(f"Log file not found: {self.log_file}") + + format_type = self._detect_format() if self.log_format == "auto" else self.log_format + + if format_type == "json": + self._load_json() + elif format_type == "jsonl": + self._load_jsonl() + elif format_type == "csv": + self._load_csv() + else: + raise ValueError(f"Unsupported format: {format_type}") + + print(f"Loaded {len(self.events)} log entries") + + def _detect_format(self) -> str: + """Auto-detect log file format.""" + with open(self.log_file, 'r') as f: + first_line = f.readline().strip() + + if first_line.startswith('['): + return "json" + elif first_line.startswith('{'): + return "jsonl" + elif ',' in first_line and 'timestamp' in first_line: + return "csv" + else: + raise ValueError("Cannot detect log format") + + def _load_json(self) -> None: + """Load JSON array format.""" + with open(self.log_file, 'r') as f: + data = json.load(f) + if isinstance(data, list): + self.events = data + else: + self.events = [data] + + def _load_jsonl(self) -> None: + """Load JSON Lines format.""" + with open(self.log_file, 'r') as f: + for line in f: + line = line.strip() + if line: + self.events.append(json.loads(line)) + + def _load_csv(self) -> None: + """Load CSV format.""" + with open(self.log_file, 'r') as f: + reader = csv.DictReader(f) + for row in reader: + # Convert numeric fields + event = { + 'timestamp': int(row['timestamp']), + 'timestamp_iso': row['timestamp_iso'], + 'src_ip': row['src_ip'], + 'dst_ip': row['dst_ip'], + 'src_port': int(row['src_port']), + 'dst_port': int(row['dst_port']), + 'protocol': row['protocol'], + 'protocol_num': int(row['protocol_num']), + 'packet_size': int(row['packet_size']), + 'action': row['action'], + 'interface': row['interface'], + 'flow_hash': row['flow_hash'] + } + self.events.append(event) + + def analyze(self) -> Dict[str, Any]: + """Perform comprehensive analysis of traffic logs.""" + if not self.events: + return {} + + analysis = { + 'summary': self._analyze_summary(), + 'top_sources': self._analyze_top_sources(), + 'top_destinations': self._analyze_top_destinations(), + 'protocol_distribution': self._analyze_protocols(), + 'temporal_analysis': self._analyze_temporal_patterns(), + 'action_distribution': self._analyze_actions(), + 'interface_distribution': self._analyze_interfaces(), + 'packet_sizes': self._analyze_packet_sizes(), + 'threat_analysis': self._analyze_threats(), + 'flow_analysis': self._analyze_flows() + } + + self.stats = analysis + return analysis + + def _analyze_summary(self) -> Dict[str, Any]: + """Generate summary statistics.""" + total_events = len(self.events) + total_bytes = sum(event['packet_size'] for event in self.events) + + timestamps = [event['timestamp'] for event in self.events] + time_range = max(timestamps) - min(timestamps) if timestamps else 0 + + unique_sources = len(set(event['src_ip'] for event in self.events)) + unique_destinations = len(set(event['dst_ip'] for event in self.events)) + + return { + 'total_events': total_events, + 'total_bytes': total_bytes, + 'time_range_seconds': time_range, + 'unique_sources': unique_sources, + 'unique_destinations': unique_destinations, + 'avg_packet_size': total_bytes / total_events if total_events > 0 else 0, + 'events_per_second': total_events / time_range if time_range > 0 else 0 + } + + def _analyze_top_sources(self, limit: int = 10) -> List[Dict[str, Any]]: + """Analyze top source IPs.""" + source_stats = defaultdict(lambda: {'count': 0, 'bytes': 0, 'protocols': set()}) + + for event in self.events: + src_ip = event['src_ip'] + source_stats[src_ip]['count'] += 1 + source_stats[src_ip]['bytes'] += event['packet_size'] + source_stats[src_ip]['protocols'].add(event['protocol']) + + # Convert to list and sort by count + sources = [] + for ip, stats in source_stats.items(): + sources.append({ + 'ip': ip, + 'count': stats['count'], + 'bytes': stats['bytes'], + 'protocols': list(stats['protocols']), + 'percentage': (stats['count'] / len(self.events)) * 100 + }) + + return sorted(sources, key=lambda x: x['count'], reverse=True)[:limit] + + def _analyze_top_destinations(self, limit: int = 10) -> List[Dict[str, Any]]: + """Analyze top destination IPs.""" + dest_stats = defaultdict(lambda: {'count': 0, 'bytes': 0, 'protocols': set()}) + + for event in self.events: + dst_ip = event['dst_ip'] + dest_stats[dst_ip]['count'] += 1 + dest_stats[dst_ip]['bytes'] += event['packet_size'] + dest_stats[dst_ip]['protocols'].add(event['protocol']) + + destinations = [] + for ip, stats in dest_stats.items(): + destinations.append({ + 'ip': ip, + 'count': stats['count'], + 'bytes': stats['bytes'], + 'protocols': list(stats['protocols']), + 'percentage': (stats['count'] / len(self.events)) * 100 + }) + + return sorted(destinations, key=lambda x: x['count'], reverse=True)[:limit] + + def _analyze_protocols(self) -> Dict[str, Any]: + """Analyze protocol distribution.""" + protocols = Counter(event['protocol'] for event in self.events) + total = sum(protocols.values()) + + return { + 'distribution': { + protocol: { + 'count': count, + 'percentage': (count / total) * 100 + } + for protocol, count in protocols.most_common() + } + } + + def _analyze_temporal_patterns(self) -> Dict[str, Any]: + """Analyze temporal patterns in traffic.""" + if not self.events: + return {} + + # Group events by hour + hourly_counts = defaultdict(int) + daily_counts = defaultdict(int) + + for event in self.events: + dt = datetime.fromtimestamp(event['timestamp']) + hour_key = dt.hour + day_key = dt.strftime('%Y-%m-%d') + + hourly_counts[hour_key] += 1 + daily_counts[day_key] += 1 + + # Calculate peak hours + peak_hour = max(hourly_counts.items(), key=lambda x: x[1]) if hourly_counts else (0, 0) + + return { + 'hourly_distribution': dict(hourly_counts), + 'daily_distribution': dict(daily_counts), + 'peak_hour': {'hour': peak_hour[0], 'count': peak_hour[1]}, + 'total_days': len(daily_counts) + } + + def _analyze_actions(self) -> Dict[str, Any]: + """Analyze action distribution (LOG vs DROP).""" + actions = Counter(event['action'] for event in self.events) + total = sum(actions.values()) + + return { + 'distribution': { + action: { + 'count': count, + 'percentage': (count / total) * 100 + } + for action, count in actions.items() + } + } + + def _analyze_interfaces(self) -> Dict[str, Any]: + """Analyze interface distribution.""" + interfaces = Counter(event['interface'] for event in self.events) + total = sum(interfaces.values()) + + return { + 'distribution': { + interface: { + 'count': count, + 'percentage': (count / total) * 100 + } + for interface, count in interfaces.items() + } + } + + def _analyze_packet_sizes(self) -> Dict[str, Any]: + """Analyze packet size distribution.""" + sizes = [event['packet_size'] for event in self.events] + + if not sizes: + return {} + + return { + 'min': min(sizes), + 'max': max(sizes), + 'mean': statistics.mean(sizes), + 'median': statistics.median(sizes), + 'std_dev': statistics.stdev(sizes) if len(sizes) > 1 else 0, + 'percentiles': { + '25th': statistics.quantiles(sizes, n=4)[0] if len(sizes) > 3 else 0, + '75th': statistics.quantiles(sizes, n=4)[2] if len(sizes) > 3 else 0, + '95th': statistics.quantiles(sizes, n=20)[18] if len(sizes) > 19 else 0, + '99th': statistics.quantiles(sizes, n=100)[98] if len(sizes) > 99 else 0 + } + } + + def _analyze_threats(self) -> Dict[str, Any]: + """Analyze potential security threats.""" + threats = { + 'port_scanners': self._detect_port_scanners(), + 'high_volume_sources': self._detect_high_volume_sources(), + 'unusual_protocols': self._detect_unusual_protocols(), + 'suspicious_patterns': self._detect_suspicious_patterns() + } + + return threats + + def _detect_port_scanners(self) -> List[Dict[str, Any]]: + """Detect potential port scanning activity.""" + src_port_counts = defaultdict(lambda: defaultdict(int)) + + for event in self.events: + src_ip = event['src_ip'] + dst_port = event['dst_port'] + src_port_counts[src_ip][dst_port] += 1 + + scanners = [] + for src_ip, ports in src_port_counts.items(): + unique_ports = len(ports) + if unique_ports >= 10: # Threshold for port scanning + scanners.append({ + 'ip': src_ip, + 'unique_ports_accessed': unique_ports, + 'total_attempts': sum(ports.values()), + 'ports': list(ports.keys())[:20] # Show first 20 ports + }) + + return sorted(scanners, key=lambda x: x['unique_ports_accessed'], reverse=True) + + def _detect_high_volume_sources(self) -> List[Dict[str, Any]]: + """Detect sources with unusually high traffic volume.""" + source_bytes = defaultdict(int) + + for event in self.events: + source_bytes[event['src_ip']] += event['packet_size'] + + if not source_bytes: + return [] + + # Calculate threshold (95th percentile) + byte_counts = list(source_bytes.values()) + if len(byte_counts) < 20: + return [] + + threshold = statistics.quantiles(byte_counts, n=20)[18] # 95th percentile + + high_volume = [] + for ip, bytes_sent in source_bytes.items(): + if bytes_sent > threshold: + high_volume.append({ + 'ip': ip, + 'bytes_sent': bytes_sent, + 'threshold_ratio': bytes_sent / threshold + }) + + return sorted(high_volume, key=lambda x: x['bytes_sent'], reverse=True) + + def _detect_unusual_protocols(self) -> List[Dict[str, Any]]: + """Detect unusual or rare protocols.""" + protocol_counts = Counter(event['protocol'] for event in self.events) + total_events = len(self.events) + + unusual = [] + for protocol, count in protocol_counts.items(): + percentage = (count / total_events) * 100 + if percentage < 1.0 and protocol not in ['TCP', 'UDP', 'ICMP']: # Less than 1% + unusual.append({ + 'protocol': protocol, + 'count': count, + 'percentage': percentage + }) + + return sorted(unusual, key=lambda x: x['count'], reverse=True) + + def _detect_suspicious_patterns(self) -> List[Dict[str, Any]]: + """Detect other suspicious patterns.""" + patterns = [] + + # Check for repeated identical flows + flow_patterns = defaultdict(int) + for event in self.events: + flow_key = (event['src_ip'], event['dst_ip'], event['dst_port'], event['protocol']) + flow_patterns[flow_key] += 1 + + for flow, count in flow_patterns.items(): + if count >= 100: # Threshold for suspicious repetition + patterns.append({ + 'type': 'repeated_flow', + 'src_ip': flow[0], + 'dst_ip': flow[1], + 'dst_port': flow[2], + 'protocol': flow[3], + 'count': count + }) + + return patterns + + def _analyze_flows(self) -> Dict[str, Any]: + """Analyze network flows.""" + flows = defaultdict(lambda: {'count': 0, 'bytes': 0, 'first_seen': None, 'last_seen': None}) + + for event in self.events: + flow_key = (event['src_ip'], event['dst_ip'], event['dst_port'], event['protocol']) + flows[flow_key]['count'] += 1 + flows[flow_key]['bytes'] += event['packet_size'] + + timestamp = event['timestamp'] + if flows[flow_key]['first_seen'] is None: + flows[flow_key]['first_seen'] = timestamp + flows[flow_key]['last_seen'] = timestamp + else: + flows[flow_key]['last_seen'] = max(flows[flow_key]['last_seen'], timestamp) + + # Convert to list for analysis + flow_list = [] + for flow_key, stats in flows.items(): + duration = stats['last_seen'] - stats['first_seen'] + flow_list.append({ + 'src_ip': flow_key[0], + 'dst_ip': flow_key[1], + 'dst_port': flow_key[2], + 'protocol': flow_key[3], + 'packet_count': stats['count'], + 'total_bytes': stats['bytes'], + 'duration_seconds': duration, + 'avg_packet_size': stats['bytes'] / stats['count'] if stats['count'] > 0 else 0 + }) + + # Sort by packet count and get top flows + top_flows = sorted(flow_list, key=lambda x: x['packet_count'], reverse=True)[:20] + + return { + 'total_flows': len(flows), + 'top_flows': top_flows, + 'avg_packets_per_flow': statistics.mean([f['packet_count'] for f in flow_list]) if flow_list else 0, + 'avg_bytes_per_flow': statistics.mean([f['total_bytes'] for f in flow_list]) if flow_list else 0 + } + + def print_analysis(self) -> None: + """Print comprehensive analysis report.""" + if not self.stats: + print("No analysis data available. Run analyze() first.") + return + + print("\n" + "="*80) + print("TRAFFIC MONITOR LOG ANALYSIS REPORT") + print("="*80) + + # Summary + summary = self.stats['summary'] + print(f"\nšŸ“Š SUMMARY:") + print(f" Total Events: {summary['total_events']:,}") + print(f" Total Bytes: {summary['total_bytes']:,}") + print(f" Time Range: {summary['time_range_seconds']:,} seconds") + print(f" Unique Sources: {summary['unique_sources']:,}") + print(f" Unique Destinations: {summary['unique_destinations']:,}") + print(f" Avg Packet Size: {summary['avg_packet_size']:.1f} bytes") + print(f" Events/Second: {summary['events_per_second']:.2f}") + + # Top Sources + print(f"\nšŸ” TOP SOURCE IPs:") + for src in self.stats['top_sources'][:10]: + print(f" {src['ip']:15} - {src['count']:6,} packets ({src['percentage']:.1f}%) - {src['bytes']:,} bytes") + + # Protocol Distribution + print(f"\nšŸ“” PROTOCOL DISTRIBUTION:") + for protocol, data in self.stats['protocol_distribution']['distribution'].items(): + print(f" {protocol:10} - {data['count']:6,} packets ({data['percentage']:.1f}%)") + + # Temporal Analysis + temporal = self.stats['temporal_analysis'] + if temporal: + print(f"\nā° TEMPORAL ANALYSIS:") + print(f" Peak Hour: {temporal['peak_hour']['hour']}:00 ({temporal['peak_hour']['count']:,} events)") + print(f" Days in Dataset: {temporal['total_days']}") + + # Threat Analysis + threats = self.stats['threat_analysis'] + if threats['port_scanners']: + print(f"\n🚨 POTENTIAL PORT SCANNERS:") + for scanner in threats['port_scanners'][:5]: + print(f" {scanner['ip']:15} - {scanner['unique_ports_accessed']} unique ports, {scanner['total_attempts']} attempts") + + if threats['high_volume_sources']: + print(f"\nšŸ“ˆ HIGH VOLUME SOURCES:") + for source in threats['high_volume_sources'][:5]: + print(f" {source['ip']:15} - {source['bytes_sent']:,} bytes ({source['threshold_ratio']:.1f}x threshold)") + + # Packet Size Analysis + sizes = self.stats['packet_sizes'] + if sizes: + print(f"\nšŸ“¦ PACKET SIZE ANALYSIS:") + print(f" Min: {sizes['min']} bytes") + print(f" Max: {sizes['max']} bytes") + print(f" Mean: {sizes['mean']:.1f} bytes") + print(f" Median: {sizes['median']:.1f} bytes") + print(f" 95th Percentile: {sizes['percentiles']['95th']:.1f} bytes") + + # Flow Analysis + flows = self.stats['flow_analysis'] + if flows: + print(f"\n🌊 FLOW ANALYSIS:") + print(f" Total Flows: {flows['total_flows']:,}") + print(f" Avg Packets per Flow: {flows['avg_packets_per_flow']:.1f}") + print(f" Avg Bytes per Flow: {flows['avg_bytes_per_flow']:.1f}") + + print("\n" + "="*80) + + def export_report(self, output_file: str, format_type: str = "json") -> None: + """Export analysis report to file.""" + if not self.stats: + raise ValueError("No analysis data available. Run analyze() first.") + + if format_type == "json": + with open(output_file, 'w') as f: + json.dump(self.stats, f, indent=2, default=str) + elif format_type == "csv": + # Export summary statistics as CSV + with open(output_file, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(['Metric', 'Value']) + for key, value in self.stats['summary'].items(): + writer.writerow([key, value]) + else: + raise ValueError(f"Unsupported export format: {format_type}") + + print(f"Report exported to: {output_file}") + + +def main(): + """Main CLI function.""" + parser = argparse.ArgumentParser( + description="Analyze traffic monitor logs", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + analyze_logs.py traffic.jsonl + analyze_logs.py traffic.csv --format csv + analyze_logs.py traffic.json --export-report analysis.json + """ + ) + + parser.add_argument('log_file', help='Path to log file') + parser.add_argument('--format', choices=['auto', 'json', 'jsonl', 'csv'], + default='auto', help='Log file format') + parser.add_argument('--export-report', help='Export analysis report to file') + parser.add_argument('--export-format', choices=['json', 'csv'], + default='json', help='Export format') + + args = parser.parse_args() + + try: + analyzer = TrafficLogAnalyzer(args.log_file, args.format) + analyzer.load_logs() + analyzer.analyze() + analyzer.print_analysis() + + if args.export_report: + analyzer.export_report(args.export_report, args.export_format) + + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file