From b9f416bbe186520c7be1dafc3cb30705983bd740 Mon Sep 17 00:00:00 2001 From: Chris <christianilhoefer@gmail.com> Date: Fri, 10 Jan 2025 14:12:51 +0100 Subject: [PATCH] rate based anomaly detection --- .idea/PSE_Code.iml | 2 +- README.md | 2 +- code/src/packet_capturing.py | 283 +++++++++++++++++------------------ 3 files changed, 142 insertions(+), 145 deletions(-) diff --git a/.idea/PSE_Code.iml b/.idea/PSE_Code.iml index 52d8cca..f1e0c5e 100644 --- a/.idea/PSE_Code.iml +++ b/.idea/PSE_Code.iml @@ -5,7 +5,7 @@ <sourceFolder url="file://$MODULE_DIR$/code/src" isTestSource="false" /> <sourceFolder url="file://$MODULE_DIR$/code/test" isTestSource="true" /> </content> - <orderEntry type="jdk" jdkName="Python 3.10" jdkType="Python SDK" /> + <orderEntry type="jdk" jdkName="Python 3.13" jdkType="Python SDK" /> <orderEntry type="sourceFolder" forTests="false" /> </component> </module> \ No newline at end of file diff --git a/README.md b/README.md index 8381c8e..0040e17 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ - [ ] Module 1: Package capture - [X] 1. Checksom Verification - [X] 2. Payload Pattern Matching - - [ ] 3. Rate-Based Anomaly Detection + - [X] 3. Rate-Based Anomaly Detection - [X] 4. Malformed Packet Detection - [X] 5. ICMP Flood Detection - [ ] 6. DNS Spoofing Detection diff --git a/code/src/packet_capturing.py b/code/src/packet_capturing.py index 84b1bb3..1c5c4d7 100644 --- a/code/src/packet_capturing.py +++ b/code/src/packet_capturing.py @@ -35,193 +35,190 @@ packet_max_size = 1500 icmp_threshold = 100 syn_flood_threshold = 100 icmp_flood_periodical_check = 10 -syn_flood_periodical_check = 10 +syn_flood_periodical_check = 1 ############### def ip_spoofing(src_mac, src_ip, packet): - if src_ip not in reserved_ips: - if src_ip.startswith("10.") or src_ip.startswith("192.168.") or src_ip.startswith("169.254."): - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="IP Spoofing") - print("Possible IP spoofing using private networks detected.") - elif src_ip.startswith("172."): - octet = int(src_ip.split(".")[1]) - if 16 <= octet <= 31: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="IP Spoofing") - print("Possible IP spoofing using private networks detected.") + if src_ip not in reserved_ips: + if src_ip.startswith("10.") or src_ip.startswith("192.168.") or src_ip.startswith("169.254."): + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="IP Spoofing") + print("Possible IP spoofing using private networks detected.") + elif src_ip.startswith("172."): + octet = int(src_ip.split(".")[1]) + if 16 <= octet <= 31: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="IP Spoofing") + print("Possible IP spoofing using private networks detected.") def syn_fin(packet): - if TCP in packet: - tcp_flags = packet[TCP].flags - if 'S' in tcp_flags and 'F' in tcp_flags: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="SYN-FIN") - print("Malicious packet detected: SYN-FIN combination.") - print(f"MAC Address of malicious agent: {packet[Ether].src}") + if TCP in packet: + tcp_flags = packet[TCP].flags + if 'S' in tcp_flags and 'F' in tcp_flags: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="SYN-FIN") + print("Malicious packet detected: SYN-FIN combination.") + print(f"MAC Address of malicious agent: {packet[Ether].src}") def null_packet(packet): - if TCP in packet: - tcp_flags = packet[TCP].flags - if tcp_flags == 0: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Null Packet") - print("Malicious null packet found.") - print(f"MAC Address of malicious agent: {packet[Ether].src}") + if TCP in packet: + tcp_flags = packet[TCP].flags + if tcp_flags == 0: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Null Packet") + print("Malicious null packet found.") + print(f"MAC Address of malicious agent: {packet[Ether].src}") def port_check(packet): - if TCP in packet: - src_port = packet[TCP].sport - dst_port = packet[TCP].dport - if src_port == 0 or dst_port == 0: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="TCP Port 0") - print("Illegal packet with source or destination port 0.") - print(f"MAC Address of malicious agent: {packet[Ether].src}") + if TCP in packet: + src_port = packet[TCP].sport + dst_port = packet[TCP].dport + if src_port == 0 or dst_port == 0: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="TCP Port 0") + print("Illegal packet with source or destination port 0.") + print(f"MAC Address of malicious agent: {packet[Ether].src}") def destination_check(packet): - if IP in packet: - dest_ip = packet[IP].dst - if dest_ip.endswith(".0") or dest_ip.endswith(".255"): - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Broadcast Address") - print("Packets with broadcast destination address detected.") - print(f"MAC Address of malicious agent: {packet[Ether].src}") + if IP in packet: + dest_ip = packet[IP].dst + if dest_ip.endswith(".0") or dest_ip.endswith(".255"): + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Broadcast Address") + print("Packets with broadcast destination address detected.") + print(f"MAC Address of malicious agent: {packet[Ether].src}") # Simple Check to see if the Packet has a valid format def malformed_check(packet): - if len(packet) > packet_max_size: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Malformed Packet") - print(f"Malformed over-sized packet detected: {packet[IP].src}") - if TCP in packet: - if packet[TCP].dataofs is None or packet[TCP].dataofs * 4 < 20: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Malformed Packet") - print(f"Malformed packet with short TCP detected: {packet[IP].src}") - + if len(packet) > packet_max_size: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Malformed Packet") + print(f"Malformed over-sized packet detected: {packet[IP].src}") + if TCP in packet: + if packet[TCP].dataofs is None or packet[TCP].dataofs * 4 < 20: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Malformed Packet") + print(f"Malformed packet with short TCP detected: {packet[IP].src}") # Detect if a certain IP is sending a suspiciously large amount of SYN packets -def syn_flood_detection(packet): - global syn_counts_last_checked - if TCP in packet and packet[TCP].flags == 'S': - syn_counts[packet[IP].src] += 1 - - # Check counts periodically (every 10s) - if time.time() - syn_counts_last_checked > syn_flood_periodical_check: - for ip, count in syn_counts.items(): - if count > syn_flood_threshold: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="SYN Flood") - print(f"Possible SYN flood from {ip}. Number of SYN Packets in last {time.time() - syn_counts_last_checked} seconds: {count}") - syn_counts.clear() - syn_counts_last_checked = time.time() +def ip_rate_based_anomaly_detection(packet): + global syn_counts_last_checked + if TCP in packet and packet[TCP].flags == 'S': + syn_counts[packet[IP].src] += 1 + + # Check counts periodically (every 10s) + if time.time() - syn_counts_last_checked > syn_flood_periodical_check: + for ip, count in syn_counts.items(): + if count > syn_flood_threshold: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="SYN Flood") + print(f"Possible SYN flood from {ip}. Number of SYN Packets in last {time.time() - syn_counts_last_checked} seconds: {count}") + syn_counts.clear() + syn_counts_last_checked = time.time() # Detect if a certain IP is sending too many echo requests, functionality mimics 'syn_flood_detection'. def icmp_flood_detection(packet): - # Echo requests: ICMP.type = 8, ICMP.code = 0 - if packet[ICMP].type != 8 and packet[ICMP].code != 0: return + # Echo requests: ICMP.type = 8, ICMP.code = 0 + if packet[ICMP].type != 8 and packet[ICMP].code != 0: return - global icmp_counts_last_checked - icmp_counts[packet[IP].src] += 1 + global icmp_counts_last_checked + icmp_counts[packet[IP].src] += 1 - # Check counts periodically (every 10s) - if time.time() - icmp_counts_last_checked > icmp_flood_periodical_check: - for ip, count in icmp_counts.items(): - if count > icmp_threshold: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="ICMP Flood") - print(f"Possible ICMP flood from {ip}. Number of ICMP Packets in last {time.time() - icmp_counts_last_checked} seconds: {count}") - icmp_counts.clear() - icmp_counts_last_checked = time.time() + # Check counts periodically (every 10s) + if time.time() - icmp_counts_last_checked > icmp_flood_periodical_check: + for ip, count in icmp_counts.items(): + if count > icmp_threshold: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="ICMP Flood") + print(f"Possible ICMP flood from {ip}. Number of ICMP Packets in last {time.time() - icmp_counts_last_checked} seconds: {count}") + icmp_counts.clear() + icmp_counts_last_checked = time.time() # Checks if the same IP has a consistent MAC. # Otherwise, an attacker might try to use an IP already in use, like e.g. the gateway's IP, to intercept traffic def arp_spoofing(packet): - # op == 2 checks for an ARP reply (they say what their MAC is) - if ARP in packet and packet[ARP].op == 2: - src_ip = packet[ARP].psrc - src_mac = packet[ARP].hwsrc - if src_ip in arp_table: - if arp_table[src_ip] != src_mac: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="ARP Spoofing") - print(f"Possible ARP spoofing detected from IP {src_ip}. Conflicting MACs: {arp_table[src_ip]} and {src_mac}") - else: - arp_table[src_ip] = src_mac + # op == 2 checks for an ARP reply (they say what their MAC is) + if ARP in packet and packet[ARP].op == 2: + src_ip = packet[ARP].psrc + src_mac = packet[ARP].hwsrc + if src_ip in arp_table: + if arp_table[src_ip] != src_mac: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="ARP Spoofing") + print(f"Possible ARP spoofing detected from IP {src_ip}. Conflicting MACs: {arp_table[src_ip]} and {src_mac}") + else: + arp_table[src_ip] = src_mac def tcp_reset_attack(packet): - if packet[TCP].flags == 'R': - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="TCP Reset") - print(f"Possible TCP reset attack from {packet[IP].src}") + if packet[TCP].flags == 'R': + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="TCP Reset") + print(f"Possible TCP reset attack from {packet[IP].src}") # Checks if the package has DNS layers def dns_spoofing(packet): - if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 1: # DNS response - dns_response = packet.getlayer(DNS) - if dns_response.an is not None: - for i in range(dns_response.ancount): - dns_rr = dns_response.an[i] - if dns_rr.rdata in reserved_ips: - print(f"Possible DNS spoofing detected: {dns_rr.rrname} -> {dns_rr.rdata}") + if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 1: # DNS response + dns_response = packet.getlayer(DNS) + if dns_response.an is not None: + for i in range(dns_response.ancount): + dns_rr = dns_response.an[i] + if dns_rr.rdata in reserved_ips: + print(f"Possible DNS spoofing detected: {dns_rr.rrname} -> {dns_rr.rdata}") # Checks if checksum is corrupted def checksum_verification(packet): - if IP in packet: - original_checksum = packet[IP].chksum - del packet[IP].chksum # Remove checksum to force recalculation - recalculated_checksum = IP(raw(packet[IP])).chksum - if original_checksum != recalculated_checksum: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Checksum Mismatch") - print(f"Failed checksum verification from {packet[IP].src}") + if IP in packet: + original_checksum = packet[IP].chksum + del packet[IP].chksum # Remove checksum to force recalculation + recalculated_checksum = IP(raw(packet[IP])).chksum + if original_checksum != recalculated_checksum: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Checksum Mismatch") + print(f"Failed checksum verification from {packet[IP].src}") def payload_pattern_matching(packet): - payload = packet[Raw].load.decode(errors="ignore") - malicious_patterns = ["DROP TABLE", "<script>", "SELECT * FROM"] - if any(pattern in payload for pattern in malicious_patterns): - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Payload") - print(f"Possible Payload matching from {packet[IP].src}") + payload = packet[Raw].load.decode(errors="ignore") + malicious_patterns = ["DROP TABLE", "<script>", "SELECT * FROM"] + if any(pattern in payload for pattern in malicious_patterns): + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Payload") + print(f"Possible Payload matching from {packet[IP].src}") def content_length_mismatch(packet): - http_data = packet[Raw].load.decode(errors='ignore') - headers = http_data.split('\r\n') - for header in headers: - if header.startswith("Content-Length"): - content_length = int(header.split(":")[1].strip()) - actual_length = len(http_data.split('\r\n\r\n', 1)[1]) - if content_length != actual_length: - db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Content-Length Mismatch") - print(f"Content length mismatch from {packet[IP].src}") + http_data = packet[Raw].load.decode(errors='ignore') + headers = http_data.split('\r\n') + for header in headers: + if header.startswith("Content-Length"): + content_length = int(header.split(":")[1].strip()) + actual_length = len(http_data.split('\r\n\r\n', 1)[1]) + if content_length != actual_length: + db_conn.update_address(connection=connection, packet=packet, is_dangerous=True, type_of_threat="Content-Length Mismatch") + print(f"Content length mismatch from {packet[IP].src}") def packet_handler(packet): - db_conn.add_address(connection=connection, packet=packet) - src = packet[IP].src if IP in packet else packet[Ether].src - dst = packet[IP].dst if IP in packet else packet[Ether].dst - print(f"Packet captured: {src} -> {dst}") - - if IP in packet: - src_mac = packet[Ether].src if Ether in packet else None - malformed_check(packet) - ip_spoofing(src_mac, src, packet) - destination_check(packet) - syn_flood_detection(packet) - arp_spoofing(packet) - checksum_verification(packet) - if ICMP in packet: - icmp_flood_detection(packet) - if TCP in packet: - syn_fin(packet) - null_packet(packet) - port_check(packet) - tcp_reset_attack(packet) - if DNS in packet: - dns_spoofing(packet) - if Raw in packet: - payload_pattern_matching(packet) - content_length_mismatch(packet) - - + db_conn.add_address(connection=connection, packet=packet) + src = packet[IP].src if IP in packet else packet[Ether].src + dst = packet[IP].dst if IP in packet else packet[Ether].dst + print(f"Packet captured: {src} -> {dst}") + + if IP in packet: + src_mac = packet[Ether].src if Ether in packet else None + malformed_check(packet) + ip_spoofing(src_mac, src, packet) + destination_check(packet) + ip_rate_based_anomaly_detection(packet) + arp_spoofing(packet) + checksum_verification(packet) + if ICMP in packet: + icmp_flood_detection(packet) + if TCP in packet: + syn_fin(packet) + null_packet(packet) + port_check(packet) + tcp_reset_attack(packet) + if DNS in packet: + dns_spoofing(packet) + if Raw in packet: + payload_pattern_matching(packet) + content_length_mismatch(packet) # Allows unit testing of the dict def get_dicts(): - return dicts + return dicts def dict_clear(): - for dictionary in dicts: - dictionary.clear() + for dictionary in dicts: + dictionary.clear() def main(): - db_conn.initiate_database(connection) - print("Starting packet capture...") - sniff(prn=packet_handler, store=False) + db_conn.initiate_database(connection) + print("Starting packet capture...") + sniff(prn=packet_handler, store=False) if __name__ == "__main__": - main() + main() -- GitLab