added ParseMasscanOutput class; stage-2 complete

This commit is contained in:
epi052
2019-09-12 09:08:34 -05:00
parent 88d41a3047
commit 4738c51bb0

View File

@@ -1,4 +1,7 @@
import json
import pickle
import logging
from collections import defaultdict
import luigi
from luigi.util import inherits
@@ -26,6 +29,7 @@ class Masscan(ExternalProgramTask):
interface: use the named raw network interface, such as "eth0"
top_ports: Scan top N most popular ports
ports: specifies the port(s) to be scanned
target_file: specifies the file on disk containing a list of ips or domains *--* Required by upstream Task
"""
rate = luigi.Parameter(default=masscan_config.get("rate"))
@@ -104,3 +108,81 @@ class Masscan(ExternalProgramTask):
]
return command
@inherits(Masscan)
class ParseMasscanOutput(luigi.Task):
""" Read masscan JSON results and create a pickled dictionary of pertinent information for processing.
Args:
top_ports: Scan top N most popular ports *--* Required by upstream Task
ports: specifies the port(s) to be scanned *--* Required by upstream Task
interface: use the named raw network interface, such as "eth0" *--* Required by upstream Task
rate: desired rate for transmitting packets (packets per second) *--* Required by upstream Task
target_file: specifies the file on disk containing a list of ips or domains *--* Required by upstream Task
"""
def requires(self):
""" ParseMasscanOutput depends on Masscan to run.
Masscan expects rate, target_file, interface, and either ports or top_ports as parameters.
Returns:
luigi.Task - Masscan
"""
args = {
"rate": self.rate,
"target_file": self.target_file,
"top_ports": self.top_ports,
"interface": self.interface,
"ports": self.ports,
}
return Masscan(**args)
def output(self):
""" Returns the target output for this task.
Naming convention for the output file is masscan.TARGET_FILE.parsed.pickle.
Returns:
luigi.local_target.LocalTarget
"""
return luigi.LocalTarget(f"masscan.{self.target_file}.parsed.pickle")
def run(self):
""" Reads masscan JSON results and creates a pickled dictionary of pertinent information for processing. """
ip_dict = defaultdict(lambda: defaultdict(set)) # nested defaultdict
try:
entries = json.load(self.input().open()) # load masscan results from Masscan Task
except json.decoder.JSONDecodeError as e:
# return on exception; no output file created; pipeline should start again from
# this task if restarted because we never hit pickle.dump
return print(e)
"""
build out ip_dictionary from the loaded JSON
masscan JSON structure over which we're looping
[
{ "ip": "10.10.10.146", "timestamp": "1567856130", "ports": [ {"port": 22, "proto": "tcp", "status": "open", "reason": "syn-ack", "ttl": 63} ] }
,
{ "ip": "10.10.10.146", "timestamp": "1567856130", "ports": [ {"port": 80, "proto": "tcp", "status": "open", "reason": "syn-ack", "ttl": 63} ] }
]
ip_dictionary structure that is built out from each JSON entry
{
"IP_ADDRESS":
{'udp': {"161", "5000", ... },
...
i.e. {protocol: set(ports) }
}
"""
for entry in entries:
single_target_ip = entry.get("ip")
for port_entry in entry.get("ports"):
protocol = port_entry.get("proto")
ip_dict[single_target_ip][protocol].add(str(port_entry.get("port")))
with open(self.output().path, "wb") as f:
pickle.dump(dict(ip_dict), f)