Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

# Authors: 

#   Andrew Wnuk <awnuk@redhat.com> 

#   Jason Gerard DeRose <jderose@redhat.com> 

#   John Dennis <jdennis@redhat.com> 

# 

# Copyright (C) 2009  Red Hat 

# see file 'COPYING' for use and warranty information 

# 

# This program is free software; you can redistribute it and/or modify 

# it under the terms of the GNU General Public License as published by 

# the Free Software Foundation, either version 3 of the License, or 

# (at your option) any later version. 

# 

# This program is distributed in the hope that it will be useful, 

# but WITHOUT ANY WARRANTY; without even the implied warranty of 

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 

# GNU General Public License for more details. 

# 

# You should have received a copy of the GNU General Public License 

# along with this program.  If not, see <http://www.gnu.org/licenses/>. 

 

from ipalib import api, SkipPluginModule 

if api.env.enable_ra is not True: 

    # In this case, abort loading this plugin module... 

    raise SkipPluginModule(reason='env.enable_ra is not True') 

import os 

from ipalib import Command, Str, Int, Bytes, Flag, File 

from ipalib import errors 

from ipalib import pkcs10 

from ipalib import x509 

from ipalib import util 

from ipalib.plugins.virtual import * 

from ipalib.plugins.service import split_principal 

import base64 

import traceback 

from ipalib.text import _ 

from ipalib.request import context 

from ipalib.output import Output 

from ipalib.plugins.service import validate_principal 

import nss.nss as nss 

from nss.error import NSPRError 

 

__doc__ = _(""" 

IPA certificate operations 

 

Implements a set of commands for managing server SSL certificates. 

 

Certificate requests exist in the form of a Certificate Signing Request (CSR) 

in PEM format. 

 

If using the selfsign back end then the subject in the CSR needs to match 

the subject configured in the server. The dogtag CA uses just the CN 

value of the CSR and forces the rest of the subject. 

 

A certificate is stored with a service principal and a service principal 

needs a host. 

 

In order to request a certificate: 

 

* The host must exist 

* The service must exist (or you use the --add option to automatically add it) 

 

EXAMPLES: 

 

Request a new certificate and add the principal: 

   ipa cert-request --add --principal=HTTP/lion.example.com example.csr 

 

Retrieve an existing certificate: 

   ipa cert-show 1032 

 

Revoke a certificate (see RFC 5280 for reason details): 

   ipa cert-revoke --revocation-reason=6 1032 

 

Remove a certificate from revocation hold status: 

   ipa cert-remove-hold 1032 

 

Check the status of a signing request: 

   ipa cert-status 10 

 

IPA currently immediately issues (or declines) all certificate requests so 

the status of a request is not normally useful. This is for future use 

or the case where a CA does not immediately issue a certificate. 

 

The following revocation reasons are supported: 

 

    * 0 - unspecified 

    * 1 - keyCompromise 

    * 2 - cACompromise 

    * 3 - affiliationChanged 

    * 4 - superseded 

    * 5 - cessationOfOperation 

    * 6 - certificateHold 

    * 8 - removeFromCRL 

    * 9 - privilegeWithdrawn 

    * 10 - aACompromise 

 

Note that reason code 7 is not used.  See RFC 5280 for more details: 

 

http://www.ietf.org/rfc/rfc5280.txt 

 

""") 

 

def get_csr_hostname(csr): 

    """ 

    Return the value of CN in the subject of the request or None 

    """ 

    try: 

        request = pkcs10.load_certificate_request(csr) 

        subject = pkcs10.get_subject(request) 

        return subject.common_name 

    except NSPRError, nsprerr: 

        raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request:')) 

 

def get_subjectaltname(csr): 

    """ 

    Return the first value of the subject alt name, if any 

    """ 

    try: 

        request = pkcs10.load_certificate_request(csr) 

        for extension in request.extensions: 

            if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME: 

                return nss.x509_alt_name(extension.value)[0] 

        return None 

    except NSPRError, nsprerr: 

        raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request')) 

 

def validate_csr(ugettext, csr): 

    """ 

    Ensure the CSR is base64-encoded and can be decoded by our PKCS#10 

    parser. 

    """ 

    if api.env.context == 'cli': 

        # If we are passed in a pointer to a valid file on the client side 

        # escape and let the load_files() handle things 

        if csr and os.path.exists(csr): 

            return 

    try: 

        request = pkcs10.load_certificate_request(csr) 

    except TypeError, e: 

        raise errors.Base64DecodeError(reason=str(e)) 

    except NSPRError: 

        raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request')) 

    except Exception, e: 

        raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e)) 

 

