NetExec/nxc/modules/adcs.py

138 lines
5.4 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
from impacket.ldap import ldap, ldapasn1
from impacket.ldap.ldap import LDAPSearchError
2023-03-30 04:11:29 +00:00
class NXCModule:
2023-03-30 04:11:29 +00:00
"""
2021-10-30 19:59:39 +00:00
Find PKI Enrollment Services in Active Directory and Certificate Templates Names.
2021-10-30 19:59:39 +00:00
Module by Tobias Neitzel (@qtc_de) and Sam Freeside (@snovvcrash)
2023-03-30 04:11:29 +00:00
"""
2023-05-02 15:17:59 +00:00
name = "adcs"
description = "Find PKI Enrollment Services in Active Directory and Certificate Templates Names"
supported_protocols = ["ldap"]
opsec_safe = True
multiple_hosts = True
2023-03-31 17:49:25 +00:00
def __init__(self, context=None, module_options=None):
self.context = context
self.module_options = module_options
self.server = None
self.regex = None
def options(self, context, module_options):
2023-03-30 04:11:29 +00:00
"""
2021-12-18 21:21:42 +00:00
SERVER PKI Enrollment Server to enumerate templates for. Default is None, use CN name
BASE_DN The base domain name for the LDAP query
2023-03-30 04:11:29 +00:00
"""
self.context = context
2023-05-02 15:17:59 +00:00
self.regex = re.compile("(https?://.+)")
2021-10-30 19:59:39 +00:00
self.server = None
self.base_dn = None
2023-05-02 15:17:59 +00:00
if module_options and "SERVER" in module_options:
self.server = module_options["SERVER"]
if module_options and "BASE_DN" in module_options:
self.base_dn = module_options["BASE_DN"]
2021-10-30 19:59:39 +00:00
def on_login(self, context, connection):
2023-03-30 04:11:29 +00:00
"""
2021-10-30 19:59:39 +00:00
On a successful LDAP login we perform a search for all PKI Enrollment Server or Certificate Templates Names.
2023-03-30 04:11:29 +00:00
"""
2021-10-30 19:59:39 +00:00
if self.server is None:
2023-05-02 15:17:59 +00:00
search_filter = "(objectClass=pKIEnrollmentService)"
2021-10-30 19:59:39 +00:00
else:
2023-03-30 04:11:29 +00:00
search_filter = f"(distinguishedName=CN={self.server},CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,"
2023-05-02 15:17:59 +00:00
self.context.log.highlight("Using PKI CN: {}".format(self.server))
2023-05-08 18:39:36 +00:00
context.log.display("Starting LDAP search with search filter '{}'".format(search_filter))
try:
sc = ldap.SimplePagedResultsControl()
base_dn_root = connection.ldapConnection._baseDN if self.base_dn is None else self.base_dn
2021-10-30 19:59:39 +00:00
if self.server is None:
2023-03-30 04:11:29 +00:00
resp = connection.ldapConnection.search(
searchFilter=search_filter,
attributes=[],
sizeLimit=0,
searchControls=[sc],
perRecordCallback=self.process_servers,
2023-05-02 15:17:59 +00:00
searchBase="CN=Configuration," + base_dn_root,
2023-03-30 04:11:29 +00:00
)
2021-10-30 19:59:39 +00:00
else:
2023-03-30 04:11:29 +00:00
resp = connection.ldapConnection.search(
2023-05-02 15:17:59 +00:00
searchFilter=search_filter + base_dn_root + ")",
attributes=["certificateTemplates"],
sizeLimit=0,
searchControls=[sc],
2023-03-30 04:11:29 +00:00
perRecordCallback=self.process_templates,
2023-05-02 15:17:59 +00:00
searchBase="CN=Configuration," + base_dn_root,
2023-03-30 04:11:29 +00:00
)
except LDAPSearchError as e:
2023-05-02 15:17:59 +00:00
context.log.fail("Obtained unexpected exception: {}".format(str(e)))
2021-10-30 19:59:39 +00:00
def process_servers(self, item):
2023-03-30 04:11:29 +00:00
"""
2021-10-30 19:59:39 +00:00
Function that is called to process the items obtain by the LDAP search when listing PKI Enrollment Servers.
2023-03-30 04:11:29 +00:00
"""
if not isinstance(item, ldapasn1.SearchResultEntry):
return
urls = []
host_name = None
2021-12-18 21:21:42 +00:00
cn = None
try:
2023-05-02 15:17:59 +00:00
for attribute in item["attributes"]:
if str(attribute["type"]) == "dNSHostName":
host_name = attribute["vals"][0].asOctets().decode("utf-8")
if str(attribute["type"]) == "cn":
cn = attribute["vals"][0].asOctets().decode("utf-8")
elif str(attribute["type"]) == "msPKI-Enrollment-Servers":
values = attribute["vals"]
for value in values:
2023-05-02 15:17:59 +00:00
value = value.asOctets().decode("utf-8")
match = self.regex.search(value)
if match:
urls.append(match.group(1))
except Exception as e:
2023-05-02 15:17:59 +00:00
entry = host_name or "item"
2023-05-08 18:39:36 +00:00
self.context.log.fail("Skipping {}, cannot process LDAP entry due to error: '{}'".format(entry, str(e)))
if host_name:
2023-05-08 18:39:36 +00:00
self.context.log.highlight("Found PKI Enrollment Server: {}".format(host_name))
2021-12-18 21:21:42 +00:00
if cn:
2023-05-02 15:17:59 +00:00
self.context.log.highlight("Found CN: {}".format(cn))
for url in urls:
2023-05-08 18:39:36 +00:00
self.context.log.highlight("Found PKI Enrollment WebService: {}".format(url))
2021-10-30 19:59:39 +00:00
def process_templates(self, item):
2023-03-30 04:11:29 +00:00
"""
2021-10-30 19:59:39 +00:00
Function that is called to process the items obtain by the LDAP search when listing Certificate Templates Names for a specific PKI Enrollment Server.
2023-03-30 04:11:29 +00:00
"""
2021-10-30 19:59:39 +00:00
if not isinstance(item, ldapasn1.SearchResultEntry):
return
templates = []
template_name = None
try:
2023-05-02 15:17:59 +00:00
for attribute in item["attributes"]:
if str(attribute["type"]) == "certificateTemplates":
for val in attribute["vals"]:
template_name = val.asOctets().decode("utf-8")
2021-10-30 19:59:39 +00:00
templates.append(template_name)
except Exception as e:
2023-05-02 15:17:59 +00:00
entry = template_name or "item"
2023-05-08 18:39:36 +00:00
self.context.log.fail(f"Skipping {entry}, cannot process LDAP entry due to error: '{e}'")
2021-10-30 19:59:39 +00:00
if templates:
for t in templates:
2023-05-02 15:17:59 +00:00
self.context.log.highlight("Found Certificate Template: {}".format(t))