Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

# Authors: Rob Crittenden <rcritten@redhat.com> 

# 

# Copyright (C) 2009    Red Hat 

# see file 'COPYING' for use and warranty information 

# 

# This program is free software; you can redistribute it and/or modify 

# it under the terms of the GNU General Public License as published by 

# the Free Software Foundation, either version 3 of the License, or 

# (at your option) any later version. 

# 

# This program is distributed in the hope that it will be useful, 

# but WITHOUT ANY WARRANTY; without even the implied warranty of 

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the 

# GNU General Public License for more details. 

# 

# You should have received a copy of the GNU General Public License 

# along with this program.  If not, see <http://www.gnu.org/licenses/>. 

# 

 

from ipalib import api, errors 

import httplib 

import xml.dom.minidom 

from ipapython import nsslib, ipautil 

import nss.nss as nss 

from nss.error import NSPRError 

from ipalib.errors import NetworkError, CertificateOperationError 

from urllib import urlencode 

from ipapython.ipa_log_manager import * 

 

def get_ca_certchain(ca_host=None): 

    """ 

    Retrieve the CA Certificate chain from the configured Dogtag server. 

    """ 

    if ca_host is None: 

        ca_host = api.env.ca_host 

    chain = None 

    conn = httplib.HTTPConnection(ca_host, api.env.ca_install_port) 

    conn.request("GET", "/ca/ee/ca/getCertChain") 

    res = conn.getresponse() 

    doc = None 

    if res.status == 200: 

        data = res.read() 

        conn.close() 

        try: 

            doc = xml.dom.minidom.parseString(data) 

            try: 

                item_node = doc.getElementsByTagName("ChainBase64") 

                chain = item_node[0].childNodes[0].data 

            except IndexError: 

                try: 

                    item_node = doc.getElementsByTagName("Error") 

                    reason = item_node[0].childNodes[0].data 

                    raise errors.RemoteRetrieveError(reason=reason) 

                except Exception, e: 

                    raise errors.RemoteRetrieveError(reason="Retrieving CA cert chain failed: %s" % str(e)) 

        finally: 

            if doc: 

                doc.unlink() 

    else: 

        raise errors.RemoteRetrieveError(reason="request failed with HTTP status %d" % res.status) 

 

    return chain 

 

def https_request(host, port, url, secdir, password, nickname, **kw): 

    """ 

    :param url: The URL to post to. 

    :param kw:  Keyword arguments to encode into POST body. 

    :return:   (http_status, http_reason_phrase, http_headers, http_body) 

               as (integer, unicode, dict, str) 

 

    Perform a client authenticated HTTPS request 

    """ 

    if isinstance(host, unicode): 

        host = host.encode('utf-8') 

    uri = 'https://%s%s' % (ipautil.format_netloc(host, port), url) 

    post = urlencode(kw) 

    root_logger.debug('https_request %r', uri) 

    root_logger.debug('https_request post %r', post) 

    request_headers = {"Content-type": "application/x-www-form-urlencoded", 

                       "Accept": "text/plain"} 

    try: 

        conn = nsslib.NSSConnection(host, port, dbdir=secdir) 

        conn.set_debuglevel(0) 

        conn.connect() 

        conn.sock.set_client_auth_data_callback(nsslib.client_auth_data_callback, 

                                                nickname, 

                                                password, nss.get_default_certdb()) 

        conn.request("POST", url, post, request_headers) 

 

        res = conn.getresponse() 

 

        http_status = res.status 

        http_reason_phrase = unicode(res.reason, 'utf-8') 

        http_headers = res.msg.dict 

        http_body = res.read() 

        conn.close() 

    except Exception, e: 

        raise NetworkError(uri=uri, error=str(e)) 

 

    return http_status, http_reason_phrase, http_headers, http_body 

 

def http_request(host, port, url, **kw): 

        """ 

        :param url: The URL to post to. 

        :param kw: Keyword arguments to encode into POST body. 

        :return:   (http_status, http_reason_phrase, http_headers, http_body) 

                   as (integer, unicode, dict, str) 

 

        Perform an HTTP request. 

        """ 

        if isinstance(host, unicode): 

            host = host.encode('utf-8') 

        uri = 'http://%s%s' % (ipautil.format_netloc(host, port), url) 

        post = urlencode(kw) 

        root_logger.info('request %r', uri) 

        root_logger.debug('request post %r', post) 

        conn = httplib.HTTPConnection(host, port) 

        try: 

            conn.request('POST', url, 

                body=post, 

                headers={'Content-type': 'application/x-www-form-urlencoded'}, 

            ) 

            res = conn.getresponse() 

 

            http_status = res.status 

            http_reason_phrase = unicode(res.reason, 'utf-8') 

            http_headers = res.msg.dict 

            http_body = res.read() 

            conn.close() 

        except NSPRError, e: 

            raise NetworkError(uri=uri, error=str(e)) 

 

        root_logger.debug('request status %d',        http_status) 

        root_logger.debug('request reason_phrase %r', http_reason_phrase) 

        root_logger.debug('request headers %s',       http_headers) 

        root_logger.debug('request body %r',          http_body) 

 

        return http_status, http_reason_phrase, http_headers, http_body