Files
Interlace/Interlace/lib/core/input.py
Ira Lun 7034a09e09 Accept hostname targets that do not contain a period.
This fixes issue #123, which was due to a misinterpretation of the
line

`if host.split(".")[0][0].isalpha() or host.split(".")[-1][-1].isalpha():`

(see commit 8d46d14731, L156). Because of
how Python's str.split() works, a hostname that does not contain a
period when .split(".") results in a single-item list, so this
expression evaluates to True. However, this also means that the
.split()s had no effect: the line was equivalent to

`if host[0].isalpha() or host[-1].isalpha()`.
2020-10-12 04:56:54 +01:00

508 lines
18 KiB
Python

import functools
import itertools
import os.path
import sys
from io import TextIOWrapper
from argparse import ArgumentParser
from random import choice
from netaddr import (
IPRange,
IPSet,
glob_to_iprange,
)
from Interlace.lib.threader import Task
class InputHelper(object):
@staticmethod
def check_path(parser, arg):
if not os.path.exists(arg):
parser.error("The path %s does not exist!" % arg)
else:
return arg
@staticmethod
def readable_file(parser, arg):
if InputHelper.check_path(parser, arg):
return open(arg, 'r') # return an open file handle
@staticmethod
def check_positive(parser, arg):
try:
ivalue = int(arg)
if ivalue <= 0:
raise parser.ArgumentTypeError("%s is not a valid positive integer!" % arg)
except ValueError as e:
raise parser.ArgumentValueError("%s is not a a number!" % arg)
return arg
@staticmethod
def _get_files_from_directory(arg):
files = list()
for file in os.listdir(arg):
location = os.path.join(arg, file)
if os.path.isfile(location):
files.append(location)
return files
@staticmethod
def _process_port(port_type):
if "," in port_type:
return port_type.split(",")
elif "-" in port_type:
tmp = port_type.split("-")
begin_range = int(tmp[0])
end_range = int(tmp[1])
if begin_range >= end_range:
raise Exception("Invalid range provided")
return list(range(begin_range, end_range + 1))
return [port_type]
@staticmethod
def _pre_process_commands(command_list, task_name=None, is_global_task=True):
"""
:param command_list:
:param task_name: all tasks have 'scope' and all scopes have unique names, global scope defaults None
:param is_global_task: when True, signifies that all global tasks are meant to be run concurrently
:return: list of possibly re-adjusted commands
"""
task_block = []
sibling = None
blocker = None
for command in command_list:
command = str(command).strip()
if len(command) == 0:
continue
# the start or end of a command block
if (command.startswith('_block:') and command.endswith('_')) or\
command == '_block_':
# if this is the end of a block, then we're done
new_task_name = ''
if command.startswith('_block:'):
new_task_name = command.split('_block:')[1][:-1].strip()
if task_name and task_name == new_task_name:
return task_block
# otherwise pre-process all the commands in this new `new_task_name` block
tasks = InputHelper._pre_process_commands(command_list, new_task_name, False)
if blocker:
for task in tasks:
task.wait_for(task_block)
task_block += tasks
if len(tasks) > 0:
sibling = tasks[-1]
continue
else:
# if a blocker is encountered, all commands following the blocker must wait until the last
# command in the block is executed. All block commands are synchronous
if command == '_blocker_':
blocker = sibling
continue
task = Task(command)
# if we're in the global scope and there was a previous _blocker_ encountered, we wait for the last
# child of the block
if is_global_task and blocker:
task.wait_for(task_block)
# all but the first command in a block scope wait for its predecessor
elif sibling and not is_global_task:
task.wait_for([sibling])
task_block.append(task)
sibling = task
return task_block
@staticmethod
def _replace_target_variables_in_commands(tasks, str_targets, ipset_targets):
TARGET_VAR = "_target_"
HOST_VAR = "_host_"
CLEANTARGET_VAR = "_cleantarget_"
for task in tasks:
command = task.name()
if TARGET_VAR in command or HOST_VAR in command:
for dirty_target in itertools.chain(str_targets, ipset_targets):
yielded_task = task.clone()
dirty_target = str(dirty_target)
yielded_task.replace(TARGET_VAR, dirty_target)
yielded_task.replace(HOST_VAR, dirty_target)
yielded_task.replace(
CLEANTARGET_VAR,
dirty_target.replace("http://", "").replace(
"https://", "").rstrip("/").replace("/", "-"),
)
yield yielded_task
else:
yield task
@staticmethod
def _replace_variable_in_commands(tasks_generator_func, variable, replacements):
for task in tasks_generator_func():
if variable in task.name():
for replacement in replacements:
yielded_task = task.clone()
yielded_task.replace(variable, str(replacement))
yield yielded_task
else:
yield task
@staticmethod
def _replace_variable_array(
tasks_generator_func, variable, replacements_iterator
):
for task in tasks_generator_func():
task.replace(variable, str(next(replacements_iterator)))
yield task
@staticmethod
def _process_targets(arguments):
def pre_process_target_spec(target_spec):
target_spec = "".join(
filter(lambda char: char not in (" ", "\n"), target_spec)
)
return target_spec.split(",")
# If ","s not in target_spec, this returns [target_spec], so this
# static method always returns a list
if arguments.target:
target_specs = pre_process_target_spec(arguments.target)
else:
target_specs_file = arguments.target_list
if not isinstance(target_specs_file, TextIOWrapper):
if not sys.stdin.isatty():
target_specs_file = sys.stdin
target_specs = (
target_spec.strip() for target_spec in target_specs_file
)
target_specs = (
pre_process_target_spec(target_spec) for target_spec in
target_specs if target_spec
)
target_specs = itertools.chain(*target_specs)
def parse_and_group_target_specs(target_specs, nocidr):
str_targets = set()
ipset_targets = IPSet()
for target_spec in target_specs:
if (
target_spec.startswith(".") or
(target_spec[0].isalpha() or target_spec[-1].isalpha()) or
(nocidr and "/" in target_spec)
):
str_targets.add(target_spec)
else:
if "-" in target_spec:
start_ip, post_dash_segment = target_spec.split("-")
end_ip = start_ip.rsplit(".", maxsplit=1)[0] + "." + \
post_dash_segment
target_spec = IPRange(start_ip, end_ip)
elif "*" in target_spec:
target_spec = glob_to_iprange(target_spec)
else: # str IP addresses and str CIDR notations
target_spec = (target_spec,)
ipset_targets.update(IPSet(target_spec))
return (str_targets, ipset_targets)
str_targets, ipset_targets = parse_and_group_target_specs(
target_specs=target_specs,
nocidr=arguments.nocidr,
)
if arguments.exclusions or arguments.exclusions_list:
if arguments.exclusions:
exclusion_specs = pre_process_target_spec(arguments.exclusions)
elif arguments.exclusions_list:
exclusion_specs = (
exclusion_spec.strip() for exclusion_spec in
arguments.exclusions_list
)
exclusion_specs = (
pre_process_target_spec(exclusion_spec) for exclusion_spec
in exclusion_specs if exclusion_spec
)
exclusion_specs = itertools.chain(*exclusion_specs)
str_exclusions, ipset_exclusions = parse_and_group_target_specs(
target_specs=exclusion_specs,
nocidr=arguments.nocidr,
)
str_targets -= str_exclusions
ipset_targets -= ipset_exclusions
return (str_targets, ipset_targets)
@staticmethod
def process_data_for_tasks_iterator(arguments):
# removing the trailing slash if any
if arguments.output and arguments.output[-1] == "/":
arguments.output = arguments.output[:-1]
ports = InputHelper._process_port(arguments.port) if arguments.port \
else None
real_ports = InputHelper._process_port(arguments.realport) if \
arguments.realport else None
str_targets, ipset_targets = InputHelper._process_targets(
arguments=arguments,
)
targets_count = len(str_targets) + ipset_targets.size
if not targets_count:
raise Exception("No target provided, or empty target list")
if arguments.random:
files = InputHelper._get_files_from_directory(arguments.random)
random_file = choice(files)
else:
random_file = None
tasks = list()
if arguments.command:
tasks.append(Task(arguments.command.rstrip('\n')))
else:
tasks = InputHelper._pre_process_commands(arguments.command_list)
if arguments.proto:
protocols = arguments.proto.split(",")
# if "," not in arguments.proto, [arguments.proto] is returned by
# .split()
else:
protocols = None
# Calculate the tasks count, as we will not have access to the len() of
# the tasks iterator
tasks_count = len(tasks) * targets_count
if ports:
tasks_count *= len(ports)
if real_ports:
tasks_count *= len(real_ports)
if protocols:
tasks_count *= len(protocols)
return {
"tasks": tasks,
"str_targets": str_targets,
"ipset_targets": ipset_targets,
"ports": ports,
"real_ports": real_ports,
"random_file": random_file,
"output": arguments.output,
"protocols": protocols,
"proxy_list": arguments.proxy_list,
"tasks_count": tasks_count,
}
@staticmethod
def make_tasks_generator_func(tasks_data):
tasks_generator_func = functools.partial(
InputHelper._replace_target_variables_in_commands,
tasks=tasks_data["tasks"],
str_targets=tasks_data["str_targets"],
ipset_targets=tasks_data["ipset_targets"],
)
ports = tasks_data["ports"]
if ports:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_port_",
replacements=ports,
)
real_ports = tasks_data["real_ports"]
if real_ports:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_realport_",
replacements=real_ports,
)
random_file = tasks_data["random_file"]
if random_file:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_random_",
replacements=[random_file],
)
output = tasks_data["output"]
if output:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_output_",
replacements=[output],
)
protocols = tasks_data["protocols"]
if protocols:
tasks_generator_func = functools.partial(
InputHelper._replace_variable_in_commands,
tasks_generator_func=tasks_generator_func,
variable="_proto_",
replacements=protocols,
)
proxy_list = tasks_data["proxy_list"]
if proxy_list:
proxy_list_iterator = itertools.cycle(
proxy for proxy in (
proxy.strip() for proxy in proxy_list
) if proxy
)
tasks_generator_func = functools.partial(
InputHelper._replace_variable_array,
tasks_generator_func=tasks_generator_func,
variable="_proxy_",
replacements_iterator=proxy_list_iterator,
)
return tasks_generator_func
class InputParser(object):
def __init__(self):
self._parser = self.setup_parser()
def parse(self, argv):
return self._parser.parse_args(argv)
@staticmethod
def setup_parser():
parser = ArgumentParser()
#Is stdin attached?
requireTargetArg = True
if not sys.stdin.isatty():
requireTargetArg = False
targets = parser.add_mutually_exclusive_group(required=requireTargetArg)
targets.add_argument(
'-t', dest='target', required=False,
help='Specify a target or domain name either in comma format, '
'CIDR notation, glob notation, or a single target.'
)
targets.add_argument(
'-tL', dest='target_list', required=False,
help='Specify a list of targets or domain names.',
metavar="FILE",
type=lambda x: InputHelper.readable_file(parser, x)
)
# exclusions group
exclusions = parser.add_mutually_exclusive_group()
exclusions.add_argument(
'-e', dest='exclusions', required=False,
help='Specify an exclusion either in comma format, '
'CIDR notation, or a single target.'
)
exclusions.add_argument(
'-eL', dest='exclusions_list', required=False,
help='Specify a list of exclusions.',
metavar="FILE",
type=lambda x: InputHelper.readable_file(parser, x)
)
parser.add_argument(
'-threads', dest='threads', required=False,
help="Specify the maximum number of threads to run (DEFAULT:5)",
default=5,
type=lambda x: InputHelper.check_positive(parser, x)
)
parser.add_argument(
'-timeout', dest='timeout', required=False,
help="Command timeout in seconds (DEFAULT:600)",
default=600,
type=lambda x: InputHelper.check_positive(parser, x)
)
parser.add_argument(
'-pL', dest='proxy_list', required=False,
help='Specify a list of proxies.',
metavar="FILE",
type=lambda x: InputHelper.readable_file(parser, x)
)
commands = parser.add_mutually_exclusive_group(required=True)
commands.add_argument(
'-c', dest='command',
help='Specify a single command to execute.'
)
commands.add_argument(
'-cL', dest='command_list', required=False,
help='Specify a list of commands to execute',
metavar="FILE",
type=lambda x: InputHelper.readable_file(parser, x)
)
parser.add_argument(
'-o', dest='output',
help='Specify an output folder variable that can be used in commands as _output_'
)
parser.add_argument(
'-p', dest='port',
help='Specify a port variable that can be used in commands as _port_'
)
parser.add_argument(
'--proto', dest='proto',
help='Specify protocols that can be used in commands as _proto_'
)
parser.add_argument(
'-rp', dest='realport',
help='Specify a real port variable that can be used in commands as _realport_'
)
parser.add_argument(
'-random', dest='random',
help='Specify a directory of files that can be randomly used in commands as _random_',
type=lambda x: InputHelper.check_path(parser, x)
)
parser.add_argument(
'--no-cidr', dest='nocidr', action='store_true', default=False,
help='If set then CIDR notation in a target file will not be automatically '
'be expanded into individual hosts.'
)
parser.add_argument(
'--no-color', dest='nocolor', action='store_true', default=False,
help='If set then any foreground or background colours will be '
'stripped out.'
)
parser.add_argument(
'--no-bar', '--sober', dest='sober', action='store_true', default=False,
help='If set then progress bar will be stripped out'
)
parser.add_argument(
'--repeat', dest='repeat',
help='repeat the given command x number of times.'
)
output_types = parser.add_mutually_exclusive_group()
output_types.add_argument(
'-v', '--verbose', dest='verbose', action='store_true', default=False,
help='If set then verbose output will be displayed in the terminal.'
)
output_types.add_argument(
'--silent', dest='silent', action='store_true', default=False,
help='If set only findings will be displayed and banners '
'and other information will be redacted.'
)
return parser