mirror of
https://github.com/aljazceru/recon-pipeline.git
synced 2025-12-20 15:54:25 +01:00
Completed store results in a database project (#32)
Co-authored-by: Ryan Good <usafaryangood@gmail.com> * added initial skeleton; restructured project directories * removed workers directive from luigi; changed input to tko-subs * changed masscan command to use config.tool_paths * linted __init__ files and updated docstring for get_scans * added per-file-ignores for linting * recon-pipeline linted * PoC working for amass results -> db; rudimentary db mgmt commands also * more linting * added database management commands to the shell * db_location passes through to all tasks; masscan results added to db * removed unused imports from masscan.py * added ParseNmapOutput class to handle parsing for database storage * cleaned up repeat code * searchsploit results stored in db * lint/format * gobuster scans now stored in database * fixed test_recon tests to use db_location * fixed web tests * tkosub entries recorded in db * subjack scan results stored in database * webanalyze results stored in db * refactored older commits to use newer helper functions * refactored older commits to use newer helper functions * aquatone results stored in database refactored a few scans to use dbmanager helper functions refactored db structure wrt headers/screenshots added 80/443 to web_ports in config.py * fixed a few queries and re-added webanalyze to FullScan * view targets/endpoints done * overhauled nmap parsing * print all nmap_results good, next to focus on filtering * complex nmap filters complete * nmap printing done * updated pipfile * view web-technologies complete * view searchsploit results complete * removed filesystem code from amass * targetlist moved to db only * targets,amass,masscan all cutover to full database; added view ports * nmap fully db compliant * aquatone and webtargets db compliant * gobuster uses db now * webanalyze db compliant * all scans except corscanner are db compliant * recon tests passing * web tests passing * linted files * added tests for helpers.py and parsers.py * refactored some redundant code * added tests to pre-commit * updated amass tests and pre-commit version * updated recon.targets tests * updated nmap tests * updated masscan tests * updated config tests * updated web targets tests * added gobuster tests * added aquatone tests * added subdomain takeover and webanalyze tests; updated test data * removed homegrown sqlite target in favor of the sqla implementation * added tests for recon-pipeline.py * fixed cluge function to set __package__ globally * updated amass tests * updated targets tests * updated nmap tests * updated masscan tests * updated aquatone tests * updated nmap tests to account for no searchsploit * updated nmap tests to account for no searchsploit * updated masscan tests * updated subjack/tkosub tests * updated web targets tests * updated webanalyze tests * added corscanner tests * linted DBManager a bit * fixed weird cyclic import issue that only happened during docs build; housekeeping * added models tests, removed test_install dir * updated docs a bit; sidenav is wonky * fixed readthedocs requirements.txt * fixed issue where view results werent populated directly after scan * added new tests to pipeline; working on docs * updated a few overlooked view command items * updated tests to reflect changes to shell * incremental push of docs update * documentation done * updated exploitdb install * updated exploitdb install * updated seclists install * parseamass updates db in the event of no amass output * removed corscanner * added pipenv shell to install instructions per @GreaterGoodest * added pipenv shell to install instructions per @GreaterGoodest * added check for chromium-browser during aquatone install; closes #26 * added check for old recon-tools dir; updated Path.resolve calls to Path.expanduser.resolve; fixed very specific import bug due to filesystem location * added CONTIBUTING.md; updated pre-commit hooks/README * added .gitattributes for linguist reporting * updated tests * fixed a few weird bugs found during test * updated README * updated asciinema links in README * updated README with view command video * updated other location for url scheme /status * add ability to specify single target using --target (#31) * updated a few items in docs and moved tool-dict to tools-dir * fixed issue where removing tempfile without --verbose caused scan to fail
This commit is contained in:
391
tests/test_shell/test_recon_pipeline_shell.py
Normal file
391
tests/test_shell/test_recon_pipeline_shell.py
Normal file
@@ -0,0 +1,391 @@
|
||||
import sys
|
||||
import time
|
||||
import shutil
|
||||
import importlib
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from pipeline.recon.config import defaults
|
||||
from pipeline.models.port_model import Port
|
||||
from pipeline.models.target_model import Target
|
||||
from pipeline.models.db_manager import DBManager
|
||||
from pipeline.models.ip_address_model import IPAddress
|
||||
|
||||
recon_shell = importlib.import_module("pipeline.recon-pipeline")
|
||||
existing_db = Path(__file__).parent.parent / "data" / "existing-database-test"
|
||||
|
||||
|
||||
class TestReconShell:
|
||||
luigi_logs = [
|
||||
(
|
||||
"INFO: Informed scheduler that task SearchsploitScan__home_epi__local_bl_eno1_7c290 has status DONE\n",
|
||||
"SearchsploitScan complete!",
|
||||
),
|
||||
(
|
||||
"INFO: Informed scheduler that task SearchsploitScan__home_epi__local_bl_eno1_7c290 has status PENDING\n",
|
||||
"SearchsploitScan queued",
|
||||
),
|
||||
("", ""),
|
||||
(
|
||||
"INFO: [pid 31387] Worker Worker(pid=31387) running FullScan(target_file=bitdiscovery\n",
|
||||
"FullScan running...",
|
||||
),
|
||||
("===== Luigi Execution Summary =====", "Luigi Execution Summary"),
|
||||
]
|
||||
|
||||
def setup_method(self):
|
||||
self.shell = recon_shell.ReconShell()
|
||||
self.shell.async_alert = print
|
||||
self.shell.poutput = print
|
||||
self.db_location = Path(__file__).parent.parent / "data" / "recon-results" / "updated-tests"
|
||||
self.realdb = DBManager(self.db_location)
|
||||
|
||||
def create_temp_target(self):
|
||||
tgt = Target(
|
||||
hostname="localhost",
|
||||
ip_addresses=[IPAddress(ipv4_address="127.0.0.1"), IPAddress(ipv6_address="::1")],
|
||||
open_ports=[Port(port_number=443, protocol="tcp"), Port(port_number=80, protocol="tcp")],
|
||||
)
|
||||
return tgt
|
||||
|
||||
@pytest.mark.parametrize("test_input", ["tools-dir", "database-dir"])
|
||||
def test_scan_creates_results_dir(self, test_input):
|
||||
assert Path(defaults.get(test_input)).exists()
|
||||
|
||||
def test_selector_thread_starts(self):
|
||||
self.shell._preloop_hook()
|
||||
assert self.shell.selectorloop.is_alive()
|
||||
|
||||
def test_selector_thread_stops(self):
|
||||
with patch("selectors.DefaultSelector.select"), patch("selectors.DefaultSelector.get_map"):
|
||||
self.shell._preloop_hook()
|
||||
assert self.shell.selectorloop.is_alive()
|
||||
time.sleep(0.5)
|
||||
self.shell._postloop_hook()
|
||||
time.sleep(1)
|
||||
assert self.shell.selectorloop.stopped()
|
||||
|
||||
@pytest.mark.parametrize("test_input", ["tools-dir\n", ""])
|
||||
def test_install_error_reporter(self, test_input, capsys):
|
||||
stderr = MagicMock()
|
||||
stderr.readline.return_value = test_input.encode()
|
||||
|
||||
self.shell._install_error_reporter(stderr)
|
||||
|
||||
if not test_input:
|
||||
assert not capsys.readouterr().out
|
||||
else:
|
||||
assert test_input.strip() in capsys.readouterr().out
|
||||
|
||||
@pytest.mark.parametrize("test_input, expected", luigi_logs)
|
||||
def test_luigi_pretty_printer(self, test_input, expected, capsys):
|
||||
stderr = MagicMock()
|
||||
stderr.readline.return_value = test_input.encode()
|
||||
self.shell._luigi_pretty_printer(stderr)
|
||||
if not test_input:
|
||||
assert not capsys.readouterr().out
|
||||
else:
|
||||
assert expected in capsys.readouterr().out
|
||||
|
||||
def test_do_scan_without_db(self, capsys):
|
||||
self.shell.do_scan(f"FullScan --target-file {__file__}")
|
||||
assert "You are not connected to a database" in capsys.readouterr().out
|
||||
|
||||
def test_get_databases(self):
|
||||
testdb = Path(defaults.get("database-dir")) / "testdb6"
|
||||
testdb.touch()
|
||||
assert testdb in list(recon_shell.ReconShell.get_databases())
|
||||
try:
|
||||
testdb.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def test_database_list_bad(self, capsys):
|
||||
def empty_gen():
|
||||
yield from ()
|
||||
|
||||
self.shell.get_databases = empty_gen
|
||||
|
||||
self.shell.database_list("")
|
||||
assert "There are no databases" in capsys.readouterr().out
|
||||
|
||||
def test_database_attach_new(self, capsys, tmp_path):
|
||||
testdb = Path(tmp_path) / "testdb1"
|
||||
shell = recon_shell.ReconShell()
|
||||
|
||||
shell.select = lambda x: "create new database"
|
||||
shell.read_input = lambda x: str(testdb)
|
||||
shell.database_attach("")
|
||||
time.sleep(1)
|
||||
assert "created database @" in capsys.readouterr().out
|
||||
try:
|
||||
testdb.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def test_database_attach_existing(self, capsys, tmp_path):
|
||||
testdb = Path(tmp_path) / "testdb2"
|
||||
shutil.copy(self.db_location, testdb)
|
||||
assert testdb.exists()
|
||||
shell = recon_shell.ReconShell()
|
||||
|
||||
shell.select = lambda x: str(testdb.expanduser().resolve())
|
||||
shell.get_databases = MagicMock(return_value=[str(testdb)])
|
||||
shell.database_attach("")
|
||||
time.sleep(1)
|
||||
assert "attached to sqlite database @" in capsys.readouterr().out
|
||||
try:
|
||||
testdb.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def test_database_detach_connected(self, capsys):
|
||||
self.shell.db_mgr = MagicMock()
|
||||
self.shell.db_mgr.location = "stuff"
|
||||
self.shell.database_detach("")
|
||||
assert "detached from sqlite database @" in capsys.readouterr().out
|
||||
|
||||
def test_database_detach_not_connected(self, capsys):
|
||||
self.shell.database_detach("")
|
||||
assert "you are not connected to a database" in capsys.readouterr().out
|
||||
|
||||
def test_database_delete_without_index(self, capsys, tmp_path):
|
||||
testdb = Path(tmp_path) / "testdb3"
|
||||
testdb.touch()
|
||||
self.shell.select = lambda x: str(testdb.expanduser().resolve())
|
||||
self.shell.get_databases = MagicMock(return_value=[str(testdb)])
|
||||
self.shell.database_delete("")
|
||||
try:
|
||||
assert not testdb.exists()
|
||||
except AssertionError:
|
||||
try:
|
||||
testdb.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
raise AssertionError
|
||||
assert "[+] deleted sqlite database" in capsys.readouterr().out
|
||||
|
||||
def test_database_delete_with_index(self, capsys):
|
||||
testdb = Path(defaults.get("database-dir")) / "testdb4"
|
||||
testdb.touch()
|
||||
shell = recon_shell.ReconShell()
|
||||
shell.select = lambda x: str(testdb.expanduser().resolve())
|
||||
shell.prompt = f"[db-1] {recon_shell.DEFAULT_PROMPT}> "
|
||||
shell.db_mgr = MagicMock()
|
||||
shell.db_mgr.location = "stuff"
|
||||
shell.get_databases = MagicMock(return_value=[str(testdb)])
|
||||
shell.database_delete("")
|
||||
try:
|
||||
assert not testdb.exists()
|
||||
except AssertionError:
|
||||
try:
|
||||
testdb.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
raise AssertionError
|
||||
out = capsys.readouterr().out
|
||||
assert "[+] deleted sqlite database" in out
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_input, expected",
|
||||
[
|
||||
(None, "you are not connected to a database"),
|
||||
("", "View results of completed scans"),
|
||||
("ports", "blog.bitdiscovery.com: 443,80"),
|
||||
("ports --paged", "blog.bitdiscovery.com: 443,80"),
|
||||
("ports --host assetinventory.bugcrowd.com", "assetinventory.bugcrowd.com: 8443,8080,443,80"),
|
||||
("ports --host assetinventory.bugcrowd.com --paged", "assetinventory.bugcrowd.com: 8443,8080,443,80"),
|
||||
("ports --port-number 8443", "assetinventory.bugcrowd.com: 8443,8080,443,80"),
|
||||
("endpoints", " https://ibm.bitdiscovery.com/user\n[\x1b[91m403\x1b[39m] https://52.8.186.88/ADMIN"),
|
||||
("endpoints --host 52.8.186.88", "[\x1b[32m200\x1b[39m] https://52.8.186.88/favicon.ico"),
|
||||
("endpoints --host 52.8.186.88 --status-code 200", "[\x1b[32m200\x1b[39m] https://52.8.186.88/favicon.ico"),
|
||||
(
|
||||
"endpoints --host 52.8.186.88 --status-code 200 --paged",
|
||||
"[\x1b[32m200\x1b[39m] https://52.8.186.88/favicon.ico",
|
||||
),
|
||||
("endpoints --host 52.8.186.88 --status-code 200 --paged --plain", "https://52.8.186.88/favicon.ico"),
|
||||
(
|
||||
"endpoints --host 52.8.186.88 --status-code 200 --paged --plain --headers",
|
||||
"http://52.8.186.88/\n Access-Control-Allow-Headers: X-Requested-With",
|
||||
),
|
||||
(
|
||||
"endpoints --host 52.8.186.88 --status-code 200 --paged --headers",
|
||||
"[\x1b[32m200\x1b[39m] https://52.8.186.88/\n\x1b[36m Content-Security-Policy:\x1b[39m default-src 'self' https: 'unsafe-inline' 'unsafe-eval' data: client-api.arkoselabs.com",
|
||||
),
|
||||
(
|
||||
"nmap-scans",
|
||||
"52.8.163.50 - http\n==================\n\ntcp port: 443 - open - syn-ack\nproduct: nginx :: 1.16.1\nnse script(s) output:\n http-server-header",
|
||||
),
|
||||
(
|
||||
"nmap-scans --commandline",
|
||||
"nmap --open -sT -n -sC -T 4 -sV -Pn -p 8443,8080,443,80 -oA /home/epi/PycharmProjects/recon-pipeline/tests/data/updated-tests/nmap-results/nmap.104.20.60.51-tcp 104.20.60.51",
|
||||
),
|
||||
(
|
||||
"nmap-scans --host synopsys.bitdiscovery.com",
|
||||
"52.53.89.219 - http\n===================\n\ntcp port: 443 - open - syn-ack",
|
||||
),
|
||||
(
|
||||
"nmap-scans --port 443 --product nginx",
|
||||
"52.8.163.50 - http\n==================\n\ntcp port: 443 - open - syn-ack\nproduct: nginx :: 1.16.1",
|
||||
),
|
||||
("nmap-scans --port 443 --nse-script http-title", "http-title\n bugcrowd - Asset Inventory"),
|
||||
(
|
||||
"nmap-scans --host synopsys.bitdiscovery.com",
|
||||
"52.53.89.219 - http\n===================\n\ntcp port: 443 - open - syn-ack",
|
||||
),
|
||||
(
|
||||
"searchsploit-results",
|
||||
"==================================================================================",
|
||||
),
|
||||
("searchsploit-results --type webapps", "webapps | 37450.txt| Amazon S3"),
|
||||
("searchsploit-results --host synopsys.bitdiscovery.com --fullpath", "exploits/linux/local/40768.sh"),
|
||||
("targets", "email.assetinventory.bugcrowd.com"),
|
||||
("targets --vuln-to-subdomain-takeover --paged", ""),
|
||||
("targets --type ipv4 --paged", "13.226.182.120"),
|
||||
("targets --type ipv6", "2606:4700:10::6814:3c33"),
|
||||
("targets --type domain-name --paged", "email.assetinventory.bugcrowd.com"),
|
||||
("web-technologies", "CloudFlare (CDN)"),
|
||||
("web-technologies --host blog.bitdiscovery.com --type CDN", "Amazon Cloudfront (CDN)"),
|
||||
("web-technologies --host blog.bitdiscovery.com --product WordPress", "WordPress (CMS,Blogs)"),
|
||||
("web-technologies --product WordPress", "13.226.191.61"),
|
||||
("web-technologies --type Miscellaneous", "13.226.191.85"),
|
||||
],
|
||||
)
|
||||
def test_do_view_with_real_database(self, test_input, expected, capsys):
|
||||
if test_input is None:
|
||||
self.shell.do_view("")
|
||||
assert expected in capsys.readouterr().out
|
||||
else:
|
||||
self.shell.db_mgr = self.realdb
|
||||
self.shell.add_dynamic_parser_arguments()
|
||||
|
||||
self.shell.do_view(test_input)
|
||||
assert expected in capsys.readouterr().out
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_input, expected",
|
||||
[
|
||||
(None, "Manage database connections (list/attach/detach/delete)"),
|
||||
("list", "View results of completed scans"),
|
||||
],
|
||||
)
|
||||
def test_do_database(self, test_input, expected, capsys):
|
||||
if test_input is None:
|
||||
self.shell.do_database("")
|
||||
assert expected in capsys.readouterr().out
|
||||
else:
|
||||
testdb = Path(defaults.get("database-dir")) / "testdb5"
|
||||
testdb.touch()
|
||||
self.shell.do_database(test_input)
|
||||
assert str(testdb) in capsys.readouterr().out
|
||||
try:
|
||||
testdb.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
@patch("webbrowser.open", autospec=True)
|
||||
def test_do_status(self, mock_browser):
|
||||
self.shell.do_status("--host 127.0.0.1 --port 1111")
|
||||
assert mock_browser.called
|
||||
|
||||
# ("all", "commands failed and may have not installed properly", 1)
|
||||
# after tools moved to DB, update this test
|
||||
@pytest.mark.parametrize("test_input, expected, return_code", [("all", "is already installed", 0)])
|
||||
def test_do_install(self, test_input, expected, return_code, capsys, tmp_path, monkeypatch):
|
||||
process_mock = MagicMock()
|
||||
attrs = {"communicate.return_value": (b"output", b"error"), "returncode": return_code}
|
||||
process_mock.configure_mock(**attrs)
|
||||
|
||||
def mockreturn():
|
||||
return tmp_path
|
||||
|
||||
monkeypatch.setattr(Path, "home", mockreturn)
|
||||
|
||||
with patch("subprocess.Popen", autospec=True) as mocked_popen:
|
||||
mocked_popen.return_value = process_mock
|
||||
self.shell.do_install(test_input)
|
||||
out = capsys.readouterr().out
|
||||
assert mocked_popen.called or expected in out
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_input, expected, db_mgr",
|
||||
[
|
||||
(
|
||||
"FullScan --target-file required",
|
||||
"You are not connected to a database; run database attach before scanning",
|
||||
None,
|
||||
),
|
||||
(
|
||||
"FullScan --target-file required --sausage --verbose",
|
||||
"If anything goes wrong, rerun your command with --verbose",
|
||||
True,
|
||||
),
|
||||
("FullScan --target-file required ", "If anything goes wrong, rerun your command with --verbose", True),
|
||||
("FullScan --target required ", "If anything goes wrong, rerun your command with --verbose", True),
|
||||
],
|
||||
)
|
||||
def test_do_scan(self, test_input, expected, db_mgr, capsys, tmp_path):
|
||||
process_mock = MagicMock()
|
||||
attrs = {"communicate.return_value": (b"output", b"error"), "returncode": 0}
|
||||
process_mock.configure_mock(**attrs)
|
||||
|
||||
with patch("subprocess.run", autospec=True) as mocked_popen, patch(
|
||||
"webbrowser.open", autospec=True
|
||||
) as mocked_web, patch("selectors.DefaultSelector.register", autospec=True) as mocked_selector:
|
||||
mocked_popen.return_value = process_mock
|
||||
if db_mgr is None:
|
||||
self.shell.do_scan(test_input)
|
||||
assert expected in capsys.readouterr().out
|
||||
else:
|
||||
self.shell.db_mgr = MagicMock()
|
||||
self.shell.db_mgr.location = tmp_path / "stuff"
|
||||
self.shell.do_scan(test_input)
|
||||
if "--sausage" in test_input:
|
||||
assert mocked_web.called
|
||||
if "--verbose" not in test_input:
|
||||
assert mocked_selector.called
|
||||
|
||||
def test_cluge_package_imports(self):
|
||||
pathlen = len(sys.path)
|
||||
recon_shell.cluge_package_imports(name="__main__", package=None)
|
||||
assert len(sys.path) > pathlen
|
||||
|
||||
def test_main(self):
|
||||
with patch("cmd2.Cmd.cmdloop") as mocked_loop, patch("sys.exit"), patch("cmd2.Cmd.select") as mocked_select:
|
||||
mocked_select.return_value = "No"
|
||||
recon_shell.main(name="__main__")
|
||||
assert mocked_loop.called
|
||||
|
||||
@pytest.mark.parametrize("test_input", ["Yes", "No"])
|
||||
def test_remove_old_recon_tools(self, test_input, tmp_path):
|
||||
tooldict = tmp_path / ".tool-dict.pkl"
|
||||
tooldir = tmp_path / ".recon-tools"
|
||||
searchsploit_rc = tmp_path / ".searchsploit_rc"
|
||||
|
||||
tooldict.touch()
|
||||
assert tooldict.exists()
|
||||
|
||||
searchsploit_rc.touch()
|
||||
assert searchsploit_rc.exists()
|
||||
|
||||
tooldir.mkdir()
|
||||
assert tooldir.exists()
|
||||
|
||||
subfile = tooldir / "subfile"
|
||||
|
||||
subfile.touch()
|
||||
assert subfile.exists()
|
||||
|
||||
with patch("cmd2.Cmd.cmdloop"), patch("sys.exit"), patch("cmd2.Cmd.select") as mocked_select:
|
||||
mocked_select.return_value = test_input
|
||||
recon_shell.main(
|
||||
name="__main__", old_tools_dir=tooldir, old_tools_dict=tooldict, old_searchsploit_rc=searchsploit_rc
|
||||
)
|
||||
|
||||
for file in [subfile, tooldir, tooldict, searchsploit_rc]:
|
||||
if test_input == "Yes":
|
||||
assert not file.exists()
|
||||
else:
|
||||
assert file.exists()
|
||||
Reference in New Issue
Block a user