Rewrite get_host_subnets, drastically simplify Linux implementation.

Cleanup code in get_ips_from_interfaces

Modern python

bug fix
This commit is contained in:
Daniel Goldberg 2017-09-10 10:36:00 +03:00
parent 274f758239
commit 53a20308de
1 changed files with 41 additions and 61 deletions

View File

@ -1,34 +1,34 @@
import os import os
import sys import sys
import array
import socket import socket
import struct import struct
import psutil import psutil
import ipaddress import ipaddress
import itertools
import netifaces
from subprocess import check_output from subprocess import check_output
from random import randint from random import randint
if sys.platform == "win32": if sys.platform == "win32":
import netifaces
def local_ips(): def local_ips():
local_hostname = socket.gethostname() local_hostname = socket.gethostname()
return socket.gethostbyname_ex(local_hostname)[2] return socket.gethostbyname_ex(local_hostname)[2]
def get_host_subnets(only_ips=False): def get_host_subnets():
network_adapters = [] ipv4_nets = [netifaces.ifaddresses(interface)[netifaces.AF_INET]
valid_ips = local_ips() for interface in netifaces.interfaces()
if only_ips: if netifaces.AF_INET in netifaces.ifaddresses(interface)
return valid_ips ]
interfaces = [netifaces.ifaddresses(x) for x in netifaces.interfaces()] # flatten
for inte in interfaces: ipv4_nets = itertools.chain.from_iterable(ipv4_nets)
if netifaces.AF_INET in inte: # remove loopback
for add in inte[netifaces.AF_INET]: ipv4_nets = [network for network in ipv4_nets if network['addr'] != '127.0.0.1']
if "netmask" in add and add["addr"] in valid_ips: # remove auto conf
network_adapters.append((add["addr"], add["netmask"])) ipv4_nets = [network for network in ipv4_nets if not network['addr'].startswith('169.254')]
return network_adapters return ipv4_nets
def get_routes(): def get_routes():
raise NotImplementedError() raise NotImplementedError()
@ -36,46 +36,26 @@ if sys.platform == "win32":
else: else:
from fcntl import ioctl from fcntl import ioctl
def get_host_subnets(only_ips=False):
"""Get the list of Linux network adapters."""
max_bytes = 8096
is_64bits = sys.maxsize > 2 ** 32
if is_64bits:
offset1 = 16
offset2 = 40
else:
offset1 = 32
offset2 = 32
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
names = array.array('B', '\0' * max_bytes)
outbytes = struct.unpack('iL', ioctl(
sock.fileno(),
0x8912,
struct.pack('iL', max_bytes, names.buffer_info()[0])))[0]
adapter_names = [names.tostring()[n_cnt:n_cnt + offset1].split('\0', 1)[0]
for n_cnt in xrange(0, outbytes, offset2)]
network_adapters = []
for adapter_name in adapter_names:
ip_address = socket.inet_ntoa(ioctl(
sock.fileno(),
0x8915,
struct.pack('256s', adapter_name))[20:24])
if ip_address.startswith('127'):
continue
subnet_mask = socket.inet_ntoa(ioctl(
sock.fileno(),
0x891b,
struct.pack('256s', adapter_name))[20:24])
if only_ips: def get_host_subnets():
network_adapters.append(ip_address) ipv4_nets = [netifaces.ifaddresses(interface)[netifaces.AF_INET]
else: for interface in netifaces.interfaces()
network_adapters.append((ip_address, subnet_mask)) if netifaces.AF_INET in netifaces.ifaddresses(interface)
]
# flatten
ipv4_nets = list(itertools.chain.from_iterable(ipv4_nets))
# remove loopback
ipv4_nets = [network for network in ipv4_nets if network['addr'] != '127.0.0.1']
# remove auto conf
ipv4_nets = [network for network in ipv4_nets if not network['addr'].startswith('169.254')]
return ipv4_nets
return network_adapters
def local_ips(): def local_ips():
return get_host_subnets(only_ips=True) ipv4_nets = get_host_subnets()
valid_ips = [network['addr'] for network in ipv4_nets]
return valid_ips
def get_routes(): # based on scapy implementation for route parsing def get_routes(): # based on scapy implementation for route parsing
LOOPBACK_NAME = "lo" LOOPBACK_NAME = "lo"
@ -151,17 +131,17 @@ def check_internet_access(services):
def get_ips_from_interfaces(): def get_ips_from_interfaces():
res = [] res = []
ifs = get_host_subnets() ifs = get_host_subnets()
for interface in ifs: for net_interface in ifs:
ipint = ipaddress.ip_interface(u"%s/%s" % interface) host_addr = ipaddress.ip_address(net_interface['addr'])
ip_interface = ipaddress.ip_interface(u"%s/%s" % (net_interface['addr'], net_interface['netmask']))
# limit subnet scans to class C only # limit subnet scans to class C only
if ipint.network.num_addresses > 255: if ip_interface.network.num_addresses > 255:
ipint = ipaddress.ip_interface(u"%s/24" % interface[0]) ip_interface = ipaddress.ip_interface(u"%s/24" % net_interface['addr'])
for addr in ipint.network.hosts(): addrs = [str(addr) for addr in ip_interface.network.hosts() if addr != host_addr]
if str(addr) == interface[0]: res.extend(addrs)
continue
res.append(str(addr))
return res return res
if sys.platform == "win32": if sys.platform == "win32":
def get_ip_for_connection(target_ip): def get_ip_for_connection(target_ip):
return None return None
@ -171,7 +151,7 @@ else:
query_str = 'ip route get %s' % target_ip query_str = 'ip route get %s' % target_ip
resp = check_output(query_str.split()) resp = check_output(query_str.split())
substr = resp.split() substr = resp.split()
src = substr[substr.index('src')+1] src = substr[substr.index('src') + 1]
return src return src
except Exception: except Exception:
return None return None