Threading Library Base

This commit is contained in:
Michael Skelton
2018-10-23 21:15:33 +10:00
parent 6aa0396b1f
commit 1843319861
9 changed files with 755 additions and 19 deletions

View File

@@ -3,6 +3,7 @@ import sys
from lib.core.input import InputParser, InputHelper from lib.core.input import InputParser, InputHelper
from lib.core.output import OutputHelper, Level from lib.core.output import OutputHelper, Level
def main(): def main():
parser = InputParser() parser = InputParser()
arguments = parser.parse(sys.argv[1:]) arguments = parser.parse(sys.argv[1:])

View File

@@ -10,6 +10,14 @@ class InputHelper(object):
else: else:
return open(arg, 'r') # return an open file handle return open(arg, 'r') # return an open file handle
@staticmethod
def check_positive(parser, arg):
ivalue = int(arg)
if ivalue <= 0:
raise parser.ArgumentTypeError("%s is not a valid positive integer!" % arg)
return arg
@staticmethod @staticmethod
def process_targets(arguments): def process_targets(arguments):
targets = set() targets = set()
@@ -59,6 +67,13 @@ class InputParser(object):
type=lambda x: InputHelper.readable_file(parser, x) type=lambda x: InputHelper.readable_file(parser, x)
) )
parser.add_argument(
'-threads', dest='threads', required=False,
help="Specify the maximum number of threads to run (DEFAULT:5).",
default=5,
type=lambda x: InputHelper.check_positive(parser, x)
)
commands = parser.add_mutually_exclusive_group(required=True) commands = parser.add_mutually_exclusive_group(required=True)
commands.add_argument( commands.add_argument(
'-c', dest='command', '-c', dest='command',
@@ -72,25 +87,6 @@ class InputParser(object):
type=lambda x: InputHelper.readable_file(parser, x) type=lambda x: InputHelper.readable_file(parser, x)
) )
output = parser.add_mutually_exclusive_group()
output.add_argument(
'-oN', dest='output_normal',
help='Normal output printed to a file when the -oN option is '
'specified with a filename argument.'
)
output.add_argument(
'-oJ', dest='output_json',
help='JSON output printed to a file when the -oJ option is '
'specified with a filename argument.'
)
output.add_argument(
'-oG', dest='output_grepable',
help='Grepable output printed to a file when the -oG option is '
'specified with a filename argument.'
)
parser.add_argument( parser.add_argument(
'--no-color', dest='nocolor', action='store_true', default=False, '--no-color', dest='nocolor', action='store_true', default=False,

View File

View File

@@ -0,0 +1,123 @@
from threading import Thread
class Loop:
"""A simple loop, with some callbacks for:
on_start, on_stop and on_update"""
# the current state of the pool
_state = 0
# should the queue clear on exit
_clear = False
# kills the thread if true
_kill = False
# true if the pool thread is running
_has_thread = False
def has_thread(self):
"""Returns true if the pool thread is running
Parameters:
- None
"""
return self._has_thread
def state(self):
"""returns the latest state of the pool
- 0 = off
- 1 = running
- 2 = stopping
Parameters:
- None
"""
return self._state
def start_thread(self):
"""Starts the pool thread if it is not already started
Parameters:
- None
"""
# check there is not a thread already running
if self._has_thread is False and self._kill is False:
# start the pool thread
Thread(target=self._thread, daemon=True).start()
def stop_thread(self):
"""Stops the pool thread if it is running
Parameters:
- None
"""
# check that the pool thread is running
if self._kill is False and self._has_thread is True:
# set kill trigger
self._kill = True
def _thread(self):
"""[Protected]
The main pool thread.
Updates schedule and executes task in queue
Parameters:
- None
"""
# the pool now has a running thread
self._has_thread = True
# state is 1 (running)
self._state = 1
self.on_start()
# while the kill flag is not set
while self._kill is False:
# call virtual update
self.on_update()
# state is now 0 (stopped)
self.state = 0
self.on_stop()
# reset the kill flag
self._kill = False
# the pool no longer has a running thread
self._has_thread = False
def on_start(self):
pass
def on_stop(self):
pass
def on_update(self):
"""Overridable method.
Called on every loop update.
no return
Parameters:
- None
"""
pass

View File

@@ -0,0 +1,107 @@
from .queue import Queue
from .loop import Loop
class Pool(Queue, Loop):
"""Pool is a loop that processes a Queue"""
def __init__(self):
"""Definitions:
- None
Parameters:
- None
"""
# init super class Queue
super(Pool, self).__init__()
def stop_thread(self, clear_queue=False):
"""Stops the pool thread if it is running
Parameters:
- None
"""
# check that the pool thread is running
if self._kill is False and self._has_thread is True:
# set kill trigger
self._kill = True
# set clear flag
self._clear = clear_queue
def _thread(self):
"""[Protected]
The main pool thread.
Updates schedule and executes task in queue
Parameters:
- None
"""
# the pool now has a running thread
self._has_thread = True
# state is 1 (running)
self._state = 1
# while the kill flag is not set
while self._kill is False:
# call virtual update
self.on_update()
# once the loop exits,
# state is 2 (stopping)
self.state = 2
# if the clear flag is set and there are items still to be processed
while self._clear and self.length() > 0:
# dequeue the first item
item = self.dequeue()
# try to execute it
if not self.on_clear(item):
# if execution not possible append to back and try again
self.enqueue(item)
# state is now 0 (stopped)
self.state = 0
# reset the kill flag
self._kill = False
# the pool no longer has a running thread
self._has_thread = False
def on_update(self):
"""Overridable method.
Called on every loop update.
no return
Parameters:
- None
"""
pass
def on_clear(self, item):
"""Overridable method.
Called every loop while thread is stopping until queue is empty.
Runs item and return True and item will be removed from queue.
Returns False if item was not run and item will be appended to back of queue
Parameters:
- item: the latest item being processed
"""
return True

View File

@@ -0,0 +1,116 @@
class Queue:
"""Queue was intended as a c# style queue"""
def __init__(self):
"""Definitions:
- self._queue: the queue of items
Parameters:
- None
"""
# the queue of items
self._queue = []
def enqueue(self, item):
"""Adds a new item to the back of the queue"""
# append item to back of queue
self._queue.append(item)
def dequeue(self):
"""Removes an item from the beginning of the queue and returns it.
Returns none if the queue is empty
Parameters:
- None
"""
# if the queue length is more than zero get the first item
# otherwise return None
return self._queue.pop(0) if len(self._queue) > 0 else None
def dequeue_at(self, index):
"""Removes an item from the queue at a given index and returns it.
Returns none if the queue is empty
Parameters:
- index: the index of the item you wish to remove and return
"""
# if the queue length is longer than the given index, return item at index,
# otherwise return None
return self._queue.pop(index) if len(self._queue) > index else None
def remove(self, item):
"""Removes an item from the queue
Parameters:
- item: the item you wish to remove and return
"""
# remove the item from the list
self._queue.remove(item)
def length(self):
"""Returns the integer number of items in the queue
Parameters:
- None
"""
# return the number of items in the queue
return len(self._queue)
def items(self):
"""A get property for the queue
Parameters:
- None
"""
# returns the queue
return self._queue
def wipe(self):
"""Deletes every item in the queue immediately
Parameters:
- None
"""
# create a fresh queue
self._queue = []
def index_of(self, item):
"""Returns the index of a given item
Parameters:
- The item you wish to retrieve an index for
"""
# return the index of item
return self._queue.index(item)
def index(self, index):
"""Returns the item at a given index
Parameters:
- The index of the item you wish to retrieve
"""
# return queue item at index
return self._queue[index]

View File

@@ -0,0 +1,140 @@
# requires .worker for 'force run' technique
from .worker import Worker
# some standard stuff required
from threading import Thread
from time import time
class TaskScheduler:
"""TaskScheduler is intended to be an extension to the TaskManager class.
It allows task to be added to a queue and executed based on their time deadline,
(as opposed to the order they sit in the queue).
Scheduled task have the same options as regular ones, with a few extras.
- time: the expected time of execution.
- reoccurring: should the task be triggered again?
- delay: will cause the task to be triggered every <delay> seconds.
- run_now: should the task be run immediately (as well).
"""
def __init__(self):
"""Definitions:
- self.task: the list of scheduled task
- self.next_index: the task list index of the next scheduled task
- self.next_time: the expected execution time of the next scheduled task
Parameters:
- None
"""
# task waiting for deadline
self.tasks = []
# expected execution time of next scheduled task
self.next_index = -1
# index of next scheduled task in task list
self.next_time = -1.0
def update(self):
"""Called every time you want to update the schedule,
this should really be as often as possible (every frame, if from a loop).
Runs any over due task and prepares the next
Parameters:
- None
"""
# if a task is due, it is force run
if self.next_index >= 0 and time() > self.next_time:
# find the task in the list and force run it
task = self.tasks.pop(self.next_index)
self._force_run(task)
# reset next time and index
self.next_time = -1
self.next_index = -1
# if the task is reoccurring, it is set up again
if task['reoccurring']:
task['time'] = time() + task['delay']
task['run_now'] = False
self.tasks.append(task)
# set up the next time and index
self._prepare_next()
def schedule_task(self, task):
"""Adds a new task to the schedule.
Also triggers _prepare_next in case this task is expected earlier than the latest
Parameters:
- task: the task to run
"""
# add the task to the list
self.tasks.append(task)
# if it has the 'run_now' flag,
# force run it now too
if task['run_now']:
self._force_run(task)
# prepare next index and time,
# in case new task is earlier than one currently set
self._prepare_next()
def _force_run(self, task):
"""[Protected]
Force run is a way of bypassing thread restrictions enforced by the pool and its modules.
A temp worker is created, used and destroyed.
Parameters:
- task: the task to run
"""
# create a temp worker
worker = Worker(None)
# set the task within the worker
worker.set_task(task)
# run task on new thread
Thread(target=worker.run_task).start()
def _prepare_next(self):
"""[Protected]
Finds the item in the queue with the lowest timestamp and
records its expected execution time and index within queue
Parameters:
- None
"""
# for each task in the schedule
for task in self.tasks:
# if the task is already overdue,
# set it as next and skip the rest of search
if time() > task['time']:
self.next_index = self.tasks.index(task)
self.next_time = task['time']
break
# otherwise if there is no next task
# or this one is required sooner than the one currently set,
# set it but continue searching
elif self.next_index < 0 or task['time'] < self.next_time:
self.next_index = self.tasks.index(task)
self.next_time = task['time']

View File

@@ -0,0 +1,171 @@
# module and pool can be used for other stuff,
# so it may be useful to be able it import it from here...
from .queue import Queue
from .pool import Pool as Pool
# should only be used by TaskManager, so obfuscating name
from .scheduler import TaskScheduler as _TaskScheduler
# some standard stuff required
from threading import Thread
from time import time, sleep
class TaskManager(Pool):
"""TaskManager is responsible for executing a large number of task on behalf of a group of modules.
task can be appended in two different ways.
- Immediate: task is added to the back of the queue and run asap
- Scheduled: task is added to a schedule and executed at a given time (see TaskScheduler)
"""
# seconds to sleep when there is nothing to do
_SLEEP = 0.1
def set_thread_sleep(self, seconds):
self._SLEEP = seconds
def __init__(self):
"""Definitions:
- super: Pool
- scheduler: TaskScheduler instance
"""
# init super class (Pool)
super(TaskManager, self).__init__()
# instance of TaskScheduler to handle scheduled task
self.scheduler = _TaskScheduler()
def schedule_task(self, task):
"""Schedules a new task, see TaskScheduler.schedule_task
Parameters:
- The task to schedule
"""
# when adding a task directly,
# there is no need to specify the time
# it is always calculated this way
if 'time' not in task:
task['time'] = time() + delay
# call method in TaskScheduler instance
self.scheduler.schedule_task(task)
def on_update(self):
"""[Callback]
Called on every Pool frame,
runs overdue scheduled task and one from the queue
Parameters:
- None
"""
# prioritises any overdue scheduled task waiting to be executed
self.scheduler.update()
# loop through all task until we find one we can use
for task in self._queue:
# if the task has been added through a module,
# it must adhere to the module limit
if "module" in task:
# if we can find a worker on this module
this_worker = task['module'].get_worker()
if this_worker is not None:
# remove the task from the queue
self.remove(task)
# set the ask in the worker
this_worker.set_task(task)
# run the task on a new thread
Thread(target=this_worker.run_task).start()
# stop searching for usable task
break
# the task was added directly
# no limits to adhere to
else:
# create a temp worker
worker = Worker(None)
# set the task within the worker
worker.set_task(task)
# run task on new thread
Thread(target=worker.run_task).start()
# sleep thread based on state
self._sleep()
def on_clear(self, item):
"""[Callback]
Called on every Pool frame when _kill has been activated.
If the item can be processed, it is processed and True is returned.
Otherwise, if the item can not be processed, False is returned
Parameters:
- the latest item
"""
# if we can find a worker on this module
this_worker = item['module'].get_worker()
if this_worker is not None:
# remove the task from the queue
self.remove(item)
# set the ask in the worker
this_worker.set_task(item)
# run the task on a new thread
Thread(target=this_worker.run_task).start()
# return true to delete the item
return True
# the item could not be processed,
# return false to append to back of queue and try again
return False
def _sleep(self):
"""[Protected]
Used by on_update as an optimization technique.
If there are task to be processed, or an overdue scheduled task,
the thread will not sleep at all.
If the next scheduled task is less that the default sleep time,
it will sleep until the next scheduled task expected execution time.
Otherwise it will sleep for its default sleep time before running the loop again
Parameters:
- None
"""
# get the current time
time_now = time()
# if there are items to process, either in queue or schedule
if len(self._queue) > 0 or time_now > self.scheduler.next_time:
# do not sleep
return
# if the time to the next scheduled task is less than the default sleep time
elif self.scheduler.next_time - time_now < self._SLEEP:
# sleep for time to next scheduled task
sleep(self.scheduler.next_time - time_now)
# otherwise, if inactive
else:
# sleep for default sleep time
sleep(self._SLEEP)

View File

@@ -0,0 +1,82 @@
class Worker:
"""Worker instances are used by Modules to regulate their resource usage"""
def __init__(self, mod):
"""
Definition:
- module: the module 'master' of this worker
- task: the workers latest task at any given time
Parameters:
- mod: the module 'master' of this worker
"""
# set reference to the module 'master' of this worker
self.module = mod
# set current task to null
self.task = None
# if this is not a temp worker
if self.module is not None:
# register with the module master
self.module.inactive_workers.append(self)
def set_task(self, task):
"""Sets a task for the worker
Parameters:
- task: the task to be executed next
"""
# if this is not a temp worker
if self.module is not None:
# remove self from modules waiting list
self.module.inactive_workers.remove(self)
# set current task
self.task = task
def run_task(self):
"""Runs latest task
Parameters:
- task: None
"""
if self.task['kwargs'] is not None:
# call the task method with args and kwargs
data = self.task['method'](
*self.task['args'],
**self.task['kwargs']
)
# if there is a callback set
if self.task['callback'] and data is not None:
if type(self.task['callback']) == list or type(self.task['callback']) == tuple:
for c in self.task['callback']:
c(data)
else:
# call the callback with the return of method
self.task['callback'](data)
# set current task to None
self.task = None
# if this is not a temp module
if self.module is not None:
# register with module master
self.module.inactive_workers.append(self)