Flattened the commands hierarchy

This commit is contained in:
Joshua Ogunyinka
2019-08-16 14:31:39 +01:00
parent bcc4599dad
commit b9487fec9c
3 changed files with 58 additions and 117 deletions

View File

@@ -1,29 +1,23 @@
#!/usr/bin/python3
import sys
from Interlace.lib.core.input import InputParser, InputHelper
from Interlace.lib.core.output import OutputHelper, Level
from Interlace.lib.threader import Pool, TaskBlock
def print_command(level, command, message, output):
if isinstance(command, TaskBlock):
for c in command:
print_command(level, c, message, output)
else:
output.terminal(Level.THREAD, command.name(), "Added to Queue")
from Interlace.lib.threader import Pool
def build_queue(arguments, output):
task_list = InputHelper.process_commands(arguments)
for task in task_list:
print_command(Level.THREAD, task, "Added to Queue", output)
output.terminal(Level.THREAD, task.name(), "Added to Queue")
return task_list
def main():
parser = InputParser()
arguments = parser.parse(sys.argv[1:])
args = ["-cL", "C:\\Users\\user\\Documents\\PythonProjects\\Interlace\\foo.test",
"-tL", "C:\\Users\\user\\Documents\\PythonProjects\\Interlace\\bar.test"]
arguments = parser.parse(args)
# arguments = parser.parse(sys.argv[1:])
output = OutputHelper(arguments)

View File

@@ -2,11 +2,11 @@ import os.path
import sys
from argparse import ArgumentParser
from math import ceil
from os import access, W_OK
from random import sample
from netaddr import IPNetwork, IPRange, IPGlob
from Interlace.lib.threader import TaskBlock, Task
from Interlace.lib.threader import Task
class InputHelper(object):
@@ -75,9 +75,10 @@ class InputHelper(object):
return [port_type]
@staticmethod
def _pre_process_commands(command_list, task_name):
task_block = TaskBlock(task_name)
parent_task = None
def _pre_process_commands(command_list, task_name, is_global_task=True):
task_block = []
sibling = None
global_task = None
for command in command_list:
command = str(command).strip()
if not command:
@@ -86,16 +87,23 @@ class InputHelper(object):
new_task_name = command.split('_block:')[1][:-1].strip()
if task_name == new_task_name:
return task_block
task = InputHelper._pre_process_commands(command_list, new_task_name)
for task in InputHelper._pre_process_commands(command_list, new_task_name, False):
task_block.append(task)
sibling = task
continue
else:
task = Task(command)
if command == '_blocker_':
parent_task = task_block.last()
parent_task.set_lock()
global_task = sibling
continue
if parent_task:
task.wait_for(parent_task.get_lock())
task_block.add_task(task)
task = Task(command)
if is_global_task and global_task:
print('{} must wait for GLOBAL {}'.format(task.name(), global_task.name()))
task.wait_for(global_task.get_lock())
elif sibling:
task.wait_for(sibling.get_lock())
print('{} is waiting for {}'.format(task.name(), sibling.name()))
task_block.append(task)
sibling = task
return task_block
@staticmethod
@@ -121,28 +129,20 @@ class InputHelper(object):
@staticmethod
def _replace_variable_with_commands(commands, variable, replacements):
foo = []
def add_task(t):
if t not in set(foo):
foo.append(t)
def add_task(t, item_list):
if t not in set(item_list):
item_list.append(t)
tasks = []
for command in commands:
is_task = not isinstance(command, TaskBlock)
for replacement in replacements:
if is_task and command.name().find(variable) != -1:
if command.name().find(variable) != -1:
new_task = command.clone()
new_task.replace(variable, replacement)
add_task(new_task)
elif is_task and command not in set(foo):
add_task(command)
elif not is_task:
tasks = [task for task in command.get_tasks()]
command.clear_tasks()
for r in InputHelper._replace_variable_with_commands(tasks, variable, replacements):
command.add_task(r)
add_task(command)
return foo
add_task(new_task, tasks)
else:
add_task(command, tasks)
return tasks
@staticmethod
def _replace_variable_array(commands, variable, replacement):
@@ -151,10 +151,7 @@ class InputHelper(object):
return
for counter, command in enumerate(commands):
if isinstance(command, TaskBlock):
InputHelper._replace_variable_array(command, variable, replacement)
else:
command.replace(variable, str(replacement[counter]))
command.replace(variable, str(replacement[counter]))
@staticmethod
def process_commands(arguments):
@@ -178,8 +175,8 @@ class InputHelper(object):
ranges.add(arguments.target)
else:
target_file = arguments.target_list
if not sys.stdin.isatty():
target_file = sys.stdin
# if not sys.stdin.isatty():
# target_file = sys.stdin
ranges.update([target.strip() for target in target_file if target.strip()])
# process exclusions first
@@ -205,8 +202,7 @@ class InputHelper(object):
if arguments.command:
commands.append(arguments.command.rstrip('\n'))
else:
tasks = InputHelper._pre_process_commands(arguments.command_list, '')
commands = tasks.get_tasks()
commands = InputHelper._pre_process_commands(arguments.command_list, '')
commands = InputHelper._replace_variable_with_commands(commands, "_target_", targets)
commands = InputHelper._replace_variable_with_commands(commands, "_host_", targets)

