diff --git a/pipeline/recon-pipeline.py b/pipeline/recon-pipeline.py index d7e80df..fe643b1 100755 --- a/pipeline/recon-pipeline.py +++ b/pipeline/recon-pipeline.py @@ -298,10 +298,14 @@ class ReconShell(cmd2.Cmd): # get_scans() returns mapping of {classname: [modulename, ...]} in the recon module # each classname corresponds to a potential recon-pipeline command, i.e. AmassScan, GobusterScan ... scans = get_scans() - # command is a list that will end up looking something like what's below # luigi --module pipeline.recon.web.webanalyze WebanalyzeScan --target abc.com --top-ports 100 --interface eth0 - command = ["luigi", "--module", scans.get(args.scantype)[0]] + try: + command = ["luigi", "--module", scans.get(args.scantype)[0]] + except TypeError: + return self.poutput( + style(f"[!] {args.scantype} or one of its dependencies is not installed", fg="bright_red") + ) tgt_file_path = None if args.target: diff --git a/pipeline/recon/amass.py b/pipeline/recon/amass.py index 04382a5..89f5ace 100644 --- a/pipeline/recon/amass.py +++ b/pipeline/recon/amass.py @@ -9,7 +9,7 @@ from luigi.contrib.sqla import SQLAlchemyTarget import pipeline.models.db_manager from ..tools import tools from .targets import TargetList -from .helpers import get_tool_state +from .helpers import meets_requirements from ..models.target_model import Target @@ -43,21 +43,14 @@ class AmassScan(luigi.Task): """ exempt_list = luigi.Parameter(default="") + requirements = ["go", "amass"] + exception = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.db_mgr = pipeline.models.db_manager.DBManager(db_location=self.db_location) self.results_subfolder = (Path(self.results_dir) / "amass-results").expanduser().resolve() - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["amass"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) - def requires(self): """ AmassScan depends on TargetList to run. @@ -66,6 +59,7 @@ class AmassScan(luigi.Task): Returns: luigi.ExternalTask - TargetList """ + meets_requirements(self.requirements, self.exception) args = {"target_file": self.target_file, "results_dir": self.results_dir, "db_location": self.db_location} return TargetList(**args) @@ -89,7 +83,6 @@ class AmassScan(luigi.Task): Returns: list: list of options/arguments, beginning with the name of the executable to run """ - self.results_subfolder.mkdir(parents=True, exist_ok=True) hostnames = self.db_mgr.get_all_hostnames() diff --git a/pipeline/recon/helpers.py b/pipeline/recon/helpers.py index bc646fd..ed19c51 100644 --- a/pipeline/recon/helpers.py +++ b/pipeline/recon/helpers.py @@ -6,11 +6,29 @@ import pkgutil import importlib import ipaddress from pathlib import Path +from cmd2.ansi import style + from collections import defaultdict from ..recon.config import defaults +def meets_requirements(requirements, exception): + """ Determine if tools required to perform task are installed. """ + tools = get_tool_state() + + for tool in requirements: + if not tools.get(tool).get("installed"): + if exception: + raise RuntimeError( + style(f"[!!] {tool} is not installed, and is required to run this scan", fg="bright_red") + ) + else: + return False + + return True + + def get_tool_state() -> typing.Union[dict, None]: """ Load current tool state from disk. """ tools = Path(defaults.get("tools-dir")) / ".tool-dict.pkl" @@ -60,7 +78,9 @@ def get_scans(): # final check, this ensures that the tools necessary to AT LEAST run this scan are present # does not consider upstream dependencies try: - if not sub_obj.meets_requirements(): + requirements = sub_obj.requirements + exception = False # let meets_req know we want boolean result + if not meets_requirements(requirements, exception): continue except AttributeError: # some scan's haven't implemented meets_requirements yet, silently allow them through diff --git a/pipeline/recon/masscan.py b/pipeline/recon/masscan.py index 2b95f98..68e98ca 100644 --- a/pipeline/recon/masscan.py +++ b/pipeline/recon/masscan.py @@ -14,7 +14,7 @@ from .amass import ParseAmassOutput from ..models.port_model import Port from ..models.ip_address_model import IPAddress -from .helpers import get_tool_state +from .helpers import meets_requirements from .config import top_tcp_ports, top_udp_ports, defaults, web_ports @@ -58,21 +58,14 @@ class MasscanScan(luigi.Task): interface = luigi.Parameter(default=defaults.get("masscan-iface")) top_ports = luigi.IntParameter(default=0) # IntParameter -> top_ports expected as int ports = luigi.Parameter(default="") + requirements = ["masscan"] + exception = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.db_mgr = pipeline.models.db_manager.DBManager(db_location=self.db_location) self.results_subfolder = (Path(self.results_dir) / "masscan-results").expanduser().resolve() - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["masscan"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) - def output(self): """ Returns the target output for this task. @@ -91,6 +84,7 @@ class MasscanScan(luigi.Task): Returns: list: list of options/arguments, beginning with the name of the executable to run """ + meets_requirements(self.requirements, self.exception) if not self.ports and not self.top_ports: # need at least one, can't be put into argparse scanner because things like amass don't require ports option logging.error("Must specify either --top-ports or --ports.") diff --git a/pipeline/recon/nmap.py b/pipeline/recon/nmap.py index 51bacca..b772006 100644 --- a/pipeline/recon/nmap.py +++ b/pipeline/recon/nmap.py @@ -3,6 +3,8 @@ import logging import subprocess import concurrent.futures from pathlib import Path +from shutil import which +from cmd2.ansi import style import luigi import sqlalchemy @@ -13,10 +15,9 @@ from luigi.contrib.sqla import SQLAlchemyTarget import pipeline.models.db_manager from .masscan import ParseMasscanOutput from .config import defaults -from .helpers import get_ip_address_version, is_ip_address +from .helpers import get_ip_address_version, is_ip_address, meets_requirements from ..tools import tools -from .helpers import get_tool_state from ..models.port_model import Port from ..models.nse_model import NSEResult from ..models.target_model import Target @@ -58,6 +59,8 @@ class ThreadedNmapScan(luigi.Task): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + if not which("nmap"): + raise RuntimeError(style("[!] nmap is not installed", fg="bright_red")) self.db_mgr = pipeline.models.db_manager.DBManager(db_location=self.db_location) self.results_subfolder = (Path(self.results_dir) / "nmap-results").expanduser().resolve() @@ -238,19 +241,13 @@ class SearchsploitScan(luigi.Task): results_dir: specifies the directory on disk to which all Task results are written *Required by upstream Task* """ + requirements = ["searchsploit"] + exception = True + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.db_mgr = pipeline.models.db_manager.DBManager(db_location=self.db_location) - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["searchsploit"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) - def requires(self): """ Searchsploit depends on ThreadedNmap to run. @@ -261,6 +258,7 @@ class SearchsploitScan(luigi.Task): Returns: luigi.Task - ThreadedNmap """ + meets_requirements(self.requirements, self.exception) args = { "rate": self.rate, "ports": self.ports, diff --git a/pipeline/recon/web/aquatone.py b/pipeline/recon/web/aquatone.py index cdb46c4..3963ef3 100644 --- a/pipeline/recon/web/aquatone.py +++ b/pipeline/recon/web/aquatone.py @@ -13,7 +13,7 @@ from ..config import defaults from ...tools import tools import pipeline.models.db_manager -from ..helpers import get_tool_state +from ..helpers import meets_requirements from ...models.port_model import Port from ...models.header_model import Header from ...models.endpoint_model import Endpoint @@ -58,21 +58,14 @@ class AquatoneScan(luigi.Task): threads = luigi.Parameter(default=defaults.get("threads", "")) scan_timeout = luigi.Parameter(default=defaults.get("aquatone-scan-timeout", "")) + requirements = ["aquatone", "masscan"] + exception = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.db_mgr = pipeline.models.db_manager.DBManager(db_location=self.db_location) self.results_subfolder = Path(self.results_dir) / "aquatone-results" - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["aquatone"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) - def requires(self): """ AquatoneScan depends on GatherWebTargets to run. @@ -82,6 +75,7 @@ class AquatoneScan(luigi.Task): Returns: luigi.Task - GatherWebTargets """ + meets_requirements(self.requirements, self.exception) args = { "results_dir": self.results_dir, "rate": self.rate, diff --git a/pipeline/recon/web/gobuster.py b/pipeline/recon/web/gobuster.py index ee0f003..c0c222a 100644 --- a/pipeline/recon/web/gobuster.py +++ b/pipeline/recon/web/gobuster.py @@ -12,7 +12,7 @@ from luigi.contrib.sqla import SQLAlchemyTarget import pipeline.models.db_manager from ...tools import tools from ..config import defaults -from ..helpers import get_tool_state +from ..helpers import meets_requirements from .targets import GatherWebTargets from ...models.endpoint_model import Endpoint from ..helpers import get_ip_address_version, is_ip_address @@ -59,21 +59,14 @@ class GobusterScan(luigi.Task): threads = luigi.Parameter(default=defaults.get("threads")) wordlist = luigi.Parameter(default=defaults.get("gobuster-wordlist")) extensions = luigi.Parameter(default=defaults.get("gobuster-extensions")) + requirements = ["recursive-gobuster", "go", "gobuster", "masscan"] + exception = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.db_mgr = pipeline.models.db_manager.DBManager(db_location=self.db_location) self.results_subfolder = Path(self.results_dir) / "gobuster-results" - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["recursive-gobuster", "gobuster"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) - def requires(self): """ GobusterScan depends on GatherWebTargets to run. @@ -83,6 +76,7 @@ class GobusterScan(luigi.Task): Returns: luigi.Task - GatherWebTargets """ + meets_requirements(self.requirements, self.exception) args = { "results_dir": self.results_dir, "rate": self.rate, diff --git a/pipeline/recon/web/subdomain_takeover.py b/pipeline/recon/web/subdomain_takeover.py index 57b64bc..ec24c99 100644 --- a/pipeline/recon/web/subdomain_takeover.py +++ b/pipeline/recon/web/subdomain_takeover.py @@ -10,7 +10,7 @@ from luigi.contrib.sqla import SQLAlchemyTarget import pipeline.models.db_manager from ...tools import tools from ..config import defaults -from ..helpers import get_tool_state +from ..helpers import meets_requirements from .targets import GatherWebTargets @@ -47,21 +47,15 @@ class TKOSubsScan(luigi.Task): results_dir: specifes the directory on disk to which all Task results are written *Required by upstream Task* """ + requirements = ["go", "tko-subs", "masscan"] + exception = True + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.db_mgr = pipeline.models.db_manager.DBManager(db_location=self.db_location) self.results_subfolder = (Path(self.results_dir) / "tkosubs-results").expanduser().resolve() self.output_file = self.results_subfolder / "tkosubs.csv" - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["tko-subs"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) - def requires(self): """ TKOSubsScan depends on GatherWebTargets to run. @@ -71,6 +65,7 @@ class TKOSubsScan(luigi.Task): Returns: luigi.Task - GatherWebTargets """ + meets_requirements(self.requirements, self.exception) args = { "results_dir": self.results_dir, "rate": self.rate, @@ -177,6 +172,8 @@ class SubjackScan(luigi.Task): """ threads = luigi.Parameter(default=defaults.get("threads")) + requirements = ["go", "subjack", "masscan"] + exception = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -184,15 +181,6 @@ class SubjackScan(luigi.Task): self.results_subfolder = (Path(self.results_dir) / "subjack-results").expanduser().resolve() self.output_file = self.results_subfolder / "subjack.txt" - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["subjack"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) - def requires(self): """ SubjackScan depends on GatherWebTargets to run. @@ -202,6 +190,7 @@ class SubjackScan(luigi.Task): Returns: luigi.Task - GatherWebTargets """ + meets_requirements(self.requirements, self.exception) args = { "results_dir": self.results_dir, "rate": self.rate, diff --git a/pipeline/recon/web/waybackurls.py b/pipeline/recon/web/waybackurls.py index 6d97cdf..8195d7b 100644 --- a/pipeline/recon/web/waybackurls.py +++ b/pipeline/recon/web/waybackurls.py @@ -8,7 +8,7 @@ from luigi.contrib.sqla import SQLAlchemyTarget from .targets import GatherWebTargets from ...tools import tools -from ..helpers import get_tool_state +from ..helpers import meets_requirements from ...models.endpoint_model import Endpoint import pipeline.models.db_manager @@ -44,20 +44,14 @@ class WaybackurlsScan(luigi.Task): results_dir: specifes the directory on disk to which all Task results are written *Required by upstream Task* """ + requirements = ["go", "waybackurls", "masscan"] + exception = True + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.db_mgr = pipeline.models.db_manager.DBManager(db_location=self.db_location) self.results_subfolder = Path(self.results_dir) / "waybackurls-results" - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["waybackurls"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) - def requires(self): """ WaybackurlsScan depends on GatherWebTargets to run. @@ -67,6 +61,7 @@ class WaybackurlsScan(luigi.Task): Returns: luigi.Task - GatherWebTargets """ + meets_requirements(self.requirements, self.exception) args = { "results_dir": self.results_dir, "rate": self.rate, diff --git a/pipeline/recon/web/webanalyze.py b/pipeline/recon/web/webanalyze.py index 07b2f35..da9b97c 100644 --- a/pipeline/recon/web/webanalyze.py +++ b/pipeline/recon/web/webanalyze.py @@ -13,7 +13,7 @@ from luigi.contrib.sqla import SQLAlchemyTarget import pipeline.models.db_manager from ...tools import tools from ..config import defaults -from ..helpers import get_tool_state +from ..helpers import meets_requirements from .targets import GatherWebTargets from ...models.technology_model import Technology from ..helpers import get_ip_address_version, is_ip_address @@ -54,21 +54,14 @@ class WebanalyzeScan(luigi.Task): """ threads = luigi.Parameter(default=defaults.get("threads")) + requirements = ["go", "webanalyze", "masscan"] + exception = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.db_mgr = pipeline.models.db_manager.DBManager(db_location=self.db_location) self.results_subfolder = Path(self.results_dir) / "webanalyze-results" - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["webanalyze"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) - def requires(self): """ WebanalyzeScan depends on GatherWebTargets to run. @@ -78,6 +71,7 @@ class WebanalyzeScan(luigi.Task): Returns: luigi.Task - GatherWebTargets """ + meets_requirements(self.requirements, self.exception) args = { "results_dir": self.results_dir, "rate": self.rate, diff --git a/pipeline/recon/wrappers.py b/pipeline/recon/wrappers.py index 2ee1ff3..35d7af1 100644 --- a/pipeline/recon/wrappers.py +++ b/pipeline/recon/wrappers.py @@ -2,7 +2,7 @@ import luigi from luigi.util import inherits from .nmap import SearchsploitScan -from .helpers import get_tool_state +from .helpers import meets_requirements from .web import AquatoneScan, GobusterScan, SubjackScan, TKOSubsScan, WaybackurlsScan, WebanalyzeScan @@ -28,28 +28,24 @@ class FullScan(luigi.WrapperTask): results_dir: specifes the directory on disk to which all Task results are written """ - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = [ - "amass", - "aquatone", - "masscan", - "tko-subs", - "recursive-gobuster", - "searchsploit", - "subjack", - "gobuster", - "webanalyze", - "waybackurls", - ] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) + requirements = [ + "amass", + "aquatone", + "masscan", + "tko-subs", + "recursive-gobuster", + "searchsploit", + "subjack", + "gobuster", + "webanalyze", + "waybackurls", + "go", + ] + exception = True def requires(self): """ FullScan is a wrapper, as such it requires any Tasks that it wraps. """ + meets_requirements(self.requirements, self.exception) args = { "results_dir": self.results_dir, "rate": self.rate, @@ -111,17 +107,12 @@ class HTBScan(luigi.WrapperTask): results_dir: specifes the directory on disk to which all Task results are written """ - @staticmethod - def meets_requirements(): - """ Reports whether or not this scan's needed tool(s) are installed or not """ - needs = ["aquatone", "masscan", "recursive-gobuster", "searchsploit", "gobuster", "webanalyze"] - tools = get_tool_state() - - if tools: - return all([tools.get(x).get("installed") is True for x in needs]) + requirements = ["aquatone", "go", "masscan", "recursive-gobuster", "searchsploit", "gobuster", "webanalyze"] + exception = True def requires(self): """ HTBScan is a wrapper, as such it requires any Tasks that it wraps. """ + meets_requirements(self.requirements, self.exception) args = { "results_dir": self.results_dir, "rate": self.rate, diff --git a/tests/test_recon/test_amass.py b/tests/test_recon/test_amass.py index d37fef2..a52943c 100644 --- a/tests/test_recon/test_amass.py +++ b/tests/test_recon/test_amass.py @@ -23,8 +23,9 @@ class TestAmassScan: def test_scan_requires(self): with patch("pipeline.recon.TargetList"): - retval = self.scan.requires() - assert isinstance(retval, TargetList) + with patch("pipeline.recon.amass.meets_requirements"): + retval = self.scan.requires() + assert isinstance(retval, TargetList) def test_scan_run(self): with patch("subprocess.run") as mocked_run: diff --git a/tests/test_recon/test_helpers.py b/tests/test_recon/test_helpers.py index 97f2c62..d3518c8 100644 --- a/tests/test_recon/test_helpers.py +++ b/tests/test_recon/test_helpers.py @@ -1,34 +1,61 @@ import pytest +from unittest.mock import patch -from pipeline.recon.helpers import get_ip_address_version, get_scans, is_ip_address +from pipeline.recon.helpers import get_ip_address_version, get_scans, is_ip_address, meets_requirements from pipeline.recon import AmassScan, MasscanScan, FullScan, HTBScan, SearchsploitScan, ThreadedNmapScan from pipeline.recon.web import GobusterScan, SubjackScan, TKOSubsScan, AquatoneScan, WaybackurlsScan, WebanalyzeScan def test_get_scans(): + with patch("pipeline.recon.helpers.meets_requirements"): + scan_names = [ + AmassScan, + GobusterScan, + MasscanScan, + SubjackScan, + TKOSubsScan, + AquatoneScan, + FullScan, + HTBScan, + SearchsploitScan, + ThreadedNmapScan, + WebanalyzeScan, + WaybackurlsScan, + ] - scan_names = [ - AmassScan, - GobusterScan, - MasscanScan, - SubjackScan, - TKOSubsScan, - AquatoneScan, - FullScan, - HTBScan, - SearchsploitScan, - ThreadedNmapScan, - WebanalyzeScan, - WaybackurlsScan, - ] + scans = get_scans() + for scan in scan_names: + if hasattr(scan, "requirements"): + assert scan.__name__ in scans.keys() + else: + assert scan not in scans.keys() - scans = get_scans() - for scan in scan_names: - if hasattr(scan, "meets_requirements") and scan.meets_requirements(): - assert scan.__name__ in scans.keys() - else: - assert scan not in scans.keys() +@pytest.mark.parametrize( + "requirements, exception", + [ + (["amass"], True), + (["masscan"], True), + ( + [ + "amass", + "aquatone", + "masscan", + "tko-subs", + "recursive-gobuster", + "searchsploit", + "subjack", + "gobuster", + "webanalyze", + "waybackurls", + ], + False, + ), + ], +) +def test_meets_requirements(requirements, exception): + with patch("pipeline.recon.helpers.get_tool_state"): + assert meets_requirements(requirements, exception) @pytest.mark.parametrize( diff --git a/tests/test_recon/test_masscan.py b/tests/test_recon/test_masscan.py index dfa8286..7cdfef3 100644 --- a/tests/test_recon/test_masscan.py +++ b/tests/test_recon/test_masscan.py @@ -17,6 +17,7 @@ class TestMasscanScan: self.scan = MasscanScan( target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) diff --git a/tests/test_recon/test_nmap.py b/tests/test_recon/test_nmap.py index 5aa1b0a..763b4b7 100644 --- a/tests/test_recon/test_nmap.py +++ b/tests/test_recon/test_nmap.py @@ -15,18 +15,23 @@ nmap_results = Path(__file__).parent.parent / "data" / "recon-results" / "nmap-r class TestThreadedNmapScan: def setup_method(self): - self.tmp_path = Path(tempfile.mkdtemp()) - self.scan = ThreadedNmapScan( - target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") - ) + with patch("pipeline.recon.nmap.which"): + self.tmp_path = Path(tempfile.mkdtemp()) + shutil.which = MagicMock() + shutil.which.return_value = True + self.scan = ThreadedNmapScan( + target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") + ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) def test_scan_requires(self): with patch("pipeline.recon.ParseMasscanOutput"): - retval = self.scan.requires() - assert isinstance(retval, ParseMasscanOutput) + with patch("pipeline.recon.nmap.which"): + retval = self.scan.requires() + assert isinstance(retval, ParseMasscanOutput) def test_scan_run(self): with patch("concurrent.futures.ThreadPoolExecutor.map") as mocked_run: @@ -75,14 +80,17 @@ class TestSearchsploitScan: self.scan = SearchsploitScan( target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) def test_scan_requires(self): with patch("pipeline.recon.ThreadedNmapScan"): - retval = self.scan.requires() - assert isinstance(retval, ThreadedNmapScan) + with patch("pipeline.recon.nmap.meets_requirements"): + with patch("pipeline.recon.nmap.which"): + retval = self.scan.requires() + assert isinstance(retval, ThreadedNmapScan) def test_scan_output(self): retval = self.scan.output() diff --git a/tests/test_web/test_aquatone.py b/tests/test_web/test_aquatone.py index 593b504..9ea56d6 100644 --- a/tests/test_web/test_aquatone.py +++ b/tests/test_web/test_aquatone.py @@ -15,14 +15,16 @@ class TestAquatoneScan: self.scan = AquatoneScan( target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) def test_scan_requires(self): with patch("pipeline.recon.web.GatherWebTargets"): - retval = self.scan.requires() - assert isinstance(retval, GatherWebTargets) + with patch("pipeline.recon.web.aquatone.meets_requirements"): + retval = self.scan.requires() + assert isinstance(retval, GatherWebTargets) def test_scan_creates_results_dir(self): assert self.scan.results_subfolder == self.tmp_path / "aquatone-results" diff --git a/tests/test_web/test_gobuster.py b/tests/test_web/test_gobuster.py index 52b53b5..8071b4b 100644 --- a/tests/test_web/test_gobuster.py +++ b/tests/test_web/test_gobuster.py @@ -14,14 +14,16 @@ class TestGobusterScan: self.scan = GobusterScan( target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) def test_scan_requires(self): with patch("pipeline.recon.web.GatherWebTargets"): - retval = self.scan.requires() - assert isinstance(retval, GatherWebTargets) + with patch("pipeline.recon.web.gobuster.meets_requirements"): + retval = self.scan.requires() + assert isinstance(retval, GatherWebTargets) def test_scan_run(self): with patch("concurrent.futures.ThreadPoolExecutor.map") as mocked_run: diff --git a/tests/test_web/test_subdomain_takeover.py b/tests/test_web/test_subdomain_takeover.py index 2c21454..726dc7e 100644 --- a/tests/test_web/test_subdomain_takeover.py +++ b/tests/test_web/test_subdomain_takeover.py @@ -17,14 +17,16 @@ class TestTKOSubsScanScan: self.scan = TKOSubsScan( target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) def test_scan_requires(self): with patch("pipeline.recon.web.GatherWebTargets"): - retval = self.scan.requires() - assert isinstance(retval, GatherWebTargets) + with patch("pipeline.recon.web.subdomain_takeover.meets_requirements"): + retval = self.scan.requires() + assert isinstance(retval, GatherWebTargets) def test_scan_creates_results_dir(self): assert self.scan.results_subfolder == self.tmp_path / "tkosubs-results" @@ -84,14 +86,16 @@ class TestSubjackScan: self.scan = SubjackScan( target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) def test_scan_requires(self): with patch("pipeline.recon.web.GatherWebTargets"): - retval = self.scan.requires() - assert isinstance(retval, GatherWebTargets) + with patch("pipeline.recon.web.subdomain_takeover.meets_requirements"): + retval = self.scan.requires() + assert isinstance(retval, GatherWebTargets) def test_scan_creates_results_dir(self): assert self.scan.results_subfolder == self.tmp_path / "subjack-results" diff --git a/tests/test_web/test_targets.py b/tests/test_web/test_targets.py index 50516ba..bcdf237 100644 --- a/tests/test_web/test_targets.py +++ b/tests/test_web/test_targets.py @@ -13,6 +13,7 @@ class TestGatherWebTargets: self.scan = GatherWebTargets( target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) diff --git a/tests/test_web/test_waybackurls.py b/tests/test_web/test_waybackurls.py index 1d792e9..8c08f02 100644 --- a/tests/test_web/test_waybackurls.py +++ b/tests/test_web/test_waybackurls.py @@ -12,14 +12,16 @@ class TestGatherWebTargets: self.scan = WaybackurlsScan( target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) def test_scan_requires(self): with patch("pipeline.recon.web.GatherWebTargets"): - retval = self.scan.requires() - assert isinstance(retval, GatherWebTargets) + with patch("pipeline.recon.web.waybackurls.meets_requirements"): + retval = self.scan.requires() + assert isinstance(retval, GatherWebTargets) def test_scan_creates_database(self): assert self.scan.db_mgr.location.exists() diff --git a/tests/test_web/test_webanalyze.py b/tests/test_web/test_webanalyze.py index 67803c1..41d6746 100644 --- a/tests/test_web/test_webanalyze.py +++ b/tests/test_web/test_webanalyze.py @@ -16,14 +16,16 @@ class TestWebanalyzeScan: self.scan = WebanalyzeScan( target_file=__file__, results_dir=str(self.tmp_path), db_location=str(self.tmp_path / "testing.sqlite") ) + self.scan.exception = False def teardown_method(self): shutil.rmtree(self.tmp_path) def test_scan_requires(self): with patch("pipeline.recon.web.GatherWebTargets"): - retval = self.scan.requires() - assert isinstance(retval, GatherWebTargets) + with patch("pipeline.recon.web.webanalyze.meets_requirements"): + retval = self.scan.requires() + assert isinstance(retval, GatherWebTargets) def test_scan_creates_results_dir(self): assert self.scan.results_subfolder == self.tmp_path / "webanalyze-results"