sinetstat
Zur Navigation springen
Zur Suche springen
Eine Alternative zum Linux netstat-Programm das auch IPv4inIPv6 Sockets anzeigen kann.
Es ist nur auf Netzwerksocken ausgelegt, also nur TCP und UDP, aber keine Unix-Sockets.
Eine Uebersicht der Parameter erhält man mit: sinetstat -h
Die aktuellste Version steht hier: sinetstat-1.0.tar.gz zum Download.
Updates:
- V0.93:
- neuer Parameter '-e' für 'Established' Verbindungen. Als established gelten alle Verbindungen die nicht in den Zuständen LISTEN|SYN_SENT|SYN_RECV sind.
- V0.95:
- Anzeige von UDP 'ESTABLISHED'. Ja, scheinbar gibts das, obwohl UDP verbindungslos ist.
- Und zwar dann, wenn eine Apllikation einen
connect(2)Systemcall auf einem SOCK_DGRAM Socket zu einem Partner macht. Dabei werden dann nur Verbindungen von diesem Partner akzeptiert.
- V1.0:
- Namensänderung si_netstat -> sinetstat
- Neue Parameter '-w' und '-W'.
- -w zeigt das Kommando wie es in /proc/<pid>/cmdline zu finden ist.
- -W listet zusätzlich die Kommandozeilen-Optionen auf.
Und hier für die Ungeduldigen der Python 2.x Code:
#!/usr/bin/python2
# (c) 2016 by Siegrist(SystemLoesungen) <PSS @ ZweierNet.ch>
# Website: [https://wiki.zweiernet.ch/wiki/Sinetstat]
#
# This program is free software.
#
# The program is based on a python netstat script that was written by da667 available on https://github.com/da667/netstat
# who had it adapted from Ricardo Pascal, available on http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code.
#
# This version has some improvements make it an acceptable alternative to the original netstat command.
# So it can explore IPv4 in IPv6 listening sockets and some other information over and above the original netstat.
#
# Simply try: 'sinetstat -h'
#
import pwd
import os
import re
import glob
import socket
import sys
import string
import fcntl
import struct
import argparse
VERSION = '1.0'
PROC_TCP4 = "/proc/net/tcp"
PROC_UDP4 = "/proc/net/udp"
PROC_TCP6 = "/proc/net/tcp6"
PROC_UDP6 = "/proc/net/udp6"
PROC_PACKET = "/proc/net/packet"
MAX_IPV4_ADDRESS = 0xffffffff
MAX_IPV6_ADDRESS = 0xffffffffffffffffffffffffffffffff
TCP_STATE = {
'01':'ESTABLISHED',
'02':'SYN_SENT',
'03':'SYN_RECV',
'04':'FIN_WAIT1',
'05':'FIN_WAIT2',
'06':'TIME_WAIT',
'07':'CLOSE',
'08':'CLOSE_WAIT',
'09':'LAST_ACK',
'0A':'LISTEN',
'0B':'CLOSING'
}
#MYIPV4 = os.popen('ip addr show eth0 | grep "\<inet\>" | awk \'{ print $2 }\' | awk -F "/" \'{ print $1 }\'').read().strip()
#MYIPV6 = os.popen('ip addr show eth0 | grep "\<inet\>" | awk \'{ print $2 }\' | awk -F "/" \'{ print $1 }\'').read().strip()
#MYIFS = os.popen("ifconfig | grep '^[a-z0-9]' | awk '{print $1}'").read().strip()
v4ports = []
opt_l = True
def xx():
for a in MYIFS.split():
print 'IFF:', a
print 'v6:', os.popen('ip addr show ' + a + ' | grep "\<inet6\>" | awk \'{ print $2 }\' | awk -F "/" \'{ print $1 }\'').read().strip()
def grep_b(list, search):
return [True for i in list if search in i]
def get_ip_address(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24])
def _tcp4load():
''' Read the table of tcp connections & remove the header '''
with open(PROC_TCP4,'r') as f:
content = f.readlines()
content.pop(0)
return content
def _tcp6load():
''' Read the table of tcpv6 connections & remove the header'''
with open(PROC_TCP6,'r') as f:
content = f.readlines()
content.pop(0)
return content
def _udp4load():
'''Read the table of udp connections & remove the header '''
with open(PROC_UDP4,'r') as f:
content = f.readlines()
content.pop(0)
return content
def _udp6load():
'''Read the table of udp connections & remove the header '''
with open(PROC_UDP6,'r') as f:
content = f.readlines()
content.pop(0)
return content
def _hex2dec(s):
return str(int(s,16))
def _ip(s):
ip = [(_hex2dec(s[6:8])),(_hex2dec(s[4:6])),(_hex2dec(s[2:4])),(_hex2dec(s[0:2]))]
return '.'.join(ip)
def _ip6(s):
ip = [s[6:8],s[4:6],s[2:4],s[0:2],s[12:14],s[14:16],s[10:12],s[8:10],s[22:24],s[20:22],s[18:20],s[16:18],s[30:32],s[28:30],s[26:28],s[24:26]]
#print '66666:', ':'.join(ip), s
return ':'.join(ip)
def _ip6q(s):
ip = [s[6:8]+s[4:6],s[2:4]+s[0:2],s[12:14]+s[14:16],s[10:12]+s[8:10],s[22:24]+s[20:22],s[18:20]+s[16:18],s[30:32]+s[28:30],s[26:28]+s[24:26]]
#print '666qqq:', ':'.join(ip), s
return ':'.join(ip)
def _conv_v6(s):
return s
def _remove_empty(array):
return [x for x in array if x !='']
def _convert_ipv4_port(array):
host,port = array.split(':')
return _ip(host),_hex2dec(port)
def _convert_ipv6_port(array):
host,port = array.split(':')
return _ip6(host),_hex2dec(port)
def _convert_ipv6(array):
host,port = array.split(':')
return _ip6q(host)
def _addr_normal(s):
return ':'.join(["%x" % x for x in [int(x, 16) for x in s.split(':')]])
def _countFollowingZeros(l):
"""Return number of elements containing 0 at the beginning of the list."""
#print 'aaa:', l
if len(l) == 0:
return 0
elif l[0] != 0:
return 0
else:
return 1 + _countFollowingZeros(l[1:])
def _compress_v6(addr):
hextets = [int(x, 16) for x in addr.split(':')]
#print hextets
followingzeros = [0] * 8
for i in xrange(len(hextets)):
followingzeros[i] = _countFollowingZeros(hextets[i:])
# compressionpos is the position where we can start removing zeros
compressionpos = followingzeros.index(max(followingzeros))
if max(followingzeros) > 1:
# genererate string with the longest number of zeros cut out
# now we need hextets as strings
hextets = [x for x in _addr_normal(addr).split(':')]
while compressionpos < len(hextets) and hextets[compressionpos] == '0':
del(hextets[compressionpos])
hextets.insert(compressionpos, '')
if compressionpos + 1 >= len(hextets):
hextets.append('')
if compressionpos == 0:
hextets = [''] + hextets
return ':'.join(hextets)
else:
return _addr_normal(addr)
def _resolve_ip(host):
"""
resolve ip und update dictionary res_cache {'ip': 'name'}.
If resolution for a ip failed, 'name' is n_try ... 0.
"""
try:
hname = socket.gethostbyaddr(host)[0]
return str(hname)
except:
return str(host)
def netstat_tcp4():
'''
Function to return a list with status of tcp4 connections on Linux systems.
'''
tcpcontent =_tcp4load()
tcpresult = []
for line in tcpcontent:
line_array = _remove_empty(line.split(' ')) # Split lines and remove empty spaces.
l_host,l_port = _convert_ipv4_port(line_array[1]) # Convert ipaddress and port from hex to decimal.
r_host,r_port = _convert_ipv4_port(line_array[2])
tcp_id = line_array[0]
state = TCP_STATE[line_array[3]]
if state != 'LISTEN' and o_listen == True:
continue
if ( state == 'LISTEN' or state == 'SYN_SENT' or state == 'SYN_RECV' ) and o_estab == True:
continue
try:
uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID.
except:
uid = line_array[7]
inode = line_array[9] # Need the inode to get process pid.
if int(inode) > 0:
pid = _get_pid_of_inode(inode)
try: # try read the process name.
if o_wide == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00')).split(' ')[0]
elif o_wider == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00'))
else:
exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1]
except:
exe = '-'
else:
pid = '-'
exe = '-'
if o_numeric == False:
r_host = _resolve_ip(r_host)
nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('TCP4', l_host+': '+l_port, r_host+': '+r_port, state, uid, pid, exe)
tcpresult.append(nline)
# update v4inv6check list
v4ports.append(l_port)
return tcpresult
def netstat_tcp6():
'''
This function returns a list of tcp connections utilizing ipv6.
'''
tcpcontent = _tcp6load()
tcpresult = []
for line in tcpcontent:
line_array = _remove_empty(line.split(' '))
l_host,l_port = _convert_ipv6_port(line_array[1])
r_host,r_port = _convert_ipv6_port(line_array[2])
tcp_id = line_array[0]
state = TCP_STATE[line_array[3]]
if state != 'LISTEN' and o_listen == True:
continue
if ( state == 'LISTEN' or state == 'SYN_SENT' or state == 'SYN_RECV' ) and o_estab == True:
continue
try:
uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID.
except:
uid = line_array[7]
inode = line_array[9]
if int(inode) > 0:
pid = _get_pid_of_inode(inode)
try: # try read the process name.
if o_wide == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00')).split(' ')[0]
elif o_wider == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00'))
else:
exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1]
except:
exe = '-'
else:
pid = '-'
exe = '-'
nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('TCP6', _compress_v6(_convert_ipv6(line_array[1]))+': '+l_port, _compress_v6(_convert_ipv6(line_array[2]))+': '+r_port, state, uid, pid, exe)
tcpresult.append(nline)
return tcpresult
def netstat_tcp4in6():
'''
Returns a list of tcp ipv4 in ipv6 listen sockets.
'''
#print xx()
tcpcontent = _tcp6load()
tcpresult = []
for line in tcpcontent:
line_array = _remove_empty(line.split(' '))
#if TCP_STATE[line_array[3]] != 'LISTEN':
# continue
l_host,l_port = _convert_ipv6_port(line_array[1])
r_host,r_port = _convert_ipv6_port(line_array[2])
if grep_b(v4ports,l_port):
continue
tcp_id = line_array[0]
state = TCP_STATE[line_array[3]]
if state != 'LISTEN' and o_listen == True:
continue
if ( state == 'LISTEN' or state == 'SYN_SENT' or state == 'SYN_RECV' ) and o_estab == True:
continue
try:
uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID.
except:
uid = line_array[7]
inode = line_array[9]
if int(inode) > 0:
pid = _get_pid_of_inode(inode)
try: # try read the process name.
if o_wide == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00')).split(' ')[0]
elif o_wider == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00'))
else:
exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1]
except:
exe = '-'
else:
pid = '-'
exe = '-'
if l_host == '00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:01':
if _check_v4inv6_port("127.0.0.1",l_port):
nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('TCP4in6', '127.0.0.1: '+l_port, _compress_v6(_convert_ipv6(line_array[2]))+': '+r_port, state, uid, pid, exe)
tcpresult.append(nline)
if l_host == "00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00":
if _check_v4inv6_port("0.0.0.0",l_port):
nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('TCP4in6', '0.0.0.0: '+l_port, _compress_v6(_convert_ipv6(line_array[2]))+': '+r_port, state, uid, pid, exe)
tcpresult.append(nline)
#else:
# for a in MYIFS.split():
# _check_v4inv6_port(get_ip_address(a),l_port)
return tcpresult
def netstat_udp4():
'''
Function to return a list with status of udp connections.
'''
udpcontent =_udp4load()
udpresult = []
for line in udpcontent:
line_array = _remove_empty(line.split(' '))
l_host,l_port = _convert_ipv4_port(line_array[1])
r_host,r_port = _convert_ipv4_port(line_array[2])
udp_id = line_array[0]
udp_state = TCP_STATE[line_array[3]]
if ( udp_state != 'ESTABLISHED' and o_estab == True ) or o_estab == False:
continue
if udp_state != 'ESTABLISHED':
udp_state =' ' #UDP is stateless
try:
uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID.
except:
uid = line_array[7]
inode = line_array[9]
if int(inode) > 0:
pid = _get_pid_of_inode(inode)
try: # try read the process name.
if o_wide == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00')).split(' ')[0]
elif o_wider == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00'))
else:
exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1]
except:
exe = '-'
else:
pid = '-'
exe = '-'
nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('UDP4', l_host+': '+l_port, r_host+': '+r_port, udp_state, uid, pid, exe)
udpresult.append(nline)
return udpresult
def netstat_udp6():
'''
Function to return a list of udp connection utilizing ipv6
'''
udpcontent =_udp6load()
udpresult = []
for line in udpcontent:
line_array = _remove_empty(line.split(' '))
l_host,l_port = _convert_ipv6_port(line_array[1])
r_host,r_port = _convert_ipv6_port(line_array[2])
udp_id = line_array[0]
udp_state = TCP_STATE[line_array[3]]
if ( udp_state != 'ESTABLISHED' and o_estab == True ) or o_estab == False:
continue
if udp_state != 'ESTABLISHED':
udp_state =' ' #UDP is stateless
try:
uid = pwd.getpwuid(int(line_array[7]))[0] # Get user from UID.
except:
uid = line_array[7]
inode = line_array[9]
if int(inode) > 0:
pid = _get_pid_of_inode(inode)
try: # try read the process name.
if o_wide == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00')).split(' ')[0]
elif o_wider == True:
with open('/proc/'+pid+'/cmdline','r') as f:
exe = ' '.join(f.read().split('\x00'))
else:
exe = os.readlink('/proc/'+pid+'/exe').split('/')[-1]
except:
exe = '-'
else:
pid = '-'
exe = '-'
nline = '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('UDP6', _compress_v6(_convert_ipv6(line_array[1]))+': '+l_port, _compress_v6(_convert_ipv6(line_array[2]))+': '+r_port, udp_state, uid, pid, exe)
udpresult.append(nline)
return udpresult
def _get_pid_of_inode(inode):
'''
To retrieve the process pid, check every running process and look for one using
the given inode.
'''
for item in glob.glob('/proc/[0-9]*/fd/[0-9]*'):
try:
if re.search(inode,os.readlink(item)):
return item.split('/')[2]
except:
pass
return None
def check_root():
if os.getuid() == 0:
return True
else:
return False
def _check_v4inv6_port(addr,portnr):
'''
check if a v4 port is listening over ip6. Strange, we do a SYN connect for every port not listening v4.
thats because I think there is no image in the /proc filesystem
'''
#print 'aaacc:', addr, portnr
is_onnected = False
try:
try:
t_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except:
print("Error: Can't open socket!\n")
return False
t_socket.connect((addr, int(portnr)))
is_onnected = True
except:
is_onnected = False
finally:
if(is_onnected and portnr != t_socket.getsockname()[1]):
#print("{}:{} Open \n".format(addr, portnr))
t_socket.close()
return True
t_socket.close()
return False
if __name__ == '__main__':
if not check_root():
print("This program needs root privileges !\nThats because of reading the /proc filesystem and using functions like getpwuid().\nSo I give up\n")
sys.exit()
#print
# commandline params
o_numeric = True
o_listen = False
o_estab = None
o_wide = False
o_wider = False
o_udp = True
o_tcp = True
o_v6 = True
o_v4 = True
parser = argparse.ArgumentParser(description='netstat utility V'+VERSION+"\n2017 by sigi <https://wiki.zweiernet.ch/wiki/Informatik>",
formatter_class=argparse.RawDescriptionHelpFormatter )
parser.add_argument('-l', help="Only listening sockets", action="store_true")
parser.add_argument('-e', help="Only established sockets", action="store_true")
parser.add_argument('-r', help="Resolve IP-Addresses", action="store_true")
parser.add_argument('-w', help="Wide (show cmd)", action="store_true")
parser.add_argument('-W', help="Wider (show cmd with arguments)", action="store_true")
parser.add_argument('-t', help="Only TCP", action="store_true")
parser.add_argument('-u', help="Only UDP", action="store_true")
parser.add_argument('-4', dest='v4', help="Only IPv4", action="store_true")
parser.add_argument('-6', dest='v6', help="Only IPv6", action="store_true")
args = parser.parse_args()
if args.r:
o_numeric = False
if args.l:
o_listen = True
if args.e:
o_estab = True
o_listen = False
if args.w:
o_wide = True
if args.W:
o_wider = True
o_wide = False
if args.t:
o_udp = False
if args.u:
o_tcp = False
if args.v4:
o_v6 = False
if args.v6:
o_v4 = False
# Output
print '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('Proto', 'Local Address', 'Remote Address', 'State', 'UID', 'PID', 'Program')
print '%-7s %-24s %-24s %-11s %-8s %-6s %-s' % ('-----', '-------------', '--------------', '-----', '---', '---', '-------')
#print "\nTCP (v4) Results:\n"
if o_v4 == True and o_tcp == True:
for conn_tcp in netstat_tcp4():
print conn_tcp
#print "\nTCP (v4inv6) Results:\n"
if o_v4 == True and o_tcp == True:
for conn_tcp46 in netstat_tcp4in6():
print conn_tcp46
#print "\nTCP (v6) Results:\n"
if o_v6 == True and o_tcp == True:
for conn_tcp6 in netstat_tcp6():
print conn_tcp6
#print "\nUDP (v4) Results:\n"
if o_v4 == True and o_udp == True and not args.l:
for conn_udp in netstat_udp4():
print conn_udp
#print "\nUDP (v6) Results:\n"
if o_v6 == True and o_udp == True and not args.l:
for conn_udp6 in netstat_udp6():
print conn_udp6