View File

@@ -1,6 +1,6 @@
import subprocess
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Event
from threading import Thread
from tqdm import tqdm
@@ -8,8 +8,8 @@ from tqdm import tqdm
class Task(object):
def __init__(self, command):
self.task = command
self._lock = None
self._waiting_for_task = False
self.self_lock = None
self.sibling_lock = None
def __cmp__(self, other):
return self.name() == other.name()
@@ -19,36 +19,31 @@ class Task(object):
def clone(self):
new_task = Task(self.task)
new_task._lock = self._lock
new_task._waiting_for_task = self._waiting_for_task
new_task.self_lock = self.self_lock
new_task.sibling_lock = self.sibling_lock
return new_task
def replace(self, old, new):
self.task = self.task.replace(old, new)
def run(self, t=False):
if not self._waiting_for_task:
self._run_task(t)
if self._lock:
self._lock.set()
else:
self._lock.wait()
self._run_task(t)
if self.sibling_lock:
self.sibling_lock.wait()
self._run_task(t)
if self.self_lock:
self.self_lock.set()
def wait_for(self, lock):
self._lock = lock
self._waiting_for_task = True
def set_lock(self):
if not self._lock:
self._lock = Event()
self._lock.clear()
def wait_for(self, _lock):
self.sibling_lock = _lock
def name(self):
return self.task
def get_lock(self):
return self._lock
if not self.self_lock:
self.self_lock = Event()
self.self_lock.clear()
return self.self_lock
def _run_task(self, t=False):
if t:
@@ -58,44 +53,6 @@ class Task(object):
subprocess.Popen(self.task, shell=True)
class TaskBlock(Task):
def __init__(self, name):
super().__init__('')
self._name = name
self.tasks = []
def name(self):
return self._name
def add_task(self, task):
self.tasks.append(task)
def clear_tasks(self):
self.tasks.clear()
def __len__(self):
return len(self.tasks)
def __hash__(self):
hash_value = 0
for t in self.tasks:
hash_value ^= t.__hash__()
return hash_value
def __iter__(self):
return self.tasks.__iter__()
def last(self):
return self.tasks[-1]
def _run_task(self, t=False):
for task in self.tasks:
task._run_task(t)
def get_tasks(self):
return self.tasks
class Worker(object):
def __init__(self, task_queue, timeout, output, tqdm):
self.queue = task_queue
@@ -144,17 +101,11 @@ class Pool(object):
def run(self):
workers = [Worker(self.queue, self.timeout, self.output, self.tqdm) for w in range(self.max_workers)]
threads = []
# run
for worker in workers:
thread = Thread(target=worker)
thread.start()
threads.append(thread)
# wait until all workers have completed their tasks
for thread in threads:
thread.join()
with ThreadPoolExecutor(self.max_workers) as executors:
for worker in workers:
executors.submit(worker)
# test harness