Support ranges in fixed_ip_list

This commit is contained in:
Itay Mizeretz 2018-02-20 16:15:40 +02:00
parent 0e9206728f
commit 7ee0ceda75
1 changed files with 46 additions and 19 deletions

View File

@ -3,6 +3,8 @@ import socket
import struct import struct
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
import ipaddress
from model.host import VictimHost from model.host import VictimHost
__author__ = 'itamar' __author__ = 'itamar'
@ -16,6 +18,14 @@ class NetworkRange(object):
self._shuffle = shuffle self._shuffle = shuffle
self._config = __import__('config').WormConfiguration self._config = __import__('config').WormConfiguration
@staticmethod
def _ip_to_number(address):
return struct.unpack(">L", socket.inet_aton(address))[0]
@staticmethod
def _number_to_ip(num):
return socket.inet_ntoa(struct.pack(">L", num))
@abstractmethod @abstractmethod
def _get_range(self): def _get_range(self):
raise NotImplementedError() raise NotImplementedError()
@ -26,7 +36,7 @@ class NetworkRange(object):
random.shuffle(base_range) random.shuffle(base_range)
for x in base_range: for x in base_range:
yield VictimHost(socket.inet_ntoa(struct.pack(">L", self._base_address + x))) yield VictimHost(self._number_to_ip(self._base_address + x))
class ClassCRange(NetworkRange): class ClassCRange(NetworkRange):
@ -35,8 +45,8 @@ class ClassCRange(NetworkRange):
super(ClassCRange, self).__init__(base_address, shuffle=shuffle) super(ClassCRange, self).__init__(base_address, shuffle=shuffle)
def __repr__(self): def __repr__(self):
return "<ClassCRange %s-%s>" % (socket.inet_ntoa(struct.pack(">L", self._base_address + 1)), return "<ClassCRange %s-%s>" % (self._number_to_ip(self._base_address + 1),
socket.inet_ntoa(struct.pack(">L", self._base_address + 254))) self._number_to_ip(self._base_address + 254))
def _get_range(self): def _get_range(self):
return range(1, 254) return range(1, 254)
@ -49,8 +59,8 @@ class RelativeRange(NetworkRange):
self._size = 1 self._size = 1
def __repr__(self): def __repr__(self):
return "<RelativeRange %s-%s>" % (socket.inet_ntoa(struct.pack(">L", self._base_address - self._size)), return "<RelativeRange %s-%s>" % (self._number_to_ip(self._base_address - self._size),
socket.inet_ntoa(struct.pack(">L", self._base_address + self._size))) self._number_to_ip(self._base_address + self._size))
def _get_range(self): def _get_range(self):
lower_end = -(self._size / 2) lower_end = -(self._size / 2)
@ -59,24 +69,41 @@ class RelativeRange(NetworkRange):
class FixedRange(NetworkRange): class FixedRange(NetworkRange):
def __init__(self, fixed_addresses=None, shuffle=True): def __init__(self, fixed_addresses, shuffle=True):
base_address = 0 base_address = 0
super(FixedRange, self).__init__(base_address, shuffle=shuffle) super(FixedRange, self).__init__(base_address, shuffle=shuffle)
if not fixed_addresses: self._fixed_addresses = fixed_addresses
self._fixed_addresses = self._config.range_fixed
else:
if type(fixed_addresses) is str:
self._fixed_addresses = [fixed_addresses]
else:
self._fixed_addresses = list(fixed_addresses)
def __repr__(self): def __repr__(self):
return "<FixedRange %s>" % (",".join(self._fixed_addresses)) return "<FixedRange %s>" % (",".join(self._fixed_addresses))
@staticmethod
def _cidr_range_to_ip_list(address_str):
return [FixedRange._ip_to_number(str(x)) for x in ipaddress.ip_network(unicode(address_str), strict=False)]
@staticmethod
def _ip_range_to_ip_list(address_str):
addresses = address_str.split('-')
if len(addresses) != 2:
raise ValueError('Illegal address format: %s' % address_str)
lower_end, higher_end = [FixedRange._ip_to_number(x.strip()) for x in addresses]
if higher_end < lower_end:
raise ValueError('Illegal address range: %s' % address_str)
return range(lower_end, higher_end + 1)
@staticmethod
def _parse_address_str(address_str):
address_str = address_str.strip()
if not address_str: # Empty string
return []
if -1 != address_str.find('-'):
return FixedRange._ip_range_to_ip_list(address_str)
if -1 != address_str.find('/'):
return FixedRange._cidr_range_to_ip_list(address_str)
return [FixedRange._ip_to_number(address_str)]
def _get_range(self): def _get_range(self):
address_range = [] ip_list = list(reduce(
for address in self._fixed_addresses: lambda x, y: x.union(y),
if not address: # Empty string [set(self._parse_address_str(z)) for z in self._fixed_addresses]))
continue return [x for x in ip_list if (x & 0xFF != 0)] # remove broadcast ips
address_range.append(struct.unpack(">L", socket.inet_aton(address.strip()))[0])
return address_range