Fix #230: jsonlog threading corruption (#283)

This commit is contained in:
Derek Abdine
2016-10-10 05:21:40 -07:00
committed by Michel Oosterhof
parent 05283b60c3
commit aafef68e39

View File

@@ -32,7 +32,10 @@ Docstring
import json
import os
import Queue
import threading
from twisted.python import log
import twisted.python.logfile
import cowrie.core.output
@@ -47,8 +50,21 @@ class Output(cowrie.core.output.Output):
fn = cfg.get('output_jsonlog', 'logfile')
dirs = os.path.dirname(fn)
base = os.path.basename(fn)
# create the log queue with a default buffer size if none is specified in
# the log file.
buffer_size = 10000
if cfg.has_option('output_jsonlog', 'buffer_size'):
buffer_size = int(cfg.get('output_jsonlog', 'buffer_size'))
self._log_writer_queue = Queue.Queue(maxsize=buffer_size)
# allocate the output file
self.outfile = twisted.python.logfile.DailyLogFile(base, dirs, defaultMode=0o664)
# start the log writer thread
self._log_writer_thread = threading.Thread(target=self._write_log)
self._log_writer_thread.daemon = True
self._log_writer_thread.start()
def start(self):
"""
@@ -59,6 +75,7 @@ class Output(cowrie.core.output.Output):
def stop(self):
"""
"""
self._log_queue.join()
self.outfile.flush()
@@ -69,7 +86,26 @@ class Output(cowrie.core.output.Output):
# Remove twisted 15 legacy keys
if i.startswith('log_'):
del logentry[i]
json.dump(logentry, self.outfile)
self.outfile.write('\n')
self.outfile.flush()
# TODO: There's a possibility that the queue is full when we do this put, which means
# we'll lose the log item. We specifically use put_nowait so in that case it doesn't
# block the main writer thread.
try:
self._log_writer_queue.put_nowait(json.dumps(logentry))
except Queue.Full:
log.err('Could not queue jsonlog item. Consider increasing buffer_size in [output_jsonlog] of your cowrie configuration')
def _write_log(self):
# there's a probability of hitting IO errors while attempting to write
# for various reasons (for example, the disk is full). So, regardless
# of what happens during the write, we always mark the queue item as done
# so self.stop() can properly join on any remaining items.
while True:
item = self._log_writer_queue.get()
try:
self.outfile.write(item)
self.outfile.write('\n')
self.outfile.flush()
finally:
self._log_writer_queue.task_done()