def normalize_csr(csr): 

    """ 

    Strip any leading and trailing cruft around the BEGIN/END block 

    """ 

    end_len = 37 

    s = csr.find('-----BEGIN NEW CERTIFICATE REQUEST-----') 

    if s == -1: 

        s = csr.find('-----BEGIN CERTIFICATE REQUEST-----') 

    e = csr.find('-----END NEW CERTIFICATE REQUEST-----') 

    if e == -1: 

        e = csr.find('-----END CERTIFICATE REQUEST-----') 

        if e != -1: 

            end_len = 33 

 

    if s > -1 and e > -1: 

        # We're normalizing here, not validating 

        csr = csr[s:e+end_len] 

 

    return csr 

 

def _convert_serial_number(num): 

    """ 

    Convert a SN given in decimal or hexadecimal. 

    Returns the number or None if conversion fails. 

    """ 

    # plain decimal or hexa with radix prefix 

    try: 

        num = int(num, 0) 

    except ValueError: 

        try: 

            # hexa without prefix 

            num = int(num, 16) 

        except ValueError: 

            num = None 

 

    return num 

 

def validate_serial_number(ugettext, num): 

    if _convert_serial_number(num) == None: 

        return u"Decimal or hexadecimal number is required for serial number" 

    return None 

 

def normalize_serial_number(num): 

    # It's been already validated 

    return unicode(_convert_serial_number(num)) 

 

def get_host_from_principal(principal): 

    """ 

    Given a principal with or without a realm return the 

    host portion. 

    """ 

    validate_principal(None, principal) 

    realm = principal.find('@') 

    slash = principal.find('/') 

    if realm == -1: 

        realm = len(principal) 

    hostname = principal[slash+1:realm] 

 

    return hostname 

 

