Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# Authors: # Pavel Zuna <pzuna@redhat.com> # John Dennis <jdennis@redhat.com> # # Copyright (C) 2009 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. Backend plugin for LDAP. """
# Entries are represented as (dn, entry_attrs), where entry_attrs is a dict # mapping attribute names to values. Values can be a single value or list/tuple # of virtually any type. Each method passing these values to the python-ldap # binding encodes them into the appropriate representation. This applies to # everything except the CrudBackend methods, where dn is part of the entry dict.
except ImportError: """ python-ldap 2.4.x introduced a new API for effective rights control, which needs to be used or otherwise bind dn is not passed correctly. The following class is created for backward compatibility with python-ldap 2.3.x. Relevant BZ: https://bugzilla.redhat.com/show_bug.cgi?id=802675 """ from ldap.controls import LDAPControl class GetEffectiveRightsControl(LDAPControl): def __init__(self, criticality, authzId=None): LDAPControl.__init__(self, '1.3.6.1.4.1.42.2.27.9.5.2', criticality, authzId)
""" LDAP Backend Take 2. """
schema=None): except AttributeError: ldap_uri = 'ldap://example.com'
else: except AttributeError: self.base_dn = DN()
# Connectible.conn is a proxy to thread-local storage; # do not set it
self.disconnect()
return self.ldap_uri
tls_cacertfile=None, tls_certfile=None, tls_keyfile=None, debug_level=0, autobind=False): """ Connect to LDAP server.
Keyword arguments: ldapuri -- the LDAP server to connect to ccache -- Kerberos V5 ccache object or name bind_dn -- dn used to bind to the server bind_pw -- password used to bind to the server debug_level -- LDAP debug level option tls_cacertfile -- TLS CA certificate filename tls_certfile -- TLS certificate filename tls_keyfile - TLS bind key filename autobind - autobind as the current user
Extends backend.Connectible.create_connection. """ _ldap.set_option(_ldap.OPT_X_TLS_CACERTFILE, tls_cacertfile) _ldap.set_option(_ldap.OPT_X_TLS_CERTFILE, tls_certfile) _ldap.set_option(_ldap.OPT_X_TLS_KEYFILE, tls_keyfile)
_ldap.set_option(_ldap.OPT_DEBUG_LEVEL, debug_level)
self.ldap_uri, force_schema_updates=force_updates) conn.set_option(_ldap.OPT_HOST_NAME, api.env.host) # Always connect with at least an SSF of 56, confidentiality # This also protects us from a broken ldap.conf conn.set_option(_ldap.OPT_X_SASL_SSF_MAX, minssf) # Get a fully qualified CCACHE name (schema+name) # As we do not use the krbV.CCache object later, # we can safely overwrite it name=ccache.name) else: context=krbV.default_context()).principal().name
else: # no kerberos ccache, use simple bind or external sasl else:
"""Disconnect from LDAP server.""" except _ldap.LDAPError: # ignore when trying to unbind multiple times pass
"""Returns the IPA configuration entry (dn, entry_attrs)."""
# Not in our context yet None, attrs_list, base_dn=dn, scope=self.SCOPE_BASE, time_limit=2, size_limit=10 ) raise errors.LimitsExceeded() except errors.NotFound: config_entry = self.make_entry(dn) config_entry[a] = self.config_defaults[a]
"""Returns True/False whether User-Private Groups are enabled. This is determined based on whether the UPG Template exists. """
('cn', 'etc'), api.env.basedn)
attrlist=['*'])[0] else: return False except _ldap.NO_SUCH_OBJECT, e: return False
"""Returns the rights the currently bound user has for the given DN.
Returns 2 attributes, the attributeLevelRights for the given list of attributes and the entryLevelRights for the entry itself. """
"krbPrincipalAux", base_dn=api.env.basedn) # remove the control so subsequent operations don't include GER
"""Returns True/False if the currently bound user has write permissions on the attribute. This only operates on a single attribute at a time. """
return False
"""Returns True/False if the currently bound user has read permissions on the attribute. This only operates on a single attribute at a time. """ assert isinstance(dn, DN)
(dn, attrs) = self.get_effective_rights(dn, [attr]) if 'attributelevelrights' in attrs: attr_rights = attrs.get('attributelevelrights')[0].decode('UTF-8') (attr, rights) = attr_rights.split(':') if 'r' in rights: return True
return False
# # Entry-level effective rights # # a - Add # d - Delete # n - Rename the DN # v - View the entry #
"""Returns True/False if the currently bound user has delete permissions on the entry. """
assert isinstance(dn, DN)
(dn, attrs) = self.get_effective_rights(dn, ["*"]) if 'entrylevelrights' in attrs: entry_rights = attrs['entrylevelrights'][0].decode('UTF-8') if 'd' in entry_rights: return True
return False
"""Returns True/False if the currently bound user has add permissions on the entry. """ assert isinstance(dn, DN) (dn, attrs) = self.get_effective_rights(dn, ["*"]) if 'entrylevelrights' in attrs: entry_rights = attrs['entrylevelrights'][0].decode('UTF-8') if 'a' in entry_rights: return True
return False
"""Set user password."""
# The python-ldap passwd command doesn't verify the old password # so we'll do a simple bind to validate it. with self.error_handler(): conn = IPASimpleLDAPObject( self.ldap_uri, force_schema_updates=False) conn.simple_bind_s(dn, old_pass) conn.unbind_s()
""" Add entry designaed by dn to group group_dn in the member attribute member_attr.
Adding a group as a member of itself is not allowed unless allow_same is True. """
"add_entry_to_group: dn=%s group_dn=%s member_attr=%s", dn, group_dn, member_attr) # check if the entry exists
# get group entry
"add_entry_to_group: group_entry_attrs=%s", group_entry_attrs) # check if we're not trying to add group into itself raise errors.SameGroupError()
# add dn to group entry's `member_attr` attribute
# update group entry
"""Remove entry from group."""
"remove_entry_from_group: dn=%s group_dn=%s member_attr=%s", dn, group_dn, member_attr) # get group entry
"remove_entry_from_group: group_entry_attrs=%s", group_entry_attrs) # remove dn from group entry's `member_attr` attribute
# update group entry
"""Mark entry active/inactive."""
# get the entry in question
# check nsAccountLock attribute raise errors.AlreadyActive() else: raise errors.AlreadyInactive()
# LDAP expects string instead of Bool but it also requires it to be TRUE or FALSE, # not True or False as Python stringification does. Thus, we uppercase it.
"""Mark entry active."""
"""Mark entry inactive."""
"""Remove a kerberos principal key."""
# We need to do this directly using the LDAP library because we # don't have read access to krbprincipalkey so we need to delete # it in the blind. (_ldap.MOD_REPLACE, 'krblastpwdchange', None)]
# CrudBackend methods
assert isinstance(dn, DN)
(dn, entry_attrs) = self.get_entry(dn, attrs_list) return entry_attrs
""" Create a new entry and return it as one dict (DN included).
Extends CrudBackend.create. """ assert 'dn' in kw dn = kw['dn'] assert isinstance(dn, DN) del kw['dn'] self.add_entry(dn, kw) return self._get_normalized_entry_for_crud(dn)
""" Get entry by primary_key (DN) as one dict (DN included).
Extends CrudBackend.retrieve. """ return self._get_normalized_entry_for_crud(primary_key, attributes)
""" Update entry's attributes and return it as one dict (DN included).
Extends CrudBackend.update. """ self.update_entry(primary_key, kw) return self._get_normalized_entry_for_crud(primary_key)
""" Delete entry by primary_key (DN).
Extends CrudBackend.delete. """ self.delete_entry(primary_key)
""" Return a list of entries (each entry is one dict, DN included) matching the specified criteria.
Keyword arguments: filter -- search filter (default: '') attrs_list -- list of attributes to return, all if None (default None) base_dn -- dn of the entry at which to start the search (default '') scope -- search scope, see LDAP docs (default ldap2.SCOPE_SUBTREE)
Extends CrudBackend.search. """ # get keyword arguments filter = kw.pop('filter', None) attrs_list = kw.pop('attrs_list', None) base_dn = kw.pop('base_dn', DN()) assert isinstance(base_dn, DN) scope = kw.pop('scope', self.SCOPE_SUBTREE)
# generate filter filter_tmp = self.make_filter(kw) if filter: filter = self.combine_filters((filter, filter_tmp), self.MATCH_ALL) else: filter = filter_tmp if not filter: filter = '(objectClass=*)'
# find entries and normalize the output for CRUD output = [] (entries, truncated) = self.find_entries( filter, attrs_list, base_dn, scope ) for (dn, entry_attrs) in entries: output.append(entry_attrs)
if truncated: return (-1, output) return (len(output), output)
|