Network Traffic Analysis with Python: A Practical Guide

Introduction

Network traffic analysis is crucial for:

  • Detecting security threats
  • Optimizing network performance
  • Understanding user behavior

In this guide, we’ll explore practical approaches to analyze network traffic using Python.

1. Basic Packet Capture and Analysis

First, let’s capture and analyze network packets:

from scapy.all import *
from collections import Counter
import pandas as pd
import numpy as np
from datetime import datetime

class NetworkAnalyzer:
    def __init__(self):
        self.packets = []
        self.flow_data = {}
    
    def capture_packets(self, duration=60):
        print(f"Capturing packets for {duration} seconds...")
        packets = sniff(timeout=duration)
        self.packets = packets
        return len(packets)
    
    def analyze_basic_stats(self):
        protocols = Counter()
        ip_sources = Counter()
        ip_destinations = Counter()
        
        for packet in self.packets:
            if IP in packet:
                protocols[packet[IP].proto] += 1
                ip_sources[packet[IP].src] += 1
                ip_destinations[packet[IP].dst] += 1
        
        return {
            'protocol_stats': protocols,
            'source_ips': ip_sources,
            'dest_ips': ip_destinations
        }
    
    def extract_flow_features(self):
        flows = {}
        
        for packet in self.packets:
            if IP in packet and (TCP in packet or UDP in packet):
                if TCP in packet:
                    sport, dport = packet[TCP].sport, packet[TCP].dport
                    flags = packet[TCP].flags
                else:
                    sport, dport = packet[UDP].sport, packet[UDP].dport
                    flags = 0
                
                flow_tuple = (packet[IP].src, packet[IP].dst, sport, dport, packet[IP].proto)
                
                if flow_tuple not in flows:
                    flows[flow_tuple] = {
                        'bytes': 0,
                        'packets': 0,
                        'start_time': packet.time,
                        'end_time': packet.time,
                        'flags': set()
                    }
                
                flows[flow_tuple]['bytes'] += len(packet)
                flows[flow_tuple]['packets'] += 1
                flows[flow_tuple]['end_time'] = packet.time
                if TCP in packet:
                    flows[flow_tuple]['flags'].add(flags)
        
        # Convert flows to feature vectors
        flow_features = []
        for flow_tuple, flow_data in flows.items():
            duration = flow_data['end_time'] - flow_data['start_time']
            feature_vector = {
                'src_ip': flow_tuple[0],
                'dst_ip': flow_tuple[1],
                'src_port': flow_tuple[2],
                'dst_port': flow_tuple[3],
                'protocol': flow_tuple[4],
                'duration': duration,
                'bytes': flow_data['bytes'],
                'packets': flow_data['packets'],
                'bytes_per_second': flow_data['bytes'] / duration if duration > 0 else 0,
                'packets_per_second': flow_data['packets'] / duration if duration > 0 else 0,
                'avg_packet_size': flow_data['bytes'] / flow_data['packets']
            }
            flow_features.append(feature_vector)
        
        self.flow_data = pd.DataFrame(flow_features)
        return self.flow_data

# Example usage
analyzer = NetworkAnalyzer()
num_packets = analyzer.capture_packets(duration=30)
print(f"Captured {num_packets} packets")

basic_stats = analyzer.analyze_basic_stats()
print("\nProtocol Statistics:")
for proto, count in basic_stats['protocol_stats'].most_common():
    print(f"Protocol {proto}: {count} packets")

flow_features = analyzer.extract_flow_features()
print("\nFlow Features:")
print(flow_features.describe())

2. Machine Learning for Traffic Classification

Now let’s use ML to classify network traffic:

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import joblib

