sinetstat
Aus Si:Wiki von Siegrist SystemLösungen - Informatik und Rezepte
Version vom 16. Juni 2017, 11:15 Uhr von Sigi (Diskussion | Beiträge)
Eine Alternative zum Linux netstat-Programm das auch IPv4inIPv6 Sockets anzeigen kann.
Es ist nur auf Netzwerksocken ausgelegt, also nur TCP und UDP, aber keine Unix-Sockets.
Eine Uebersicht der Parameter gibt sinetstat -h
:
# sinetstat -h usage: sinetstat [-h] [-l] [-e] [-r] [-w] [-W] [-t] [-u] [-4] [-6] netstat utility V1.0 2017 by sigi <https://wiki.zweiernet.ch/wiki/sinetstat> optional arguments: -h, --help show this help message and exit -l Only listening sockets -e Only established sockets -r Resolve IP-Addresses -w Wide (show cmd) -W Wider (show cmd with arguments) -t Only TCP -u Only UDP -4 Only IPv4 -6 Only IPv6
Die aktuellste Version steht hier: sinetstat-1.0.tar.gz zum Download.
Updates:
- V0.93:
- neuer Parameter
'-e'
für 'Established' Verbindungen. Als established gelten alle Verbindungen die nicht in den Zuständen LISTEN|SYN_SENT|SYN_RECV sind.
- neuer Parameter
- V0.95:
- Anzeige von UDP 'ESTABLISHED'. Ja, scheinbar gibts das, obwohl UDP verbindungslos ist.
- Und zwar dann, wenn eine Apllikation einen
connect(2)
Systemcall auf einem SOCK_DGRAM Socket zu einem Partner macht. Dabei werden dann nur Verbindungen von diesem Partner akzeptiert.
- V1.0:
- Namensänderung
si_netstat
->sinetstat
- Neue Parameter
'-w'
und'-W'
. -
-w
zeigt das Kommando wie es in/proc/<pid>/cmdline
zu finden ist. -
-W
listet zusätzlich die Kommandozeilen-Optionen auf.
- Namensänderung
Und hier für die Ungeduldigen der Python 2.x Code:
#!/usr/bin/python2 # (c) 2016 by Siegrist(SystemLoesungen) <PSS @ ZweierNet.ch> # Website: [https://wiki.zweiernet.ch/wiki/Sinetstat] # # This program is free software. # # The program is based on a python netstat script that was written by da667 available on https://github.com/da667/netstat # who had it adapted from Ricardo Pascal, available on http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code. # # This version has some improvements make it an acceptable alternative to the original netstat command. # So it can explore IPv4 in IPv6 listening sockets and some other information over and above the original netstat. # # Simply try: 'sinetstat -h' # import pwd import os import re import glob import socket import sys import string import fcntl import struct import argparse VERSION = '1.01' PROC_TCP4 = "/proc/net/tcp" PROC_UDP4 = "/proc/net/udp" PROC_TCP6 = "/proc/net/tcp6" PROC_UDP6 = "/proc/net/udp6" MAX_IPV4_ADDRESS = 0xffffffff MAX_IPV6_ADDRESS = 0xffffffffffffffffffffffffffffffff TCP_STATE = { '01':'ESTABLISHED', '02':'SYN_SENT', '03':'SYN_RECV', '04':'FIN_WAIT1', '05':'FIN_WAIT2', '06':'TIME_WAIT', '07':'CLOSE', '08':'CLOSE_WAIT', '09':'LAST_ACK', '0A':'LISTEN', '0B':'CLOSING' } v4ports = [] opt_l = True def grep_b(list, search): return [True for i in list if search in i] def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15]) )[20:24]) def _tcp4load(): ''' Read the table of tcp connections & remove the header ''' with open(PROC_TCP4,'r') as f: content = f.readlines() content.pop(0) return content def _tcp6load(): ''' Read the table of tcpv6 connections & remove the header''' with open(PROC_TCP6,'r') as f: content = f.readlines() content.pop(0) return content def _udp4load(): '''Read the table of udp connections & remove the header ''' with open(PROC_UDP4,'r') as f: content = f.readlines() content.pop(0) return content def _udp6load(): '''Read the table of udp connections & remove the header ''' with open(PROC_UDP6,'r') as f: content = f.readlines() content.pop(0) return content def _hex2dec(s): return str(int(s,16)) def _ip(s): ip = [(_hex2dec(s[6:8])),(_hex2dec(s[4:6])),(_hex2dec(s[2:4])),(_hex2dec(s[0:2]))] return '.'.join(ip) def _ip6(s): ip = [s[6:8],s[4:6],s[2:4],s[0:2],s[12:14],s[14:16],s[10:12],s[8:10],s[22:24],s[20:22],s[18:20],s[16:18],s[30:32],s[28:30],s[26:28],s[24:26]] #print '66666:', ':'.join(ip), s return ':'.join(ip) def _ip6q(s): ip = [s[6:8]+s[4:6],s[2:4]+s[0:2],s[12:14]+s[14:16],s[10:12]+s[8:10],s[22:24]+s[20:22],s[18:20]+s[16:18],s[30:32]+s[28:30],s[26:28]+s[24:26]] #print '666qqq:', ':'.join(ip), s return ':'.join(ip) def _conv_v6(s): return s def _remove_empty(array): return [x for x in array if x !=''] def _convert_ipv4_port(array): host,port = array.split(':') return _ip(host),_hex2dec(port) def _convert_ipv6_port(array): host,port = array.split(':') return _ip6(host),_hex2dec(port) def _convert_ipv6(array): host,port = array.split(':') return _ip6q(host) def _addr_normal(s): return ':'.join(["%x" % x for x in [int(x, 16) for x in s.split(':')]]) def _countFollowingZeros(l): """Return number of elements containing 0 at the beginning of the list.""" #print 'aaa:', l if len(l) == 0: return 0 elif l[0] != 0: return 0 else: return 1 + _countFollowingZeros(l[1:]) def _compress_v6(addr): hextets = [int(x, 16) for x in addr.split(':')] #print hextets followingzeros = [0] * 8 for i in xrange(len(hextets)): followingzeros[i] = _countFollowingZeros(hextets[i:]) # compressionpos is the position where we can start removing zeros compressionpos = followingzeros.index(max(followingzeros)) if max(followingzeros) > 1: # genererate string with the longest number of zeros cut out # now we need hextets as strings hextets = [x for x in _addr_normal(addr).split(':')] while compressionpos < len(hextets) and hextets[compressionpos] == '0': del(hextets[compressionpos]) hextets.insert(compressionpos, '') if compressionpos + 1 >= len(hextets): hextets.append('') if compressionpos == 0: hextets = [''] + hextets return ':'.join(hextets) else: return _addr_normal(addr) def _resolve_ip(host): """ resolve ip und update dictionary res_cache {'ip': 'name'}. If resolution for a ip failed, 'name' is n_try ... 0. """ try: hname = socket.gethostbyaddr(host)[0] return str(hname) except: return str(host) def netstat_tcp4(): ''' Function to return a list with status of tcp4 connections on Linux systems. ''' tcpcontent =_tcp4load() tcpresult = [] for line in tcpcontent: line_array = _remove_empty(line.split(' ')) # Split lines and remove empty spaces. l_host,l_port = _convert_ipv4_port(line_array[1]) # Convert ipaddress and port from hex to decimal. r_host,r_port = _convert_ipv4_port(line_array[2]) tcp_id = line_array[0] state = TCP_STATE[line_array[3]] if state != 'LISTEN' and o_listen == True: continue if ( state == 'LISTEN' or state == 'SYN_SENT' or state == 'SYN_RECV' ) and o_estab == True: continue try: uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID. except: uid = line_array[7] inode = line_array[9] # Need the inode to get process pid. if int(inode) > 0: pid = _get_pid_of_inode(inode) try: # try read the process name. if o_wide == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')).split(' ')[0] elif o_wider == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')) else: exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1] except: exe = '-' else: pid = '-' exe = '-' if o_numeric == False: r_host = _resolve_ip(r_host) nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('TCP4', l_host+': '+l_port, r_host+': '+r_port, state, uid, pid, exe) tcpresult.append(nline) # update v4inv6check list v4ports.append(l_port) return tcpresult def netstat_tcp6(): ''' This function returns a list of tcp connections utilizing ipv6. ''' tcpcontent = _tcp6load() tcpresult = [] for line in tcpcontent: line_array = _remove_empty(line.split(' ')) l_host,l_port = _convert_ipv6_port(line_array[1]) r_host,r_port = _convert_ipv6_port(line_array[2]) tcp_id = line_array[0] state = TCP_STATE[line_array[3]] if state != 'LISTEN' and o_listen == True: continue if ( state == 'LISTEN' or state == 'SYN_SENT' or state == 'SYN_RECV' ) and o_estab == True: continue try: uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID. except: uid = line_array[7] inode = line_array[9] if int(inode) > 0: pid = _get_pid_of_inode(inode) try: # try read the process name. if o_wide == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')).split(' ')[0] elif o_wider == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')) else: exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1] except: exe = '-' else: pid = '-' exe = '-' nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('TCP6', _compress_v6(_convert_ipv6(line_array[1]))+': '+l_port, _compress_v6(_convert_ipv6(line_array[2]))+': '+r_port, state, uid, pid, exe) tcpresult.append(nline) return tcpresult def netstat_tcp4in6(): ''' Returns a list of tcp ipv4 in ipv6 listen sockets. ''' #print xx() tcpcontent = _tcp6load() tcpresult = [] for line in tcpcontent: line_array = _remove_empty(line.split(' ')) #if TCP_STATE[line_array[3]] != 'LISTEN': # continue l_host,l_port = _convert_ipv6_port(line_array[1]) r_host,r_port = _convert_ipv6_port(line_array[2]) if grep_b(v4ports,l_port): continue tcp_id = line_array[0] state = TCP_STATE[line_array[3]] if state != 'LISTEN' and o_listen == True: continue if ( state == 'LISTEN' or state == 'SYN_SENT' or state == 'SYN_RECV' ) and o_estab == True: continue try: uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID. except: uid = line_array[7] inode = line_array[9] if int(inode) > 0: pid = _get_pid_of_inode(inode) try: # try read the process name. if o_wide == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')).split(' ')[0] elif o_wider == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')) else: exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1] except: exe = '-' else: pid = '-' exe = '-' if l_host == '00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:01': if _check_v4inv6_port("127.0.0.1",l_port): nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('TCP4in6', '127.0.0.1: '+l_port, _compress_v6(_convert_ipv6(line_array[2]))+': '+r_port, state, uid, pid, exe) tcpresult.append(nline) if l_host == "00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00": if _check_v4inv6_port("0.0.0.0",l_port): nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('TCP4in6', '0.0.0.0: '+l_port, _compress_v6(_convert_ipv6(line_array[2]))+': '+r_port, state, uid, pid, exe) tcpresult.append(nline) #else: # for a in MYIFS.split(): # _check_v4inv6_port(get_ip_address(a),l_port) return tcpresult def netstat_udp4(): ''' Function to return a list with status of udp connections. ''' udpcontent =_udp4load() udpresult = [] for line in udpcontent: line_array = _remove_empty(line.split(' ')) l_host,l_port = _convert_ipv4_port(line_array[1]) r_host,r_port = _convert_ipv4_port(line_array[2]) udp_id = line_array[0] udp_state = TCP_STATE[line_array[3]] if ( udp_state != 'ESTABLISHED' and o_estab == True ) or o_estab == False: continue if udp_state != 'ESTABLISHED': udp_state =' ' #UDP is stateless try: uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID. except: uid = line_array[7] inode = line_array[9] if int(inode) > 0: pid = _get_pid_of_inode(inode) try: # try read the process name. if o_wide == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')).split(' ')[0] elif o_wider == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')) else: exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1] except: exe = '-' else: pid = '-' exe = '-' nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('UDP4', l_host+': '+l_port, r_host+': '+r_port, udp_state, uid, pid, exe) udpresult.append(nline) return udpresult def netstat_udp6(): ''' Function to return a list of udp connection utilizing ipv6 ''' udpcontent =_udp6load() udpresult = [] for line in udpcontent: line_array = _remove_empty(line.split(' ')) l_host,l_port = _convert_ipv6_port(line_array[1]) r_host,r_port = _convert_ipv6_port(line_array[2]) udp_id = line_array[0] udp_state = TCP_STATE[line_array[3]] if ( udp_state != 'ESTABLISHED' and o_estab == True ) or o_estab == False: continue if udp_state != 'ESTABLISHED': udp_state =' ' #UDP is stateless try: uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID. except: uid = line_array[7] inode = line_array[9] if int(inode) > 0: pid = _get_pid_of_inode(inode) try: # try read the process name. if o_wide == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')).split(' ')[0] elif o_wider == True: with open('/proc/'+pid+'/cmdline','r') as f: exe = ' '.join(f.read().split('\x00')) else: exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1] except: exe = '-' else: pid = '-' exe = '-' nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('UDP6', _compress_v6(_convert_ipv6(line_array[1]))+': '+l_port, _compress_v6(_convert_ipv6(line_array[2]))+': '+r_port, udp_state, uid, pid, exe) udpresult.append(nline) return udpresult def _get_pid_of_inode(inode): ''' To retrieve the process pid, check every running process and look for one using the given inode. ''' for item in glob.glob('/proc/[0-9]*/fd/[0-9]*'): try: if re.search(inode,os.readlink(item)): return item.split('/')[2] except: pass return None def check_root(): if os.getuid() == 0: return True else: return False def _check_v4inv6_port(addr,portnr): ''' check if a v4 port is listening over ip6. Strange, we do a SYN connect for every port not listening v4. thats because I think there is no image in the /proc filesystem ''' #print 'aaacc:', addr, portnr is_onnected = False try: try: t_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) except: print("Error: Can't open socket!\n") return False t_socket.connect((addr, int(portnr))) is_onnected = True except: is_onnected = False finally: if(is_onnected and portnr != t_socket.getsockname()[1]): #print("{}:{} Open \n".format(addr, portnr)) t_socket.close() return True t_socket.close() return False if __name__ == '__main__': if not check_root(): print("This program needs root privileges !\nThats because of reading the /proc filesystem and using functions like getpwuid().\nSo I give up\n") sys.exit() #print # commandline params o_numeric = True o_listen = False o_estab = None o_wide = False o_wider = False o_udp = True o_tcp = True o_v6 = True o_v4 = True parser = argparse.ArgumentParser(description='netstat utility V'+VERSION+"\n2017 by sigi <https://wiki.zweiernet.ch/wiki/sinetstat>", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument('-l', help="Only listening sockets", action="store_true") parser.add_argument('-e', help="Only established sockets", action="store_true") parser.add_argument('-r', help="Resolve IP-Addresses", action="store_true") parser.add_argument('-w', help="Wide (show cmd)", action="store_true") parser.add_argument('-W', help="Wider (show cmd with arguments)", action="store_true") parser.add_argument('-t', help="Only TCP", action="store_true") parser.add_argument('-u', help="Only UDP", action="store_true") parser.add_argument('-4', dest='v4', help="Only IPv4", action="store_true") parser.add_argument('-6', dest='v6', help="Only IPv6", action="store_true") args = parser.parse_args() if args.r: o_numeric = False if args.l: o_listen = True if args.e: o_estab = True o_listen = False if args.w: o_wide = True if args.W: o_wider = True o_wide = False if args.t: o_udp = False if args.u: o_tcp = False if args.v4: o_v6 = False if args.v6: o_v4 = False # Output print '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('Proto', 'Local Address', 'Remote Address', 'State', 'UID', 'PID', 'Program') print '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('-----', '-------------', '--------------', '-----', '---', '---', '-------') #print "\nTCP (v4) Results:\n" if o_v4 == True and o_tcp == True: for conn_tcp in netstat_tcp4(): print conn_tcp #print "\nTCP (v4inv6) Results:\n" if o_v4 == True and o_tcp == True: for conn_tcp46 in netstat_tcp4in6(): print conn_tcp46 #print "\nTCP (v6) Results:\n" if o_v6 == True and o_tcp == True: for conn_tcp6 in netstat_tcp6(): print conn_tcp6 #print "\nUDP (v4) Results:\n" if o_v4 == True and o_udp == True and not args.l: for conn_udp in netstat_udp4(): print conn_udp #print "\nUDP (v6) Results:\n" if o_v6 == True and o_udp == True and not args.l: for conn_udp6 in netstat_udp6(): print conn_udp6