mirror of
https://github.com/aljazceru/recon-pipeline.git
synced 2025-12-20 07:44:26 +01:00
fixed lint getting caught on CI but not locally
This commit is contained in:
@@ -3,7 +3,7 @@ repos:
|
||||
rev: stable
|
||||
hooks:
|
||||
- id: black
|
||||
language_version: python3.7
|
||||
language_version: python3
|
||||
args: ['pipeline', 'tests/test_web', 'tests/test_recon', 'tests/test_shell', 'tests/test_models']
|
||||
- repo: https://gitlab.com/pycqa/flake8
|
||||
rev: 3.7.9
|
||||
|
||||
@@ -46,7 +46,7 @@ class DBManager:
|
||||
self.session.add(item)
|
||||
self.session.commit()
|
||||
except (sqlite3.IntegrityError, exc.IntegrityError):
|
||||
print(ansi.style(f"[-] unique key constraint handled, moving on...", fg="bright_white"))
|
||||
print(ansi.style("[-] unique key constraint handled, moving on...", fg="bright_white"))
|
||||
self.session.rollback()
|
||||
|
||||
def get_or_create_target_by_ip_or_hostname(self, ip_or_host):
|
||||
@@ -86,15 +86,13 @@ class DBManager:
|
||||
|
||||
def get_all_ipv4_addresses(self) -> list:
|
||||
""" Simple helper to return all ipv4 addresses from Target records """
|
||||
return [
|
||||
x[0] for x in self.session.query(IPAddress.ipv4_address).filter(IPAddress.ipv4_address != None)
|
||||
] # noqa: E711
|
||||
query = self.session.query(IPAddress.ipv4_address).filter(IPAddress.ipv4_address != None) # noqa: E711
|
||||
return [x[0] for x in query]
|
||||
|
||||
def get_all_ipv6_addresses(self) -> list:
|
||||
""" Simple helper to return all ipv6 addresses from Target records """
|
||||
return [
|
||||
x[0] for x in self.session.query(IPAddress.ipv6_address).filter(IPAddress.ipv6_address != None)
|
||||
] # noqa: E711
|
||||
query = self.session.query(IPAddress.ipv6_address).filter(IPAddress.ipv6_address != None) # noqa: E711
|
||||
return [x[0] for x in query]
|
||||
|
||||
def close(self):
|
||||
""" Simple helper to close the database session """
|
||||
|
||||
@@ -36,7 +36,7 @@ class NmapResult(Base):
|
||||
msg += f"{'=' * (len(ip_address) + len(self.service) + 3)}\n\n"
|
||||
msg += f"{self.port.protocol} port: {self.port.port_number} - {'open' if self.open else 'closed'} - {self.reason}\n"
|
||||
msg += f"product: {self.product} :: {self.product_version}\n"
|
||||
msg += f"nse script(s) output:\n"
|
||||
msg += "nse script(s) output:\n"
|
||||
|
||||
if nse_results is None:
|
||||
# add all nse scripts
|
||||
@@ -53,7 +53,7 @@ class NmapResult(Base):
|
||||
msg += "\n"
|
||||
|
||||
if commandline:
|
||||
msg += f"command used:\n"
|
||||
msg += "command used:\n"
|
||||
msg += f"{pad}{self.commandline}\n"
|
||||
|
||||
return msg
|
||||
|
||||
@@ -239,10 +239,10 @@ class ReconShell(cmd2.Cmd):
|
||||
answer = self.select([("Resume", option_one), ("Remove", option_two), ("Save", option_three)])
|
||||
|
||||
if answer == "Resume":
|
||||
self.poutput(style(f"[+] Resuming scan from last known good state.", fg="bright_green"))
|
||||
self.poutput(style("[+] Resuming scan from last known good state.", fg="bright_green"))
|
||||
elif answer == "Remove":
|
||||
shutil.rmtree(Path(directory))
|
||||
self.poutput(style(f"[+] Old directory removed, starting fresh scan.", fg="bright_green"))
|
||||
self.poutput(style("[+] Old directory removed, starting fresh scan.", fg="bright_green"))
|
||||
elif answer == "Save":
|
||||
current = time.strftime("%Y%m%d-%H%M%S")
|
||||
directory.rename(f"{directory}-{current}")
|
||||
@@ -262,7 +262,7 @@ class ReconShell(cmd2.Cmd):
|
||||
"""
|
||||
if self.db_mgr is None:
|
||||
return self.poutput(
|
||||
style(f"[!] You are not connected to a database; run database attach before scanning", fg="bright_red")
|
||||
style("[!] You are not connected to a database; run database attach before scanning", fg="bright_red")
|
||||
)
|
||||
|
||||
self.check_scan_directory(args.results_dir)
|
||||
@@ -438,7 +438,7 @@ class ReconShell(cmd2.Cmd):
|
||||
try:
|
||||
next(self.get_databases())
|
||||
except StopIteration:
|
||||
return self.poutput(style(f"[-] There are no databases.", fg="bright_white"))
|
||||
return self.poutput(style("[-] There are no databases.", fg="bright_white"))
|
||||
|
||||
for i, location in enumerate(self.get_databases(), start=1):
|
||||
self.poutput(style(f" {i}. {location}"))
|
||||
@@ -515,7 +515,7 @@ class ReconShell(cmd2.Cmd):
|
||||
def database_detach(self, args):
|
||||
""" Detach from the currently attached database """
|
||||
if self.db_mgr is None:
|
||||
return self.poutput(style(f"[!] you are not connected to a database", fg="magenta"))
|
||||
return self.poutput(style("[!] you are not connected to a database", fg="magenta"))
|
||||
|
||||
self.db_mgr.close()
|
||||
self.poutput(style(f"[*] detached from sqlite database @ {self.db_mgr.location}", fg="bright_yellow"))
|
||||
@@ -752,7 +752,7 @@ class ReconShell(cmd2.Cmd):
|
||||
def do_view(self, args):
|
||||
""" View results of completed scans """
|
||||
if self.db_mgr is None:
|
||||
return self.poutput(style(f"[!] you are not connected to a database", fg="magenta"))
|
||||
return self.poutput(style("[!] you are not connected to a database", fg="magenta"))
|
||||
|
||||
func = getattr(args, "func", None)
|
||||
|
||||
@@ -774,7 +774,7 @@ def main(
|
||||
if old_tools_dir.exists() and old_tools_dir.is_dir():
|
||||
# want to try and ensure a smooth transition for folks who have used the pipeline before from
|
||||
# v0.8.4 and below to v0.9.0+
|
||||
print(style(f"[*] Found remnants of an older version of recon-pipeline.", fg="bright_yellow"))
|
||||
print(style("[*] Found remnants of an older version of recon-pipeline.", fg="bright_yellow"))
|
||||
print(
|
||||
style(
|
||||
f"[*] It's {style('strongly', fg='red')} advised that you allow us to remove them.",
|
||||
@@ -803,7 +803,7 @@ def main(
|
||||
old_searchsploit_rc.unlink()
|
||||
print(style(f"[+] {old_searchsploit_rc} removed", fg="bright_green"))
|
||||
|
||||
print(style(f"[=] Please run the install all command to complete setup", fg="bright_blue"))
|
||||
print(style("[=] Please run the install all command to complete setup", fg="bright_blue"))
|
||||
|
||||
rs = ReconShell(persistent_history_file="~/.reconshell_history", persistent_history_length=10000)
|
||||
sys.exit(rs.cmdloop())
|
||||
|
||||
Reference in New Issue
Block a user