From 7a24d85db4d13818f5b526d10ba9f2bc2a5d8615 Mon Sep 17 00:00:00 2001 From: epi052 <43392618+epi052@users.noreply.github.com> Date: Tue, 4 Feb 2020 06:33:00 -0600 Subject: [PATCH] Add scan tests (#12) - tests of current codebase complete * recon.targets tests added * restructured tests logically * fixed yaml error * fixed job names * recon.__init__ tests added * recon.config tests added * recon.amass.ParseAmassScan tests added * fixed test destined to fail on CI pipeline * testing amass partially complete * Changed the dir layout (#6) and fixed paths (#8) this commit closes #6 and #8 updated existing tests to utilize new paths * tests of current codebase complete * added is_kali check to searchsploit test * added test_web action to pipeline --- .flake8 | 2 +- .github/workflows/pythonapp.yml | 20 +++ pyproject.toml | 2 +- recon-pipeline.py | 53 ++------ recon/__init__.py | 79 +++-------- recon/amass.py | 12 +- recon/masscan.py | 22 +--- recon/nmap.py | 28 +--- recon/web/gobuster.py | 8 +- recon/web/targets.py | 4 +- recon/web/webanalyze.py | 12 +- recon/wrappers.py | 8 +- tests/test_install/test_install_command.py | 20 +-- tests/test_recon/test_amass.py | 16 +-- tests/test_recon/test_masscan.py | 35 +---- tests/test_recon/test_nmap.py | 38 ++++++ tests/test_recon/test_targets.py | 34 ++--- tests/test_web/__init__.py | 0 tests/test_web/test_targets.py | 144 +++++++++++++++++++++ tests/utils.py | 6 +- 20 files changed, 279 insertions(+), 264 deletions(-) create mode 100644 tests/test_recon/test_nmap.py create mode 100644 tests/test_web/__init__.py create mode 100644 tests/test_web/test_targets.py diff --git a/.flake8 b/.flake8 index 6964a09..1fe22e5 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,5 @@ [flake8] -max-line-length = 88 +max-line-length = 120 select = C,E,F,W,B,B950 ignore = E203, E501, W503 max-complexity = 13 \ No newline at end of file diff --git a/.github/workflows/pythonapp.yml b/.github/workflows/pythonapp.yml index abe3207..3d28e57 100644 --- a/.github/workflows/pythonapp.yml +++ b/.github/workflows/pythonapp.yml @@ -69,3 +69,23 @@ jobs: run: | pipenv install pytest pipenv run python -m pytest tests/test_recon + + test-web: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + - name: Set up Python 3.7 + uses: actions/setup-python@v1 + with: + python-version: 3.7 + - name: Set up pipenv + run: | + python -m pip install --upgrade pip + pip install pipenv + pipenv install -d + - name: Test with pytest + run: | + pipenv install pytest + pipenv run python -m pytest tests/test_web \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b3600fa..edd2751 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,4 +1,4 @@ [tool.black] -line-length = 88 +line-length = 120 include = '\.pyi?$' exclude = '.*config.*py$|\.git' \ No newline at end of file diff --git a/recon-pipeline.py b/recon-pipeline.py index 8927009..3aa4ae9 100755 --- a/recon-pipeline.py +++ b/recon-pipeline.py @@ -9,12 +9,8 @@ import threading import subprocess from pathlib import Path -__version__ = "0.7.3" - # fix up the PYTHONPATH so we can simply execute the shell from wherever in the filesystem -os.environ[ - "PYTHONPATH" -] = f"{os.environ.get('PYTHONPATH')}:{str(Path(__file__).parent.resolve())}" +os.environ["PYTHONPATH"] = f"{os.environ.get('PYTHONPATH')}:{str(Path(__file__).parent.resolve())}" # suppress "You should consider upgrading via the 'pip install --upgrade pip' command." warning os.environ["PIP_DISABLE_PIP_VERSION_CHECK"] = "1" @@ -115,9 +111,7 @@ class ReconShell(cmd2.Cmd): words = output.split() - self.async_alert( - style(f"[-] {words[5].split('_')[0]} queued", fg="bright_white") - ) + self.async_alert(style(f"[-] {words[5].split('_')[0]} queued", fg="bright_white")) elif output.startswith("INFO: ") and "running" in output: # luigi Task is currently running @@ -134,9 +128,7 @@ class ReconShell(cmd2.Cmd): words = output.split() - self.async_alert( - style(f"[+] {words[5].split('_')[0]} complete!", fg="bright_green") - ) + self.async_alert(style(f"[+] {words[5].split('_')[0]} complete!", fg="bright_green")) @cmd2.with_argparser(scan_parser) def do_scan(self, args): @@ -172,14 +164,10 @@ class ReconShell(cmd2.Cmd): subprocess.run(command) else: # suppress luigi messages in favor of less verbose/cleaner output - proc = subprocess.Popen( - command, stderr=subprocess.PIPE, stdout=subprocess.PIPE - ) + proc = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) # add stderr to the selector loop for processing when there's something to read from the fd - selector.register( - proc.stderr, selectors.EVENT_READ, self._luigi_pretty_printer - ) + selector.register(proc.stderr, selectors.EVENT_READ, self._luigi_pretty_printer) @cmd2.with_argparser(install_parser) def do_install(self, args): @@ -219,29 +207,21 @@ class ReconShell(cmd2.Cmd): continue self.async_alert( - style( - f"[!] {args.tool} has an unmet dependency; installing {dependency}", - fg="yellow", - bold=True, - ) + style(f"[!] {args.tool} has an unmet dependency; installing {dependency}", fg="yellow", bold=True,) ) # install the dependency before continuing with installation self.do_install(dependency) if tools.get(args.tool).get("installed"): - return self.async_alert( - style(f"[!] {args.tool} is already installed.", fg="yellow") - ) + return self.async_alert(style(f"[!] {args.tool} is already installed.", fg="yellow")) else: # list of return values from commands run during each tool installation # used to determine whether the tool installed correctly or not retvals = list() - self.async_alert( - style(f"[*] Installing {args.tool}...", fg="bright_yellow") - ) + self.async_alert(style(f"[*] Installing {args.tool}...", fg="bright_yellow")) for command in tools.get(args.tool).get("commands"): # run all commands required to install the tool @@ -252,20 +232,11 @@ class ReconShell(cmd2.Cmd): if tools.get(args.tool).get("shell"): # go tools use subshells (cmd1 && cmd2 && cmd3 ...) during install, so need shell=True - proc = subprocess.Popen( - command, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) + proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) else: # "normal" command, split up the string as usual and run it - proc = subprocess.Popen( - shlex.split(command), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) + proc = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE,) out, err = proc.communicate() @@ -298,7 +269,5 @@ class ReconShell(cmd2.Cmd): if __name__ == "__main__": - rs = ReconShell( - persistent_history_file="~/.reconshell_history", persistent_history_length=10000 - ) + rs = ReconShell(persistent_history_file="~/.reconshell_history", persistent_history_length=10000) sys.exit(rs.cmdloop()) diff --git a/recon/__init__.py b/recon/__init__.py index 65d8f69..227bc78 100644 --- a/recon/__init__.py +++ b/recon/__init__.py @@ -26,16 +26,8 @@ tools = { ], "shell": True, }, - "luigi": { - "installed": False, - "dependencies": ["pipenv"], - "commands": ["pipenv install luigi"], - }, - "pipenv": { - "installed": False, - "dependencies": None, - "commands": ["sudo apt-get install -y -q pipenv"], - }, + "luigi": {"installed": False, "dependencies": ["pipenv"], "commands": ["pipenv install luigi"],}, + "pipenv": {"installed": False, "dependencies": None, "commands": ["sudo apt-get install -y -q pipenv"],}, "masscan": { "installed": False, "dependencies": None, @@ -46,11 +38,7 @@ tools = { "rm -rf /tmp/masscan", ], }, - "amass": { - "installed": False, - "dependencies": None, - "commands": ["sudo apt-get install -y -q amass"], - }, + "amass": {"installed": False, "dependencies": None, "commands": ["sudo apt-get install -y -q amass"],}, "aquatone": { "installed": False, "dependencies": None, @@ -94,10 +82,7 @@ tools = { "subjack": { "installed": False, "dependencies": ["go"], - "commands": [ - "go get github.com/haccer/subjack", - "(cd ~/go/src/github.com/haccer/subjack && go install)", - ], + "commands": ["go get github.com/haccer/subjack", "(cd ~/go/src/github.com/haccer/subjack && go install)",], "shell": True, }, "webanalyze": { @@ -117,11 +102,7 @@ tools = { f"sudo bash -c 'if [[ -d {Path(tool_paths.get('recursive-gobuster')).parent} ]] ; then cd {Path(tool_paths.get('recursive-gobuster')).parent} && git pull; else git clone https://github.com/epi052/recursive-gobuster.git {Path(tool_paths.get('recursive-gobuster')).parent}; fi'", ], }, - "go": { - "installed": False, - "dependencies": None, - "commands": ["sudo apt-get install -y -q golang"], - }, + "go": {"installed": False, "dependencies": None, "commands": ["sudo apt-get install -y -q golang"],}, } @@ -141,9 +122,7 @@ def get_scans(): # recursively walk packages; import each module in each package # walk_packages yields ModuleInfo objects for all modules recursively on path # prefix is a string to output on the front of every module name on output. - for loader, module_name, is_pkg in pkgutil.walk_packages( - path=recon.__path__, prefix="recon." - ): + for loader, module_name, is_pkg in pkgutil.walk_packages(path=recon.__path__, prefix="recon."): importlib.import_module(module_name) # walk all modules, grabbing classes that we've written and add them to the classlist defaultdict @@ -162,9 +141,7 @@ def get_scans(): # options for ReconShell's 'install' command install_parser = cmd2.Cmd2ArgumentParser() -install_parser.add_argument( - "tool", help="which tool to install", choices=list(tools.keys()) + ["all"] -) +install_parser.add_argument("tool", help="which tool to install", choices=list(tools.keys()) + ["all"]) # options for ReconShell's 'scan' command @@ -176,54 +153,34 @@ scan_parser.add_argument( help="file created by the user that defines the target's scope; list of ips/domains", ) scan_parser.add_argument( - "--exempt-list", - completer_method=cmd2.Cmd.path_complete, - help="list of blacklisted ips/domains", + "--exempt-list", completer_method=cmd2.Cmd.path_complete, help="list of blacklisted ips/domains", ) scan_parser.add_argument( - "--results-dir", - completer_method=cmd2.Cmd.path_complete, - help="directory in which to save scan results", + "--results-dir", completer_method=cmd2.Cmd.path_complete, help="directory in which to save scan results", ) scan_parser.add_argument( - "--wordlist", - completer_method=cmd2.Cmd.path_complete, - help="path to wordlist used by gobuster", + "--wordlist", completer_method=cmd2.Cmd.path_complete, help="path to wordlist used by gobuster", ) scan_parser.add_argument( "--interface", choices_function=lambda: [x[1] for x in socket.if_nameindex()], help="which interface masscan should use", ) -scan_parser.add_argument( - "--recursive", action="store_true", help="whether or not to recursively gobust" -) +scan_parser.add_argument("--recursive", action="store_true", help="whether or not to recursively gobust") scan_parser.add_argument("--rate", help="rate at which masscan should scan") scan_parser.add_argument( - "--top-ports", - help="ports to scan as specified by nmap's list of top-ports (only meaningful to around 5000)", + "--top-ports", help="ports to scan as specified by nmap's list of top-ports (only meaningful to around 5000)", ) scan_parser.add_argument( - "--ports", - help="port specification for masscan (all ports example: 1-65535,U:1-65535)", -) -scan_parser.add_argument( - "--threads", help="number of threads for all of the threaded applications to use" + "--ports", help="port specification for masscan (all ports example: 1-65535,U:1-65535)", ) +scan_parser.add_argument("--threads", help="number of threads for all of the threaded applications to use") scan_parser.add_argument("--scan-timeout", help="scan timeout for aquatone") +scan_parser.add_argument("--proxy", help="proxy for gobuster if desired (ex. 127.0.0.1:8080)") +scan_parser.add_argument("--extensions", help="list of extensions for gobuster (ex. asp,html,aspx)") scan_parser.add_argument( - "--proxy", help="proxy for gobuster if desired (ex. 127.0.0.1:8080)" + "--local-scheduler", action="store_true", help="use the local scheduler instead of the central scheduler (luigid)", ) scan_parser.add_argument( - "--extensions", help="list of extensions for gobuster (ex. asp,html,aspx)" -) -scan_parser.add_argument( - "--local-scheduler", - action="store_true", - help="use the local scheduler instead of the central scheduler (luigid)", -) -scan_parser.add_argument( - "--verbose", - action="store_true", - help="shows debug messages from luigi, useful for troubleshooting", + "--verbose", action="store_true", help="shows debug messages from luigi, useful for troubleshooting", ) diff --git a/recon/amass.py b/recon/amass.py index f133a41..f900c60 100644 --- a/recon/amass.py +++ b/recon/amass.py @@ -171,9 +171,7 @@ class ParseAmassOutput(luigi.Task): unique_ip6s = set() unique_subs = set() - Path(self.output().get("target-ips").path).parent.mkdir( - parents=True, exist_ok=True - ) + Path(self.output().get("target-ips").path).parent.mkdir(parents=True, exist_ok=True) amass_json = self.input().open() ip_file = self.output().get("target-ips").open("w") @@ -187,13 +185,9 @@ class ParseAmassOutput(luigi.Task): for address in entry.get("addresses"): ipaddr = address.get("ip") - if isinstance( - ipaddress.ip_address(ipaddr), ipaddress.IPv4Address - ): # ipv4 addr + if isinstance(ipaddress.ip_address(ipaddr), ipaddress.IPv4Address): # ipv4 addr unique_ips.add(ipaddr) - elif isinstance( - ipaddress.ip_address(ipaddr), ipaddress.IPv6Address - ): # ipv6 + elif isinstance(ipaddress.ip_address(ipaddr), ipaddress.IPv6Address): # ipv6 unique_ip6s.add(ipaddr) # send gathered results to their appropriate destination diff --git a/recon/masscan.py b/recon/masscan.py index b0fe0c4..a431912 100644 --- a/recon/masscan.py +++ b/recon/masscan.py @@ -50,9 +50,7 @@ class MasscanScan(luigi.Task): rate = luigi.Parameter(default=defaults.get("masscan-rate", "")) interface = luigi.Parameter(default=defaults.get("masscan-iface", "")) - top_ports = luigi.IntParameter( - default=0 - ) # IntParameter -> top_ports expected as int + top_ports = luigi.IntParameter(default=0) # IntParameter -> top_ports expected as int ports = luigi.Parameter(default="") def output(self): @@ -93,27 +91,21 @@ class MasscanScan(luigi.Task): if self.top_ports: # if --top-ports used, format the top_*_ports lists as strings and then into a proper masscan --ports option - top_tcp_ports_str = ",".join( - str(x) for x in top_tcp_ports[: self.top_ports] - ) - top_udp_ports_str = ",".join( - str(x) for x in top_udp_ports[: self.top_ports] - ) + top_tcp_ports_str = ",".join(str(x) for x in top_tcp_ports[: self.top_ports]) + top_udp_ports_str = ",".join(str(x) for x in top_udp_ports[: self.top_ports]) self.ports = f"{top_tcp_ports_str},U:{top_udp_ports_str}" self.top_ports = 0 - target_list = yield TargetList( - target_file=self.target_file, results_dir=self.results_dir - ) + target_list = yield TargetList(target_file=self.target_file, results_dir=self.results_dir) + + Path(self.output().path).parent.mkdir(parents=True, exist_ok=True) Path(self.output().path).parent.mkdir(parents=True, exist_ok=True) if target_list.path.endswith("domains"): yield ParseAmassOutput( - target_file=self.target_file, - exempt_list=self.exempt_list, - results_dir=self.results_dir, + target_file=self.target_file, exempt_list=self.exempt_list, results_dir=self.results_dir, ) command = [ diff --git a/recon/nmap.py b/recon/nmap.py index a53257a..d213499 100644 --- a/recon/nmap.py +++ b/recon/nmap.py @@ -81,9 +81,7 @@ class ThreadedNmapScan(luigi.Task): try: self.threads = abs(int(self.threads)) except TypeError: - return logging.error( - "The value supplied to --threads must be a non-negative integer." - ) + return logging.error("The value supplied to --threads must be a non-negative integer.") ip_dict = pickle.load(open(self.input().path, "rb")) @@ -121,9 +119,7 @@ class ThreadedNmapScan(luigi.Task): # arg to -oA, will drop into subdir off curdir tmp_cmd[10] = ",".join(ports) - tmp_cmd.append( - str(Path(self.output().path) / f"nmap.{target}-{protocol}") - ) + tmp_cmd.append(str(Path(self.output().path) / f"nmap.{target}-{protocol}")) tmp_cmd.append(target) # target as final arg to nmap @@ -132,9 +128,7 @@ class ThreadedNmapScan(luigi.Task): # basically mkdir -p, won't error out if already there Path(self.output().path).mkdir(parents=True, exist_ok=True) - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.threads - ) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor: executor.map(subprocess.run, commands) @@ -205,22 +199,12 @@ class SearchsploitScan(luigi.Task): def run(self): """ Grabs the xml files created by ThreadedNmap and runs searchsploit --nmap on each one, saving the output. """ - Path(self.output().path).mkdir(parents=True, exist_ok=True) - for entry in Path(self.input().path).glob("nmap*.xml"): - proc = subprocess.run( - ["searchsploit", "--nmap", str(entry)], stderr=subprocess.PIPE - ) + proc = subprocess.run(["searchsploit", "--nmap", str(entry)], stderr=subprocess.PIPE) if proc.stderr: Path(self.output().path).mkdir(parents=True, exist_ok=True) # change wall-searchsploit-results/nmap.10.10.10.157-tcp to 10.10.10.157 - target = ( - entry.stem.replace("nmap.", "") - .replace("-tcp", "") - .replace("-udp", "") - ) + target = entry.stem.replace("nmap.", "").replace("-tcp", "").replace("-udp", "") - Path( - f"{self.output().path}/searchsploit.{target}-{entry.stem[-3:]}.txt" - ).write_bytes(proc.stderr) + Path(f"{self.output().path}/searchsploit.{target}-{entry.stem[-3:]}.txt").write_bytes(proc.stderr) diff --git a/recon/web/gobuster.py b/recon/web/gobuster.py index ba7d00c..9c819ea 100644 --- a/recon/web/gobuster.py +++ b/recon/web/gobuster.py @@ -97,9 +97,7 @@ class GobusterScan(luigi.Task): try: self.threads = abs(int(self.threads)) except TypeError: - return logging.error( - "The value supplied to --threads must be a non-negative integer." - ) + return logging.error("The value supplied to --threads must be a non-negative integer.") commands = list() @@ -108,9 +106,7 @@ class GobusterScan(luigi.Task): target = target.strip() try: - if isinstance( - ipaddress.ip_address(target), ipaddress.IPv6Address - ): # ipv6 + if isinstance(ipaddress.ip_address(target), ipaddress.IPv6Address): # ipv6 target = f"[{target}]" except ValueError: # domain names raise ValueErrors, just assume we have a domain and keep on keepin on diff --git a/recon/web/targets.py b/recon/web/targets.py index e066e3f..99b5dd3 100644 --- a/recon/web/targets.py +++ b/recon/web/targets.py @@ -43,9 +43,7 @@ class GatherWebTargets(luigi.Task): return { "masscan-output": ParseMasscanOutput(**args), "amass-output": ParseAmassOutput( - exempt_list=self.exempt_list, - target_file=self.target_file, - results_dir=self.results_dir, + exempt_list=self.exempt_list, target_file=self.target_file, results_dir=self.results_dir, ), } diff --git a/recon/web/webanalyze.py b/recon/web/webanalyze.py index cdc15df..59f2ab0 100644 --- a/recon/web/webanalyze.py +++ b/recon/web/webanalyze.py @@ -82,9 +82,7 @@ class WebanalyzeScan(luigi.Task): return luigi.LocalTarget(results_subfolder.resolve()) def _wrapped_subprocess(self, cmd): - with open( - f"webanalyze.{cmd[2].replace('//', '_').replace(':', '')}.txt", "wb" - ) as f: + with open(f"webanalyze.{cmd[2].replace('//', '_').replace(':', '')}.txt", "wb") as f: subprocess.run(cmd, stderr=f) def run(self): @@ -96,9 +94,7 @@ class WebanalyzeScan(luigi.Task): try: self.threads = abs(int(self.threads)) except TypeError: - return logging.error( - "The value supplied to --threads must be a non-negative integer." - ) + return logging.error("The value supplied to --threads must be a non-negative integer.") commands = list() @@ -107,9 +103,7 @@ class WebanalyzeScan(luigi.Task): target = target.strip() try: - if isinstance( - ipaddress.ip_address(target), ipaddress.IPv6Address - ): # ipv6 + if isinstance(ipaddress.ip_address(target), ipaddress.IPv6Address): # ipv6 target = f"[{target}]" except ValueError: # domain names raise ValueErrors, just assume we have a domain and keep on keepin on diff --git a/recon/wrappers.py b/recon/wrappers.py index 3ec3e56..14e72a9 100644 --- a/recon/wrappers.py +++ b/recon/wrappers.py @@ -10,13 +10,7 @@ from recon.web.webanalyze import WebanalyzeScan @inherits( - SearchsploitScan, - AquatoneScan, - TKOSubsScan, - SubjackScan, - CORScannerScan, - GobusterScan, - WebanalyzeScan, + SearchsploitScan, AquatoneScan, TKOSubsScan, SubjackScan, CORScannerScan, GobusterScan, WebanalyzeScan, ) class FullScan(luigi.WrapperTask): """ Wraps multiple scan types in order to run tasks on the same hierarchical level at the same time. diff --git a/tests/test_install/test_install_command.py b/tests/test_install/test_install_command.py index c90176b..61a467e 100644 --- a/tests/test_install/test_install_command.py +++ b/tests/test_install/test_install_command.py @@ -155,9 +155,7 @@ def test_update_corscanner(): setup_install_test() if not corscanner.parent.exists(): - subprocess.run( - f"sudo git clone https://github.com/chenjj/CORScanner.git {corscanner.parent}".split() - ) + subprocess.run(f"sudo git clone https://github.com/chenjj/CORScanner.git {corscanner.parent}".split()) rs = recon_pipeline.ReconShell() @@ -203,16 +201,12 @@ def test_install_luigi_service(): setup_install_test(luigi_service) - proc = subprocess.run( - "systemctl is-enabled luigid.service".split(), stdout=subprocess.PIPE - ) + proc = subprocess.run("systemctl is-enabled luigid.service".split(), stdout=subprocess.PIPE) if proc.stdout.decode().strip() == "enabled": subprocess.run("systemctl disable luigid.service".split()) - proc = subprocess.run( - "systemctl is-active luigid.service".split(), stdout=subprocess.PIPE - ) + proc = subprocess.run("systemctl is-active luigid.service".split(), stdout=subprocess.PIPE) if proc.stdout.decode().strip() == "active": subprocess.run("systemctl stop luigid.service".split()) @@ -226,14 +220,10 @@ def test_install_luigi_service(): assert Path("/lib/systemd/system/luigid.service").exists() - proc = subprocess.run( - "systemctl is-enabled luigid.service".split(), stdout=subprocess.PIPE - ) + proc = subprocess.run("systemctl is-enabled luigid.service".split(), stdout=subprocess.PIPE) assert proc.stdout.decode().strip() == "enabled" - proc = subprocess.run( - "systemctl is-active luigid.service".split(), stdout=subprocess.PIPE - ) + proc = subprocess.run("systemctl is-active luigid.service".split(), stdout=subprocess.PIPE) assert proc.stdout.decode().strip() == "active" assert Path("/usr/local/bin/luigid").exists() diff --git a/tests/test_recon/test_amass.py b/tests/test_recon/test_amass.py index d861475..02e7be9 100644 --- a/tests/test_recon/test_amass.py +++ b/tests/test_recon/test_amass.py @@ -41,13 +41,7 @@ subdomains = [ "tenable.bitdiscovery.com", ] -amass_json = ( - Path(__file__).parent.parent - / "data" - / "recon-results" - / "amass-results" - / "amass.json" -) +amass_json = Path(__file__).parent.parent / "data" / "recon-results" / "amass-results" / "amass.json" def test_amassscan_output_location(tmp_path): @@ -59,12 +53,8 @@ def test_amassscan_output_location(tmp_path): def test_parse_amass_output_locations(tmp_path): pao = ParseAmassOutput(target_file=tf, exempt_list=el, results_dir=str(tmp_path)) - assert pao.output().get("target-ips").path == str( - (Path(tmp_path) / "target-results" / "ipv4_addresses").resolve() - ) - assert pao.output().get("target-ip6s").path == str( - (Path(tmp_path) / "target-results" / "ipv6_addresses").resolve() - ) + assert pao.output().get("target-ips").path == str((Path(tmp_path) / "target-results" / "ipv4_addresses").resolve()) + assert pao.output().get("target-ip6s").path == str((Path(tmp_path) / "target-results" / "ipv6_addresses").resolve()) assert pao.output().get("target-subdomains").path == str( (Path(tmp_path) / "target-results" / "subdomains").resolve() ) diff --git a/tests/test_recon/test_masscan.py b/tests/test_recon/test_masscan.py index 0913e6a..42ae397 100644 --- a/tests/test_recon/test_masscan.py +++ b/tests/test_recon/test_masscan.py @@ -7,8 +7,6 @@ tf = Path(tfp).stem el = "../data/blacklist" rd = "../data/recon-results" -ips = [] - test_dict = { "104.20.60.51": {"tcp": {"8443", "443"}}, "104.20.61.51": {"tcp": {"8080", "80", "443"}}, @@ -20,45 +18,24 @@ test_dict = { def test_massscan_output_location(tmp_path): - asc = MasscanScan( - target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100 - ) + asc = MasscanScan(target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100) assert asc.output().path == str(Path(tmp_path) / "masscan-results" / "masscan.json") def test_parsemassscan_output_location(tmp_path): - pmo = ParseMasscanOutput( - target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100 - ) + pmo = ParseMasscanOutput(target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100) - assert pmo.output().path == str( - Path(tmp_path) / "masscan-results" / "masscan.parsed.pickle" - ) + assert pmo.output().path == str(Path(tmp_path) / "masscan-results" / "masscan.parsed.pickle") def test_parsemassscan_output_dictionary(tmp_path): - # pmo = ParseMasscanOutput( - # target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100 - # ) - - # masscan_results = ( - # Path(__file__) / ".." / ".." / "data" / "recon-results" / f"masscan.{tf}.json" - # ) - # shutil.copy(masscan_results.resolve(), tmp_path) ip_dict = pickle.load( - ( - Path(__file__).parent.parent - / "data" - / "recon-results" - / "masscan-results" - / "masscan.parsed.pickle" - ).open("rb") + (Path(__file__).parent.parent / "data" / "recon-results" / "masscan-results" / "masscan.parsed.pickle").open( + "rb" + ) ) - from pprint import pprint - pprint(ip_dict) for ip, proto_dict in test_dict.items(): for proto, ports in proto_dict.items(): - print(ip, proto) assert not ip_dict.get(ip).get(proto).difference(ports) diff --git a/tests/test_recon/test_nmap.py b/tests/test_recon/test_nmap.py new file mode 100644 index 0000000..7cbecbd --- /dev/null +++ b/tests/test_recon/test_nmap.py @@ -0,0 +1,38 @@ +from pathlib import Path +from recon.nmap import ThreadedNmapScan, SearchsploitScan + +import luigi + +from ..utils import is_kali + +tfp = "../data/bitdiscovery" +tf = Path(tfp).stem +el = "../data/blacklist" +rd = "../data/recon-results" + +nmap_results = Path(__file__).parent.parent / "data" / "recon-results" / "nmap-results" + + +def test_nmap_output_location(tmp_path): + tns = ThreadedNmapScan(target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100) + + assert tns.output().path == str(Path(tmp_path) / "nmap-results") + + +def test_searchsploit_output_location(tmp_path): + sss = SearchsploitScan(target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100) + + assert sss.output().path == str(Path(tmp_path) / "searchsploit-results") + + +def test_searchsploit_produces_results(tmp_path): + sss = SearchsploitScan(target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100) + + sss.input = lambda: luigi.LocalTarget(nmap_results) + + if not is_kali(): + return True + + sss.run() + + assert len([x for x in Path(sss.output().path).glob("searchsploit*.txt")]) > 0 diff --git a/tests/test_recon/test_targets.py b/tests/test_recon/test_targets.py index 5f31414..bcb5966 100644 --- a/tests/test_recon/test_targets.py +++ b/tests/test_recon/test_targets.py @@ -7,29 +7,21 @@ def test_creates_ips(tmp_path): targetfile = tmp_path / "test_targetlist" targetfile.write_text("127.0.0.1") - tl = TargetList( - target_file=str(targetfile), results_dir=str(tmp_path / "recon-results") - ) + tl = TargetList(target_file=str(targetfile), results_dir=str(tmp_path / "recon-results")) out = tl.output() - assert out.path == str( - (tmp_path / "recon-results" / "target-results" / "ip_addresses").resolve() - ) + assert out.path == str((tmp_path / "recon-results" / "target-results" / "ip_addresses").resolve()) def test_creates_domains(tmp_path): targetfile = tmp_path / "test_targetlist" targetfile.write_text("stuff.com") - tl = TargetList( - target_file=str(targetfile), results_dir=str(tmp_path / "recon-results") - ) + tl = TargetList(target_file=str(targetfile), results_dir=str(tmp_path / "recon-results")) out = tl.output() - assert out.path == str( - (tmp_path / "recon-results" / "target-results" / "domains").resolve() - ) + assert out.path == str((tmp_path / "recon-results" / "target-results" / "domains").resolve()) def test_filenotfound(tmp_path): @@ -44,30 +36,20 @@ def test_results_dir_relative(tmp_path): targetfile = tmp_path / "test_targetlist" targetfile.write_text("stuff.com") - tl = TargetList( - target_file=str(targetfile), - results_dir=str((tmp_path / ".." / tmp_path / "recon-results")), - ) + tl = TargetList(target_file=str(targetfile), results_dir=str((tmp_path / ".." / tmp_path / "recon-results")),) out = tl.output() - assert out.path == str( - (tmp_path / "recon-results" / "target-results" / "domains").resolve() - ) + assert out.path == str((tmp_path / "recon-results" / "target-results" / "domains").resolve()) def test_results_dir_absolute(tmp_path): targetfile = tmp_path / "test_targetlist" targetfile.write_text("stuff.com") - tl = TargetList( - target_file=str(targetfile), - results_dir=str((tmp_path / "recon-results").resolve()), - ) + tl = TargetList(target_file=str(targetfile), results_dir=str((tmp_path / "recon-results").resolve()),) out = tl.output() - assert out.path == str( - (tmp_path / "recon-results" / "target-results" / "domains").resolve() - ) + assert out.path == str((tmp_path / "recon-results" / "target-results" / "domains").resolve()) def test_results_dir_empty(tmp_path): diff --git a/tests/test_web/__init__.py b/tests/test_web/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_web/test_targets.py b/tests/test_web/test_targets.py new file mode 100644 index 0000000..24904e7 --- /dev/null +++ b/tests/test_web/test_targets.py @@ -0,0 +1,144 @@ +import pickle +import ipaddress +from pathlib import Path + +import luigi + +from recon.config import web_ports +from recon.web.targets import GatherWebTargets + +test_dict = { + "10.10.10.161": { + "tcp": { + "135", + "139", + "3268", + "3269", + "389", + "445", + "464", + "47001", + "49664", + "49665", + "49666", + "49667", + "49671", + "49676", + "49677", + "49684", + "49703", + "49903", + "53", + "593", + "5985", + "636", + "88", + "9389", + "80", + "443", + "8080", + } + } +} + +tfp = "../data/bitdiscovery" +tf = Path(tfp).stem +el = "../data/blacklist" +rd = "../data/recon-results" + +input_dict = {"masscan-output": None, "amass-output": {}} + + +def test_webtargets_creates_webtargets_txt(tmp_path): + gwt = GatherWebTargets(target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100) + + mass_pickle = tmp_path / "masscan.parsed.pickle" + + pickle.dump(test_dict, mass_pickle.open("wb")) + + input_dict["masscan-output"] = luigi.LocalTarget(mass_pickle) + + gwt.input = lambda: input_dict + gwt.run() + + assert Path(gwt.output().path) == tmp_path / "target-results" / "webtargets.txt" + + +def test_webtargets_finds_all_web_targets_with_non_web_ports(tmp_path): + gwt = GatherWebTargets(target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100) + + mass_pickle = tmp_path / "masscan.parsed.pickle" + + pickle.dump(test_dict, mass_pickle.open("wb")) + + input_dict["masscan-output"] = luigi.LocalTarget(mass_pickle) + + gwt.input = lambda: input_dict + gwt.run() + + contents = (Path(gwt.output().path)).read_text() + + for line in contents.splitlines(): + if ":" in line: + assert line.split(":")[1] in web_ports + else: + assert line.strip() == "10.10.10.161" + + +def test_webtargets_finds_all_web_targets_with_multiple_targets(tmp_path): + gwt = GatherWebTargets(target_file=tf, exempt_list=el, results_dir=str(tmp_path), top_ports=100) + + mass_pickle = Path(__file__).parent.parent / "data" / "recon-results" / "masscan-results" / "masscan.parsed.pickle" + ipv4_addys = Path(__file__).parent.parent / "data" / "recon-results" / "target-results" / "ipv4_addresses" + sumdomains_prod = Path(__file__).parent.parent / "data" / "recon-results" / "target-results" / "subdomains" + ipv6_addys = Path(__file__).parent.parent / "data" / "recon-results" / "target-results" / "ipv6_addresses" + + input_dict["masscan-output"] = luigi.LocalTarget(mass_pickle) + input_dict["amass-output"] = { + "target-ips": luigi.LocalTarget(ipv4_addys), + "target-ip6s": luigi.LocalTarget(sumdomains_prod), + "target-subdomains": luigi.LocalTarget(ipv6_addys), + } + + gwt.input = lambda: input_dict + gwt.run() + + contents = (Path(gwt.output().path)).read_text() + + subdomains = [ + "blog.bitdiscovery.com", + "bitdiscovery.com", + "staging.bitdiscovery.com", + "tenable.bitdiscovery.com", + "ibm.bitdiscovery.com", + ] + + ips = [ + "13.225.54.22", + "13.57.162.100", + "52.53.92.161", + "104.20.61.51", + "54.183.32.157", + "104.20.60.51", + "13.225.54.58", + "13.225.54.41", + "52.9.23.177", + "13.225.54.100", + ] + + ip6s = ["2606:4700:10::6814:3c33", "2606:4700:10::6814:3d33"] + + for line in contents.splitlines(): + if "." in line and ":" in line: # ipv4 w/ port + tgt, port = line.split(":") + assert port in web_ports and tgt in ips + elif ":" in line: # ipv6 + assert line.strip() in ip6s + else: # domain or bare ip + try: + # bare ip + ipaddress.ip_interface(line.strip()) + assert line.strip() in ips + except ValueError: + # domain + assert line.strip() in subdomains diff --git a/tests/utils.py b/tests/utils.py index 3941a43..25e0507 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,11 +10,7 @@ def is_kali(): return any( [ "kali" in x - for x in subprocess.run( - "cat /etc/lsb-release".split(), stdout=subprocess.PIPE - ) - .stdout.decode() - .split() + for x in subprocess.run("cat /etc/lsb-release".split(), stdout=subprocess.PIPE).stdout.decode().split() ] )