class TrafficClassifier:
    def __init__(self):
        self.scaler = StandardScaler()
        self.model = RandomForestClassifier(n_estimators=100)
        self.feature_columns = ['duration', 'bytes', 'packets', 
                               'bytes_per_second', 'packets_per_second', 
                               'avg_packet_size']
    
    def prepare_data(self, flow_data):
        # Assuming we have some labeled data
        # In reality, you'd need to label your flows (e.g., normal, attack, etc.)
        flow_data['label'] = flow_data.apply(self._label_flow, axis=1)
        
        X = flow_data[self.feature_columns]
        y = flow_data['label']
        
        return train_test_split(X, y, test_size=0.2, random_state=42)
    
    def _label_flow(self, flow):
        # This is a simplified labeling function
        # In reality, you'd need more sophisticated rules or manual labeling
        if flow['bytes_per_second'] > 1000000:  # 1 MB/s
            return 'high_traffic'
        elif flow['dst_port'] in [80, 443]:
            return 'web_traffic'
        else:
            return 'other'
    
    def train(self, X_train, y_train):
        X_train_scaled = self.scaler.fit_transform(X_train)
        self.model.fit(X_train_scaled, y_train)
    
    def predict(self, X):
        X_scaled = self.scaler.transform(X)
        return self.model.predict(X_scaled)
    
    def evaluate(self, X_test, y_test):
        X_test_scaled = self.scaler.transform(X_test)
        y_pred = self.model.predict(X_test_scaled)
        return classification_report(y_test, y_pred)
    
    def save_model(self, filename):
        joblib.dump((self.scaler, self.model), filename)
    
    @classmethod
    def load_model(cls, filename):
        classifier = cls()
        classifier.scaler, classifier.model = joblib.load(filename)
        return classifier

# Example usage
analyzer = NetworkAnalyzer()
analyzer.capture_packets(duration=60)
flow_data = analyzer.extract_flow_features()

classifier = TrafficClassifier()
X_train, X_test, y_train, y_test = classifier.prepare_data(flow_data)

classifier.train(X_train, y_train)
evaluation_report = classifier.evaluate(X_test, y_test)
print("\nTraffic Classification Report:")
print(evaluation_report)

# Save the model for future use
classifier.save_model('traffic_classifier.joblib')

3. Anomaly Detection in Network Traffic

Let’s implement anomaly detection to find unusual network behavior:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np

class NetworkAnomalyDetector:
    def __init__(self):
        self.scaler = StandardScaler()
        self.model = IsolationForest(contamination=0.1, random_state=42)
        self.feature_columns = ['bytes_per_second', 'packets_per_second', 
                               'avg_packet_size']
    
    def train(self, flow_data):
        X = flow_data[self.feature_columns]
        X_scaled = self.scaler.fit_transform(X)
        self.model.fit(X_scaled)
    
    def detect_anomalies(self, flow_data):
        X = flow_data[self.feature_columns]
        X_scaled = self.scaler.transform(X)
        predictions = self.model.predict(X_scaled)
        
        # -1 indicates anomaly, 1 indicates normal
        flow_data['is_anomaly'] = predictions == -1
        return flow_data[flow_data['is_anomaly']]
    
    def calculate_anomaly_scores(self, flow_data):
        X = flow_data[self.feature_columns]
        X_scaled = self.scaler.transform(X)
        scores = self.model.score_samples(X_scaled)
        return scores

class RealTimeTrafficMonitor:
    def __init__(self, analyzer, classifier, anomaly_detector):
        self.analyzer = analyzer
        self.classifier = classifier
        self.anomaly_detector = anomaly_detector
        self.baseline_stats = None
    
    def establish_baseline(self, duration=300):
        print(f"Establishing baseline over {duration} seconds...")
        self.analyzer.capture_packets(duration=duration)
        flow_data = self.analyzer.extract_flow_features()
        
        self.baseline_stats = {
            'avg_bytes_per_second': flow_data['bytes_per_second'].mean(),
            'avg_packets_per_second': flow_data['packets_per_second'].mean(),
            'std_bytes_per_second': flow_data['bytes_per_second'].std(),
            'std_packets_per_second': flow_data['packets_per_second'].std()
        }
        
        self.anomaly_detector.train(flow_data)
        return self.baseline_stats
    
    def monitor_traffic(self, duration=60):
        print(f"Monitoring traffic for {duration} seconds...")
        self.analyzer.capture_packets(duration=duration)
        flow_data = self.analyzer.extract_flow_features()
        
        # Classify traffic
        classifications = self.classifier.predict(flow_data[self.classifier.feature_columns])
        flow_data['classification'] = classifications
        
        # Detect anomalies
        anomalies = self.anomaly_detector.detect_anomalies(flow_data)
        anomaly_scores = self.anomaly_detector.calculate_anomaly_scores(flow_data)
        flow_data['anomaly_score'] = anomaly_scores
        
        return {
            'flow_data': flow_data,
            'anomalies': anomalies,
            'summary': {
                'total_flows': len(flow_data),
                'anomaly_flows': len(anomalies),
                'traffic_types': Counter(classifications)
            }
        }

# Example usage
analyzer = NetworkAnalyzer()
classifier = TrafficClassifier.load_model('traffic_classifier.joblib')
anomaly_detector = NetworkAnomalyDetector()

