Files
Interlace/Interlace/lib/core/input.py
Ira Lun d1c90a49cd Improve memory usage in the generation of tasks (to fix issue #119).
Switch to generating tasks using iterators, and make other changes such
as using netaddr's IPSet to store ranges of IP addresses, in order to
reduce the use of memory where possible.
2020-09-29 07:15:15 +01:00

511 lines
18 KiB
Python

import functools
import itertools
import os.path
import sys
from io import TextIOWrapper
from argparse import ArgumentParser
from random import choice
from netaddr import (
IPRange,
IPSet,
glob_to_iprange,
)
from Interlace.lib.threader import Task
class InputHelper(object):
@staticmethod
def check_path(parser, arg):
if not os.path.exists(arg):
parser.error("The path %s does not exist!" % arg)
else:
return arg
@staticmethod
def readable_file(parser, arg):
if InputHelper.check_path(parser, arg):
return open(arg, 'r') # return an open file handle
@staticmethod
def check_positive(parser, arg):
try:
ivalue = int(arg)
if ivalue <= 0:
raise parser.ArgumentTypeError("%s is not a valid positive integer!" % arg)
except ValueError as e:
raise parser.ArgumentValueError("%s is not a a number!" % arg)
return arg
@staticmethod
def _get_files_from_directory(arg):
files = list()
for file in os.listdir(arg):
location = os.path.join(arg, file)
if os.path.isfile(location):
files.append(location)
return files
@staticmethod
def _process_port(port_type):
if "," in port_type:
return port_type.split(",")
elif "-" in port_type:
tmp = port_type.split("-")
begin_range = int(tmp[0])
end_range = int(tmp[1])
if begin_range >= end_range:
raise Exception("Invalid range provided")
return list(range(begin_range, end_range + 1))
return [port_type]
@staticmethod
def _pre_process_commands(command_list, task_name=None, is_global_task=True):
"""
:param command_list:
:param task_name: all tasks have 'scope' and all scopes have unique names, global scope defaults None
:param is_global_task: when True, signifies that all global tasks are meant to be run concurrently
:return: list of possibly re-adjusted commands
"""
task_block = []
sibling = None
blocker = None
for command in command_list:
command = str(command).strip()
if len(command) == 0:
continue
# the start or end of a command block
if (command.startswith('_block:') and command.endswith('_')) or\
command == '_block_':
# if this is the end of a block, then we're done
new_task_name = ''
if command.startswith('_block:'):
new_task_name = command.split('_block:')[1][:-1].strip()
if task_name and task_name == new_task_name:
return task_block
# otherwise pre-process all the commands in this new `new_task_name` block
tasks = InputHelper._pre_process_commands(command_list, new_task_name, False)
if blocker:
for task in tasks:
task.wait_for(task_block)
task_block += tasks
if len(tasks) > 0:
sibling = tasks[-1]
continue
else:
# if a blocker is encountered, all commands following the blocker must wait until the last
# command in the block is executed. All block commands are synchronous
if command == '_blocker_':
blocker = sibling
continue
task = Task(command)
# if we're in the global scope and there was a previous _blocker_ encountered, we wait for the last
# child of the block
if is_global_task and blocker:
task.wait_for(task_block)
# all but the first command in a block scope wait for its predecessor
elif sibling and not is_global_task:
task.wait_for([sibling])
task_block.append(task)
sibling = task
return task_block
@staticmethod
def _replace_target_variables_in_commands(tasks, str_targets, ipset_targets):
TARGET_VAR = "_target_"
HOST_VAR = "_host_"
CLEANTARGET_VAR = "_cleantarget_"
for task in tasks:
command = task.name()
if TARGET_VAR in command or HOST_VAR in command:
for dirty_target in itertools.chain(str_targets, ipset_targets):
yielded_task = task.clone()
dirty_target = str(dirty_target)
yielded_task.replace(TARGET_VAR, dirty_target)
yielded_task.replace(HOST_VAR, dirty_target)
yielded_task.replace(
CLEANTARGET_VAR,
dirty_target.replace("http://", "").replace(
"https://", "").rstrip("/").replace("/", "-"),
)
yield yielded_task
else:
yield task
@staticmethod
def _replace_variable_in_commands(tasks_generator_func, variable, replacements):
for task in tasks_generator_func():
if variable in task.name():
for replacement in replacements:
yielded_task = task.clone()
yielded_task.replace(variable, str(replacement))
yield yielded_task
else:
yield task
@staticmethod
def _replace_variable_array(
tasks_generator_func, variable, replacements_iterator
):
for task in tasks_generator_func():
task.replace(variable, str(next(replacements_iterator)))
yield task
@staticmethod
def _process_targets(arguments):
def pre_process_target_spec(target_spec):
target_spec = "".join(
filter(lambda char: char not in (" ", "\n"), target_spec)
)
return target_spec.split(",")
# If ","s not in target_spec, this returns [target_spec], so this
# static method always returns a list
if arguments.target:
target_specs = pre_process_target_spec(arguments.target)
else:
target_specs_file = arguments.target_list
if not isinstance(target_specs_file, TextIOWrapper):
if not sys.stdin.isatty():
target_specs_file = sys.stdin
target_specs = (
target_spec.strip() for target_spec in target_specs_file
)
target_specs = (
pre_process_target_spec(target_spec) for target_spec in
target_specs if target_spec
)
target_specs = itertools.chain(*target_specs)
def parse_and_group_target_specs(target_specs, nocidr):
str_targets = set()
ipset_targets = IPSet()
for target_spec in target_specs:
if (
target_spec.startswith(".") or
(
(target_spec[0].isalpha() or target_spec[-1].isalpha())
and "." in target_spec
) or
(nocidr and "/" in target_spec)
):
str_targets.add(target_spec)
else:
if "-" in target_spec:
start_ip, post_dash_segment = target_spec.split("-")
end_ip = start_ip.rsplit(".", maxsplit=1)[0] + "." + \
post_dash_segment
target_spec = IPRange(start_ip, end_ip)
elif "*" in target_spec:
target_spec = glob_to_iprange(target_spec)
else: # str IP addresses and str CIDR notations
target_spec = (target_spec,)
ipset_targets.update(IPSet(target_spec))
return (str_targets, ipset_targets)
str_targets, ipset_targets = parse_and_group_target_specs(
target_specs=target_specs,
nocidr=arguments.nocidr,
)
if arguments.exclusions or arguments.exclusions_list:
if arguments.exclusions:
exclusion_specs = pre_process_target_spec(arguments.exclusions)
elif arguments.exclusions_list:
exclusion_specs = (
exclusion_spec.strip() for exclusion_spec in
arguments.exclusions_list
)
exclusion_specs = (
pre_process_target_spec(exclusion_spec) for exclusion_spec
in exclusion_specs if exclusion_spec
)
exclusion_specs = itertools.chain(*exclusion_specs)
str_exclusions, ipset_exclusions = parse_and_group_target_specs(
target_specs=exclusion_specs,
nocidr=arguments.nocidr,
)
str_targets -= str_exclusions
ipset_targets -= ipset_exclusions
return (str_targets, ipset_targets)
@staticmethod
def process_data_for_tasks_iterator(arguments):
# removing the trailing slash if any
if arguments.output and arguments.output[-1] == "/":
arguments.output = arguments.output[:-1]
ports = InputHelper._process_port(arguments.port) if arguments.port \
else None
real_ports = InputHelper._process_port(arguments.realport) if \
arguments.realport else None
str_targets, ipset_targets = InputHelper._process_targets(
arguments=arguments,
)
targets_count = len(str_targets) + ipset_targets.size
if not targets_count:
raise Exception("No target provided, or empty target list")
if arguments.random:
files = InputHelper._get_files_from_directory(arguments.random)
random_file = choice(files)
else:
random_file = None
tasks = list()
if arguments.command:
tasks.append(Task(arguments.command.rstrip('\n')))
else:
tasks = InputHelper._pre_process_commands(arguments.command_list)
if arguments.proto:
protocols = arguments.proto.split(",")
# if "," not in arguments.proto, [arguments.proto] is returned by
# .split()
else:
protocols = None
# Calculate the tasks count, as we will not have access to the len() of
# the tasks iterator
tasks_count = len(tasks) * targets_count
if ports:
tasks_count *= len(ports)
if real_ports:
tasks_count *= len(real_ports)
if protocols:
tasks_count *= len(protocols)
return {
"tasks": tasks,
"str_targets": str_targets,
"ipset_targets": ipset_targets,
"ports": ports,
"real_ports": real_ports,
"random_file": random_file,
"output": arguments.output,
"protocols": protocols,
"proxy_list": arguments.proxy_list,
"tasks_count": tasks_count,
}
@staticmethod
def make_tasks_generator_func(tasks_data):
tasks_generator_func = functools.partial(
InputHelper._replace_target_variables_in_commands,
tasks=tasks_data["tasks"],
str_targets=tasks_data["str_targets"],
ipset_targets=tasks_data["ipset_targets"],
)
ports = tasks_data["ports"]
if ports:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_port_",
replacements=ports,
)
real_ports = tasks_data["real_ports"]
if real_ports:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_realport_",
replacements=real_ports,
)
random_file = tasks_data["random_file"]
if random_file:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_random_",
replacements=[random_file],
)
output = tasks_data["output"]
if output:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_output_",
replacements=[output],
)
protocols = tasks_data["protocols"]
if protocols:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_proto_",
replacements=protocols,
)
proxy_list = tasks_data["proxy_list"]
if proxy_list:
proxy_list_iterator = itertools.cycle(
proxy for proxy in (
proxy.strip() for proxy in proxy_list
) if proxy
)
tasks_generator_func = functools.partial(
InputHelper._replace_variable_array,
tasks_generator_func=tasks_generator_func,
variable="_proxy_",
replacements_iterator=proxy_list_iterator,
)
return tasks_generator_func
class InputParser(object):
def __init__(self):
self._parser = self.setup_parser()
def parse(self, argv):
return self._parser.parse_args(argv)
@staticmethod
def setup_parser():
parser = ArgumentParser()
#Is stdin attached?
requireTargetArg = True
if not sys.stdin.isatty():
requireTargetArg = False
targets = parser.add_mutually_exclusive_group(required=requireTargetArg)
targets.add_argument(
'-t', dest='target', required=False,
help='Specify a target or domain name either in comma format, '
'CIDR notation, glob notation, or a single target.'
)
targets.add_argument(
'-tL', dest='target_list', required=False,
help='Specify a list of targets or domain names.',
metavar="FILE",
type=lambda x: InputHelper.readable_file(parser, x)
)
# exclusions group
exclusions = parser.add_mutually_exclusive_group()
exclusions.add_argument(
'-e', dest='exclusions', required=False,
help='Specify an exclusion either in comma format, '
'CIDR notation, or a single target.'
)
exclusions.add_argument(
'-eL', dest='exclusions_list', required=False,
help='Specify a list of exclusions.',
metavar="FILE",
type=lambda x: InputHelper.readable_file(parser, x)
)
parser.add_argument(
'-threads', dest='threads', required=False,
help="Specify the maximum number of threads to run (DEFAULT:5)",
default=5,
type=lambda x: InputHelper.check_positive(parser, x)
)
parser.add_argument(
'-timeout', dest='timeout', required=False,
help="Command timeout in seconds (DEFAULT:600)",
default=600,
type=lambda x: InputHelper.check_positive(parser, x)
)
parser.add_argument(
'-pL', dest='proxy_list', required=False,
help='Specify a list of proxies.',
metavar="FILE",
type=lambda x: InputHelper.readable_file(parser, x)
)
commands = parser.add_mutually_exclusive_group(required=True)
commands.add_argument(
'-c', dest='command',
help='Specify a single command to execute.'
)
commands.add_argument(
'-cL', dest='command_list', required=False,
help='Specify a list of commands to execute',
metavar="FILE",
type=lambda x: InputHelper.readable_file(parser, x)
)
parser.add_argument(
'-o', dest='output',
help='Specify an output folder variable that can be used in commands as _output_'
)
parser.add_argument(
'-p', dest='port',
help='Specify a port variable that can be used in commands as _port_'
)
parser.add_argument(
'--proto', dest='proto',
help='Specify protocols that can be used in commands as _proto_'
)
parser.add_argument(
'-rp', dest='realport',
help='Specify a real port variable that can be used in commands as _realport_'
)
parser.add_argument(
'-random', dest='random',
help='Specify a directory of files that can be randomly used in commands as _random_',
type=lambda x: InputHelper.check_path(parser, x)
)
parser.add_argument(
'--no-cidr', dest='nocidr', action='store_true', default=False,
help='If set then CIDR notation in a target file will not be automatically '
'be expanded into individual hosts.'
)
parser.add_argument(
'--no-color', dest='nocolor', action='store_true', default=False,
help='If set then any foreground or background colours will be '
'stripped out.'
)
parser.add_argument(
'--no-bar', '--sober', dest='sober', action='store_true', default=False,
help='If set then progress bar will be stripped out'
)
parser.add_argument(
'--repeat', dest='repeat',
help='repeat the given command x number of times.'
)
output_types = parser.add_mutually_exclusive_group()
output_types.add_argument(
'-v', '--verbose', dest='verbose', action='store_true', default=False,
help='If set then verbose output will be displayed in the terminal.'
)
output_types.add_argument(
'--silent', dest='silent', action='store_true', default=False,
help='If set only findings will be displayed and banners '
'and other information will be redacted.'
)
return parser