class cert_request(VirtualCommand): 

    __doc__ = _('Submit a certificate signing request.') 

 

    takes_args = ( 

        File('csr', validate_csr, 

            label=_('CSR'), 

            cli_name='csr_file', 

            normalizer=normalize_csr, 

        ), 

    ) 

    operation="request certificate" 

 

    takes_options = ( 

        Str('principal', 

            label=_('Principal'), 

            doc=_('Service principal for this certificate (e.g. HTTP/test.example.com)'), 

        ), 

        Str('request_type', 

            default=u'pkcs10', 

            autofill=True, 

        ), 

        Flag('add', 

            doc=_("automatically add the principal if it doesn't exist"), 

            default=False, 

            autofill=True 

        ), 

    ) 

 

    has_output_params = ( 

        Str('certificate', 

            label=_('Certificate'), 

        ), 

        Str('subject', 

            label=_('Subject'), 

        ), 

        Str('issuer', 

            label=_('Issuer'), 

        ), 

        Str('valid_not_before', 

            label=_('Not Before'), 

        ), 

        Str('valid_not_after', 

            label=_('Not After'), 

        ), 

        Str('md5_fingerprint', 

            label=_('Fingerprint (MD5)'), 

        ), 

        Str('sha1_fingerprint', 

            label=_('Fingerprint (SHA1)'), 

        ), 

        Str('serial_number', 

            label=_('Serial number'), 

        ), 

        Str('serial_number_hex', 

            label=_('Serial number (hex)'), 

        ), 

    ) 

 

    has_output = ( 

        Output('result', 

            type=dict, 

            doc=_('Dictionary mapping variable name to value'), 

        ), 

    ) 

 

    def execute(self, csr, **kw): 

        ldap = self.api.Backend.ldap2 

        principal = kw.get('principal') 

        add = kw.get('add') 

        del kw['principal'] 

        del kw['add'] 

        service = None 

 

        """ 

        Access control is partially handled by the ACI titled 

        'Hosts can modify service userCertificate'. This is for the case 

        where a machine binds using a host/ prinicpal. It can only do the 

        request if the target hostname is in the managedBy attribute which 

        is managed using the add/del member commands. 

 

        Binding with a user principal one needs to be in the request_certs 

        taskgroup (directly or indirectly via role membership). 

        """ 

 

        bind_principal = getattr(context, 'principal') 

        # Can this user request certs? 

        if not bind_principal.startswith('host/'): 

            self.check_access() 

 

        # FIXME: add support for subject alt name 

 

        # Ensure that the hostname in the CSR matches the principal 

        subject_host = get_csr_hostname(csr) 

        if not subject_host: 

            raise errors.ValidationError(name='csr', 

                error=_("No hostname was found in subject of request.")) 

 

        (servicename, hostname, realm) = split_principal(principal) 

        if subject_host.lower() != hostname.lower(): 

            raise errors.ACIError( 

                info=_("hostname in subject of request '%(subject_host)s' " 

                    "does not match principal hostname '%(hostname)s'") % dict( 

                        subject_host=subject_host, hostname=hostname)) 

 

        dn = None 

        service = None 

        # See if the service exists and punt if it doesn't and we aren't 

        # going to add it 

        try: 

            if not principal.startswith('host/'): 

                service = api.Command['service_show'](principal, all=True, raw=True)['result'] 

                dn = service['dn'] 

            else: 

                hostname = get_host_from_principal(principal) 

                service = api.Command['host_show'](hostname, all=True, raw=True)['result'] 

                dn = service['dn'] 

        except errors.NotFound, e: 

            if not add: 

                raise errors.NotFound(reason=_("The service principal for " 

                    "this request doesn't exist.")) 

            try: 

                service = api.Command['service_add'](principal, **{'force': True})['result'] 

                dn = service['dn'] 

            except errors.ACIError: 

                raise errors.ACIError(info=_('You need to be a member of ' 

                    'the serviceadmin role to add services')) 

 

        # We got this far so the service entry exists, can we write it? 

        if not ldap.can_write(dn, "usercertificate"): 

            raise errors.ACIError(info=_("Insufficient 'write' privilege " 

                "to the 'userCertificate' attribute of entry '%s'.") % dn) 

 

        # Validate the subject alt name, if any 

        request = pkcs10.load_certificate_request(csr) 

        subjectaltname = pkcs10.get_subjectaltname(request) 

        if subjectaltname is not None: 

            for name in subjectaltname: 

                name = unicode(name) 

                try: 

                    hostentry = api.Command['host_show'](name, all=True, raw=True)['result'] 

                    hostdn = hostentry['dn'] 

                except errors.NotFound: 

                    # We don't want to issue any certificates referencing 

                    # machines we don't know about. Nothing is stored in this 

                    # host record related to this certificate. 

                    raise errors.NotFound(reason=_('no host record for ' 

                        'subject alt name %s in certificate request') % name) 

                authprincipal = getattr(context, 'principal') 

                if authprincipal.startswith("host/"): 

                    if not hostdn in service.get('managedby', []): 

                        raise errors.ACIError(info=_( 

                            "Insufficient privilege to create a certificate " 

                            "with subject alt name '%s'.") % name) 

 

        if 'usercertificate' in service: 

            serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER) 

            # revoke the certificate and remove it from the service 

            # entry before proceeding. First we retrieve the certificate to 

            # see if it is already revoked, if not then we revoke it. 

            try: 

                result = api.Command['cert_show'](unicode(serial))['result'] 

                if 'revocation_reason' not in result: 

                    try: 

                        api.Command['cert_revoke'](unicode(serial), revocation_reason=4) 

                    except errors.NotImplementedError: 

                        # some CA's might not implement revoke 

                        pass 

            except errors.NotImplementedError: 

                # some CA's might not implement get 

                pass 

            if not principal.startswith('host/'): 

                api.Command['service_mod'](principal, usercertificate=None) 

            else: 

                hostname = get_host_from_principal(principal) 

                api.Command['host_mod'](hostname, usercertificate=None) 

 

        # Request the certificate 

        result = self.Backend.ra.request_certificate(csr, **kw) 

        cert = x509.load_certificate(result['certificate']) 

        result['issuer'] = unicode(cert.issuer) 

        result['valid_not_before'] = unicode(cert.valid_not_before_str) 

        result['valid_not_after'] = unicode(cert.valid_not_after_str) 

        result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) 

        result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) 

 

        # Success? Then add it to the service entry. 

        if 'certificate' in result: 

            if not principal.startswith('host/'): 

                skw = {"usercertificate": str(result.get('certificate'))} 

                api.Command['service_mod'](principal, **skw) 

            else: 

                hostname = get_host_from_principal(principal) 

                skw = {"usercertificate": str(result.get('certificate'))} 

                api.Command['host_mod'](hostname, **skw) 

 

        return dict( 

            result=result 

        ) 

 

api.register(cert_request) 

 

 

class cert_status(VirtualCommand): 

    __doc__ = _('Check the status of a certificate signing request.') 

 

    takes_args = ( 

        Str('request_id', 

            label=_('Request id'), 

            flags=['no_create', 'no_update', 'no_search'], 

        ), 

    ) 

    has_output_params = ( 

        Str('cert_request_status', 

            label=_('Request status'), 

        ), 

    ) 

    operation = "certificate status" 

 

 

    def execute(self, request_id, **kw): 

        self.check_access() 

        return dict( 

            result=self.Backend.ra.check_request_status(request_id) 

        ) 

 

api.register(cert_status) 

 

 

_serial_number = Str('serial_number', 

    validate_serial_number, 

    label=_('Serial number'), 

    doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'), 

    normalizer=normalize_serial_number, 

) 

 

