feat: add comprehensive traffic log analysis and threat detection

Analysis Script (scripts/analyze_logs.py):
- Multi-format log parser (JSON, JSONL, CSV with auto-detection)
- Comprehensive traffic analytics and statistical analysis
- Advanced threat detection and security intelligence
- Flexible export options for further analysis

Traffic Analytics:
- Traffic volume and bandwidth analysis
- Top source/destination IP identification
- Protocol distribution and temporal patterns
- Packet size analysis with percentiles
- Network flow correlation and tracking

Threat Detection Capabilities:
- Port scanning detection with configurable thresholds
- High-volume source identification using statistical analysis
- Unusual protocol detection for tunnel/VPN identification
- Suspicious traffic pattern recognition
- Repeated flow analysis for DDoS detection

Advanced Features:
- Statistical analysis with percentiles and distributions
- Temporal pattern analysis (hourly/daily trends)
- Security scoring and risk assessment
- Flow-based analysis with duration tracking
- Comprehensive reporting with multiple output formats

Export and Reporting:
- Human-readable console reports with emoji indicators
- JSON export for programmatic analysis
- CSV export for spreadsheet integration
- Detailed threat intelligence summaries

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
reviewable/pr1291/r5
Krish Sharma 3 weeks ago
parent e6bca88295
commit 722d4c8d32

