separate out auth.py and checkers.py

This commit is contained in:
Michel Oosterhof
2015-09-28 18:41:59 +00:00
parent c0d9652a41
commit dbf45b4d31
3 changed files with 118 additions and 106 deletions

View File

@@ -9,16 +9,7 @@ from random import randint
from zope.interface import implements from zope.interface import implements
from twisted.cred.checkers import ICredentialsChecker from twisted.python import log
from twisted.cred.credentials import ISSHPrivateKey
from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
from twisted.internet import defer
from twisted.python import log, failure
from twisted.conch import error
from twisted.conch.ssh import keys
import credentials
# by Walter de Jong <walter@sara.nl> # by Walter de Jong <walter@sara.nl>
class UserDB(object): class UserDB(object):
@@ -240,97 +231,4 @@ class AuthRandom(object):
self.savevars() self.savevars()
return auth return auth
class HoneypotPublicKeyChecker:
implements(ICredentialsChecker)
"""
Checker that accepts, logs and denies public key authentication attempts
"""
credentialInterfaces = (ISSHPrivateKey,)
def __init__(self, cfg):
pass
def requestAvatarId(self, credentials):
_pubKey = keys.Key.fromString(credentials.blob)
log.msg(format='public key attempt for user %(username)s with fingerprint %(fingerprint)s',
username=credentials.username,
fingerprint=_pubKey.fingerprint())
return failure.Failure(error.ConchError('Incorrect signature'))
class HoneypotNoneChecker:
implements(ICredentialsChecker)
"""
Checker that does no authentication check
"""
credentialInterfaces = (credentials.IUsername,)
def __init__(self):
pass
def requestAvatarId(self, credentials):
return defer.succeed(credentials.username)
class HoneypotPasswordChecker:
implements(ICredentialsChecker)
"""
Checker that accepts "keyboard-interactive" and "password"
"""
credentialInterfaces = (credentials.IUsernamePasswordIP,
credentials.IPluggableAuthenticationModulesIP)
def __init__(self, cfg):
self.cfg = cfg
def requestAvatarId(self, credentials):
if hasattr(credentials, 'password'):
if self.checkUserPass(credentials.username, credentials.password,
credentials.ip):
return defer.succeed(credentials.username)
else:
return defer.fail(UnauthorizedLogin())
elif hasattr(credentials, 'pamConversion'):
return self.checkPamUser(credentials.username,
credentials.pamConversion, credentials.ip)
return defer.fail(UnhandledCredentials())
def checkPamUser(self, username, pamConversion, ip):
r = pamConversion((('Password:', 1),))
return r.addCallback(self.cbCheckPamUser, username, ip)
def cbCheckPamUser(self, responses, username, ip):
for (response, zero) in responses:
if self.checkUserPass(username, response, ip):
return defer.succeed(username)
return defer.fail(UnauthorizedLogin())
def checkUserPass(self, theusername, thepassword, ip):
# UserDB is the default auth_class
authname = UserDB
# Is the auth_class defined in the config file?
if self.cfg.has_option('honeypot', 'auth_class'):
authclass = self.cfg.get('honeypot', 'auth_class')
# Check if authclass exists in this module
if hasattr(modules[__name__], authclass):
authname = getattr(modules[__name__], authclass)
else:
log.msg('auth_class: %s not found in %s' % (authclass, __name__))
theauth = authname(self.cfg)
if theauth.checklogin(theusername, thepassword, ip):
log.msg(eventid='KIPP0002',
format='login attempt [%(username)s/%(password)s] succeeded',
username=theusername, password=thepassword)
return True
else:
log.msg(eventid='KIPP0003',
format='login attempt [%(username)s/%(password)s] failed',
username=theusername, password=thepassword)
return False
# vim: set sw=4 et: # vim: set sw=4 et:

113
cowrie/core/checkers.py Normal file
View File

