Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# Authors: Karl MacMillan <kmacmillan@mentalrootkit.com> # # Copyright (C) 2007 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. #
# Apache needs access to this database so we need to create it # where apache can reach
""" Given a cert blob (str) which may or may not contian leading and trailing text, pull out just the certificate part. This will return the FIRST cert in a stream of data.
Returns a tuple (certificate, last position in cert) """ s = cert.find('-----BEGIN CERTIFICATE-----', start) e = cert.find('-----END CERTIFICATE-----', s) if e > 0: e = e + 25
if s < 0 or e < 0: raise RuntimeError("Unable to find certificate")
cert = cert[s:e] return (cert, e)
""" Using the subject from cert come up with a nickname suitable for NSS. The caller can decide whether to use just the RDN or the whole subject.
Returns a tuple of (rdn, subject_dn) when rdn is the string representation of the first RDN in the subject and subject_dn is a DN object. """ nsscert = x509.load_certificate(cert) subject = str(nsscert.subject) dn = DN(subject)
return (str(dn[0]), dn)
"""A general-purpose wrapper around a NSS cert database
For permanent NSS databases, pass the cert DB directory to __init__
For temporary databases, do not pass nssdir, and call close() when done to remove the DB. Alternatively, a NSSDatabase can be used as a context manager that calls close() automatically. """ # Traditionally, we used CertDB for our NSS DB operations, but that class # got too tied to IPA server details, killing reusability. # BaseCertDB is a class that knows nothing about IPA. # Generic NSS DB code should be moved here. if nssdir is None: self.secdir = tempfile.mkdtemp() self._is_temporary = True else: self.secdir = nssdir self._is_temporary = False
if self._is_temporary: shutil.rmtree(self.secdir)
return self
self.close()
new_args = ["/usr/bin/certutil", "-d", self.secdir] new_args = new_args + args return ipautil.run(new_args, stdin)
"""Create cert DB
:param password_filename: Name of file containing the database password """ self.run_certutil(["-N", "-f", password_filename])
"""Return nicknames and cert flags for all certs in the database
:return: List of (name, trust_flags) tuples """ certs, stderr, returncode = self.run_certutil(["-L"]) certs = certs.splitlines()
# FIXME, this relies on NSS never changing the formatting of certutil certlist = [] for cert in certs: nickname = cert[0:61] trust = cert[61:] if re.match(r'\w*,\w*,\w*', trust): certlist.append((nickname.strip(), trust.strip()))
return tuple(certlist)
"""Return nicknames and cert flags for server certs in the database
Server certs have an "u" character in the trust flags.
:return: List of (name, trust_flags) tuples """ server_certs = [] for name, flags in self.list_certs(): if 'u' in flags: server_certs.append((name, flags))
return server_certs
"""Return names of certs in a given cert's trust chain
:param nickname: Name of the cert :return: List of certificate names """ root_nicknames = [] chain, stderr, returncode = self.run_certutil([ "-O", "-n", nickname]) chain = chain.splitlines()
for c in chain: m = re.match('\s*"(.*)" \[.*', c) if m: root_nicknames.append(m.groups()[0])
return root_nicknames
pkcs_password_filename=None): args = ["/usr/bin/pk12util", "-d", self.secdir, "-i", pkcs12_filename, "-k", db_password_filename, '-v'] if pkcs_password_filename: args = args + ["-w", pkcs_password_filename] try: ipautil.run(args) except ipautil.CalledProcessError, e: if e.returncode == 17: raise RuntimeError("incorrect password for pkcs#12 file %s" % pkcs12_filename) else: raise RuntimeError("unknown error import pkcs#12 file %s" % pkcs12_filename)
"""Given a PKCS#12 file, try to find any certificates that do not have a key. The assumption is that these are the root CAs. """ args = ["/usr/bin/pk12util", "-d", self.secdir, "-l", pkcs12_fname, "-k", passwd_fname] if passwd_fname: args = args + ["-w", passwd_fname] try: (stdout, stderr, returncode) = ipautil.run(args) except ipautil.CalledProcessError, e: if e.returncode == 17: raise RuntimeError("incorrect password for pkcs#12 file") else: raise RuntimeError("unknown error using pkcs#12 file")
lines = stdout.split('\n')
# A simple state machine. # 1 = looking for a line starting with 'Certificate' # 2 = looking for the Friendly name (nickname) nicknames = [] state = 1 for line in lines: if state == 2: m = re.match("\W+Friendly Name: (.*)", line) if m: nicknames.append( m.groups(0)[0]) state = 1 if line == "Certificate:": state = 2 elif not line.startswith(' '): # Top-level item that is not a certificate state = 1
return nicknames
if root_nickname[:7] == "Builtin": root_logger.debug( "No need to add trust for built-in root CAs, skipping %s" % root_nickname) else: try: self.run_certutil(["-M", "-n", root_nickname, "-t", "CT,CT,"]) except ipautil.CalledProcessError, e: raise RuntimeError( "Setting trust on %s failed" % root_nickname)
"""Export the given cert to PEM file in the given location""" cert, err, returncode = self.run_certutil(["-L", "-n", nickname, "-a"]) with open(location, "w+") as fd: fd.write(cert) os.chmod(location, 0444)
"""Import a cert form the given PEM file.
The file must contain exactly one certificate. """ with open(location) as fd: certs = fd.read()
cert, st = find_cert_from_txt(certs) self.add_single_pem_cert(nickname, flags, cert)
try: find_cert_from_txt(certs, st) except RuntimeError: pass else: raise ValueError('%s contains more than one certificate' % location)
"""Import a cert in PEM format""" self.run_certutil(["-A", "-n", nick, "-t", flags, "-a"], stdin=cert)
"""Verify a certificate is valid for a SSL server with given hostname
Raises a ValueError if the certificate is invalid. """ certdb = cert = None nss.nss_init(self.secdir) try: certdb = nss.get_default_certdb() cert = nss.find_cert_from_nickname(nickname) intended_usage = nss.certificateUsageSSLServer approved_usage = cert.verify_now(certdb, True, intended_usage) if not approved_usage & intended_usage: raise ValueError('invalid for a SSL server') if not cert.verify_hostname(hostname): raise ValueError('invalid for server %s' % hostname) finally: del certdb, cert nss.nss_shutdown()
return None
"""An IPA-server-specific wrapper around NSS
This class knows IPA-specific details such as nssdir location, or the CA cert name. """ # TODO: Remove all selfsign code self.nssdb = NSSDatabase(nssdir)
self.secdir = nssdir self.realm = realm
self.noise_fname = self.secdir + "/noise.txt" self.passwd_fname = self.secdir + "/pwdfile.txt" self.certdb_fname = self.secdir + "/cert8.db" self.keydb_fname = self.secdir + "/key3.db" self.secmod_fname = self.secdir + "/secmod.db" self.cacert_fname = self.secdir + "/cacert.asc" self.pk12_fname = self.secdir + "/cacert.p12" self.pin_fname = self.secdir + "/pin.txt" self.pwd_conf = "/etc/httpd/conf/password.conf" self.reqdir = None self.certreq_fname = None self.certder_fname = None self.host_name = host_name self.subject_base = subject_base try: self.cwd = os.getcwd() except OSError, e: raise RuntimeError("Unable to determine the current directory: %s" % str(e))
if not subject_base: self.subject_base = DN(('O', 'IPA'))
self.cacert_name = get_ca_nickname(self.realm) self.valid_months = "120" self.keysize = "1024"
# We are going to set the owner of all of the cert # files to the owner of the containing directory # instead of that of the process. This works when # this is called by root for a daemon that runs as # a normal user mode = os.stat(self.secdir) self.uid = mode[stat.ST_UID] self.gid = mode[stat.ST_GID]
if fstore: self.fstore = fstore else: self.fstore = sysrestore.FileStore('/var/lib/ipa/sysrestore')
if self.reqdir is not None: shutil.rmtree(self.reqdir, ignore_errors=True) try: os.chdir(self.cwd) except: pass
""" Create a temporary directory to store certificate requests and certificates. This should be called before requesting certificates.
This is set outside of __init__ to avoid creating a temporary directory every time we open a cert DB. """ if self.reqdir is not None: return
self.reqdir = tempfile.mkdtemp('', 'ipa-', '/var/lib/ipa') self.certreq_fname = self.reqdir + "/tmpcertreq" self.certder_fname = self.reqdir + "/tmpcert.der"
# When certutil makes a request it creates a file in the cwd, make # sure we are in a unique place when this happens os.chdir(self.reqdir)
if uid: pent = pwd.getpwnam(uid) os.chown(fname, pent.pw_uid, pent.pw_gid) else: os.chown(fname, self.uid, self.gid) perms = stat.S_IRUSR if write: perms |= stat.S_IWUSR os.chmod(fname, perms)
return sha1(ipautil.ipa_generate_password()).hexdigest()
return self.nssdb.run_certutil(args, stdin)
with open(self.passwd_fname, "r") as f: password = f.readline() new_args = ["/usr/bin/signtool", "-d", self.secdir, "-p", password]
new_args = new_args + args ipautil.run(new_args, stdin)
if ipautil.file_exists(self.noise_fname): os.remove(self.noise_fname) f = open(self.noise_fname, "w") f.write(self.gen_password()) self.set_perms(self.noise_fname)
ipautil.backup_file(self.passwd_fname) f = open(self.passwd_fname, "w") if passwd is not None: f.write("%s\n" % passwd) else: f.write(self.gen_password()) f.close() self.set_perms(self.passwd_fname)
ipautil.backup_file(self.certdb_fname) ipautil.backup_file(self.keydb_fname) ipautil.backup_file(self.secmod_fname) self.nssdb.create_db(self.passwd_fname) self.set_perms(self.passwd_fname, write=True)
""" Return a tuple of tuples containing (nickname, trust) """ return self.nssdb.list_certs()
""" Returns True if nickname exists in the certdb, False otherwise.
This could also be done directly with: certutil -L -d -n <nickname> ... """
certs = self.list_certs()
for cert in certs: if nickname == cert[0]: return True
return False
"""create_pkcs12 tells us whether we should create a PKCS#12 file of the CA or not. If we are running on a replica then we won't have the private key to make a PKCS#12 file so we don't need to do that step.""" # export the CA cert for use with other apps ipautil.backup_file(self.cacert_fname) root_nicknames = self.find_root_cert(nickname) fd = open(self.cacert_fname, "w") for root in root_nicknames: (cert, stderr, returncode) = self.run_certutil(["-L", "-n", root, "-a"]) fd.write(cert) fd.close() os.chmod(self.cacert_fname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) if create_pkcs12: ipautil.backup_file(self.pk12_fname) ipautil.run(["/usr/bin/pk12util", "-d", self.secdir, "-o", self.pk12_fname, "-n", self.cacert_name, "-w", self.passwd_fname, "-k", self.passwd_fname]) self.set_perms(self.pk12_fname)
""" Load all the certificates from a given file. It is assumed that this file creates CA certificates. """ fd = open(cacert_fname) certs = fd.read() fd.close()
ca_dn = DN(('CN','Certificate Authority'), self.subject_base) st = 0 while True: try: (cert, st) = find_cert_from_txt(certs, st) (rdn, subject_dn) = get_cert_nickname(cert) if subject_dn == ca_dn: nick = get_ca_nickname(self.realm) else: nick = str(subject_dn) self.nssdb.add_single_pem_cert(nick, "CT,,C", cert) except RuntimeError: break
""" Retrieve a certificate from the current NSS database for nickname.
pem controls whether the value returned PEM or DER-encoded. The default is the data straight from certutil -a. """ try: args = ["-L", "-n", nickname, "-a"] (cert, err, returncode) = self.run_certutil(args) if pem: return cert else: (cert, start) = find_cert_from_txt(cert, start=0) cert = x509.strip_header(cert) dercert = base64.b64decode(cert) return dercert except ipautil.CalledProcessError: return ''
""" Tell certmonger to track the given certificate nickname.
If command is not a full path then it is prefixed with /usr/lib[64]/ipa/certmonger. """ if command is not None and not os.path.isabs(command): if sys.maxsize > 2**32: libpath = 'lib64' else: libpath = 'lib' command = '/usr/%s/ipa/certmonger/%s' % (libpath, command) cmonger = ipaservices.knownservices.certmonger cmonger.enable() ipaservices.knownservices.messagebus.start() cmonger.start() try: (stdout, stderr, rc) = certmonger.start_tracking(nickname, self.secdir, password_file, command) except (ipautil.CalledProcessError, RuntimeError), e: root_logger.error("certmonger failed starting to track certificate: %s" % str(e)) return
cmonger.stop() cert = self.get_cert_from_db(nickname) nsscert = x509.load_certificate(cert, dbdir=self.secdir) subject = str(nsscert.subject) m = re.match('New tracking request "(\d+)" added', stdout) if not m: root_logger.error('Didn\'t get new %s request, got %s' % (cmonger.service_name, stdout)) raise RuntimeError('%s did not issue new tracking request for \'%s\' in \'%s\'. Use \'ipa-getcert list\' to list existing certificates.' % (cmonger.service_name, nickname, self.secdir)) request_id = m.group(1)
certmonger.add_principal(request_id, principal) certmonger.add_subject(request_id, subject)
cmonger.start()
""" Tell certmonger to stop tracking the given certificate nickname. """
# Always start certmonger. We can't untrack something if it isn't # running cmonger = ipaservices.knownservices.certmonger ipaservices.knownservices.messagebus.start() cmonger.start() try: certmonger.stop_tracking(self.secdir, nickname=nickname) except (ipautil.CalledProcessError, RuntimeError), e: root_logger.error("certmonger failed to stop tracking certificate: %s" % str(e)) cmonger.stop()
""" If we are using a dogtag CA then other_certdb contains the RA agent key that will issue our cert.
You can override the certificate Subject by specifying a subject.
Returns a certificate in DER format. """ cdb = other_certdb if not cdb: cdb = self if subject is None: subject=DN(('CN', hostname), self.subject_base) self.request_cert(subject) cdb.issue_server_cert(self.certreq_fname, self.certder_fname) self.add_cert(self.certder_fname, nickname) fd = open(self.certder_fname, "r") dercert = fd.read() fd.close()
os.unlink(self.certreq_fname) os.unlink(self.certder_fname)
return dercert
cdb = other_certdb if not cdb: cdb = self if subject is None: subject=DN(('CN', hostname), self.subject_base) self.request_cert(subject) cdb.issue_signing_cert(self.certreq_fname, self.certder_fname) self.add_cert(self.certder_fname, nickname) os.unlink(self.certreq_fname) os.unlink(self.certder_fname)
assert isinstance(subject, DN) self.create_noise_file() self.setup_cert_request() args = ["-R", "-s", str(subject), "-o", self.certreq_fname, "-k", certtype, "-g", keysize, "-z", self.noise_fname, "-f", self.passwd_fname, "-a"] (stdout, stderr, returncode) = self.run_certutil(args) os.remove(self.noise_fname) return (stdout, stderr)
self.setup_cert_request()
if self.host_name is None: raise RuntimeError("CA Host is not set.")
f = open(certreq_fname, "r") csr = f.readlines() f.close() csr = "".join(csr)
# We just want the CSR bits, make sure there is nothing else csr = pkcs10.strip_header(csr)
params = {'profileId': 'caIPAserviceCert', 'cert_request_type': 'pkcs10', 'requestor_name': 'IPA Installer', 'cert_request': csr, 'xmlOutput': 'true'}
# Send the request to the CA f = open(self.passwd_fname, "r") password = f.readline() f.close() result = dogtag.https_request( self.host_name, api.env.ca_ee_install_port or dogtag.configured_constants().EE_SECURE_PORT, "/ca/ee/ca/profileSubmitSSLClient", self.secdir, password, "ipaCert", **params) http_status, http_reason_phrase, http_headers, http_body = result
if http_status != 200: raise CertificateOperationError( error=_('Unable to communicate with CMS (%s)') % http_reason_phrase)
# The result is an XML blob. Pull the certificate out of that doc = xml.dom.minidom.parseString(http_body) item_node = doc.getElementsByTagName("b64") try: try: cert = item_node[0].childNodes[0].data except IndexError: raise RuntimeError("Certificate issuance failed") finally: doc.unlink()
# base64-decode the result for uniformity cert = base64.b64decode(cert)
# Write the certificate to a file. It will be imported in a later # step. This file will be read later to be imported. f = open(cert_fname, "w") f.write(cert) f.close()
self.setup_cert_request()
if self.host_name is None: raise RuntimeError("CA Host is not set.")
f = open(certreq_fname, "r") csr = f.readlines() f.close() csr = "".join(csr)
# We just want the CSR bits, make sure there is no thing else csr = pkcs10.strip_header(csr)
params = {'profileId': 'caJarSigningCert', 'cert_request_type': 'pkcs10', 'requestor_name': 'IPA Installer', 'cert_request': csr, 'xmlOutput': 'true'}
# Send the request to the CA f = open(self.passwd_fname, "r") password = f.readline() f.close() result = dogtag.https_request( self.host_name, api.env.ca_ee_install_port or dogtag.configured_constants().EE_SECURE_PORT, "/ca/ee/ca/profileSubmitSSLClient", self.secdir, password, "ipaCert", **params) http_status, http_reason_phrase, http_headers, http_body = result if http_status != 200: raise RuntimeError("Unable to submit cert request")
# The result is an XML blob. Pull the certificate out of that doc = xml.dom.minidom.parseString(http_body) item_node = doc.getElementsByTagName("b64") cert = item_node[0].childNodes[0].data doc.unlink()
# base64-decode the cert for uniformity cert = base64.b64decode(cert)
# Write the certificate to a file. It will be imported in a later # step. This file will be read later to be imported. f = open(cert_fname, "w") f.write(cert) f.close()
""" Load a certificate from a PEM file and add minimal trust. """ args = ["-A", "-n", nickname, "-t", "u,u,u", "-i", cert_fname, "-f", self.passwd_fname] self.run_certutil(args)
""" This is the format of Directory Server pin files. """ ipautil.backup_file(self.pin_fname) f = open(self.pin_fname, "w") f.write("Internal (Software) Token:") pwdfile = open(self.passwd_fname) f.write(pwdfile.read()) f.close() pwdfile.close() self.set_perms(self.pin_fname)
""" This is the format of mod_nss pin files. """ ipautil.backup_file(self.pwd_conf) f = open(self.pwd_conf, "w") f.write("internal:") pwdfile = open(self.passwd_fname) f.write(pwdfile.read()) f.close() pwdfile.close() # TODO: replace explicit uid by a platform-specific one self.set_perms(self.pwd_conf, uid="apache")
""" Given a nickname, return a list of the certificates that make up the trust chain. """ root_nicknames = self.nssdb.get_trust_chain(nickname)
return root_nicknames
return self.nssdb.find_root_cert_from_pkcs12(pkcs12_fname, passwd_fname=passwd_fname)
if root_nickname is None: root_logger.debug("Unable to identify root certificate to trust. Continuing but things are likely to fail.") return
try: self.nssdb.trust_root_cert(root_nickname) except RuntimeError: pass
return self.nssdb.find_server_certs()
return self.nssdb.import_pkcs12(pkcs12_fname, self.passwd_fname, pkcs_password_filename=passwd_fname)
if nickname is None: nickname = get_ca_nickname(api.env.realm)
ipautil.run(["/usr/bin/pk12util", "-d", self.secdir, "-o", pkcs12_fname, "-n", nickname, "-k", self.passwd_fname, "-w", pkcs12_pwd_fname])
nickname, pem_fname): ipautil.run(["/usr/bin/openssl", "pkcs12", "-export", "-name", nickname, "-in", pem_fname, "-out", pkcs12_fname, "-passout", "file:" + pkcs12_pwd_fname])
if ipautil.file_exists(self.certdb_fname): # We already have a cert db, see if it is for the same CA. # If it is we leave things as they are. f = open(cacert_fname, "r") newca = f.readlines() f.close() newca = "".join(newca) (newca, st) = find_cert_from_txt(newca)
cacert = self.get_cert_from_db(self.cacert_name) if cacert != '': (cacert, st) = find_cert_from_txt(cacert)
if newca == cacert: return
# The CA certificates are different or something went wrong. Start with # a new certificate database. self.create_passwd_file(passwd) self.create_certdbs() self.load_cacert(cacert_fname)
ca_file=None): """Create a new NSS database using the certificates in a PKCS#12 file.
pkcs12_fname: the filename of the PKCS#12 file pkcs12_pwd_fname: the file containing the pin for the PKCS#12 file nickname: the nickname/friendly-name of the cert we are loading passwd: The password to use for the new NSS database we are creating
The global CA may be added as well in case it wasn't included in the PKCS#12 file. Extra certs won't hurt in any case.
The global CA may be specified in ca_file, as a PEM filename. """ self.create_noise_file() self.create_passwd_file(passwd) self.create_certdbs() self.import_pkcs12(pkcs12_fname, pkcs12_pwd_fname) server_certs = self.find_server_certs() if len(server_certs) == 0: raise RuntimeError("Could not find a suitable server cert in import in %s" % pkcs12_fname)
if ca_file: self.nssdb.import_pem_cert('CA', 'CT,CT,', ca_file)
# We only handle one server cert nickname = server_certs[0][0]
ca_names = [name for name, flags in self.nssdb.list_certs() if 'u' not in flags] if len(ca_names) == 0: raise RuntimeError("Could not find a CA cert in %s" % pkcs12_fname)
self.cacert_name = ca_names[0] for ca in ca_names: self.trust_root_cert(ca)
self.create_pin_file() self.export_ca_cert(nickname, False)
ipautil.run(["/usr/bin/openssl", "pkcs12", "-nodes", "-in", p12_fname, "-out", pem_fname, "-passin", "file:" + p12_pwd_fname])
shutil.copy(self.cacert_fname, location) os.chmod(location, 0444)
return self.nssdb.export_pem_cert(nickname, location) |