"""
This module serves to connect to and Ldap server.
It works in principle for ESRF, Soleil Proxima and MAXIV beamlines
"""
import ldap
from mxcubecore.HardwareObjects.abstract.AbstractAuthenticator import (
AbstractAuthenticator,
)
"""
ldapou is optional, if ldapou is not defined,
the bind_str (simple_bind) will be "uid=xxx,dc=xx,dc=xx",
otherwise it is uid=xxx,ou=xxx,dc=xx,dc=xx
<procedure class="LdapAuthenticator">
<ldaphost>ldaphost.mydomain</ldaphost>
<ldapport>389</ldapport>
<ldapdomain>example.com</ldapdomain>
<ldapou>users</ldapou>
</procedure>
"""
[docs]class LdapAuthenticator(AbstractAuthenticator):
def __init__(self, name):
super().__init__(name)
self._ldapConnection = None
# Initializes the hardware object
[docs] def init(self):
self._field_values = None
self._connect()
def _connect(self):
ldaphost = self.get_property("ldaphost")
ldapport = self.get_property("ldapport")
domain = self.get_property("ldapdomain")
if ldaphost is None:
self.log.error("LdapAuthenticator: you must specify the LDAP hostname")
else:
if ldapport is None:
self.log.debug(
"LdapAuthenticator: connecting to LDAP server %s", ldaphost
)
self._ldapConnection = ldap.initialize("ldap://" + ldaphost)
else:
self.log.debug(
"LdapAuthenticator: connecting to LDAP server %s:%s",
ldaphost,
ldapport,
)
self._ldapConnection = ldap.initialize(
"ldap://%s:%s" % (ldaphost, int(ldapport))
)
self.log.debug(
"LdapAuthenticator: got connection %s" % str(self._ldapConnection)
)
if domain is not None:
domparts = domain.split(".")
domstr = ""
comma = ""
for part in domparts:
domstr += "%sdc=%s" % (comma, part)
comma = ","
self.domstr = domstr
self.log.debug(
"LdapAuthenticator: got connection %s" % str(self._ldapConnection)
)
else:
self.domstr = "dc=esrf,dc=fr" # default is esrf.fr
# Creates a new connection to LDAP if there's an exception on the current connection
def _reconnect(self):
if self._ldapConnection is not None:
try:
self._ldapConnection.result(timeout=0)
except ldap.LDAPError:
ldaphost = self.get_property("ldaphost")
ldapport = self.get_property("ldapport")
if ldapport is None:
self.log.debug(
"LdapAuthenticator: reconnecting to LDAP server %s", ldaphost
)
self._connect()
else:
self.log.debug(
"LdapAuthenticator: reconnecting to LDAP server %s:%s",
ldaphost,
ldapport,
)
self._connect()
def _cleanup(self, ex=None, msg=None):
if ex is not None:
try:
msg = ex[0]["desc"]
except (IndexError, KeyError, ValueError, TypeError):
msg = "generic LDAP error"
self._reconnect()
self.log.info("LdapAuthenticator: %s" % msg)
return False
def get_field_values(self):
return self._field_values
[docs] def invalidate(self):
pass
[docs] def authenticate(self, username, password, retry=True, fields=None):
# fields can be used in local implementation to retrieve user information from
# ldap. In ALBA for example, it is used to obtain homeDirectory upon successful
# login and use that value for programming session hwo directories
self._field_values = None
if self._ldapConnection is None:
return self._cleanup(msg="no LDAP server configured")
self.log.debug(
"LdapAuthenticator: searching for %s / %s" % (username, self.domstr)
)
try:
search_str = self.domstr
if fields is None:
found = self._ldapConnection.search_s(
search_str, ldap.SCOPE_SUBTREE, "uid=" + username, ["uid"]
)
else:
found = self._ldapConnection.search_s(
search_str, ldap.SCOPE_SUBTREE, "uid=" + username, fields
)
except ldap.LDAPError as err:
if retry:
self._cleanup(ex=err)
return self.authenticate(username, password, retry=False)
else:
return self._cleanup(ex=err)
if not found:
return self._cleanup(msg="unknown proposal %s" % username)
if fields is not None:
self._field_values = found[0][1]
if password == "":
return self._cleanup(msg="invalid password for %s" % username)
self.log.debug("LdapAuthenticator: validating %s" % username)
try:
bind_str = "uid=%s, ou=%s, %s" % (username, self.ldapou, self.domstr)
except AttributeError:
bind_str = "uid=%s,%s" % (username, self.domstr)
self.log.debug("LdapAuthenticator: binding to %s" % bind_str)
handle = self._ldapConnection.simple_bind(bind_str, password)
try:
self._ldapConnection.result(handle)
except ldap.INVALID_CREDENTIALS:
# try second time with different bind_str
bind_str = "uid=%s, ou=people,%s" % (username, self.domstr)
handle = self._ldapConnection.simple_bind(bind_str, password)
try:
self._ldapConnection.result(handle)
except Exception:
return self._cleanup(msg="invalid password for %s" % username)
except ldap.LDAPError as err:
if retry:
self._cleanup(ex=err)
return self.authenticate(username, password, retry=False)
else:
return self._cleanup(ex=err)
return True