class cert_show(VirtualCommand): 

    __doc__ = _('Retrieve an existing certificate.') 

 

    takes_args = _serial_number 

 

    has_output_params = ( 

        Str('certificate', 

            label=_('Certificate'), 

        ), 

        Str('subject', 

            label=_('Subject'), 

        ), 

        Str('issuer', 

            label=_('Issuer'), 

        ), 

        Str('valid_not_before', 

            label=_('Not Before'), 

        ), 

        Str('valid_not_after', 

            label=_('Not After'), 

        ), 

        Str('md5_fingerprint', 

            label=_('Fingerprint (MD5)'), 

        ), 

        Str('sha1_fingerprint', 

            label=_('Fingerprint (SHA1)'), 

        ), 

        Str('revocation_reason', 

            label=_('Revocation reason'), 

        ), 

        Str('serial_number_hex', 

            label=_('Serial number (hex)'), 

        ), 

    ) 

 

    takes_options = ( 

        Str('out?', 

            label=_('Output filename'), 

            doc=_('File to store the certificate in.'), 

            exclude='webui', 

        ), 

    ) 

 

    operation="retrieve certificate" 

 

    def execute(self, serial_number, **options): 

        hostname = None 

        try: 

            self.check_access() 

        except errors.ACIError, acierr: 

            self.debug("Not granted by ACI to retrieve certificate, looking at principal") 

            bind_principal = getattr(context, 'principal') 

            if not bind_principal.startswith('host/'): 

                raise acierr 

            hostname = get_host_from_principal(bind_principal) 

 

        result=self.Backend.ra.get_certificate(serial_number) 

        cert = x509.load_certificate(result['certificate']) 

        result['subject'] = unicode(cert.subject) 

        result['issuer'] = unicode(cert.issuer) 

        result['valid_not_before'] = unicode(cert.valid_not_before_str) 

        result['valid_not_after'] = unicode(cert.valid_not_after_str) 

        result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) 

        result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) 

        if hostname: 

            # If we have a hostname we want to verify that the subject 

            # of the certificate matches it, otherwise raise an error 

            if hostname != cert.subject.common_name:    #pylint: disable=E1101 

                raise acierr 

 

        return dict(result=result) 

 

    def forward(self, *keys, **options): 

        if 'out' in options: 

            util.check_writable_file(options['out']) 

            result = super(cert_show, self).forward(*keys, **options) 

            if 'certificate' in result['result']: 

                x509.write_certificate(result['result']['certificate'], options['out']) 

                return result 

            else: 

                raise errors.NoCertificateError(entry=keys[-1]) 

        else: 

            return super(cert_show, self).forward(*keys, **options) 

 

 

api.register(cert_show) 

 

 

class cert_revoke(VirtualCommand): 

    __doc__ = _('Revoke a certificate.') 

 

    takes_args = _serial_number 

 

    has_output_params = ( 

        Flag('revoked', 

            label=_('Revoked'), 

        ), 

    ) 

    operation = "revoke certificate" 

 

    # FIXME: The default is 0.  Is this really an Int param? 

    takes_options = ( 

        Int('revocation_reason', 

            label=_('Reason'), 

            doc=_('Reason for revoking the certificate (0-10)'), 

            minvalue=0, 

            maxvalue=10, 

            default=0, 

            autofill=True 

        ), 

    ) 

 

    def execute(self, serial_number, **kw): 

        hostname = None 

        try: 

            self.check_access() 

        except errors.ACIError, acierr: 

            self.debug("Not granted by ACI to revoke certificate, looking at principal") 

            try: 

                # Let cert_show() handle verifying that the subject of the 

                # cert we're dealing with matches the hostname in the principal 

                result = api.Command['cert_show'](unicode(serial_number))['result'] 

            except errors.NotImplementedError: 

                pass 

        if kw['revocation_reason'] == 7: 

            raise errors.CertificateOperationError(error=_('7 is not a valid revocation reason')) 

        return dict( 

            result=self.Backend.ra.revoke_certificate(serial_number, **kw) 

        ) 

 

api.register(cert_revoke) 

 

 

class cert_remove_hold(VirtualCommand): 

    __doc__ = _('Take a revoked certificate off hold.') 

 

    takes_args = _serial_number 

 

    has_output_params = ( 

        Flag('unrevoked', 

            label=_('Unrevoked'), 

        ), 

        Str('error_string', 

            label=_('Error'), 

        ), 

    ) 

    operation = "certificate remove hold" 

 

    def execute(self, serial_number, **kw): 

        self.check_access() 

        return dict( 

            result=self.Backend.ra.take_certificate_off_hold(serial_number) 

        ) 

 

api.register(cert_remove_hold)