monitor = RealTimeTrafficMonitor(analyzer, classifier, anomaly_detector)
baseline = monitor.establish_baseline(duration=120)
print("\nBaseline Statistics:")
for key, value in baseline.items():
    print(f"{key}: {value}")

monitoring_result = monitor.monitor_traffic(duration=60)
print("\nMonitoring Results:")
print(f"Total Flows: {monitoring_result['summary']['total_flows']}")
print(f"Anomalous Flows: {monitoring_result['summary']['anomaly_flows']}")
print("\nTraffic Types:")
for traffic_type, count in monitoring_result['summary']['traffic_types'].items():
    print(f"{traffic_type}: {count} flows")

if not monitoring_result['anomalies'].empty:
    print("\nTop Anomalies:")
    print(monitoring_result['anomalies'].sort_values('anomaly_score', ascending=True).head())

4. Visualization of Network Traffic

Let’s create visualizations to help understand the traffic patterns:

import matplotlib.pyplot as plt
import seaborn as sns
from mpl_toolkits.mplot3d import Axes3D

class TrafficVisualizer:
    def __init__(self):
        plt.style.use('seaborn')
    
    def plot_traffic_volume_over_time(self, flow_data):
        plt.figure(figsize=(12, 6))
        flow_data['timestamp'] = pd.to_datetime(flow_data['start_time'], unit='s')
        hourly_traffic = flow_data.set_index('timestamp').resample('1min').sum()
        
        plt.plot(hourly_traffic.index, hourly_traffic['bytes'])
        plt.title('Network Traffic Volume Over Time')
        plt.xlabel('Time')
        plt.ylabel('Bytes')
        plt.xticks(rotation=45)
        plt.tight_layout()
        return plt
    
    def plot_protocol_distribution(self, flow_data):
        plt.figure(figsize=(10, 6))
        protocol_counts = flow_data['protocol'].value_counts()
        protocol_counts.plot(kind='bar')
        plt.title('Protocol Distribution')
        plt.xlabel('Protocol')
        plt.ylabel('Count')
        plt.tight_layout()
        return plt
    
    def plot_anomaly_visualization(self, flow_data):
        plt.figure(figsize=(10, 6))
        plt.scatter(flow_data['bytes_per_second'], 
                   flow_data['packets_per_second'], 
                   c=flow_data['anomaly_score'], 
                   cmap='viridis')
        plt.colorbar(label='Anomaly Score')
        plt.xlabel('Bytes per Second')
        plt.ylabel('Packets per Second')
        plt.title('Network Flow Anomalies')
        plt.tight_layout()
        return plt
    
    def plot_3d_traffic_visualization(self, flow_data):
        fig = plt.figure(figsize=(12, 8))
        ax = fig.add_subplot(111, projection='3d')
        
        scatter = ax.scatter(flow_data['bytes_per_second'],
                            flow_data['packets_per_second'],
                            flow_data['avg_packet_size'],
                            c=flow_data['anomaly_score'],
                            cmap='viridis')
        
        ax.set_xlabel('Bytes per Second')
        ax.set_ylabel('Packets per Second')
        ax.set_zlabel('Avg Packet Size')
        plt.colorbar(scatter, label='Anomaly Score')
        plt.title('3D Network Traffic Visualization')
        return plt

# Example usage
visualizer = TrafficVisualizer()

# Get monitoring results
monitoring_result = monitor.monitor_traffic(duration=60)
flow_data = monitoring_result['flow_data']

# Create visualizations
volume_plot = visualizer.plot_traffic_volume_over_time(flow_data)
volume_plot.savefig('traffic_volume.png')

protocol_plot = visualizer.plot_protocol_distribution(flow_data)
protocol_plot.savefig('protocol_distribution.png')

anomaly_plot = visualizer.plot_anomaly_visualization(flow_data)
anomaly_plot.savefig('anomaly_visualization.png')

plot_3d = visualizer.plot_3d_traffic_visualization(flow_data)
plot_3d.savefig('3d_traffic_visualization.png')

5. Practical Considerations and Best Practices

  1. Performance Optimization ```python

    Use PyShark for better performance with large packet captures

    import pyshark

def capture_with_pyshark(interface, duration): capture = pyshark.LiveCapture(interface=interface) capture.sniff(timeout=duration) return capture

Use multiprocessing for faster analysis

from multiprocessing import Pool

def analyze_packet_chunk(packets): # Analysis code here pass

def parallel_analysis(all_packets, num_processes=4): chunk_size =