Merge pull request #184 from 0xtavian/master

fix: improve thread safety and execution logging with minimal changes
This commit is contained in:
Sajeeb Lohani (sml555 / prodigysml)
2025-04-19 17:07:37 +10:00
committed by GitHub
2 changed files with 27 additions and 28 deletions

View File

@@ -15,7 +15,7 @@ def task_queue_generator_func(arguments, output, repeat):
for i in range(repeat): for i in range(repeat):
tasks_iterator = tasks_generator_func() tasks_iterator = tasks_generator_func()
for task in tasks_iterator: for task in tasks_iterator:
output.terminal(Level.THREAD, task.name(), "Added to Queue") # output.terminal(Level.THREAD, task.name(), "Added to Queue")
yield task yield task
@@ -38,6 +38,7 @@ def main():
output, output,
arguments.sober, arguments.sober,
silent=arguments.silent, silent=arguments.silent,
output_helper=output
) )
pool.run() pool.run()

View File

@@ -1,10 +1,12 @@
import subprocess import subprocess
import os import os
import queue
import platform
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Event from multiprocessing import Event
from tqdm import tqdm from tqdm import tqdm
import platform from Interlace.lib.core.output import OutputHelper, Level
if platform.system().lower() == 'linux': if platform.system().lower() == 'linux':
shell = os.getenv("SHELL") if os.getenv("SHELL") else "/bin/sh" shell = os.getenv("SHELL") if os.getenv("SHELL") else "/bin/sh"
@@ -60,8 +62,7 @@ class Task(object):
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
encoding="utf-8", encoding="utf-8",
executable=shell) executable=shell)
out, _ = s.communicate() s.communicate()
return return
else: else:
s = subprocess.Popen(self.task, shell=True, s = subprocess.Popen(self.task, shell=True,
@@ -78,57 +79,54 @@ class Task(object):
class Worker(object): class Worker(object):
def __init__(self, task_queue, timeout, output, tq): def __init__(self, task_queue, timeout, output, tq, output_helper):
self.queue = task_queue self.queue = task_queue
self.timeout = timeout self.timeout = timeout
self.output = output self.output = output
self.tqdm = tq self.tqdm = tq
self.output_helper = output_helper
def __call__(self): def __call__(self):
queue = self.queue
while True: while True:
try: try:
task = next(queue) task = self.queue.get(timeout=1)
except queue.Empty:
return
self.output_helper.terminal(Level.THREAD, task.name(), "Added to Queue")
try:
if isinstance(self.tqdm, tqdm): if isinstance(self.tqdm, tqdm):
self.tqdm.update(1) self.tqdm.update(1)
# run task
task.run(self.tqdm) task.run(self.tqdm)
else: else:
task.run() task.run()
except StopIteration: except Exception as e:
break self.output_helper.terminal(Level.ERROR, task.name(), f"Task failed: {e}")
class Pool(object): class Pool(object):
def __init__(self, max_workers, task_queue, timeout, output, progress_bar, silent=False): def __init__(self, max_workers, task_queue, timeout, output, progress_bar, silent=False, output_helper=None):
# convert stdin input to integer
max_workers = int(max_workers) max_workers = int(max_workers)
# check if there are enough workers
if max_workers <= 0:
raise ValueError("Workers must be >= 1")
tasks_count = next(task_queue) tasks_count = next(task_queue)
# check if the queue is empty
if not tasks_count: if not tasks_count:
raise ValueError("The queue is empty") raise ValueError("The queue is empty")
self.queue = task_queue self.queue = queue.Queue()
for task in task_queue:
self.queue.put(task)
self.timeout = timeout self.timeout = timeout
self.output = output self.output = output
self.max_workers = min(tasks_count, max_workers) self.max_workers = min(tasks_count, max_workers)
if not progress_bar and not silent: self.output_helper = output_helper or OutputHelper()
self.tqdm = tqdm(total=tasks_count) self.tqdm = tqdm(total=tasks_count) if not progress_bar and not silent else True
else:
self.tqdm = True
def run(self): def run(self):
workers = [Worker(self.queue, self.timeout, self.output, self.tqdm) for w in range(self.max_workers)] workers = [Worker(self.queue, self.timeout, self.output, self.tqdm, self.output_helper)
for _ in range(self.max_workers)]
# run
with ThreadPoolExecutor(self.max_workers) as executors: with ThreadPoolExecutor(self.max_workers) as executors:
for worker in workers: for worker in workers:
executors.submit(worker) executors.submit(worker)
@@ -147,5 +145,5 @@ if __name__ == "__main__":
"sleep 9", "sleep 9",
"sleep 1", "sleep 1",
"echo 'Char!'"] "echo 'Char!'"]
p = Pool(4, tasks, 0, 0, True) p = Pool(4, iter([len(tasks)] + [Task(t) for t in tasks]), 0, 0, True, output_helper=OutputHelper())
p.run() p.run()