diff --git a/cowrie/output/jsonlog.py b/cowrie/output/jsonlog.py index 50d287b..0edff7b 100644 --- a/cowrie/output/jsonlog.py +++ b/cowrie/output/jsonlog.py @@ -32,7 +32,10 @@ Docstring import json import os +import Queue +import threading +from twisted.python import log import twisted.python.logfile import cowrie.core.output @@ -47,8 +50,21 @@ class Output(cowrie.core.output.Output): fn = cfg.get('output_jsonlog', 'logfile') dirs = os.path.dirname(fn) base = os.path.basename(fn) + + # create the log queue with a default buffer size if none is specified in + # the log file. + buffer_size = 10000 + if cfg.has_option('output_jsonlog', 'buffer_size'): + buffer_size = int(cfg.get('output_jsonlog', 'buffer_size')) + self._log_writer_queue = Queue.Queue(maxsize=buffer_size) + + # allocate the output file self.outfile = twisted.python.logfile.DailyLogFile(base, dirs, defaultMode=0o664) + # start the log writer thread + self._log_writer_thread = threading.Thread(target=self._write_log) + self._log_writer_thread.daemon = True + self._log_writer_thread.start() def start(self): """ @@ -59,6 +75,7 @@ class Output(cowrie.core.output.Output): def stop(self): """ """ + self._log_queue.join() self.outfile.flush() @@ -69,7 +86,26 @@ class Output(cowrie.core.output.Output): # Remove twisted 15 legacy keys if i.startswith('log_'): del logentry[i] - json.dump(logentry, self.outfile) - self.outfile.write('\n') - self.outfile.flush() + + # TODO: There's a possibility that the queue is full when we do this put, which means + # we'll lose the log item. We specifically use put_nowait so in that case it doesn't + # block the main writer thread. + try: + self._log_writer_queue.put_nowait(json.dumps(logentry)) + except Queue.Full: + log.err('Could not queue jsonlog item. Consider increasing buffer_size in [output_jsonlog] of your cowrie configuration') + + def _write_log(self): + # there's a probability of hitting IO errors while attempting to write + # for various reasons (for example, the disk is full). So, regardless + # of what happens during the write, we always mark the queue item as done + # so self.stop() can properly join on any remaining items. + while True: + item = self._log_writer_queue.get() + try: + self.outfile.write(item) + self.outfile.write('\n') + self.outfile.flush() + finally: + self._log_writer_queue.task_done()