"""
A Python client for the Microsoft AD Certificate Services web page.
https://github.com/magnuswatn/certsrv
"""
import os
import re
import base64
import logging
import warnings
import requests
__version__ = "2.1.0"
logger = logging.getLogger(__name__)
TIMEOUT = 30
[docs]class RequestDeniedException(Exception):
"""Signifies that the request was denied by the ADCS server."""
def __init__(self, message, response):
Exception.__init__(self, message)
self.response = response
[docs]class CouldNotRetrieveCertificateException(Exception):
"""Signifies that the certificate could not be retrieved."""
def __init__(self, message, response):
Exception.__init__(self, message)
self.response = response
[docs]class CertificatePendingException(Exception):
"""Signifies that the request needs to be approved by a CA admin."""
def __init__(self, req_id):
Exception.__init__(
self,
"Your certificate request has been received. "
"However, you must wait for an administrator to issue the "
"certificate you requested. Your Request Id is {0}.".format(req_id),
)
self.req_id = req_id
[docs]class Certsrv(object):
"""
Represents a Microsoft AD Certificate Services web server.
Args:
server: The FQDN to a server running the Certification Authority
Web Enrollment role (must be listening on https).
username: The username for authentication.
password: The password for authentication.
auth_method: The chosen authentication method. Either 'basic' (the default),
'ntlm' or 'cert' (SSL client certificate).
cafile: A PEM file containing the CA certificates that should be trusted.
timeout: The timeout to use against the CA server, in seconds.
The default is 30.
Note:
If you use a client certificate for authentication (auth_method=cert),
the username parameter should be the path to a certificate, and
the password parameter the path to a (unencrypted) private key.
"""
def __init__(self, server, username, password, auth_method="basic",
cafile=None, timeout=TIMEOUT):
self.server = server
self.timeout = timeout
self.auth_method = auth_method
self.session = requests.Session()
if cafile:
self.session.verify = cafile
else:
# requests uses it's own CA bundle by default
# but ADCS servers often have certificates
# from private CAs that are locally trusted,
# so we try to find, and use, the system bundle
# instead. Fallback to requests own.
self.session.verify = _get_ca_bundle()
self._set_credentials(username, password)
# We need certsrv to think we are a browser,
# or otherwise the Content-Type of the retrieved
# certificate will be wrong (for some reason).
self.session.headers = {
"User-agent": "Mozilla/5.0 certsrv (https://github.com/magnuswatn/certsrv)"
}
def _set_credentials(self, username, password):
if self.auth_method == "ntlm":
from requests_ntlm import HttpNtlmAuth
self.session.auth = HttpNtlmAuth(username, password)
elif self.auth_method == "cert":
self.session.cert = (username, password)
else:
self.session.auth = (username, password)
def _post(self, url, **kwargs):
response = self.session.post(url, timeout=self.timeout, **kwargs)
return self._handle_response(response)
def _get(self, url, **kwargs):
response = self.session.get(url, timeout=self.timeout, **kwargs)
return self._handle_response(response)
@staticmethod
def _handle_response(response):
logger.debug(
"Sent %s request to %s, with headers:\n%s\n\nand body:\n%s",
response.request.method,
response.request.url,
"\n".join(
["{0}: {1}".format(k, v) for k, v in response.request.headers.items()]
),
response.request.body,
)
try:
debug_content = response.content.decode()
except UnicodeDecodeError:
debug_content = base64.b64encode(response.content)
logger.debug(
"Recieved response:\nHTTP %s\n%s\n\n%s",
response.status_code,
"\n".join(["{0}: {1}".format(k, v) for k, v in response.headers.items()]),
debug_content,
)
response.raise_for_status()
return response
[docs] def get_cert(self, csr, template, encoding="b64", attributes=None):
"""
Gets a certificate from the ADCS server.
Args:
csr: The certificate request to submit.
template: The certificate template the cert should be issued from.
encoding: The desired encoding for the returned certificate.
Possible values are 'bin' for binary and 'b64' for Base64 (PEM).
attributes: Additional Attributes (request attibutes) to be sent along with
the request.
Returns:
The issued certificate.
Raises:
RequestDeniedException: If the request was denied by the ADCS server.
CertificatePendingException: If the request needs to be approved
by a CA admin.
CouldNotRetrieveCertificateException: If something went wrong while
fetching the cert.
"""
cert_attrib = "CertificateTemplate:{0}\r\n".format(template)
if attributes:
cert_attrib += attributes
data = {
"Mode": "newreq",
"CertRequest": csr,
"CertAttrib": cert_attrib,
"FriendlyType": "Saved-Request Certificate",
"TargetStoreFlags": "0",
"SaveCert": "yes",
}
url = "https://{0}/certsrv/certfnsh.asp".format(self.server)
response = self._post(url, data=data)
# We need to parse the Request ID from the returning HTML page
try:
req_id = re.search(r"certnew.cer\?ReqID=(\d+)&", response.text).group(1)
except AttributeError:
# We didn't find any request ID in the response. It may need approval.
if re.search(r"Certificate Pending", response.text):
req_id = re.search(r"Your Request Id is (\d+).", response.text).group(1)
raise CertificatePendingException(req_id)
else:
# Must have failed. Lets find the error message
# and raise a RequestDeniedException.
try:
error = re.search(
r'The disposition message is "([^"]+)', response.text
).group(1)
except AttributeError:
error = "An unknown error occured"
raise RequestDeniedException(error, response.text)
return self.get_existing_cert(req_id, encoding)
[docs] def get_existing_cert(self, req_id, encoding="b64"):
"""
Gets a certificate that has already been created from the ADCS server.
Args:
req_id: The request ID to retrieve.
encoding: The desired encoding for the returned certificate.
Possible values are 'bin' for binary and 'b64' for Base64 (PEM).
Returns:
The issued certificate.
Raises:
CouldNotRetrieveCertificateException: If something went wrong
while fetching the cert.
"""
cert_url = "https://{0}/certsrv/certnew.cer".format(self.server)
params = {"ReqID": req_id, "Enc": encoding}
response = self._get(cert_url, params=params)
if response.headers["Content-Type"] != "application/pkix-cert":
# The response was not a cert. Something must have gone wrong
try:
error = re.search(
"Disposition message:[^\t]+\t\t([^\r\n]+)", response.text
).group(1)
except AttributeError:
error = "An unknown error occured"
raise CouldNotRetrieveCertificateException(error, response.text)
else:
return response.content
[docs] def get_ca_cert(self, encoding="b64"):
"""
Gets the (newest) CA certificate from the ADCS server.
Args:
encoding: The desired encoding for the returned certificate.
Possible values are 'bin' for binary and 'b64' for Base64 (PEM).
Returns:
The newest CA certificate from the server.
"""
url = "https://{0}/certsrv/certcarc.asp".format(self.server)
response = self._get(url)
# We have to check how many renewals this server has had,
# so that we get the newest CA cert.
renewals = re.search(r"var nRenewals=(\d+);", response.text).group(1)
cert_url = "https://{0}/certsrv/certnew.cer".format(self.server)
params = {"ReqID": "CACert", "Enc": encoding, "Renewal": renewals}
response = self._get(cert_url, params=params)
if response.headers["Content-Type"] != "application/pkix-cert":
raise CouldNotRetrieveCertificateException(
"An unknown error occured", response.content
)
return response.content
[docs] def get_chain(self, encoding="bin"):
"""
Gets the CA chain from the ADCS server.
Args:
encoding: The desired encoding for the returned certificates.
Possible values are 'bin' for binary and 'b64' for Base64 (PEM).
Returns:
The CA chain from the server, in PKCS#7 format.
"""
url = "https://{0}/certsrv/certcarc.asp".format(self.server)
response = self._get(url)
# We have to check how many renewals this server has had, so that we get the newest chain
renewals = re.search(r"var nRenewals=(\d+);", response.text).group(1)
chain_url = "https://{0}/certsrv/certnew.p7b".format(self.server)
params = {"ReqID": "CACert", "Renewal": renewals, "Enc": encoding}
chain_response = self._get(chain_url, params=params)
if chain_response.headers["Content-Type"] != "application/x-pkcs7-certificates":
raise CouldNotRetrieveCertificateException(
"An unknown error occured", chain_response.content
)
return chain_response.content
[docs] def check_credentials(self):
"""
Checks the specified credentials against the ADCS server.
Returns:
True if authentication succeeded, False if it failed.
"""
url = "https://{0}/certsrv/".format(self.server)
try:
self._get(url)
except requests.exceptions.HTTPError as error:
if error.response.status_code == 401:
return False
else:
raise
return True
[docs] def update_credentials(self, username, password):
"""
Updates the credentials used against the ADCS server.
Args:
username: The username for authentication.
password: The password for authentication.
"""
if self.auth_method in ("ntlm", "cert"):
# NTLM and SSL is connection based,
# so we need to close the connection
# to be able to re-authenticate
self.session.close()
self._set_credentials(username, password)
def _get_ca_bundle():
"""Tries to find the platform ca bundle for the system (on linux systems)"""
ca_bundles = [
# list taken from https://golang.org/src/crypto/x509/root_linux.go
"/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc.
"/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6
"/etc/ssl/ca-bundle.pem", # OpenSUSE
"/etc/pki/tls/cacert.pem", # OpenELEC
"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7
]
for ca_bundle in ca_bundles:
if os.path.isfile(ca_bundle):
return ca_bundle
# if the bundle was not found, we revert back to requests own
return True
[docs]def get_cert(server, csr, template, username, password, encoding="b64", **kwargs):
"""
Gets a certificate from a Microsoft AD Certificate Services web page.
Args:
server: The FQDN to a server running the Certification Authority
Web Enrollment role (must be listening on https).
csr: The certificate request to submit.
template: The certificate template the cert should be issued from.
username: The username for authentication.
pasword: The password for authentication.
encoding: The desired encoding for the returned certificate.
Possible values are 'bin' for binary and 'b64' for Base64 (PEM).
auth_method: The chosen authentication method. Either 'basic' (the default),
'ntlm' or 'cert' (ssl client certificate).
cafile: A PEM file containing the CA certificates that should be trusted.
Returns:
The issued certificate.
Raises:
RequestDeniedException: If the request was denied by the ADCS server.
CertificatePendingException: If the request needs to be approved by a CA admin.
CouldNotRetrieveCertificateException: If something went wrong while
fetching the cert.
Note:
This method is deprecated.
"""
warnings.warn(
"This function is deprecated. Use the method on the Certsrv class instead",
DeprecationWarning,
)
certsrv = Certsrv(server, username, password, **kwargs)
return certsrv.get_cert(csr, template, encoding)
[docs]def get_existing_cert(server, req_id, username, password, encoding="b64", **kwargs):
"""
Gets a certificate that has already been created from a
Microsoft AD Certificate Services web page.
Args:
server: The FQDN to a server running the Certification Authority
Web Enrollment role (must be listening on https).
req_id: The request ID to retrieve.
username: The username for authentication.
pasword: The password for authentication.
encoding: The desired encoding for the returned certificate.
Possible values are 'bin' for binary and 'b64' for Base64 (PEM).
auth_method: The chosen authentication method. Either 'basic' (the default),
'ntlm' or 'cert' (ssl client certificate).
cafile: A PEM file containing the CA certificates that should be trusted.
Returns:
The issued certificate.
Raises:
CouldNotRetrieveCertificateException: If something went wrong while
fetching the cert.
Note:
This method is deprecated.
"""
warnings.warn(
"This function is deprecated. Use the method on the Certsrv class instead",
DeprecationWarning,
)
certsrv = Certsrv(server, username, password, **kwargs)
return certsrv.get_existing_cert(req_id, encoding)
[docs]def get_ca_cert(server, username, password, encoding="b64", **kwargs):
"""
Gets the (newest) CA certificate from a Microsoft AD Certificate Services web page.
Args:
server: The FQDN to a server running the Certification Authority
Web Enrollment role (must be listening on https).
username: The username for authentication.
pasword: The password for authentication.
encoding: The desired encoding for the returned certificate.
Possible values are 'bin' for binary and 'b64' for Base64 (PEM).
auth_method: The chosen authentication method. Either 'basic' (the default),
'ntlm' or 'cert' (ssl client certificate).
cafile: A PEM file containing the CA certificates that should be trusted.
Returns:
The newest CA certificate from the server.
Note:
This method is deprecated.
"""
warnings.warn(
"This function is deprecated. Use the method on the Certsrv class instead",
DeprecationWarning,
)
certsrv = Certsrv(server, username, password, **kwargs)
return certsrv.get_ca_cert(encoding)
[docs]def get_chain(server, username, password, encoding="bin", **kwargs):
"""
Gets the chain from a Microsoft AD Certificate Services web page.
Args:
server: The FQDN to a server running the Certification Authority
Web Enrollment role (must be listening on https).
username: The username for authentication.
pasword: The password for authentication.
encoding: The desired encoding for the returned certificates.
Possible values are 'bin' for binary and 'b64' for Base64 (PEM).
auth_method: The chosen authentication method. Either 'basic' (the default),
'ntlm' or 'cert' (ssl client certificate).
cafile: A PEM file containing the CA certificates that should be trusted.
Returns:
The CA chain from the server, in PKCS#7 format.
Note:
This method is deprecated.
"""
warnings.warn(
"This function is deprecated. Use the method on the Certsrv class instead",
DeprecationWarning,
)
certsrv = Certsrv(server, username, password, **kwargs)
return certsrv.get_chain(encoding)
[docs]def check_credentials(server, username, password, **kwargs):
"""
Checks the specified credentials against the specified ADCS server.
Args:
ca: The FQDN to a server running the Certification Authority
Web Enrollment role (must be listening on https).
username: The username for authentication.
pasword: The password for authentication.
auth_method: The chosen authentication method. Either 'basic' (the default),
'ntlm' or 'cert' (ssl client certificate).
cafile: A PEM file containing the CA certificates that should be trusted.
Returns:
True if authentication succeeded, False if it failed.
Note:
This method is deprecated.
"""
warnings.warn(
"This function is deprecated. Use the method on the Certsrv class instead",
DeprecationWarning,
)
certsrv = Certsrv(server, username, password, **kwargs)
return certsrv.check_credentials()