mirror of
https://github.com/aljazceru/recon-pipeline.git
synced 2025-12-22 16:54:19 +01:00
Add scan tests (#10)
* recon.targets tests added * restructured tests logically * fixed yaml error * fixed job names * recon.__init__ tests added * recon.config tests added * recon.amass.ParseAmassScan tests added * fixed test destined to fail on CI pipeline * testing amass partially complete this commit closes #6 and #8 updated existing tests to utilize new paths
This commit is contained in:
@@ -12,7 +12,9 @@ from pathlib import Path
|
||||
__version__ = "0.7.3"
|
||||
|
||||
# fix up the PYTHONPATH so we can simply execute the shell from wherever in the filesystem
|
||||
os.environ["PYTHONPATH"] = f"{os.environ.get('PYTHONPATH')}:{str(Path(__file__).parent.resolve())}"
|
||||
os.environ[
|
||||
"PYTHONPATH"
|
||||
] = f"{os.environ.get('PYTHONPATH')}:{str(Path(__file__).parent.resolve())}"
|
||||
|
||||
# suppress "You should consider upgrading via the 'pip install --upgrade pip' command." warning
|
||||
os.environ["PIP_DISABLE_PIP_VERSION_CHECK"] = "1"
|
||||
@@ -113,7 +115,9 @@ class ReconShell(cmd2.Cmd):
|
||||
|
||||
words = output.split()
|
||||
|
||||
self.async_alert(style(f"[-] {words[5].split('_')[0]} queued", fg="bright_white"))
|
||||
self.async_alert(
|
||||
style(f"[-] {words[5].split('_')[0]} queued", fg="bright_white")
|
||||
)
|
||||
elif output.startswith("INFO: ") and "running" in output:
|
||||
# luigi Task is currently running
|
||||
|
||||
@@ -130,7 +134,9 @@ class ReconShell(cmd2.Cmd):
|
||||
|
||||
words = output.split()
|
||||
|
||||
self.async_alert(style(f"[+] {words[5].split('_')[0]} complete!", fg="bright_green"))
|
||||
self.async_alert(
|
||||
style(f"[+] {words[5].split('_')[0]} complete!", fg="bright_green")
|
||||
)
|
||||
|
||||
@cmd2.with_argparser(scan_parser)
|
||||
def do_scan(self, args):
|
||||
@@ -166,10 +172,14 @@ class ReconShell(cmd2.Cmd):
|
||||
subprocess.run(command)
|
||||
else:
|
||||
# suppress luigi messages in favor of less verbose/cleaner output
|
||||
proc = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
proc = subprocess.Popen(
|
||||
command, stderr=subprocess.PIPE, stdout=subprocess.PIPE
|
||||
)
|
||||
|
||||
# add stderr to the selector loop for processing when there's something to read from the fd
|
||||
selector.register(proc.stderr, selectors.EVENT_READ, self._luigi_pretty_printer)
|
||||
selector.register(
|
||||
proc.stderr, selectors.EVENT_READ, self._luigi_pretty_printer
|
||||
)
|
||||
|
||||
@cmd2.with_argparser(install_parser)
|
||||
def do_install(self, args):
|
||||
@@ -220,14 +230,18 @@ class ReconShell(cmd2.Cmd):
|
||||
self.do_install(dependency)
|
||||
|
||||
if tools.get(args.tool).get("installed"):
|
||||
return self.async_alert(style(f"[!] {args.tool} is already installed.", fg="yellow"))
|
||||
return self.async_alert(
|
||||
style(f"[!] {args.tool} is already installed.", fg="yellow")
|
||||
)
|
||||
else:
|
||||
|
||||
# list of return values from commands run during each tool installation
|
||||
# used to determine whether the tool installed correctly or not
|
||||
retvals = list()
|
||||
|
||||
self.async_alert(style(f"[*] Installing {args.tool}...", fg="bright_yellow"))
|
||||
self.async_alert(
|
||||
style(f"[*] Installing {args.tool}...", fg="bright_yellow")
|
||||
)
|
||||
|
||||
for command in tools.get(args.tool).get("commands"):
|
||||
# run all commands required to install the tool
|
||||
@@ -239,13 +253,18 @@ class ReconShell(cmd2.Cmd):
|
||||
|
||||
# go tools use subshells (cmd1 && cmd2 && cmd3 ...) during install, so need shell=True
|
||||
proc = subprocess.Popen(
|
||||
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
command,
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
else:
|
||||
|
||||
# "normal" command, split up the string as usual and run it
|
||||
proc = subprocess.Popen(
|
||||
shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
shlex.split(command),
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
|
||||
out, err = proc.communicate()
|
||||
|
||||
Reference in New Issue
Block a user