mirror of
https://github.com/codingo/Interlace.git
synced 2026-01-04 23:54:24 +01:00
Threading base
This commit is contained in:
@@ -2,6 +2,16 @@
|
|||||||
import sys
|
import sys
|
||||||
from lib.core.input import InputParser, InputHelper
|
from lib.core.input import InputParser, InputHelper
|
||||||
from lib.core.output import OutputHelper, Level
|
from lib.core.output import OutputHelper, Level
|
||||||
|
from lib.core.threader import Pool
|
||||||
|
|
||||||
|
|
||||||
|
def build_queue(arguments, output):
|
||||||
|
queue = ""
|
||||||
|
for target in InputHelper.process_targets(arguments):
|
||||||
|
for command in InputHelper.process_commands(arguments):
|
||||||
|
output.terminal(Level.VERBOSE, target, command, "Added to Queue")
|
||||||
|
queue += command
|
||||||
|
return queue
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@@ -11,9 +21,10 @@ def main():
|
|||||||
|
|
||||||
output.print_banner()
|
output.print_banner()
|
||||||
|
|
||||||
for target in InputHelper.process_targets(arguments):
|
#pool = Pool(arguments.threads, )
|
||||||
for command in InputHelper.process_commands(arguments):
|
pool = build_queue(arguments, output)
|
||||||
output.terminal(Level.THREAD, target, command)
|
print(pool)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
@@ -35,7 +35,7 @@ class InputHelper(object):
|
|||||||
commands = set()
|
commands = set()
|
||||||
|
|
||||||
if arguments.command:
|
if arguments.command:
|
||||||
commands.add(arguments.target)
|
commands.add(arguments.command)
|
||||||
else:
|
else:
|
||||||
for command in arguments.command_list:
|
for command in arguments.command_list:
|
||||||
commands.add(command.strip())
|
commands.add(command.strip())
|
||||||
@@ -69,11 +69,19 @@ class InputParser(object):
|
|||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'-threads', dest='threads', required=False,
|
'-threads', dest='threads', required=False,
|
||||||
help="Specify the maximum number of threads to run (DEFAULT:5).",
|
help="Specify the maximum number of threads to run (DEFAULT:5)",
|
||||||
default=5,
|
default=5,
|
||||||
type=lambda x: InputHelper.check_positive(parser, x)
|
type=lambda x: InputHelper.check_positive(parser, x)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-timeout', dest='timeout', required=False,
|
||||||
|
help="Command timeout in seconds (DEFAULT:600)",
|
||||||
|
default=600,
|
||||||
|
type=lambda x: InputHelper.check_positive(parser, x)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
commands = parser.add_mutually_exclusive_group(required=True)
|
commands = parser.add_mutually_exclusive_group(required=True)
|
||||||
commands.add_argument(
|
commands.add_argument(
|
||||||
'-c', dest='command',
|
'-c', dest='command',
|
||||||
|
|||||||
46
Interlace/lib/core/threader.py
Normal file
46
Interlace/lib/core/threader.py
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
import threading
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
|
class Worker(object):
|
||||||
|
def __init__(self, pool, output, timeout):
|
||||||
|
self.pool = pool
|
||||||
|
self.output = output
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
def __call__(self, task):
|
||||||
|
self.run_task(task, self.timeout)
|
||||||
|
self.pool.workers.add(self)
|
||||||
|
|
||||||
|
def run_task(self, task, timeout):
|
||||||
|
try:
|
||||||
|
subprocess.run(task)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
self.output.terminal(3, "", task, message="Timeout when running %s" % task)
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
self.output.terminal(3, "", task, message="Process error when running %s"
|
||||||
|
% task)
|
||||||
|
|
||||||
|
|
||||||
|
class Pool(object):
|
||||||
|
def __init__(self, max_workers, queue, timeout):
|
||||||
|
self.queue = queue
|
||||||
|
self.workers = [Worker(self) for w in range(max_workers)]
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
def run(self, command):
|
||||||
|
while True:
|
||||||
|
|
||||||
|
# make sure resources are available
|
||||||
|
if not self.queue or not self.workers:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# get a worker
|
||||||
|
worker = self.workers.pop(0)
|
||||||
|
|
||||||
|
# get task from queue
|
||||||
|
task = self.queue.pop(0)
|
||||||
|
|
||||||
|
# run
|
||||||
|
thread = threading.Thread(worker, args=(task, self.output, self.timeout))
|
||||||
|
thread.start()
|
||||||
@@ -1,123 +0,0 @@
|
|||||||
|
|
||||||
from threading import Thread
|
|
||||||
|
|
||||||
|
|
||||||
class Loop:
|
|
||||||
|
|
||||||
"""A simple loop, with some callbacks for:
|
|
||||||
on_start, on_stop and on_update"""
|
|
||||||
|
|
||||||
# the current state of the pool
|
|
||||||
_state = 0
|
|
||||||
|
|
||||||
# should the queue clear on exit
|
|
||||||
_clear = False
|
|
||||||
|
|
||||||
# kills the thread if true
|
|
||||||
_kill = False
|
|
||||||
|
|
||||||
# true if the pool thread is running
|
|
||||||
_has_thread = False
|
|
||||||
|
|
||||||
def has_thread(self):
|
|
||||||
|
|
||||||
"""Returns true if the pool thread is running
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
return self._has_thread
|
|
||||||
|
|
||||||
def state(self):
|
|
||||||
|
|
||||||
"""returns the latest state of the pool
|
|
||||||
- 0 = off
|
|
||||||
- 1 = running
|
|
||||||
- 2 = stopping
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
return self._state
|
|
||||||
|
|
||||||
def start_thread(self):
|
|
||||||
|
|
||||||
"""Starts the pool thread if it is not already started
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# check there is not a thread already running
|
|
||||||
if self._has_thread is False and self._kill is False:
|
|
||||||
|
|
||||||
# start the pool thread
|
|
||||||
Thread(target=self._thread, daemon=True).start()
|
|
||||||
|
|
||||||
def stop_thread(self):
|
|
||||||
|
|
||||||
"""Stops the pool thread if it is running
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# check that the pool thread is running
|
|
||||||
if self._kill is False and self._has_thread is True:
|
|
||||||
|
|
||||||
# set kill trigger
|
|
||||||
self._kill = True
|
|
||||||
|
|
||||||
def _thread(self):
|
|
||||||
|
|
||||||
"""[Protected]
|
|
||||||
The main pool thread.
|
|
||||||
Updates schedule and executes task in queue
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# the pool now has a running thread
|
|
||||||
self._has_thread = True
|
|
||||||
|
|
||||||
# state is 1 (running)
|
|
||||||
self._state = 1
|
|
||||||
|
|
||||||
self.on_start()
|
|
||||||
|
|
||||||
# while the kill flag is not set
|
|
||||||
while self._kill is False:
|
|
||||||
|
|
||||||
# call virtual update
|
|
||||||
self.on_update()
|
|
||||||
|
|
||||||
# state is now 0 (stopped)
|
|
||||||
self.state = 0
|
|
||||||
|
|
||||||
self.on_stop()
|
|
||||||
|
|
||||||
# reset the kill flag
|
|
||||||
self._kill = False
|
|
||||||
|
|
||||||
# the pool no longer has a running thread
|
|
||||||
self._has_thread = False
|
|
||||||
|
|
||||||
def on_start(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def on_stop(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def on_update(self):
|
|
||||||
|
|
||||||
"""Overridable method.
|
|
||||||
Called on every loop update.
|
|
||||||
no return
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
@@ -1,107 +0,0 @@
|
|||||||
|
|
||||||
from .queue import Queue
|
|
||||||
from .loop import Loop
|
|
||||||
|
|
||||||
|
|
||||||
class Pool(Queue, Loop):
|
|
||||||
|
|
||||||
"""Pool is a loop that processes a Queue"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
|
|
||||||
"""Definitions:
|
|
||||||
- None
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# init super class Queue
|
|
||||||
super(Pool, self).__init__()
|
|
||||||
|
|
||||||
def stop_thread(self, clear_queue=False):
|
|
||||||
|
|
||||||
"""Stops the pool thread if it is running
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# check that the pool thread is running
|
|
||||||
if self._kill is False and self._has_thread is True:
|
|
||||||
|
|
||||||
# set kill trigger
|
|
||||||
self._kill = True
|
|
||||||
|
|
||||||
# set clear flag
|
|
||||||
self._clear = clear_queue
|
|
||||||
|
|
||||||
def _thread(self):
|
|
||||||
|
|
||||||
"""[Protected]
|
|
||||||
The main pool thread.
|
|
||||||
Updates schedule and executes task in queue
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# the pool now has a running thread
|
|
||||||
self._has_thread = True
|
|
||||||
|
|
||||||
# state is 1 (running)
|
|
||||||
self._state = 1
|
|
||||||
|
|
||||||
# while the kill flag is not set
|
|
||||||
while self._kill is False:
|
|
||||||
|
|
||||||
# call virtual update
|
|
||||||
self.on_update()
|
|
||||||
|
|
||||||
# once the loop exits,
|
|
||||||
# state is 2 (stopping)
|
|
||||||
self.state = 2
|
|
||||||
|
|
||||||
# if the clear flag is set and there are items still to be processed
|
|
||||||
while self._clear and self.length() > 0:
|
|
||||||
|
|
||||||
# dequeue the first item
|
|
||||||
item = self.dequeue()
|
|
||||||
|
|
||||||
# try to execute it
|
|
||||||
if not self.on_clear(item):
|
|
||||||
|
|
||||||
# if execution not possible append to back and try again
|
|
||||||
self.enqueue(item)
|
|
||||||
|
|
||||||
# state is now 0 (stopped)
|
|
||||||
self.state = 0
|
|
||||||
|
|
||||||
# reset the kill flag
|
|
||||||
self._kill = False
|
|
||||||
|
|
||||||
# the pool no longer has a running thread
|
|
||||||
self._has_thread = False
|
|
||||||
|
|
||||||
def on_update(self):
|
|
||||||
|
|
||||||
"""Overridable method.
|
|
||||||
Called on every loop update.
|
|
||||||
no return
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def on_clear(self, item):
|
|
||||||
|
|
||||||
"""Overridable method.
|
|
||||||
Called every loop while thread is stopping until queue is empty.
|
|
||||||
Runs item and return True and item will be removed from queue.
|
|
||||||
Returns False if item was not run and item will be appended to back of queue
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- item: the latest item being processed
|
|
||||||
"""
|
|
||||||
return True
|
|
||||||
@@ -1,116 +0,0 @@
|
|||||||
|
|
||||||
|
|
||||||
class Queue:
|
|
||||||
|
|
||||||
"""Queue was intended as a c# style queue"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
|
|
||||||
"""Definitions:
|
|
||||||
- self._queue: the queue of items
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# the queue of items
|
|
||||||
self._queue = []
|
|
||||||
|
|
||||||
def enqueue(self, item):
|
|
||||||
|
|
||||||
"""Adds a new item to the back of the queue"""
|
|
||||||
|
|
||||||
# append item to back of queue
|
|
||||||
self._queue.append(item)
|
|
||||||
|
|
||||||
def dequeue(self):
|
|
||||||
|
|
||||||
"""Removes an item from the beginning of the queue and returns it.
|
|
||||||
Returns none if the queue is empty
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# if the queue length is more than zero get the first item
|
|
||||||
# otherwise return None
|
|
||||||
return self._queue.pop(0) if len(self._queue) > 0 else None
|
|
||||||
|
|
||||||
def dequeue_at(self, index):
|
|
||||||
"""Removes an item from the queue at a given index and returns it.
|
|
||||||
Returns none if the queue is empty
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- index: the index of the item you wish to remove and return
|
|
||||||
"""
|
|
||||||
|
|
||||||
# if the queue length is longer than the given index, return item at index,
|
|
||||||
# otherwise return None
|
|
||||||
return self._queue.pop(index) if len(self._queue) > index else None
|
|
||||||
|
|
||||||
def remove(self, item):
|
|
||||||
|
|
||||||
"""Removes an item from the queue
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- item: the item you wish to remove and return
|
|
||||||
"""
|
|
||||||
|
|
||||||
# remove the item from the list
|
|
||||||
self._queue.remove(item)
|
|
||||||
|
|
||||||
def length(self):
|
|
||||||
|
|
||||||
"""Returns the integer number of items in the queue
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# return the number of items in the queue
|
|
||||||
return len(self._queue)
|
|
||||||
|
|
||||||
def items(self):
|
|
||||||
|
|
||||||
"""A get property for the queue
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# returns the queue
|
|
||||||
return self._queue
|
|
||||||
|
|
||||||
def wipe(self):
|
|
||||||
|
|
||||||
"""Deletes every item in the queue immediately
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# create a fresh queue
|
|
||||||
self._queue = []
|
|
||||||
|
|
||||||
def index_of(self, item):
|
|
||||||
|
|
||||||
"""Returns the index of a given item
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- The item you wish to retrieve an index for
|
|
||||||
"""
|
|
||||||
|
|
||||||
# return the index of item
|
|
||||||
return self._queue.index(item)
|
|
||||||
|
|
||||||
def index(self, index):
|
|
||||||
|
|
||||||
"""Returns the item at a given index
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- The index of the item you wish to retrieve
|
|
||||||
"""
|
|
||||||
|
|
||||||
# return queue item at index
|
|
||||||
return self._queue[index]
|
|
||||||
|
|
||||||
@@ -1,140 +0,0 @@
|
|||||||
|
|
||||||
# requires .worker for 'force run' technique
|
|
||||||
from .worker import Worker
|
|
||||||
|
|
||||||
# some standard stuff required
|
|
||||||
from threading import Thread
|
|
||||||
from time import time
|
|
||||||
|
|
||||||
|
|
||||||
class TaskScheduler:
|
|
||||||
|
|
||||||
"""TaskScheduler is intended to be an extension to the TaskManager class.
|
|
||||||
It allows task to be added to a queue and executed based on their time deadline,
|
|
||||||
(as opposed to the order they sit in the queue).
|
|
||||||
|
|
||||||
Scheduled task have the same options as regular ones, with a few extras.
|
|
||||||
- time: the expected time of execution.
|
|
||||||
- reoccurring: should the task be triggered again?
|
|
||||||
- delay: will cause the task to be triggered every <delay> seconds.
|
|
||||||
- run_now: should the task be run immediately (as well).
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
|
|
||||||
"""Definitions:
|
|
||||||
- self.task: the list of scheduled task
|
|
||||||
- self.next_index: the task list index of the next scheduled task
|
|
||||||
- self.next_time: the expected execution time of the next scheduled task
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# task waiting for deadline
|
|
||||||
self.tasks = []
|
|
||||||
|
|
||||||
# expected execution time of next scheduled task
|
|
||||||
self.next_index = -1
|
|
||||||
|
|
||||||
# index of next scheduled task in task list
|
|
||||||
self.next_time = -1.0
|
|
||||||
|
|
||||||
def update(self):
|
|
||||||
|
|
||||||
"""Called every time you want to update the schedule,
|
|
||||||
this should really be as often as possible (every frame, if from a loop).
|
|
||||||
|
|
||||||
Runs any over due task and prepares the next
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# if a task is due, it is force run
|
|
||||||
if self.next_index >= 0 and time() > self.next_time:
|
|
||||||
|
|
||||||
# find the task in the list and force run it
|
|
||||||
task = self.tasks.pop(self.next_index)
|
|
||||||
self._force_run(task)
|
|
||||||
|
|
||||||
# reset next time and index
|
|
||||||
self.next_time = -1
|
|
||||||
self.next_index = -1
|
|
||||||
|
|
||||||
# if the task is reoccurring, it is set up again
|
|
||||||
if task['reoccurring']:
|
|
||||||
task['time'] = time() + task['delay']
|
|
||||||
task['run_now'] = False
|
|
||||||
self.tasks.append(task)
|
|
||||||
|
|
||||||
# set up the next time and index
|
|
||||||
self._prepare_next()
|
|
||||||
|
|
||||||
def schedule_task(self, task):
|
|
||||||
|
|
||||||
"""Adds a new task to the schedule.
|
|
||||||
Also triggers _prepare_next in case this task is expected earlier than the latest
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- task: the task to run
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
# add the task to the list
|
|
||||||
self.tasks.append(task)
|
|
||||||
|
|
||||||
# if it has the 'run_now' flag,
|
|
||||||
# force run it now too
|
|
||||||
if task['run_now']:
|
|
||||||
self._force_run(task)
|
|
||||||
|
|
||||||
# prepare next index and time,
|
|
||||||
# in case new task is earlier than one currently set
|
|
||||||
self._prepare_next()
|
|
||||||
|
|
||||||
def _force_run(self, task):
|
|
||||||
|
|
||||||
"""[Protected]
|
|
||||||
Force run is a way of bypassing thread restrictions enforced by the pool and its modules.
|
|
||||||
A temp worker is created, used and destroyed.
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- task: the task to run
|
|
||||||
"""
|
|
||||||
|
|
||||||
# create a temp worker
|
|
||||||
worker = Worker(None)
|
|
||||||
|
|
||||||
# set the task within the worker
|
|
||||||
worker.set_task(task)
|
|
||||||
|
|
||||||
# run task on new thread
|
|
||||||
Thread(target=worker.run_task).start()
|
|
||||||
|
|
||||||
def _prepare_next(self):
|
|
||||||
|
|
||||||
"""[Protected]
|
|
||||||
Finds the item in the queue with the lowest timestamp and
|
|
||||||
records its expected execution time and index within queue
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# for each task in the schedule
|
|
||||||
for task in self.tasks:
|
|
||||||
|
|
||||||
# if the task is already overdue,
|
|
||||||
# set it as next and skip the rest of search
|
|
||||||
if time() > task['time']:
|
|
||||||
self.next_index = self.tasks.index(task)
|
|
||||||
self.next_time = task['time']
|
|
||||||
break
|
|
||||||
|
|
||||||
# otherwise if there is no next task
|
|
||||||
# or this one is required sooner than the one currently set,
|
|
||||||
# set it but continue searching
|
|
||||||
elif self.next_index < 0 or task['time'] < self.next_time:
|
|
||||||
self.next_index = self.tasks.index(task)
|
|
||||||
self.next_time = task['time']
|
|
||||||
@@ -1,171 +0,0 @@
|
|||||||
# module and pool can be used for other stuff,
|
|
||||||
# so it may be useful to be able it import it from here...
|
|
||||||
from .queue import Queue
|
|
||||||
from .pool import Pool as Pool
|
|
||||||
|
|
||||||
# should only be used by TaskManager, so obfuscating name
|
|
||||||
from .scheduler import TaskScheduler as _TaskScheduler
|
|
||||||
|
|
||||||
# some standard stuff required
|
|
||||||
from threading import Thread
|
|
||||||
from time import time, sleep
|
|
||||||
|
|
||||||
|
|
||||||
class TaskManager(Pool):
|
|
||||||
"""TaskManager is responsible for executing a large number of task on behalf of a group of modules.
|
|
||||||
task can be appended in two different ways.
|
|
||||||
- Immediate: task is added to the back of the queue and run asap
|
|
||||||
- Scheduled: task is added to a schedule and executed at a given time (see TaskScheduler)
|
|
||||||
"""
|
|
||||||
|
|
||||||
# seconds to sleep when there is nothing to do
|
|
||||||
_SLEEP = 0.1
|
|
||||||
|
|
||||||
def set_thread_sleep(self, seconds):
|
|
||||||
self._SLEEP = seconds
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
|
|
||||||
"""Definitions:
|
|
||||||
- super: Pool
|
|
||||||
- scheduler: TaskScheduler instance
|
|
||||||
"""
|
|
||||||
|
|
||||||
# init super class (Pool)
|
|
||||||
super(TaskManager, self).__init__()
|
|
||||||
|
|
||||||
# instance of TaskScheduler to handle scheduled task
|
|
||||||
self.scheduler = _TaskScheduler()
|
|
||||||
|
|
||||||
def schedule_task(self, task):
|
|
||||||
|
|
||||||
"""Schedules a new task, see TaskScheduler.schedule_task
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- The task to schedule
|
|
||||||
"""
|
|
||||||
|
|
||||||
# when adding a task directly,
|
|
||||||
# there is no need to specify the time
|
|
||||||
# it is always calculated this way
|
|
||||||
if 'time' not in task:
|
|
||||||
task['time'] = time() + delay
|
|
||||||
|
|
||||||
# call method in TaskScheduler instance
|
|
||||||
self.scheduler.schedule_task(task)
|
|
||||||
|
|
||||||
def on_update(self):
|
|
||||||
|
|
||||||
"""[Callback]
|
|
||||||
Called on every Pool frame,
|
|
||||||
runs overdue scheduled task and one from the queue
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# prioritises any overdue scheduled task waiting to be executed
|
|
||||||
self.scheduler.update()
|
|
||||||
|
|
||||||
# loop through all task until we find one we can use
|
|
||||||
for task in self._queue:
|
|
||||||
|
|
||||||
# if the task has been added through a module,
|
|
||||||
# it must adhere to the module limit
|
|
||||||
if "module" in task:
|
|
||||||
|
|
||||||
# if we can find a worker on this module
|
|
||||||
this_worker = task['module'].get_worker()
|
|
||||||
if this_worker is not None:
|
|
||||||
# remove the task from the queue
|
|
||||||
self.remove(task)
|
|
||||||
|
|
||||||
# set the ask in the worker
|
|
||||||
this_worker.set_task(task)
|
|
||||||
|
|
||||||
# run the task on a new thread
|
|
||||||
Thread(target=this_worker.run_task).start()
|
|
||||||
|
|
||||||
# stop searching for usable task
|
|
||||||
break
|
|
||||||
|
|
||||||
# the task was added directly
|
|
||||||
# no limits to adhere to
|
|
||||||
else:
|
|
||||||
|
|
||||||
# create a temp worker
|
|
||||||
worker = Worker(None)
|
|
||||||
|
|
||||||
# set the task within the worker
|
|
||||||
worker.set_task(task)
|
|
||||||
|
|
||||||
# run task on new thread
|
|
||||||
Thread(target=worker.run_task).start()
|
|
||||||
|
|
||||||
# sleep thread based on state
|
|
||||||
self._sleep()
|
|
||||||
|
|
||||||
def on_clear(self, item):
|
|
||||||
|
|
||||||
"""[Callback]
|
|
||||||
Called on every Pool frame when _kill has been activated.
|
|
||||||
If the item can be processed, it is processed and True is returned.
|
|
||||||
Otherwise, if the item can not be processed, False is returned
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- the latest item
|
|
||||||
"""
|
|
||||||
|
|
||||||
# if we can find a worker on this module
|
|
||||||
this_worker = item['module'].get_worker()
|
|
||||||
if this_worker is not None:
|
|
||||||
# remove the task from the queue
|
|
||||||
self.remove(item)
|
|
||||||
|
|
||||||
# set the ask in the worker
|
|
||||||
this_worker.set_task(item)
|
|
||||||
|
|
||||||
# run the task on a new thread
|
|
||||||
Thread(target=this_worker.run_task).start()
|
|
||||||
|
|
||||||
# return true to delete the item
|
|
||||||
return True
|
|
||||||
|
|
||||||
# the item could not be processed,
|
|
||||||
# return false to append to back of queue and try again
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _sleep(self):
|
|
||||||
|
|
||||||
"""[Protected]
|
|
||||||
Used by on_update as an optimization technique.
|
|
||||||
If there are task to be processed, or an overdue scheduled task,
|
|
||||||
the thread will not sleep at all.
|
|
||||||
If the next scheduled task is less that the default sleep time,
|
|
||||||
it will sleep until the next scheduled task expected execution time.
|
|
||||||
Otherwise it will sleep for its default sleep time before running the loop again
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- None
|
|
||||||
"""
|
|
||||||
|
|
||||||
# get the current time
|
|
||||||
time_now = time()
|
|
||||||
|
|
||||||
# if there are items to process, either in queue or schedule
|
|
||||||
if len(self._queue) > 0 or time_now > self.scheduler.next_time:
|
|
||||||
|
|
||||||
# do not sleep
|
|
||||||
return
|
|
||||||
|
|
||||||
# if the time to the next scheduled task is less than the default sleep time
|
|
||||||
elif self.scheduler.next_time - time_now < self._SLEEP:
|
|
||||||
|
|
||||||
# sleep for time to next scheduled task
|
|
||||||
sleep(self.scheduler.next_time - time_now)
|
|
||||||
|
|
||||||
# otherwise, if inactive
|
|
||||||
else:
|
|
||||||
|
|
||||||
# sleep for default sleep time
|
|
||||||
sleep(self._SLEEP)
|
|
||||||
@@ -1,82 +0,0 @@
|
|||||||
|
|
||||||
|
|
||||||
class Worker:
|
|
||||||
|
|
||||||
"""Worker instances are used by Modules to regulate their resource usage"""
|
|
||||||
|
|
||||||
def __init__(self, mod):
|
|
||||||
|
|
||||||
"""
|
|
||||||
Definition:
|
|
||||||
- module: the module 'master' of this worker
|
|
||||||
- task: the workers latest task at any given time
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- mod: the module 'master' of this worker
|
|
||||||
"""
|
|
||||||
|
|
||||||
# set reference to the module 'master' of this worker
|
|
||||||
self.module = mod
|
|
||||||
|
|
||||||
# set current task to null
|
|
||||||
self.task = None
|
|
||||||
|
|
||||||
# if this is not a temp worker
|
|
||||||
if self.module is not None:
|
|
||||||
|
|
||||||
# register with the module master
|
|
||||||
self.module.inactive_workers.append(self)
|
|
||||||
|
|
||||||
def set_task(self, task):
|
|
||||||
|
|
||||||
"""Sets a task for the worker
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- task: the task to be executed next
|
|
||||||
"""
|
|
||||||
|
|
||||||
# if this is not a temp worker
|
|
||||||
if self.module is not None:
|
|
||||||
|
|
||||||
# remove self from modules waiting list
|
|
||||||
self.module.inactive_workers.remove(self)
|
|
||||||
|
|
||||||
# set current task
|
|
||||||
self.task = task
|
|
||||||
|
|
||||||
def run_task(self):
|
|
||||||
|
|
||||||
"""Runs latest task
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
- task: None
|
|
||||||
"""
|
|
||||||
|
|
||||||
if self.task['kwargs'] is not None:
|
|
||||||
# call the task method with args and kwargs
|
|
||||||
data = self.task['method'](
|
|
||||||
*self.task['args'],
|
|
||||||
**self.task['kwargs']
|
|
||||||
)
|
|
||||||
|
|
||||||
# if there is a callback set
|
|
||||||
if self.task['callback'] and data is not None:
|
|
||||||
|
|
||||||
if type(self.task['callback']) == list or type(self.task['callback']) == tuple:
|
|
||||||
|
|
||||||
for c in self.task['callback']:
|
|
||||||
c(data)
|
|
||||||
|
|
||||||
else:
|
|
||||||
|
|
||||||
# call the callback with the return of method
|
|
||||||
self.task['callback'](data)
|
|
||||||
|
|
||||||
# set current task to None
|
|
||||||
self.task = None
|
|
||||||
|
|
||||||
# if this is not a temp module
|
|
||||||
if self.module is not None:
|
|
||||||
|
|
||||||
# register with module master
|
|
||||||
self.module.inactive_workers.append(self)
|
|
||||||
@@ -2,10 +2,3 @@
|
|||||||
A threading management application that allows controlled execution of multiple commands, over multiple targets.
|
A threading management application that allows controlled execution of multiple commands, over multiple targets.
|
||||||
|
|
||||||
[](https://www.python.org/) [](https://www.gnu.org/licenses/gpl-3.0.en.html) [](https://travis-ci.org/codingo/Reconnoitre) [](https://twitter.com/codingo_)
|
[](https://www.python.org/) [](https://www.gnu.org/licenses/gpl-3.0.en.html) [](https://travis-ci.org/codingo/Reconnoitre) [](https://twitter.com/codingo_)
|
||||||
|
|
||||||
## Scratch
|
|
||||||
|
|
||||||
Read hostnames.txt - create loop
|
|
||||||
Read commands.txt - replace $target, $port, etc', create loop
|
|
||||||
|
|
||||||
Specify threads to run based on max threads - output as each command finishes.
|
|
||||||
|
|||||||
Reference in New Issue
Block a user