From 68e5d2ec49b3d8462023b145cf33f505d3cff765 Mon Sep 17 00:00:00 2001 From: desaster Date: Sat, 14 Aug 2010 07:27:38 +0000 Subject: [PATCH] Rewrote the mysql code to use adbapi for non-blocking operation NOTE: schema changes are needed! (update5.sql) git-svn-id: https://kippo.googlecode.com/svn/trunk@162 951d7100-d841-11de-b865-b3884708a8e2 --- doc/sql/update5.sql | 7 +++ kippo/dblog/mysql.py | 134 +++++++++++++++++++++++++++---------------- 2 files changed, 92 insertions(+), 49 deletions(-) create mode 100644 doc/sql/update5.sql diff --git a/doc/sql/update5.sql b/doc/sql/update5.sql new file mode 100644 index 0000000..40ea911 --- /dev/null +++ b/doc/sql/update5.sql @@ -0,0 +1,7 @@ +/* For the asynchronous mysql code, change session to use a 32 character + * string instead of int(11) */ + +ALTER TABLE `auth` CHANGE `session` `session` CHAR( 32 ) NOT NULL ; +ALTER TABLE `input` CHANGE `session` `session` CHAR( 32 ) NOT NULL ; +ALTER TABLE `sessions` CHANGE `id` `id` CHAR( 32 ) NOT NULL ; +ALTER TABLE `ttylog` CHANGE `session` `session` CHAR( 32 ) NOT NULL ; diff --git a/kippo/dblog/mysql.py b/kippo/dblog/mysql.py index 9dcc51a..67d8153 100644 --- a/kippo/dblog/mysql.py +++ b/kippo/dblog/mysql.py @@ -1,107 +1,143 @@ from kippo.core import dblog -import MySQLdb +from twisted.enterprise import adbapi +from twisted.internet import defer +from twisted.python import log +import MySQLdb, uuid + +class ReconnectingConnectionPool(adbapi.ConnectionPool): + """Reconnecting adbapi connection pool for MySQL. + + This class improves on the solution posted at + http://www.gelens.org/2008/09/12/reinitializing-twisted-connectionpool/ + by checking exceptions by error code and only disconnecting the current + connection instead of all of them. + + Also see: + http://twistedmatrix.com/pipermail/twisted-python/2009-July/020007.html + + """ + def _runInteraction(self, interaction, *args, **kw): + try: + return adbapi.ConnectionPool._runInteraction( + self, interaction, *args, **kw) + except MySQLdb.OperationalError, e: + if e[0] not in (2006, 2013): + raise + log.msg("RCP: got error %s, retrying operation" %(e)) + conn = self.connections.get(self.threadID()) + self.disconnect(conn) + # try the interaction again + return adbapi.ConnectionPool._runInteraction( + self, interaction, *args, **kw) class DBLogger(dblog.DBLogger): def start(self, cfg): - self.db = MySQLdb.connect( + self.db = ReconnectingConnectionPool('MySQLdb', host = cfg.get('database', 'host'), db = cfg.get('database', 'database'), user = cfg.get('database', 'username'), passwd = cfg.get('database', 'password'), - reconnect = True) + cp_min = 1, + cp_max = 1) - def query(self, sql, params): - cursor = self.db.cursor() - try: - cursor.execute(sql, params) - return cursor - except MySQLdb.MySQLError, e: - print 'MySQL error:', e - return None + def sqlerror(self, error): + print 'SQL Error:', error.value + + def simpleQuery(self, sql, args): + """ Just run a deferred sql query, only care about errors """ + d = self.db.runQuery(sql, args) + d.addErrback(self.sqlerror) def createSession(self, peerIP, peerPort, hostIP, hostPort): - sensorid = self.getSensorID(self.getSensor() or hostIP) - cursor = self.query( - 'INSERT INTO `sessions` (`starttime`, `sensor`, `ip`)' + \ - ' VALUES (FROM_UNIXTIME(%s), %s, %s)', - (self.nowUnix(), sensorid, peerIP)) - if not cursor: - return None - return int(cursor.lastrowid) + sid = uuid.uuid1().hex + self.createSessionWhenever(sid, peerIP, hostIP) + return sid - def getSensorID(self, ip): - cursor = self.query( - 'SELECT `id` FROM `sensors` WHERE `ip` = %s', (ip,)) - if cursor.rowcount: - return cursor.fetchone()[0] - - cursor = self.query( - 'INSERT INTO `sensors` (`ip`) VALUES (%s)', (ip,)) - return cursor.lastrowid - - def getVersionID(self, version): - cursor = self.query( - 'SELECT `id` FROM `clients` WHERE `version` = %s', (version,)) - if cursor.rowcount: - return cursor.fetchone()[0] - - cursor = self.query( - 'INSERT INTO `clients` (`version`) VALUES (%s)', (version,)) - return cursor.lastrowid + # This is separate since we can't return with a value + @defer.inlineCallbacks + def createSessionWhenever(self, sid, peerIP, hostIP): + sensorname = self.getSensor() or hostIP + r = yield self.db.runQuery( + 'SELECT `id` FROM `sensors` WHERE `ip` = %s', (sensorname,)) + if r: + id = r[0][0] + else: + yield self.db.runQuery( + 'INSERT INTO `sensors` (`ip`) VALUES (%s)', (sensorname,)) + r = yield self.db.runQuery('SELECT LAST_INSERT_ID()') + id = int(r[0][0]) + # now that we have a sensorID, continue creating the session + self.simpleQuery( + 'INSERT INTO `sessions` (`id`, `starttime`, `sensor`, `ip`)' + \ + ' VALUES (%s, FROM_UNIXTIME(%s), %s, %s)', + (sid, self.nowUnix(), id, peerIP)) def handleConnectionLost(self, session, args): ttylog = self.ttylog(session) if ttylog: - self.query( + self.simpleQuery( 'INSERT INTO `ttylog` (`session`, `ttylog`) VALUES (%s, %s)', (session, self.ttylog(session))) - self.query('UPDATE `sessions` SET `endtime` = FROM_UNIXTIME(%s)' + \ + self.simpleQuery( + 'UPDATE `sessions` SET `endtime` = FROM_UNIXTIME(%s)' + \ ' WHERE `id` = %s', (self.nowUnix(), session)) def handleLoginFailed(self, session, args): - self.query('INSERT INTO `auth` (`session`, `success`' + \ + self.simpleQuery('INSERT INTO `auth` (`session`, `success`' + \ ', `username`, `password`, `timestamp`)' + \ ' VALUES (%s, %s, %s, %s, FROM_UNIXTIME(%s))', (session, 0, args['username'], args['password'], self.nowUnix())) def handleLoginSucceeded(self, session, args): - self.query('INSERT INTO `auth` (`session`, `success`' + \ + self.simpleQuery('INSERT INTO `auth` (`session`, `success`' + \ ', `username`, `password`, `timestamp`)' + \ ' VALUES (%s, %s, %s, %s, FROM_UNIXTIME(%s))', (session, 1, args['username'], args['password'], self.nowUnix())) def handleCommand(self, session, args): - self.query('INSERT INTO `input`' + \ + self.simpleQuery('INSERT INTO `input`' + \ ' (`session`, `timestamp`, `success`, `input`)' + \ ' VALUES (%s, FROM_UNIXTIME(%s), %s, %s)', (session, self.nowUnix(), 1, args['input'])) def handleUnknownCommand(self, session, args): - self.query('INSERT INTO `input`' + \ + self.simpleQuery('INSERT INTO `input`' + \ ' (`session`, `timestamp`, `success`, `input`)' + \ ' VALUES (%s, FROM_UNIXTIME(%s), %s, %s)', (session, self.nowUnix(), 0, args['input'])) def handleInput(self, session, args): - self.query('INSERT INTO `input`' + \ + self.simpleQuery('INSERT INTO `input`' + \ ' (`session`, `timestamp`, `realm`, `input`)' + \ ' VALUES (%s, FROM_UNIXTIME(%s), %s, %s)', (session, self.nowUnix(), args['realm'], args['input'])) def handleTerminalSize(self, session, args): - self.query('UPDATE `sessions` SET `termsize` = %s' + \ + self.simpleQuery('UPDATE `sessions` SET `termsize` = %s' + \ ' WHERE `id` = %s', ('%sx%s' % (args['width'], args['height']), session)) def handleTerminalTitle(self, session, args): - self.query('UPDATE `sessions` SET `termtitle` = %s' + \ + self.simpleQuery('UPDATE `sessions` SET `termtitle` = %s' + \ ' WHERE `id` = %s', (args['title'], session)) + @defer.inlineCallbacks def handleClientVersion(self, session, args): - cursor = self.query( + r = yield self.db.runQuery( + 'SELECT `id` FROM `clients` WHERE `version` = %s', \ + (args['version'],)) + if r: + id = int(r[0][0]) + else: + yield self.db.runQuery( + 'INSERT INTO `clients` (`version`) VALUES (%s)', \ + (args['version'],)) + r = yield self.db.runQuery('SELECT LAST_INSERT_ID()') + id = int(r[0][0]) + self.simpleQuery( 'UPDATE `sessions` SET `client` = %s WHERE `id` = %s', - (self.getVersionID(args['version']), session)) + (id, session)) # vim: set sw=4 et: