Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

# Authors: Rob Crittenden <rcritten@redhat.com> 

# 

# Copyright (C) 2010  Red Hat 

# see file 'COPYING' for use and warranty information 

# 

# This program is free software; you can redistribute it and/or modify 

# it under the terms of the GNU General Public License as published by 

# the Free Software Foundation, either version 3 of the License, or 

# (at your option) any later version. 

# 

# This program is distributed in the hope that it will be useful, 

# but WITHOUT ANY WARRANTY; without even the implied warranty of 

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 

# GNU General Public License for more details. 

# 

# You should have received a copy of the GNU General Public License 

# along with this program.  If not, see <http://www.gnu.org/licenses/>. 

# 

 

# Some certmonger functions, mostly around updating the request file. 

# This is used so we can add tracking to the Apache and 389-ds 

# server certificates created during the IPA server installation. 

 

import os 

import sys 

import re 

import time 

from ipapython import ipautil 

from ipapython import dogtag 

 

REQUEST_DIR='/var/lib/certmonger/requests/' 

CA_DIR='/var/lib/certmonger/cas/' 

 

# Normalizer types for critera in get_request_id() 

NPATH = 1 

 

def find_request_value(filename, directive): 

    """ 

    Return a value from a certmonger request file for the requested directive 

 

    It tries to do this a number of times because sometimes there is a delay 

    when ipa-getcert returns and the file is fully updated, particularly 

    when doing a request. Generating a CSR is fast but not instantaneous. 

    """ 

    tries = 1 

    value = None 

    found = False 

    while value is None and tries <= 5: 

        tries=tries + 1 

        time.sleep(1) 

        fp = open(filename, 'r') 

        lines = fp.readlines() 

        fp.close() 

 

        for line in lines: 

            if found: 

                # A value can span multiple lines. If it does then it has a 

                # leading space. 

                if not line.startswith(' '): 

                    # We hit the next directive, return now 

                    return value 

                else: 

                    value = value + line[1:] 

            else: 

                if line.startswith(directive + '='): 

                    found = True 

                    value = line[len(directive)+1:] 

 

    return value 

 

def get_request_value(request_id, directive): 

    """ 

    There is no guarantee that the request_id will match the filename 

    in the certmonger requests directory, so open each one to find the 

    request_id. 

    """ 

    fileList=os.listdir(REQUEST_DIR) 

    for file in fileList: 

        value = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id') 

        if value is not None and value.rstrip() == request_id: 

            return find_request_value('%s/%s' % (REQUEST_DIR, file), directive) 

 

    return None 

 

def get_request_id(criteria): 

    """ 

    If you don't know the certmonger request_id then try to find it by looking 

    through all the request files. An alternative would be to parse the 

    ipa-getcert list output but this seems cleaner. 

 

    criteria is a tuple of key/value/type to search for. The more specific 

    the better. An error is raised if multiple request_ids are returned for 

    the same criteria. 

 

    None is returned if none of the criteria match. 

    """ 

    assert type(criteria) is tuple 

 

    reqid=None 

    fileList=os.listdir(REQUEST_DIR) 

    for file in fileList: 

        match = True 

        for (key, value, valtype) in criteria: 

            rv = find_request_value('%s/%s' % (REQUEST_DIR, file), key) 

            if rv and valtype == NPATH: 

                rv = os.path.abspath(rv) 

            if rv is None or rv.rstrip() != value: 

                match = False 

                break 

        if match and reqid is not None: 

            raise RuntimeError('multiple certmonger requests match the criteria') 

        if match: 

            reqid = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id').rstrip() 

 

    return reqid 

 

def get_requests_for_dir(dir): 

    """ 

    Return a list containing the request ids for a given NSS database 

    directory. 

    """ 

    reqid=[] 

    fileList=os.listdir(REQUEST_DIR) 

    for file in fileList: 

        rv = find_request_value(os.path.join(REQUEST_DIR, file), 

                                'cert_storage_location') 

        if rv is None: 

            continue 

        rv = os.path.abspath(rv).rstrip() 

        if rv != dir: 

            continue 

        id = find_request_value(os.path.join(REQUEST_DIR, file), 'id') 

        if id is not None: 

            reqid.append(id.rstrip()) 

 

    return reqid 

 

def add_request_value(request_id, directive, value): 

    """ 

    Add a new directive to a certmonger request file. 

 

    The certmonger service MUST be stopped in order for this to work. 

    """ 

    fileList=os.listdir(REQUEST_DIR) 

    for file in fileList: 

        id = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id') 

        if id is not None and id.rstrip() == request_id: 

            current_value = find_request_value('%s/%s' % (REQUEST_DIR, file), directive) 

            if not current_value: 

                fp = open('%s/%s' % (REQUEST_DIR, file), 'a') 

                fp.write('%s=%s\n' % (directive, value)) 

                fp.close() 

 

    return 

 

def add_principal(request_id, principal): 

    """ 

    In order for a certmonger request to be renewable it needs a principal. 

 

    When an existing certificate is added via start-tracking it won't have 

    a principal. 

    """ 

    return add_request_value(request_id, 'template_principal', principal) 

 

def add_subject(request_id, subject): 

    """ 

    In order for a certmonger request to be renwable it needs the subject 

    set in the request file. 

 

    When an existing certificate is added via start-tracking it won't have 

    a subject_template set. 

    """ 

    return add_request_value(request_id, 'template_subject', subject) 

 

def request_cert(nssdb, nickname, subject, principal, passwd_fname=None): 

    """ 

    Execute certmonger to request a server certificate 

    """ 

    args = ['/usr/bin/ipa-getcert', 

            'request', 

            '-d', nssdb, 

            '-n', nickname, 

            '-N', subject, 

            '-K', principal, 

    ] 

    if passwd_fname: 

        args.append('-p') 

        args.append(os.path.abspath(passwd_fname)) 

    (stdout, stderr, returncode) = ipautil.run(args) 

    # FIXME: should be some error handling around this 

    m = re.match('New signing request "(\d+)" added', stdout) 

    request_id = m.group(1) 

    return request_id 

 

def cert_exists(nickname, secdir): 

    """ 

    See if a nickname exists in an NSS database. 

 

    Returns True/False 

 

    This isn't very sophisticated in that it doesn't differentiate between 

    a database that doesn't exist and a nickname that doesn't exist within 

    the database. 

    """ 

    args = ["/usr/bin/certutil", "-L", 

           "-d", os.path.abspath(secdir), 

           "-n", nickname 

          ] 

    (stdout, stderr, rc) = ipautil.run(args, raiseonerr=False) 

    if rc == 0: 

        return True 

    else: 

        return False 

 

def start_tracking(nickname, secdir, password_file=None, command=None): 

    """ 

    Tell certmonger to track the given certificate nickname in NSS 

    database in secdir protected by optional password file password_file. 

 

    command is an optional parameter which specifies a command for 

    certmonger to run when it renews a certificate. This command must 

    reside in /usr/lib/ipa/certmonger to work with SELinux. 

 

    Returns the stdout, stderr and returncode from running ipa-getcert 

 

    This assumes that certmonger is already running. 

    """ 

    if not cert_exists(nickname, os.path.abspath(secdir)): 

        raise RuntimeError('Nickname "%s" doesn\'t exist in NSS database "%s"' % (nickname, secdir)) 

    args = ["/usr/bin/ipa-getcert", "start-tracking", 

            "-d", os.path.abspath(secdir), 

            "-n", nickname] 

    if password_file: 

        args.append("-p") 

        args.append(os.path.abspath(password_file)) 

    if command: 

        args.append("-C") 

        args.append(command) 

 

    (stdout, stderr, returncode) = ipautil.run(args) 

 

    return (stdout, stderr, returncode) 

 

def stop_tracking(secdir, request_id=None, nickname=None): 

    """ 

    Stop tracking the current request using either the request_id or nickname. 

 

    This assumes that the certmonger service is running. 

    """ 

    if request_id is None and nickname is None: 

        raise RuntimeError('Both request_id and nickname are missing.') 

    if nickname: 

        # Using the nickname find the certmonger request_id 

        criteria = (('cert_storage_location', os.path.abspath(secdir), NPATH),('cert_nickname', nickname, None)) 

        try: 

            request_id = get_request_id(criteria) 

            if request_id is None: 

                return ('', '', 0) 

        except RuntimeError: 

            # This means that multiple requests matched, skip it for now 

            # Fall back to trying to stop tracking using nickname 

            pass 

 

    args = ['/usr/bin/getcert', 

            'stop-tracking', 

    ] 

    if request_id: 

        args.append('-i') 

        args.append(request_id) 

    else: 

        args.append('-n') 

        args.append(nickname) 

        args.append('-d') 

        args.append(os.path.abspath(secdir)) 

 

    (stdout, stderr, returncode) = ipautil.run(args) 

 

    return (stdout, stderr, returncode) 

 

def _find_IPA_ca(): 

    """ 

    Look through all the certmonger CA files to find the one that 

    has id=IPA 

 

    We can use find_request_value because the ca files have the 

    same file format. 

    """ 

    fileList=os.listdir(CA_DIR) 

    for file in fileList: 

        value = find_request_value('%s/%s' % (CA_DIR, file), 'id') 

        if value is not None and value.strip() == 'IPA': 

            return '%s/%s' % (CA_DIR, file) 

 

    return None 

 

def add_principal_to_cas(principal): 

    """ 

    If the hostname we were passed to use in ipa-client-install doesn't 

    match the value of gethostname() then we need to append 

    -k host/HOSTNAME@REALM to the ca helper defined for 

    /usr/libexec/certmonger/ipa-submit. 

 

    We also need to restore this on uninstall. 

 

    The certmonger service MUST be stopped in order for this to work. 

    """ 

    cafile = _find_IPA_ca() 

    if cafile is None: 

        return 

 

    update = False 

    fp = open(cafile, 'r') 

    lines = fp.readlines() 

    fp.close() 

 

    for i in xrange(len(lines)): 

        if lines[i].startswith('ca_external_helper') and \ 

            lines[i].find('-k') == -1: 

            lines[i] = '%s -k %s\n' % (lines[i].strip(), principal) 

            update = True 

 

    if update: 

        fp = open(cafile, 'w') 

        for line in lines: 

            fp.write(line) 

        fp.close() 

 

def remove_principal_from_cas(): 

    """ 

    Remove any -k principal options from the ipa_submit helper. 

 

    The certmonger service MUST be stopped in order for this to work. 

    """ 

    cafile = _find_IPA_ca() 

    if cafile is None: 

        return 

 

    update = False 

    fp = open(cafile, 'r') 

    lines = fp.readlines() 

    fp.close() 

 

    for i in xrange(len(lines)): 

        if lines[i].startswith('ca_external_helper') and \ 

            lines[i].find('-k') > 0: 

            lines[i] = lines[i].strip().split(' ')[0] + '\n' 

            update = True 

 

    if update: 

        fp = open(cafile, 'w') 

        for line in lines: 

            fp.write(line) 

        fp.close() 

 

# Routines specific to renewing dogtag CA certificates 

def get_pin(token, dogtag_constants=None): 

    """ 

    Dogtag stores its NSS pin in a file formatted as token:PIN. 

 

    The caller is expected to handle any exceptions raised. 

    """ 

    if dogtag_constants is None: 

        dogtag_constants = dogtag.configured_constants() 

    with open(dogtag_constants.PASSWORD_CONF_PATH, 'r') as f: 

        for line in f: 

            (tok, pin) = line.split('=', 1) 

            if token == tok: 

                return pin.strip() 

    return None 

 

def dogtag_start_tracking(ca, nickname, pin, pinfile, secdir, pre_command, 

                          post_command): 

    """ 

    Tell certmonger to start tracking a dogtag CA certificate. These 

    are handled differently because their renewal must be done directly 

    and not through IPA. 

 

    This uses the generic certmonger command getcert so we can specify 

    a different helper. 

 

    pre_command is the script to execute before a renewal is done. 

    post_command is the script to execute after a renewal is done. 

 

    Both commands can be None. 

 

    Returns the stdout, stderr and returncode from running ipa-getcert 

 

    This assumes that certmonger is already running. 

    """ 

    if not cert_exists(nickname, os.path.abspath(secdir)): 

        raise RuntimeError('Nickname "%s" doesn\'t exist in NSS database "%s"' % (nickname, secdir)) 

 

    args = ["/usr/bin/getcert", "start-tracking", 

            "-d", os.path.abspath(secdir), 

            "-n", nickname, 

            "-c", ca, 

           ] 

 

    if pre_command is not None: 

        if not os.path.isabs(pre_command): 

            if sys.maxsize > 2**32: 

                libpath = 'lib64' 

            else: 

                libpath = 'lib' 

            pre_command = '/usr/%s/ipa/certmonger/%s' % (libpath, pre_command) 

        args.append("-B") 

        args.append(pre_command) 

 

    if post_command is not None: 

        if not os.path.isabs(post_command): 

            if sys.maxsize > 2**32: 

                libpath = 'lib64' 

            else: 

                libpath = 'lib' 

            post_command = '/usr/%s/ipa/certmonger/%s' % (libpath, post_command) 

        args.append("-C") 

        args.append(post_command) 

 

    if pinfile: 

        args.append("-p") 

        args.append(pinfile) 

    else: 

        args.append("-P") 

        args.append(pin) 

 

    if ca == 'dogtag-ipa-retrieve-agent-submit': 

        # We cheat and pass in the nickname as the profile when 

        # renewing on a clone. The submit otherwise doesn't pass in the 

        # nickname and we need some way to find the right entry in LDAP. 

        args.append("-T") 

        args.append(nickname) 

 

    (stdout, stderr, returncode) = ipautil.run(args, nolog=[pin]) 

 

def check_state(dirs): 

    """ 

    Given a set of directories and nicknames verify that we are no longer 

    tracking certificates. 

 

    dirs is a list of directories to test for. We will return a tuple 

    of nicknames for any tracked certificates found. 

 

    This can only check for NSS-based certificates. 

    """ 

    reqids = [] 

    for dir in dirs: 

        reqids.extend(get_requests_for_dir(dir)) 

 

    return reqids 

 

if __name__ == '__main__': 

    request_id = request_cert("/etc/httpd/alias", "Test", "cn=tiger.example.com,O=IPA", "HTTP/tiger.example.com@EXAMPLE.COM") 

    csr = get_request_value(request_id, 'csr') 

    print csr 

    stop_tracking(request_id)