diff --git a/Interlace/interlace.py b/Interlace/interlace.py index a112786..77cff21 100644 --- a/Interlace/interlace.py +++ b/Interlace/interlace.py @@ -1,29 +1,23 @@ #!/usr/bin/python3 -import sys from Interlace.lib.core.input import InputParser, InputHelper from Interlace.lib.core.output import OutputHelper, Level -from Interlace.lib.threader import Pool, TaskBlock - - -def print_command(level, command, message, output): - if isinstance(command, TaskBlock): - for c in command: - print_command(level, c, message, output) - else: - output.terminal(Level.THREAD, command.name(), "Added to Queue") +from Interlace.lib.threader import Pool def build_queue(arguments, output): task_list = InputHelper.process_commands(arguments) for task in task_list: - print_command(Level.THREAD, task, "Added to Queue", output) + output.terminal(Level.THREAD, task.name(), "Added to Queue") return task_list def main(): parser = InputParser() - arguments = parser.parse(sys.argv[1:]) + args = ["-cL", "C:\\Users\\user\\Documents\\PythonProjects\\Interlace\\foo.test", + "-tL", "C:\\Users\\user\\Documents\\PythonProjects\\Interlace\\bar.test"] + arguments = parser.parse(args) + # arguments = parser.parse(sys.argv[1:]) output = OutputHelper(arguments) diff --git a/Interlace/lib/core/input.py b/Interlace/lib/core/input.py index e6566fb..6c55098 100644 --- a/Interlace/lib/core/input.py +++ b/Interlace/lib/core/input.py @@ -2,11 +2,11 @@ import os.path import sys from argparse import ArgumentParser from math import ceil -from os import access, W_OK from random import sample from netaddr import IPNetwork, IPRange, IPGlob -from Interlace.lib.threader import TaskBlock, Task + +from Interlace.lib.threader import Task class InputHelper(object): @@ -75,9 +75,10 @@ class InputHelper(object): return [port_type] @staticmethod - def _pre_process_commands(command_list, task_name): - task_block = TaskBlock(task_name) - parent_task = None + def _pre_process_commands(command_list, task_name, is_global_task=True): + task_block = [] + sibling = None + global_task = None for command in command_list: command = str(command).strip() if not command: @@ -86,16 +87,23 @@ class InputHelper(object): new_task_name = command.split('_block:')[1][:-1].strip() if task_name == new_task_name: return task_block - task = InputHelper._pre_process_commands(command_list, new_task_name) + for task in InputHelper._pre_process_commands(command_list, new_task_name, False): + task_block.append(task) + sibling = task + continue else: - task = Task(command) if command == '_blocker_': - parent_task = task_block.last() - parent_task.set_lock() + global_task = sibling continue - if parent_task: - task.wait_for(parent_task.get_lock()) - task_block.add_task(task) + task = Task(command) + if is_global_task and global_task: + print('{} must wait for GLOBAL {}'.format(task.name(), global_task.name())) + task.wait_for(global_task.get_lock()) + elif sibling: + task.wait_for(sibling.get_lock()) + print('{} is waiting for {}'.format(task.name(), sibling.name())) + task_block.append(task) + sibling = task return task_block @staticmethod @@ -121,28 +129,20 @@ class InputHelper(object): @staticmethod def _replace_variable_with_commands(commands, variable, replacements): - foo = [] - - def add_task(t): - if t not in set(foo): - foo.append(t) + def add_task(t, item_list): + if t not in set(item_list): + item_list.append(t) + tasks = [] for command in commands: - is_task = not isinstance(command, TaskBlock) for replacement in replacements: - if is_task and command.name().find(variable) != -1: + if command.name().find(variable) != -1: new_task = command.clone() new_task.replace(variable, replacement) - add_task(new_task) - elif is_task and command not in set(foo): - add_task(command) - elif not is_task: - tasks = [task for task in command.get_tasks()] - command.clear_tasks() - for r in InputHelper._replace_variable_with_commands(tasks, variable, replacements): - command.add_task(r) - add_task(command) - return foo + add_task(new_task, tasks) + else: + add_task(command, tasks) + return tasks @staticmethod def _replace_variable_array(commands, variable, replacement): @@ -151,10 +151,7 @@ class InputHelper(object): return for counter, command in enumerate(commands): - if isinstance(command, TaskBlock): - InputHelper._replace_variable_array(command, variable, replacement) - else: - command.replace(variable, str(replacement[counter])) + command.replace(variable, str(replacement[counter])) @staticmethod def process_commands(arguments): @@ -178,8 +175,8 @@ class InputHelper(object): ranges.add(arguments.target) else: target_file = arguments.target_list - if not sys.stdin.isatty(): - target_file = sys.stdin + # if not sys.stdin.isatty(): + # target_file = sys.stdin ranges.update([target.strip() for target in target_file if target.strip()]) # process exclusions first @@ -205,8 +202,7 @@ class InputHelper(object): if arguments.command: commands.append(arguments.command.rstrip('\n')) else: - tasks = InputHelper._pre_process_commands(arguments.command_list, '') - commands = tasks.get_tasks() + commands = InputHelper._pre_process_commands(arguments.command_list, '') commands = InputHelper._replace_variable_with_commands(commands, "_target_", targets) commands = InputHelper._replace_variable_with_commands(commands, "_host_", targets) diff --git a/Interlace/lib/threader.py b/Interlace/lib/threader.py index a88cbf9..26d11df 100644 --- a/Interlace/lib/threader.py +++ b/Interlace/lib/threader.py @@ -1,6 +1,6 @@ import subprocess +from concurrent.futures import ThreadPoolExecutor from multiprocessing import Event -from threading import Thread from tqdm import tqdm @@ -8,8 +8,8 @@ from tqdm import tqdm class Task(object): def __init__(self, command): self.task = command - self._lock = None - self._waiting_for_task = False + self.self_lock = None + self.sibling_lock = None def __cmp__(self, other): return self.name() == other.name() @@ -19,36 +19,31 @@ class Task(object): def clone(self): new_task = Task(self.task) - new_task._lock = self._lock - new_task._waiting_for_task = self._waiting_for_task + new_task.self_lock = self.self_lock + new_task.sibling_lock = self.sibling_lock return new_task def replace(self, old, new): self.task = self.task.replace(old, new) def run(self, t=False): - if not self._waiting_for_task: - self._run_task(t) - if self._lock: - self._lock.set() - else: - self._lock.wait() - self._run_task(t) + if self.sibling_lock: + self.sibling_lock.wait() + self._run_task(t) + if self.self_lock: + self.self_lock.set() - def wait_for(self, lock): - self._lock = lock - self._waiting_for_task = True - - def set_lock(self): - if not self._lock: - self._lock = Event() - self._lock.clear() + def wait_for(self, _lock): + self.sibling_lock = _lock def name(self): return self.task def get_lock(self): - return self._lock + if not self.self_lock: + self.self_lock = Event() + self.self_lock.clear() + return self.self_lock def _run_task(self, t=False): if t: @@ -58,44 +53,6 @@ class Task(object): subprocess.Popen(self.task, shell=True) -class TaskBlock(Task): - def __init__(self, name): - super().__init__('') - self._name = name - self.tasks = [] - - def name(self): - return self._name - - def add_task(self, task): - self.tasks.append(task) - - def clear_tasks(self): - self.tasks.clear() - - def __len__(self): - return len(self.tasks) - - def __hash__(self): - hash_value = 0 - for t in self.tasks: - hash_value ^= t.__hash__() - return hash_value - - def __iter__(self): - return self.tasks.__iter__() - - def last(self): - return self.tasks[-1] - - def _run_task(self, t=False): - for task in self.tasks: - task._run_task(t) - - def get_tasks(self): - return self.tasks - - class Worker(object): def __init__(self, task_queue, timeout, output, tqdm): self.queue = task_queue @@ -144,17 +101,11 @@ class Pool(object): def run(self): workers = [Worker(self.queue, self.timeout, self.output, self.tqdm) for w in range(self.max_workers)] - threads = [] # run - for worker in workers: - thread = Thread(target=worker) - thread.start() - threads.append(thread) - - # wait until all workers have completed their tasks - for thread in threads: - thread.join() + with ThreadPoolExecutor(self.max_workers) as executors: + for worker in workers: + executors.submit(worker) # test harness