mirror of
https://github.com/codingo/Interlace.git
synced 2025-12-17 06:44:23 +01:00
Faster mapping of CIDRs to IPs
This commit is contained in:
@@ -1,18 +1,15 @@
|
|||||||
import functools
|
import functools
|
||||||
import itertools
|
import itertools
|
||||||
import os.path
|
import os.path
|
||||||
|
import socket
|
||||||
|
import struct
|
||||||
import sys
|
import sys
|
||||||
from io import TextIOWrapper
|
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
|
from io import TextIOWrapper
|
||||||
from random import choice
|
from random import choice
|
||||||
|
|
||||||
from netaddr import (
|
|
||||||
IPRange,
|
|
||||||
IPSet,
|
|
||||||
glob_to_iprange,
|
|
||||||
)
|
|
||||||
|
|
||||||
from Interlace.lib.threader import Task
|
from Interlace.lib.threader import Task
|
||||||
|
from netaddr import IPGlob, IPRange, IPSet, glob_to_iprange
|
||||||
|
|
||||||
|
|
||||||
class InputHelper(object):
|
class InputHelper(object):
|
||||||
@@ -34,7 +31,7 @@ class InputHelper(object):
|
|||||||
ivalue = int(arg)
|
ivalue = int(arg)
|
||||||
if ivalue <= 0:
|
if ivalue <= 0:
|
||||||
raise parser.ArgumentTypeError("%s is not a valid positive integer!" % arg)
|
raise parser.ArgumentTypeError("%s is not a valid positive integer!" % arg)
|
||||||
except ValueError as e:
|
except ValueError:
|
||||||
raise parser.ArgumentValueError("%s is not a a number!" % arg)
|
raise parser.ArgumentValueError("%s is not a a number!" % arg)
|
||||||
|
|
||||||
return arg
|
return arg
|
||||||
@@ -50,6 +47,42 @@ class InputHelper(object):
|
|||||||
|
|
||||||
return files
|
return files
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_ips_from_range(ip_range):
|
||||||
|
ips = set()
|
||||||
|
ip_range = ip_range.split("-")
|
||||||
|
|
||||||
|
# parsing the above structure into an array and then making into an IP address with the end value
|
||||||
|
end_ip = ".".join(ip_range[0].split(".")[0:-1]) + "." + ip_range[1]
|
||||||
|
|
||||||
|
# creating an IPRange object to get all IPs in between
|
||||||
|
range_obj = IPRange(ip_range[0], end_ip)
|
||||||
|
|
||||||
|
for ip in range_obj:
|
||||||
|
ips.add(str(ip))
|
||||||
|
|
||||||
|
return ips
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_ips_from_glob(glob_ips):
|
||||||
|
ip_glob = IPGlob(glob_ips)
|
||||||
|
|
||||||
|
ips = set()
|
||||||
|
|
||||||
|
for ip in ip_glob:
|
||||||
|
ips.add(str(ip))
|
||||||
|
|
||||||
|
return ips
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_cidr_to_ips(cidr_range):
|
||||||
|
(ip, cidr) = cidr_range.split("/")
|
||||||
|
mask = 32 - int(cidr)
|
||||||
|
first_ip = struct.unpack(">I", socket.inet_aton(ip))[0]
|
||||||
|
last_ip = first_ip | ((1 << mask) - 1)
|
||||||
|
ips = frozenset([socket.inet_ntoa(struct.pack('>I', x)) for x in range(first_ip, last_ip)])
|
||||||
|
return ips
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _process_port(port_type):
|
def _process_port(port_type):
|
||||||
if "," in port_type:
|
if "," in port_type:
|
||||||
@@ -492,7 +525,7 @@ class InputParser(object):
|
|||||||
'--repeat', dest='repeat',
|
'--repeat', dest='repeat',
|
||||||
help='repeat the given command x number of times.'
|
help='repeat the given command x number of times.'
|
||||||
)
|
)
|
||||||
|
|
||||||
output_types = parser.add_mutually_exclusive_group()
|
output_types = parser.add_mutually_exclusive_group()
|
||||||
output_types.add_argument(
|
output_types.add_argument(
|
||||||
'-v', '--verbose', dest='verbose', action='store_true', default=False,
|
'-v', '--verbose', dest='verbose', action='store_true', default=False,
|
||||||
|
|||||||
Reference in New Issue
Block a user