sisniff
Zur Navigation springen
Zur Suche springen
Ein Netzwerk-Sniffer der für jedes Packet nebst Adresse und Port die lokal verbundene Anwendung und deren PID ermittelt und anzeigt.
Es wird TCP, UDP und ICMP unterstützt.
Der Sniffer akzeptiert Filter wie sie bei tcpdump üblich sind.
Bei HTTP Verbindungen kann ausserdem ein Teil der Payload angezeigt werden.
Die Option -h gibt eine Argumenteübersicht und listet die verfügbaren Interfaces auf.
# ./sisniff.py -h
usage: sisniff.py [-h] -i {eth0,lo,tun0,wlan0} [-n] [-pH] [filter]
positional arguments:
filter Pcap filter (BPF syntax) on top of IP (in dbl-quotes "...")
optional arguments:
-h, --help show this help message and exit
-i {eth0,lo,tun0,wlan0}
Interface (mandatory)
-n Do not resolve IP-Addresses
-pH Show HTTP Payload
- Downloads der aktuellen Version
- Source: https://git.zweiernet.ch/sigi/sisniff/raw/master/sisniff.py
- gzipped: https://git.zweiernet.ch/sigi/sisniff/archive/master.tar.gz
- GitLab Projekt: https://git.zweiernet.ch/sigi/sisniff
| i |
Da
|
Python2 Code of sisniff.py:
#!/usr/bin/python2
# (c) 2017 by Siegrist(SystemLoesungen) <PSS@ZweierNet.ch>
from scapy.all import *
import pwd
import os
import re
import glob
import sys
import string
import fcntl
import struct
import commands
import argparse
VERSION = "0.76"
PROC_TCP4 = "/proc/net/tcp"
PROC_UDP4 = "/proc/net/udp"
PROC_ICMP4 = "/proc/net/icmp"
PROC_TCP6 = "/proc/net/tcp6"
PROC_UDP6 = "/proc/net/udp6"
PROC_PACKET = "/proc/net/packet"
# Services
TSERV = dict((TCP_SERVICES[k], k) for k in TCP_SERVICES.keys())
USERV = dict((UDP_SERVICES[k], k) for k in UDP_SERVICES.keys())
# IP Protocol Numbers (dec)
IPPROTO_ICMP = 1
IPPROTO_TCP = 6
IPROTOP_IGP = 9
IPPROTO_UDP = 17
nostate = set(['04','05','06''07','08','09','0C','0D'])
tcp_payload_hdrs = ['GET|POST|HTTP|HEAD|PUT|PATCH|DELETE|TRACE|OPTIONS|CONNECT']
numeric = False
payloadH = False
fillter = ""
def get_conn_info(proto,hosts,ports):
''' returns: pid, exe, uid '''
uid = 0
line_array = _proc4load(proto,hosts,ports)
if line_array == 0:
return ['?','?','?']
'''
try:
uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID.
except:
uid = line_array[7]
'''
inode = str(line_array[9])
if inode == "0":
return ['.','.','.']
pid = _get_pid_of_inode(inode) # try get a pid
if pid == "NoPid":
#print ">>>>>>>>>>>NoPID:" + str(hosts) +" "+ str(ports) + "//" + str(line_array)
return ['-', '-', uid]
try: # try read the process name.
exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1]
except:
exe = None
#print str(lhost) +" "+ str(lport) +" "+ inode +" "+ pid
return [pid, exe, uid]
def _proc4load(proto,hosts,ports):
''' Read the table of tcp/udp connections
tcp/udp: "sl, local_address, rem_address, st, tx_queue rx_queue, tr tm->when, retrnsmt, uid , timeout, inode ,..."
---- TCP states from https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/net/tcp_states.h?id=HEAD
enum {
TCP_ESTABLISHED = 1,
TCP_SYN_SENT,
TCP_SYN_RECV,
TCP_FIN_WAIT1,
TCP_FIN_WAIT2,
TCP_TIME_WAIT,
TCP_CLOSE,
TCP_CLOSE_WAIT,
TCP_LAST_ACK,
TCP_LISTEN,
TCP_CLOSING, /* Now a valid state */
TCP_NEW_SYN_RECV,
TCP_MAX_STATES /* Leave at the end! */
};
----------
'''
#xhosts = _ip_hexrev(hosts)
xports = _dec2hex(ports)
if proto == IPPROTO_UDP:
try:
with open(PROC_UDP4,'r') as f:
next(f)
for line in f:
line_arrayu = _remove_empty(line.split(' '))
l_xhost,l_xport = line_arrayu[1].split(':')
if l_xhost not in xMYADDRS:
continue
if l_xport == xports:
return line_arrayu
return 0
except:
print "open proc_udp4 error"
return 0
elif proto == IPPROTO_TCP:
try:
with open(PROC_TCP4,'r') as f:
next(f)
for line in f:
line_arrayt = _remove_empty(line.split(' '))
if line_arrayt[3] in nostate: # not some TCP state
continue
l_xhost,l_xport = line_arrayt[1].split(':')
if l_xhost not in xMYADDRS:
continue
if l_xport == xports:
return line_arrayt
return 0
except:
print "open proc_tcp error"
return 0
elif proto == IPPROTO_ICMP:
try:
with open(PROC_ICMP4,'r') as f:
next(f)
for line in f:
line_arrayi = _remove_empty(line.split(' '))
l_xhost,l_xport = line_arrayi[1].split(':')
if l_xhost not in xMYADDRS:
continue
if l_xport == xports:
return line_arrayi
return 0
except:
print "open proc_icmp4 error"
return 0
return 0
def _convert_ipv4_port(array):
host,port = array.split(':')
return _ip(host),_hex2dec(port)
def _hex2dec(s):
return str(int(s,16))
def _dec2hex(p):
return hex(int(p)).split('x')[-1].upper()
def _ip(s):
ip = [(_hex2dec(s[6:8])),(_hex2dec(s[4:6])),(_hex2dec(s[2:4])),(_hex2dec(s[0:2]))]
return '.'.join(ip)
def _ip6(s):
ip = [s[6:8],s[4:6],s[2:4],s[0:2],s[12:14],s[14:16],s[10:12],s[8:10],s[22:24],s[20:22],s[18:20],s[16:18],s[30:32],s[28:30],s[26:28],s[24:26]]
#print '66666:', ':'.join(ip), s
return ':'.join(ip)
def _ip_hexrev(ip):
return ''.join([hex(int(x)+256)[3:] for x in ip.split('.')][::-1]).upper()
def _remove_empty(array):
return [x for x in array if x != '']
def _get_pid_of_inode(inode):
s_term = r'^socket\:\['+ inode +r'\]$'
for item in glob.iglob('/proc/[0-9]*/fd/[0-9]*'):
try:
if re.match(s_term,os.readlink(item)):
return item.split('/')[2]
except:
pass
return "NoPid"
def _resolve_ip(host):
"""
resolve ip und update dictionary res_cache {'ip': 'name'}.
If resolution for a ip failed, 'name' is n_try ... 0.
"""
try:
hname = socket.gethostbyaddr(host)[0]
res_cache[host] = str(hname)
return str(hname)
except:
res_cache[host] = str(host)
return str(host)
def check_root():
if os.getuid() == 0:
return True
else:
return False
## Define our Custom Action function
def doPackets(packet):
program = "-"
pid = "-"
uid = "-"
o_proto = ""
o_dport = "none"
o_sport = "none"
flags = ""
# only local addresses
if packet[0][1].src in MYADDRS:
conn_addr = packet[0][1].src
if packet.haslayer(TCP) or packet.haslayer(UDP) or packet.haslayer(ICMP):
try:
conn_port = packet[0][2].sport
except:
conn_port = 99999
o_dir = 1
else:
conn_addr = packet[0][1].dst
if packet.haslayer(TCP) or packet.haslayer(UDP) or packet.haslayer(ICMP):
try:
conn_port = packet[0][2].dport
except:
conn_port = 99999
o_dir = 0
if packet.haslayer(TCP) or packet.haslayer(UDP) or packet.haslayer(ICMP): # grrr, no info in /proc/net/icmp so far. or packet.haslayer(ICMP):
# logemol casch
c_hash = conn_addr+'=:='+str(conn_port)
if not any(x[0] == c_hash for x in conn_cache):
# get the connection info from packet
spid,sexe,suid = get_conn_info(packet[0][1].proto, conn_addr, conn_port)
if re.match("^[0-9]+$", spid):
program = sexe
pid = spid
uid = suid
# update cache
if len(conn_cache) >= cc_maxlen:
conn_cache.pop(0)
conn_cache.append([c_hash,program,pid])
#print conn_cache
else:
program = sexe
pid = spid
uid = suid
else:
# me honds fom casch
indx = [x[0] for x in conn_cache].index(c_hash)
program = conn_cache[indx][1]
pid = conn_cache[indx][2]
uid = 0
# cache aktualisieren
renew = conn_cache.pop(indx)
conn_cache.append(renew)
o_payload = ""
if packet.haslayer(UDP):
o_proto = "UDP"
try:
o_dport = "\033[1m"+USERV[packet[0][2].dport]+"\033[0m"
except:
o_dport = str(packet[0][2].dport)
try:
o_sport = "\033[1m"+USERV[packet[0][2].sport]+"\033[0m"
except:
o_sport = str(packet[0][2].sport)
flags = ""
#o_payload = packet[0].sprintf('%10s,UDP.payload%')
elif packet.haslayer(TCP):
o_proto = "TCP"
try:
o_dport = "\033[1m"+TSERV[packet[0][2].dport]+"\033[0m"
except:
o_dport = str(packet[0][2].dport)
try:
o_sport = "\033[1m"+TSERV[packet[0][2].sport]+"\033[0m"
except:
o_sport = str(packet[0][2].sport)
flags = packet[0].sprintf('%3s,TCP.flags%')
if payloadH == True:
if packet.haslayer(Raw):
tpld = packet[0].sprintf('%TCP.payload%')
if re.match("^GET|POST|HTTP|HEAD|PUT|PATCH|DELETE|TRACE|OPTIONS|CONNECT.*", tpld[0:8]):
request_line, gaga = tpld.split('\r\n', 1)
o_payload = str(request_line)
#o_payload = tpld[0:20]
elif packet.haslayer(ICMP):
o_proto = "ICMP"
if conn_port == 99999:
o_dport = "-"
o_sport = "-"
else:
try:
o_dport = "\033[1m"+USERV[packet[0][2].sport]+"\033[0m"
except:
o_dport = str(packet[0][2].sport)
try:
o_sport = "\033[1m"+USERV[packet[0][2].dport]+"\033[0m"
except:
o_sport = str(packet[0][2].dport)
flags = "["+packet[0].sprintf('%ICMP.type%') + "/" + packet[0].sprintf('%ICMP.code%')+"]"
else:
o_proto = "UNKNOWN"
if o_dir == 1:
if numeric == False:
if res_cache.has_key(packet[0][1].dst):
rem_name = res_cache[packet[0][1].dst]
else:
rem_name = _resolve_ip(packet[0][1].dst)
else:
rem_name = packet[0][1].dst
return "\033[1m"+str(program)+"\033[0m" +"/"+ str(pid) + " - " + o_proto + ": " + packet[0][1].src + ":" + o_sport + "\033[1m\033[31m ->>> \033[0m" + rem_name + ":" + o_dport + " " + flags + " Len:" + str(packet[0][1].len) + " : " + o_payload
else:
if numeric == False:
if res_cache.has_key(packet[0][1].src):
rem_name = res_cache[packet[0][1].src]
else:
rem_name = _resolve_ip(packet[0][1].src)
else:
rem_name = packet[0][1].src
return "\033[1m"+str(program)+"\033[0m" +"/"+ str(pid) + " - " + o_proto + ": " + packet[0][1].dst + ":" + o_dport + "\033[1m\033[36m <<<- \033[0m" + rem_name + ":" + o_sport + " " + flags + " Len:" + str(packet[0][1].len) + " : " + o_payload
## -- Ond denn s'Hooptprogramm
# root check
if not check_root():
print("This program needs root privileges !\nThats because of reading the /proc filesystem and using libpcap functions.\nSo I give up\n")
conf.sniff_promisc=0
conf.sniff_promisc=0
#print conf
sys.exit()
# get the interfaces
ifaces = commands.getoutput("ls /sys/class/net")
iface_list = ifaces.split('\n')
print
# commandline params
parser = argparse.ArgumentParser(description='sisniff V'+VERSION)
parser.add_argument('-i', help="Interface (mandatory)", choices=iface_list, required=True)
parser.add_argument('-n', help="Do not resolve IP-Addresses", action="store_true")
parser.add_argument('-pH', help="Show HTTP Payload", action="store_true")
parser.add_argument('filter', nargs='?', help="Filter (BPF syntax) on top of IP (in dbl-quotes \"...\")", type=str)
args = parser.parse_args()
iface = args.i
if args.n:
numeric = True
if args.pH:
payloadH = True
if args.filter:
fillter = " and (" + args.filter + ")"
print "> Applying Filter: \"ip" + fillter + "\""
# local addresses
MYADDRS = _remove_empty([os.popen('ip addr show '+iface).read().split("inet ")[1].split("/")[0]])
MYADDRS.append('0.0.0.0')
MYADDRS.append('127.0.0.1')
xMYADDRS = [_ip_hexrev(x) for x in MYADDRS]
print "> My IP-Addresses: " + str(MYADDRS)
# confirmed connections cache (ringboffer)
conn_cache = []
cc_maxlen = 20
# resolver cache
res_cache = {}
n_try = 3
print
print "Prog/PID mavericks: ?/? = No entry in /proc/net/xxx; -/- = No PID for Inode found; ./. = Inode=0;"
print
print "Program/PID: Local addr:port <<->> Remote addr:port [Flags] Len:length : [Payload]"
print "-------------------------------------------------------------------------------"
# sniff, filtering for IP traffic
sniff(filter="ip"+fillter,iface=iface,prn=doPackets, store=0)
## -- oond denn isch schloss