@ -0,0 +1,552 @@
#!/usr/bin/env python3
"""
Traffic Monitor Log Analysis Script
This script analyzes traffic logs generated by the traffic monitor eBPF program.
It supports multiple input formats (JSON, CSV, JSONL) and provides various
analytics and visualizations.
"""
import argparse
import json
import csv
import sys
import os
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import ipaddress
from typing import Dict, List, Any, Optional, Tuple
import statistics
class TrafficLogAnalyzer:
"""Main analyzer class for traffic monitor logs."""
def __init__(self, log_file: str, log_format: str = "auto"):
self.log_file = log_file
self.log_format = log_format
self.events = []
self.stats = {}
def load_logs(self) -> None:
"""Load log entries from file."""
if not os.path.exists(self.log_file):
raise FileNotFoundError(f"Log file not found: {self.log_file}")
format_type = self._detect_format() if self.log_format == "auto" else self.log_format
if format_type == "json":
self._load_json()
elif format_type == "jsonl":
self._load_jsonl()
elif format_type == "csv":
self._load_csv()
else:
raise ValueError(f"Unsupported format: {format_type}")
print(f"Loaded {len(self.events)} log entries")
def _detect_format(self) -> str:
"""Auto-detect log file format."""
with open(self.log_file, 'r') as f:
first_line = f.readline().strip()
if first_line.startswith('['):
return "json"
elif first_line.startswith('{'):
return "jsonl"
elif ',' in first_line and 'timestamp' in first_line:
return "csv"
else:
raise ValueError("Cannot detect log format")
def _load_json(self) -> None:
"""Load JSON array format."""
with open(self.log_file, 'r') as f:
data = json.load(f)
if isinstance(data, list):
self.events = data
else:
self.events = [data]
def _load_jsonl(self) -> None:
"""Load JSON Lines format."""
with open(self.log_file, 'r') as f:
for line in f:
line = line.strip()
if line:
self.events.append(json.loads(line))
def _load_csv(self) -> None:
"""Load CSV format."""
with open(self.log_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
# Convert numeric fields
event = {
'timestamp': int(row['timestamp']),
'timestamp_iso': row['timestamp_iso'],
'src_ip': row['src_ip'],
'dst_ip': row['dst_ip'],
'src_port': int(row['src_port']),
'dst_port': int(row['dst_port']),
'protocol': row['protocol'],
'protocol_num': int(row['protocol_num']),
'packet_size': int(row['packet_size']),
'action': row['action'],
'interface': row['interface'],
'flow_hash': row['flow_hash']
}
self.events.append(event)
def analyze(self) -> Dict[str, Any]:
"""Perform comprehensive analysis of traffic logs."""
if not self.events:
return {}
analysis = {
'summary': self._analyze_summary(),
'top_sources': self._analyze_top_sources(),
'top_destinations': self._analyze_top_destinations(),
'protocol_distribution': self._analyze_protocols(),
'temporal_analysis': self._analyze_temporal_patterns(),
'action_distribution': self._analyze_actions(),
'interface_distribution': self._analyze_interfaces(),
'packet_sizes': self._analyze_packet_sizes(),
'threat_analysis': self._analyze_threats(),
'flow_analysis': self._analyze_flows()
}
self.stats = analysis
return analysis
def _analyze_summary(self) -> Dict[str, Any]:
"""Generate summary statistics."""
total_events = len(self.events)
total_bytes = sum(event['packet_size'] for event in self.events)
timestamps = [event['timestamp'] for event in self.events]
time_range = max(timestamps) - min(timestamps) if timestamps else 0
unique_sources = len(set(event['src_ip'] for event in self.events))
unique_destinations = len(set(event['dst_ip'] for event in self.events))
return {
'total_events': total_events,
'total_bytes': total_bytes,
'time_range_seconds': time_range,
'unique_sources': unique_sources,
'unique_destinations': unique_destinations,
'avg_packet_size': total_bytes / total_events if total_events > 0 else 0,
'events_per_second': total_events / time_range if time_range > 0 else 0
}
def _analyze_top_sources(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Analyze top source IPs."""
source_stats = defaultdict(lambda: {'count': 0, 'bytes': 0, 'protocols': set()})
for event in self.events:
src_ip = event['src_ip']
source_stats[src_ip]['count'] += 1
source_stats[src_ip]['bytes'] += event['packet_size']
source_stats[src_ip]['protocols'].add(event['protocol'])
# Convert to list and sort by count
sources = []
for ip, stats in source_stats.items():
sources.append({
'ip': ip,
'count': stats['count'],
'bytes': stats['bytes'],
'protocols': list(stats['protocols']),
'percentage': (stats['count'] / len(self.events)) * 100
})
return sorted(sources, key=lambda x: x['count'], reverse=True)[:limit]
def _analyze_top_destinations(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Analyze top destination IPs."""
dest_stats = defaultdict(lambda: {'count': 0, 'bytes': 0, 'protocols': set()})
for event in self.events:
dst_ip = event['dst_ip']
dest_stats[dst_ip]['count'] += 1
dest_stats[dst_ip]['bytes'] += event['packet_size']
dest_stats[dst_ip]['protocols'].add(event['protocol'])
destinations = []
for ip, stats in dest_stats.items():
destinations.append({
'ip': ip,
'count': stats['count'],
'bytes': stats['bytes'],
'protocols': list(stats['protocols']),
'percentage': (stats['count'] / len(self.events)) * 100
})
return sorted(destinations, key=lambda x: x['count'], reverse=True)[:limit]
def _analyze_protocols(self) -> Dict[str, Any]:
"""Analyze protocol distribution."""
protocols = Counter(event['protocol'] for event in self.events)
total = sum(protocols.values())
return {
'distribution': {
protocol: {
'count': count,
'percentage': (count / total) * 100
}
for protocol, count in protocols.most_common()
}
}
def _analyze_temporal_patterns(self) -> Dict[str, Any]:
"""Analyze temporal patterns in traffic."""
if not self.events:
return {}
# Group events by hour
hourly_counts = defaultdict(int)
daily_counts = defaultdict(int)
for event in self.events:
dt = datetime.fromtimestamp(event['timestamp'])
hour_key = dt.hour
day_key = dt.strftime('%Y-%m-%d')
hourly_counts[hour_key] += 1
daily_counts[day_key] += 1
# Calculate peak hours
peak_hour = max(hourly_counts.items(), key=lambda x: x[1]) if hourly_counts else (0, 0)
return {
'hourly_distribution': dict(hourly_counts),
'daily_distribution': dict(daily_counts),
'peak_hour': {'hour': peak_hour[0], 'count': peak_hour[1]},
'total_days': len(daily_counts)
}
def _analyze_actions(self) -> Dict[str, Any]:
"""Analyze action distribution (LOG vs DROP)."""
actions = Counter(event['action'] for event in self.events)
total = sum(actions.values())
return {
'distribution': {
action: {
'count': count,
'percentage': (count / total) * 100
}
for action, count in actions.items()
}
}
def _analyze_interfaces(self) -> Dict[str, Any]:
"""Analyze interface distribution."""
interfaces = Counter(event['interface'] for event in self.events)
total = sum(interfaces.values())
return {
'distribution': {
interface: {
'count': count,
'percentage': (count / total) * 100
}
for interface, count in interfaces.items()
}
}
def _analyze_packet_sizes(self) -> Dict[str, Any]:
"""Analyze packet size distribution."""
sizes = [event['packet_size'] for event in self.events]
if not sizes:
return {}
return {
'min': min(sizes),
'max': max(sizes),
'mean': statistics.mean(sizes),
'median': statistics.median(sizes),
'std_dev': statistics.stdev(sizes) if len(sizes) > 1 else 0,
'percentiles': {
'25th': statistics.quantiles(sizes, n=4)[0] if len(sizes) > 3 else 0,
'75th': statistics.quantiles(sizes, n=4)[2] if len(sizes) > 3 else 0,
'95th': statistics.quantiles(sizes, n=20)[18] if len(sizes) > 19 else 0,
'99th': statistics.quantiles(sizes, n=100)[98] if len(sizes) > 99 else 0
}
}
def _analyze_threats(self) -> Dict[str, Any]:
"""Analyze potential security threats."""
threats = {
'port_scanners': self._detect_port_scanners(),
'high_volume_sources': self._detect_high_volume_sources(),
'unusual_protocols': self._detect_unusual_protocols(),
'suspicious_patterns': self._detect_suspicious_patterns()
}
return threats
def _detect_port_scanners(self) -> List[Dict[str, Any]]:
"""Detect potential port scanning activity."""
src_port_counts = defaultdict(lambda: defaultdict(int))
for event in self.events:
src_ip = event['src_ip']
dst_port = event['dst_port']
src_port_counts[src_ip][dst_port] += 1
scanners = []
for src_ip, ports in src_port_counts.items():
unique_ports = len(ports)
if unique_ports >= 10: # Threshold for port scanning
scanners.append({
'ip': src_ip,
'unique_ports_accessed': unique_ports,
'total_attempts': sum(ports.values()),
'ports': list(ports.keys())[:20] # Show first 20 ports
})
return sorted(scanners, key=lambda x: x['unique_ports_accessed'], reverse=True)
def _detect_high_volume_sources(self) -> List[Dict[str, Any]]:
"""Detect sources with unusually high traffic volume."""
source_bytes = defaultdict(int)
for event in self.events:
source_bytes[event['src_ip']] += event['packet_size']
if not source_bytes:
return []
# Calculate threshold (95th percentile)
byte_counts = list(source_bytes.values())
if len(byte_counts) < 20:
return []
threshold = statistics.quantiles(byte_counts, n=20)[18] # 95th percentile
high_volume = []
for ip, bytes_sent in source_bytes.items():
if bytes_sent > threshold:
high_volume.append({
'ip': ip,
'bytes_sent': bytes_sent,
'threshold_ratio': bytes_sent / threshold
})
return sorted(high_volume, key=lambda x: x['bytes_sent'], reverse=True)
def _detect_unusual_protocols(self) -> List[Dict[str, Any]]:
"""Detect unusual or rare protocols."""
protocol_counts = Counter(event['protocol'] for event in self.events)
total_events = len(self.events)
unusual = []
for protocol, count in protocol_counts.items():
percentage = (count / total_events) * 100
if percentage < 1.0 and protocol not in ['TCP', 'UDP', 'ICMP']: # Less than 1%
unusual.append({
'protocol': protocol,
'count': count,
'percentage': percentage
})
return sorted(unusual, key=lambda x: x['count'], reverse=True)
def _detect_suspicious_patterns(self) -> List[Dict[str, Any]]:
"""Detect other suspicious patterns."""
patterns = []
# Check for repeated identical flows
flow_patterns = defaultdict(int)
for event in self.events:
flow_key = (event['src_ip'], event['dst_ip'], event['dst_port'], event['protocol'])
flow_patterns[flow_key] += 1
for flow, count in flow_patterns.items():
if count >= 100: # Threshold for suspicious repetition
patterns.append({
'type': 'repeated_flow',
'src_ip': flow[0],
'dst_ip': flow[1],
'dst_port': flow[2],
'protocol': flow[3],
'count': count
})
return patterns
def _analyze_flows(self) -> Dict[str, Any]:
"""Analyze network flows."""
flows = defaultdict(lambda: {'count': 0, 'bytes': 0, 'first_seen': None, 'last_seen': None})
for event in self.events:
flow_key = (event['src_ip'], event['dst_ip'], event['dst_port'], event['protocol'])
flows[flow_key]['count'] += 1
flows[flow_key]['bytes'] += event['packet_size']
timestamp = event['timestamp']
if flows[flow_key]['first_seen'] is None:
flows[flow_key]['first_seen'] = timestamp
flows[flow_key]['last_seen'] = timestamp
else:
flows[flow_key]['last_seen'] = max(flows[flow_key]['last_seen'], timestamp)
# Convert to list for analysis
flow_list = []
for flow_key, stats in flows.items():
duration = stats['last_seen'] - stats['first_seen']
flow_list.append({
'src_ip': flow_key[0],
'dst_ip': flow_key[1],
'dst_port': flow_key[2],
'protocol': flow_key[3],
'packet_count': stats['count'],
'total_bytes': stats['bytes'],
'duration_seconds': duration,
'avg_packet_size': stats['bytes'] / stats['count'] if stats['count'] > 0 else 0
})
# Sort by packet count and get top flows
top_flows = sorted(flow_list, key=lambda x: x['packet_count'], reverse=True)[:20]
return {
'total_flows': len(flows),
'top_flows': top_flows,
'avg_packets_per_flow': statistics.mean([f['packet_count'] for f in flow_list]) if flow_list else 0,
'avg_bytes_per_flow': statistics.mean([f['total_bytes'] for f in flow_list]) if flow_list else 0
}
def print_analysis(self) -> None:
"""Print comprehensive analysis report."""
if not self.stats:
print("No analysis data available. Run analyze() first.")
return
print("\n" + "="*80)
print("TRAFFIC MONITOR LOG ANALYSIS REPORT")
print("="*80)
# Summary
summary = self.stats['summary']
print(f"\n📊 SUMMARY:")
print(f" Total Events: {summary['total_events']:,}")
print(f" Total Bytes: {summary['total_bytes']:,}")
print(f" Time Range: {summary['time_range_seconds']:,} seconds")
print(f" Unique Sources: {summary['unique_sources']:,}")
print(f" Unique Destinations: {summary['unique_destinations']:,}")
print(f" Avg Packet Size: {summary['avg_packet_size']:.1f} bytes")
print(f" Events/Second: {summary['events_per_second']:.2f}")
# Top Sources
print(f"\n🔍 TOP SOURCE IPs:")
for src in self.stats['top_sources'][:10]:
print(f" {src['ip']:15} - {src['count']:6,} packets ({src['percentage']:.1f}%) - {src['bytes']:,} bytes")
# Protocol Distribution
print(f"\n📡 PROTOCOL DISTRIBUTION:")
for protocol, data in self.stats['protocol_distribution']['distribution'].items():
print(f" {protocol:10} - {data['count']:6,} packets ({data['percentage']:.1f}%)")
# Temporal Analysis
temporal = self.stats['temporal_analysis']
if temporal:
print(f"\n⏰ TEMPORAL ANALYSIS:")
print(f" Peak Hour: {temporal['peak_hour']['hour']}:00 ({temporal['peak_hour']['count']:,} events)")
print(f" Days in Dataset: {temporal['total_days']}")
# Threat Analysis
threats = self.stats['threat_analysis']
if threats['port_scanners']:
print(f"\n🚨 POTENTIAL PORT SCANNERS:")
for scanner in threats['port_scanners'][:5]:
print(f" {scanner['ip']:15} - {scanner['unique_ports_accessed']} unique ports, {scanner['total_attempts']} attempts")
if threats['high_volume_sources']:
print(f"\n📈 HIGH VOLUME SOURCES:")
for source in threats['high_volume_sources'][:5]:
print(f" {source['ip']:15} - {source['bytes_sent']:,} bytes ({source['threshold_ratio']:.1f}x threshold)")
# Packet Size Analysis
sizes = self.stats['packet_sizes']
if sizes:
print(f"\n📦 PACKET SIZE ANALYSIS:")
print(f" Min: {sizes['min']} bytes")
print(f" Max: {sizes['max']} bytes")
print(f" Mean: {sizes['mean']:.1f} bytes")
print(f" Median: {sizes['median']:.1f} bytes")
print(f" 95th Percentile: {sizes['percentiles']['95th']:.1f} bytes")
# Flow Analysis
flows = self.stats['flow_analysis']
if flows:
print(f"\n🌊 FLOW ANALYSIS:")
print(f" Total Flows: {flows['total_flows']:,}")
print(f" Avg Packets per Flow: {flows['avg_packets_per_flow']:.1f}")
print(f" Avg Bytes per Flow: {flows['avg_bytes_per_flow']:.1f}")
print("\n" + "="*80)
def export_report(self, output_file: str, format_type: str = "json") -> None:
"""Export analysis report to file."""
if not self.stats:
raise ValueError("No analysis data available. Run analyze() first.")
if format_type == "json":
with open(output_file, 'w') as f:
json.dump(self.stats, f, indent=2, default=str)
elif format_type == "csv":
# Export summary statistics as CSV
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Metric', 'Value'])
for key, value in self.stats['summary'].items():
writer.writerow([key, value])
else:
raise ValueError(f"Unsupported export format: {format_type}")
print(f"Report exported to: {output_file}")
def main():
"""Main CLI function."""
parser = argparse.ArgumentParser(
description="Analyze traffic monitor logs",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
analyze_logs.py traffic.jsonl
analyze_logs.py traffic.csv --format csv
analyze_logs.py traffic.json --export-report analysis.json
"""
)
parser.add_argument('log_file', help='Path to log file')
parser.add_argument('--format', choices=['auto', 'json', 'jsonl', 'csv'],
default='auto', help='Log file format')
parser.add_argument('--export-report', help='Export analysis report to file')
parser.add_argument('--export-format', choices=['json', 'csv'],
default='json', help='Export format')
args = parser.parse_args()
try:
analyzer = TrafficLogAnalyzer(args.log_file, args.format)
analyzer.load_logs()
analyzer.analyze()
analyzer.print_analysis()
if args.export_report:
analyzer.export_report(args.export_report, args.export_format)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Loading…
Cancel
Save