@@ -0,0 +1,113 @@
# Copyright (c) 2009-2014 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
from sys import modules
from zope.interface import implements
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.credentials import ISSHPrivateKey
from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
from twisted.internet import defer
from twisted.python import log, failure
from twisted.conch import error
from twisted.conch.ssh import keys
import credentials
import auth
class HoneypotPublicKeyChecker:
implements(ICredentialsChecker)
"""
Checker that accepts, logs and denies public key authentication attempts
"""
credentialInterfaces = (ISSHPrivateKey,)
def __init__(self, cfg):
pass
def requestAvatarId(self, credentials):
_pubKey = keys.Key.fromString(credentials.blob)
log.msg(format='public key attempt for user %(username)s with fingerprint %(fingerprint)s',
username=credentials.username,
fingerprint=_pubKey.fingerprint())
return failure.Failure(error.ConchError('Incorrect signature'))
class HoneypotNoneChecker:
implements(ICredentialsChecker)
"""
Checker that does no authentication check
"""
credentialInterfaces = (credentials.IUsername,)
def __init__(self):
pass
def requestAvatarId(self, credentials):
return defer.succeed(credentials.username)
class HoneypotPasswordChecker:
implements(ICredentialsChecker)
"""
Checker that accepts "keyboard-interactive" and "password"
"""
credentialInterfaces = (credentials.IUsernamePasswordIP,
credentials.IPluggableAuthenticationModulesIP)
def __init__(self, cfg):
self.cfg = cfg
def requestAvatarId(self, credentials):
if hasattr(credentials, 'password'):
if self.checkUserPass(credentials.username, credentials.password,
credentials.ip):
return defer.succeed(credentials.username)
else:
return defer.fail(UnauthorizedLogin())
elif hasattr(credentials, 'pamConversion'):
return self.checkPamUser(credentials.username,
credentials.pamConversion, credentials.ip)
return defer.fail(UnhandledCredentials())
def checkPamUser(self, username, pamConversion, ip):
r = pamConversion((('Password:', 1),))
return r.addCallback(self.cbCheckPamUser, username, ip)
def cbCheckPamUser(self, responses, username, ip):
for (response, zero) in responses:
if self.checkUserPass(username, response, ip):
return defer.succeed(username)
return defer.fail(UnauthorizedLogin())
def checkUserPass(self, theusername, thepassword, ip):
# UserDB is the default auth_class
authname = auth.UserDB
# Is the auth_class defined in the config file?
if self.cfg.has_option('honeypot', 'auth_class'):
authclass = self.cfg.get('honeypot', 'auth_class')
authmodule = "cowrie.core.auth"
# Check if authclass exists in this module
if hasattr(modules[authmodule], authclass):
authname = getattr(modules[authmodule], authclass)
else:
log.msg('auth_class: %s not found in %s' % (authclass, authmodule))
theauth = authname(self.cfg)
if theauth.checklogin(theusername, thepassword, ip):
log.msg(eventid='KIPP0002',
format='login attempt [%(username)s/%(password)s] succeeded',
username=theusername, password=thepassword)
return True
else:
log.msg(eventid='KIPP0003',
format='login attempt [%(username)s/%(password)s] failed',
username=theusername, password=thepassword)
return False
# vim: set sw=4 et:

View File

@@ -12,6 +12,7 @@ from twisted.cred import portal
from cowrie.core.config import readConfigFile from cowrie.core.config import readConfigFile
from cowrie import core from cowrie import core
import cowrie.core.ssh import cowrie.core.ssh
import cowrie.core.checkers
class Options(usage.Options): class Options(usage.Options):
optParameters = [ optParameters = [
@@ -51,13 +52,13 @@ class CowrieServiceMaker(object):
factory = core.ssh.HoneyPotSSHFactory(cfg) factory = core.ssh.HoneyPotSSHFactory(cfg)
factory.portal = portal.Portal(core.ssh.HoneyPotRealm(cfg)) factory.portal = portal.Portal(core.ssh.HoneyPotRealm(cfg))
factory.portal.registerChecker(core.auth.HoneypotPublicKeyChecker(cfg)) factory.portal.registerChecker(cowrie.core.checkers.HoneypotPublicKeyChecker(cfg))
factory.portal.registerChecker(core.auth.HoneypotPasswordChecker(cfg)) factory.portal.registerChecker(cowrie.core.checkers.HoneypotPasswordChecker(cfg))
if cfg.has_option('honeypot', 'auth_none_enabled') and \ if cfg.has_option('honeypot', 'auth_none_enabled') and \
cfg.get('honeypot', 'auth_none_enabled').lower() in \ cfg.get('honeypot', 'auth_none_enabled').lower() in \
('yes', 'true', 'on'): ('yes', 'true', 'on'):
factory.portal.registerChecker(core.auth.HoneypotNoneChecker()) factory.portal.registerChecker(cowrie.core.checkers.HoneypotNoneChecker())
top_service = top_service = service.MultiService() top_service = top_service = service.MultiService()