From 1c7df154b80de2fe07270d7fb9c0e0b50d3c4874 Mon Sep 17 00:00:00 2001 From: Marshall Hallenbeck Date: Thu, 21 Sep 2023 23:42:54 -0400 Subject: [PATCH] more cleanup --- nxc/modules/handlekatz.py | 3 +- nxc/modules/laps.py | 1 + nxc/modules/pso.py | 50 ++++++------- nxc/modules/scan-network.py | 26 ++++--- nxc/modules/spider_plus.py | 4 +- nxc/modules/subnets.py | 39 +++++----- nxc/modules/trust.py | 89 ++++++++++++----------- nxc/protocols/ldap/laps.py | 135 +++++++++++++++++++---------------- nxc/protocols/smb/smbexec.py | 4 +- 9 files changed, 190 insertions(+), 161 deletions(-) diff --git a/nxc/modules/handlekatz.py b/nxc/modules/handlekatz.py index 1d4c62f7..552e5073 100644 --- a/nxc/modules/handlekatz.py +++ b/nxc/modules/handlekatz.py @@ -10,6 +10,7 @@ import re import sys from nxc.helpers.bloodhound import add_user_bh +import pypykatz class NXCModule: @@ -177,4 +178,4 @@ class NXCModule: if len(credz_bh) > 0: add_user_bh(credz_bh, None, context.log, connection.config) except Exception as e: - context.log.fail("Error opening dump file", str(e)) + context.log.fail(f"Error opening dump file: {e}") diff --git a/nxc/modules/laps.py b/nxc/modules/laps.py index a8b92d0f..35de9ffc 100644 --- a/nxc/modules/laps.py +++ b/nxc/modules/laps.py @@ -5,6 +5,7 @@ import json from impacket.ldap import ldapasn1 as ldapasn1_impacket from nxc.protocols.ldap.laps import LAPSv2Extract + class NXCModule: """ Module by technobro refactored by @mpgn (now compatible with LDAP protocol + filter by computer) diff --git a/nxc/modules/pso.py b/nxc/modules/pso.py index e5955589..02dc5d56 100644 --- a/nxc/modules/pso.py +++ b/nxc/modules/pso.py @@ -7,15 +7,15 @@ from math import fabs class NXCModule: - ''' + """ Created by fplazar and wanetty Module by @gm_eduard and @ferranplaza Based on: https://github.com/juliourena/CrackMapExec/blob/master/cme/modules/get_description.py - ''' + """ - name = 'pso' + name = "pso" description = "Query to get PSO from LDAP" - supported_protocols = ['ldap'] + supported_protocols = ["ldap"] opsec_safe = True multiple_hosts = True @@ -35,9 +35,9 @@ class NXCModule: ] def options(self, context, module_options): - ''' + """ No options available. - ''' + """ pass def convert_time_field(self, field, value): @@ -54,29 +54,31 @@ class NXCModule: return value def on_login(self, context, connection): - '''Concurrent. Required if on_admin_login is not present. This gets called on each authenticated connection''' + """Concurrent. Required if on_admin_login is not present. This gets called on each authenticated connection""" # Building the search filter - searchFilter = "(objectClass=msDS-PasswordSettings)" + search_filter = "(objectClass=msDS-PasswordSettings)" try: - context.log.debug('Search Filter=%s' % searchFilter) - resp = connection.ldapConnection.search(searchFilter=searchFilter, - attributes=self.pso_fields, - sizeLimit=0) + context.log.debug(f"Search Filter={search_filter}") + resp = connection.ldapConnection.search( + searchFilter=search_filter, + attributes=self.pso_fields, + sizeLimit=0 + ) except ldap_impacket.LDAPSearchError as e: - if e.getErrorString().find('sizeLimitExceeded') >= 0: - context.log.debug('sizeLimitExceeded exception caught, giving up and processing the data received') + if e.getErrorString().find("sizeLimitExceeded") >= 0: + context.log.debug("sizeLimitExceeded exception caught, giving up and processing the data received") # We reached the sizeLimit, process the answers we have already and that's it. Until we implement # paged queries resp = e.getAnswers() pass else: - logging.debug(e) + context.log.debug(e) return False pso_list = [] - context.log.debug('Total of records returned %d' % len(resp)) + context.log.debug(f"Total of records returned {len(resp)}") for item in resp: if isinstance(item, ldapasn1_impacket.SearchResultEntry) is not True: continue @@ -84,25 +86,25 @@ class NXCModule: pso_info = {} try: - for attribute in item['attributes']: - attr_name = str(attribute['type']) + for attribute in item["attributes"]: + attr_name = str(attribute["type"]) if attr_name in self.pso_fields: - pso_info[attr_name] = attribute['vals'][0]._value.decode('utf-8') + pso_info[attr_name] = attribute["vals"][0]._value.decode("utf-8") pso_list.append(pso_info) except Exception as e: context.log.debug("Exception:", exc_info=True) - context.log.debug('Skipping item, cannot process due to error %s' % str(e)) + context.log.debug(f"Skipping item, cannot process due to error {e}") pass if len(pso_list) > 0: - context.log.success('Password Settings Objects (PSO) found:') + context.log.success("Password Settings Objects (PSO) found:") for pso in pso_list: for field in self.pso_fields: if field in pso: value = self.convert_time_field(field, pso[field]) - context.log.highlight(u'{}: {}'.format(field, value)) - context.log.highlight('-----') + context.log.highlight(f"{field}: {value}") + context.log.highlight("-----") else: - context.log.info('No Password Settings Objects (PSO) found.') + context.log.info("No Password Settings Objects (PSO) found.") diff --git a/nxc/modules/scan-network.py b/nxc/modules/scan-network.py index e96fcd5e..7fd59357 100644 --- a/nxc/modules/scan-network.py +++ b/nxc/modules/scan-network.py @@ -1,7 +1,7 @@ # Credit to https://twitter.com/snovvcrash/status/1550518555438891009 # Credit to https://github.com/dirkjanm/adidnsdump @_dirkjan # module by @mpgn_x64 - +import re from os.path import expanduser import codecs import socket @@ -11,7 +11,9 @@ from struct import unpack import dns.name import dns.resolver +from impacket.ldap import ldap from impacket.structure import Structure +from impacket.ldap import ldapasn1 as ldapasn1_impacket from ldap3 import LEVEL @@ -43,7 +45,7 @@ def get_dns_resolver(server, context): def ldap2domain(ldap): - return re.sub(",DC=", ".", ldap[ldap.lower().find("dc=") :], flags=re.I)[3:] + return re.sub(",DC=", ".", ldap[ldap.lower().find("dc="):], flags=re.I)[3:] def new_record(rtype, serial): @@ -115,14 +117,14 @@ class NXCModule: def on_login(self, context, connection): zone = ldap2domain(connection.baseDN) - dnsroot = "CN=MicrosoftDNS,DC=DomainDnsZones,%s" % connection.baseDN - searchtarget = "DC=%s,%s" % (zone, dnsroot) + dns_root = f"CN=MicrosoftDNS,DC=DomainDnsZones,{connection.baseDN}" + search_target = f"DC={zone},{dns_root}" context.log.display("Querying zone for records") sfilter = "(DC=*)" try: list_sites = connection.ldapConnection.search( - searchBase=searchtarget, + searchBase=search_target, searchFilter=sfilter, attributes=["dnsRecord", "dNSTombstoned", "name"], sizeLimit=100000, @@ -160,7 +162,8 @@ class NXCModule: "value": address.formatCanonical(), } ) - if dr["Type"] in [a for a in RECORD_TYPE_MAPPING if RECORD_TYPE_MAPPING[a] in ["CNAME", "NS", "PTR"]]: + if dr["Type"] in [a for a in RECORD_TYPE_MAPPING if + RECORD_TYPE_MAPPING[a] in ["CNAME", "NS", "PTR"]]: address = DNS_RPC_RECORD_NODE_NAME(dr["Data"]) if str(recordname) != "DomainDnsZones" and str(recordname) != "ForestDnsZones": outdata.append( @@ -182,7 +185,8 @@ class NXCModule: ) context.log.highlight("Found %d records" % len(outdata)) - path = expanduser("~/.nxc/logs/{}_network_{}.log".format(connection.domain, datetime.now().strftime("%Y-%m-%d_%H%M%S"))) + path = expanduser( + "~/.nxc/logs/{}_network_{}.log".format(connection.domain, datetime.now().strftime("%Y-%m-%d_%H%M%S"))) with codecs.open(path, "w", "utf-8") as outfile: for row in outdata: if self.showhosts: @@ -193,7 +197,9 @@ class NXCModule: outfile.write("{}\n".format(row["value"])) context.log.success("Dumped {} records to {}".format(len(outdata), path)) if not self.showall and not self.showhosts: - context.log.display("To extract CIDR from the {} ip, run the following command: cat" " your_file | mapcidr -aa -silent | mapcidr -a -silent".format(len(outdata))) + context.log.display( + "To extract CIDR from the {} ip, run the following command: cat" " your_file | mapcidr -aa -silent | mapcidr -a -silent".format( + len(outdata))) class DNS_RECORD(Structure): @@ -250,8 +256,8 @@ class DNS_COUNT_NAME(Structure): ind = 0 labels = [] for i in range(self["LabelCount"]): - nextlen = unpack("B", self["RawName"][ind : ind + 1])[0] - labels.append(self["RawName"][ind + 1 : ind + 1 + nextlen].decode("utf-8")) + nextlen = unpack("B", self["RawName"][ind: ind + 1])[0] + labels.append(self["RawName"][ind + 1: ind + 1 + nextlen].decode("utf-8")) ind += nextlen + 1 # For the final dot labels.append("") diff --git a/nxc/modules/spider_plus.py b/nxc/modules/spider_plus.py index 54d062cf..ffc009a3 100755 --- a/nxc/modules/spider_plus.py +++ b/nxc/modules/spider_plus.py @@ -376,7 +376,7 @@ class SMBSpiderPlus: download_path = os.path.join(folder, filename) # Create the subdirectories based on the share name and file path. - self.logger.debug(f'Create folder "{folder}"') + self.logger.debug(f"Creating folder '{folder}'") make_dirs(folder) try: @@ -387,7 +387,7 @@ class SMBSpiderPlus: break fd.write(chunk) except Exception as e: - self.logger.fail(f'Error writing file "{remote_path}" from share "{share_name}": {e}') + self.logger.fail(f'Error writing file "{download_path}" from share "{share_name}": {e}') # Check if the file is empty and should not be. if os.path.getsize(download_path) == 0 and remote_file.get_filesize() > 0: diff --git a/nxc/modules/subnets.py b/nxc/modules/subnets.py index 761f0564..794e1a82 100644 --- a/nxc/modules/subnets.py +++ b/nxc/modules/subnets.py @@ -2,9 +2,10 @@ # -*- coding: utf-8 -*- from impacket.ldap import ldapasn1 as ldapasn1_impacket +from impacket.ldap.ldap import LDAPSearchError -def searchResEntry_to_dict(results): +def search_res_entry_to_dict(results): data = {} for attr in results["attributes"]: key = str(attr["type"]) @@ -52,7 +53,7 @@ class NXCModule: try: list_sites = connection.ldapConnection.search( - searchBase="CN=Configuration,%s" % dn, + searchBase=f"CN=Configuration,{dn}", searchFilter="(objectClass=site)", attributes=["distinguishedName", "name", "description"], sizeLimit=999, @@ -60,19 +61,21 @@ class NXCModule: except LDAPSearchError as e: context.log.fail(str(e)) exit() + for site in list_sites: if isinstance(site, ldapasn1_impacket.SearchResultEntry) is not True: continue - site = searchResEntry_to_dict(site) + site = search_res_entry_to_dict(site) site_dn = site["distinguishedName"] site_name = site["name"] site_description = "" if "description" in site.keys(): site_description = site["description"] + # Getting subnets of this site list_subnets = connection.ldapConnection.search( - searchBase="CN=Sites,CN=Configuration,%s" % dn, - searchFilter="(siteObject=%s)" % site_dn, + searchBase=f"CN=Sites,CN=Configuration,{dn}", + searchFilter=f"(siteObject={site_dn})", attributes=["distinguishedName", "name"], sizeLimit=999, ) @@ -82,7 +85,7 @@ class NXCModule: for subnet in list_subnets: if isinstance(subnet, ldapasn1_impacket.SearchResultEntry) is not True: continue - subnet = searchResEntry_to_dict(subnet) + subnet = search_res_entry_to_dict(subnet) subnet["distinguishedName"] subnet_name = subnet["name"] @@ -96,28 +99,24 @@ class NXCModule: ) if len([server for server in list_servers if isinstance(server, ldapasn1_impacket.SearchResultEntry)]) == 0: if len(site_description) != 0: - context.log.highlight('Site "%s" (Subnet:%s) (description:"%s")' % (site_name, subnet_name, site_description)) + context.log.highlight( + f'Site "{site_name}" (Subnet:{subnet_name}) (description:"{site_description}")') else: - context.log.highlight('Site "%s" (Subnet:%s)' % (site_name, subnet_name)) + context.log.highlight(f'Site "{site_name}" (Subnet:{subnet_name})') else: for server in list_servers: if isinstance(server, ldapasn1_impacket.SearchResultEntry) is not True: continue - server = searchResEntry_to_dict(server)["cn"] + server = search_res_entry_to_dict(server)["cn"] if len(site_description) != 0: context.log.highlight( - 'Site "%s" (Subnet:%s) (description:"%s") (Server:%s)' - % ( - site_name, - subnet_name, - site_description, - server, - ) - ) + f"Site: '{site_name}' (Subnet:{subnet_name}) (description:'{site_description}') (Server:'{server}')") else: - context.log.highlight('Site "%s" (Subnet:%s) (Server:%s)' % (site_name, subnet_name, server)) + context.log.highlight( + f'Site "{site_name}" (Subnet:{subnet_name}) (Server:{server})') else: if len(site_description) != 0: - context.log.highlight('Site "%s" (Subnet:%s) (description:"%s")' % (site_name, subnet_name, site_description)) + context.log.highlight( + f'Site "{site_name}" (Subnet:{subnet_name}) (description:"{site_description}")') else: - context.log.highlight('Site "%s" (Subnet:%s)' % (site_name, subnet_name)) + context.log.highlight(f'Site "{site_name}" (Subnet:{subnet_name})') diff --git a/nxc/modules/trust.py b/nxc/modules/trust.py index 075c8fb8..226a4203 100644 --- a/nxc/modules/trust.py +++ b/nxc/modules/trust.py @@ -1,14 +1,16 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +from impacket.ldap import ldapasn1 as ldapasn1_impacket + class NXCModule: - ''' + """ Extract all Trust Relationships, Trusting Direction, and Trust Transitivity Module by Brandon Fisher @shad0wcntr0ller - ''' - name = 'enum_trusts' - description = 'Extract all Trust Relationships, Trusting Direction, and Trust Transitivity' - supported_protocols = ['ldap'] + """ + name = "enum_trusts" + description = "Extract all Trust Relationships, Trusting Direction, and Trust Transitivity" + supported_protocols = ["ldap"] opsec_safe = True multiple_hosts = True @@ -16,15 +18,20 @@ class NXCModule: pass def on_login(self, context, connection): - domain_dn = ','.join(['DC=' + dc for dc in connection.domain.split('.')]) - search_filter = '(&(objectClass=trustedDomain))' - attributes = ['flatName', 'trustPartner', 'trustDirection', 'trustAttributes'] + domain_dn = ",".join(["DC=" + dc for dc in connection.domain.split(".")]) + search_filter = "(&(objectClass=trustedDomain))" + attributes = ["flatName", "trustPartner", "trustDirection", "trustAttributes"] - context.log.debug(f'Search Filter={search_filter}') - resp = connection.ldapConnection.search(searchBase=domain_dn, searchFilter=search_filter, attributes=attributes, sizeLimit=0) + context.log.debug(f"Search Filter={search_filter}") + resp = connection.ldapConnection.search( + searchBase=domain_dn, + searchFilter=search_filter, + attributes=attributes, + sizeLimit=0 + ) trusts = [] - context.log.debug(f'Total of records returned {len(resp)}') + context.log.debug(f"Total of records returned {len(resp)}") for item in resp: if isinstance(item, ldapasn1_impacket.SearchResultEntry) is not True: continue @@ -33,56 +40,56 @@ class NXCModule: trust_direction = '' trust_transitive = [] try: - for attribute in item['attributes']: - if str(attribute['type']) == 'flatName': - flat_name = str(attribute['vals'][0]) - elif str(attribute['type']) == 'trustPartner': - trust_partner = str(attribute['vals'][0]) - elif str(attribute['type']) == 'trustDirection': - if str(attribute['vals'][0]) == '1': - trust_direction = 'Inbound' - elif str(attribute['vals'][0]) == '2': - trust_direction = 'Outbound' - elif str(attribute['vals'][0]) == '3': - trust_direction = 'Bidirectional' - elif str(attribute['type']) == 'trustAttributes': - trust_attributes_value = int(attribute['vals'][0]) + for attribute in item["attributes"]: + if str(attribute["type"]) == "flatName": + flat_name = str(attribute["vals"][0]) + elif str(attribute["type"]) == "trustPartner": + trust_partner = str(attribute["vals"][0]) + elif str(attribute["type"]) == "trustDirection": + if str(attribute["vals"][0]) == "1": + trust_direction = "Inbound" + elif str(attribute["vals"][0]) == "2": + trust_direction = "Outbound" + elif str(attribute["vals"][0]) == "3": + trust_direction = "Bidirectional" + elif str(attribute["type"]) == "trustAttributes": + trust_attributes_value = int(attribute["vals"][0]) if trust_attributes_value & 0x1: - trust_transitive.append('Non-Transitive') + trust_transitive.append("Non-Transitive") if trust_attributes_value & 0x2: - trust_transitive.append('Uplevel-Only') + trust_transitive.append("Uplevel-Only") if trust_attributes_value & 0x4: - trust_transitive.append('Quarantined Domain') + trust_transitive.append("Quarantined Domain") if trust_attributes_value & 0x8: - trust_transitive.append('Forest Transitive') + trust_transitive.append("Forest Transitive") if trust_attributes_value & 0x10: - trust_transitive.append('Cross Organization') + trust_transitive.append("Cross Organization") if trust_attributes_value & 0x20: - trust_transitive.append('Within Forest') + trust_transitive.append("Within Forest") if trust_attributes_value & 0x40: - trust_transitive.append('Treat as External') + trust_transitive.append("Treat as External") if trust_attributes_value & 0x80: - trust_transitive.append('Uses RC4 Encryption') + trust_transitive.append("Uses RC4 Encryption") if trust_attributes_value & 0x100: - trust_transitive.append('Cross Organization No TGT Delegation') + trust_transitive.append("Cross Organization No TGT Delegation") if trust_attributes_value & 0x2000: - trust_transitive.append('PAM Trust') + trust_transitive.append("PAM Trust") if not trust_transitive: - trust_transitive.append('Other') - trust_transitive = ', '.join(trust_transitive) + trust_transitive.append("Other") + trust_transitive = ", ".join(trust_transitive) if flat_name and trust_partner and trust_direction and trust_transitive: trusts.append((flat_name, trust_partner, trust_direction, trust_transitive)) except Exception as e: - context.log.debug(f'Cannot process trust relationship due to error {e}') + context.log.debug(f"Cannot process trust relationship due to error {e}") pass if trusts: - context.log.success('Found the following trust relationships:') + context.log.success("Found the following trust relationships:") for trust in trusts: - context.log.highlight(f'{trust[1]} -> {trust[2]} -> {trust[3]}') + context.log.highlight(f"{trust[1]} -> {trust[2]} -> {trust[3]}") else: - context.log.display('No trust relationships found') + context.log.display("No trust relationships found") return True diff --git a/nxc/protocols/ldap/laps.py b/nxc/protocols/ldap/laps.py index ccc7eaf4..07a84c35 100644 --- a/nxc/protocols/ldap/laps.py +++ b/nxc/protocols/ldap/laps.py @@ -59,8 +59,8 @@ class LDAPConnect: baseDN = baseDN[:-1] try: - ldapConnection = ldap_impacket.LDAPConnection(f"ldap://{kdcHost}", baseDN) - ldapConnection.kerberosLogin( + ldap_connection = ldap_impacket.LDAPConnection(f"ldap://{kdcHost}", baseDN) + ldap_connection.kerberosLogin( username, password, domain, @@ -73,13 +73,13 @@ class LDAPConnect: # Connect to LDAP self.logger.extra["protocol"] = "LDAP" self.logger.extra["port"] = "389" - return ldapConnection + return ldap_connection except ldap_impacket.LDAPSessionError as e: if str(e).find("strongerAuthRequired") >= 0: # We need to try SSL try: - ldapConnection = ldap_impacket.LDAPConnection(f"ldaps://{kdcHost}", baseDN) - ldapConnection.login( + ldap_connection = ldap_impacket.LDAPConnection(f"ldaps://{kdcHost}", baseDN) + ldap_connection.login( username, password, domain, @@ -92,18 +92,18 @@ class LDAPConnect: self.logger.extra["protocol"] = "LDAPS" self.logger.extra["port"] = "636" # self.logger.success(out) - return ldapConnection + return ldap_connection except ldap_impacket.LDAPSessionError as e: - errorCode = str(e).split()[-2][:-1] + error_code = str(e).split()[-2][:-1] self.logger.fail( - f"{domain}\\{username}:{password if password else ntlm_hash} {ldap_error_status[errorCode] if errorCode in ldap_error_status else ''}", - color="magenta" if errorCode in ldap_error_status else "red", + f"{domain}\\{username}:{password if password else ntlm_hash} {ldap_error_status[error_code] if error_code in ldap_error_status else ''}", + color="magenta" if error_code in ldap_error_status else "red", ) else: - errorCode = str(e).split()[-2][:-1] + error_code = str(e).split()[-2][:-1] self.logger.fail( - f"{domain}\\{username}:{password if password else ntlm_hash} {ldap_error_status[errorCode] if errorCode in ldap_error_status else ''}", - color="magenta" if errorCode in ldap_error_status else "red", + f"{domain}\\{username}:{password if password else ntlm_hash} {ldap_error_status[error_code] if error_code in ldap_error_status else ''}", + color="magenta" if error_code in ldap_error_status else "red", ) return False @@ -128,45 +128,45 @@ class LDAPConnect: nthash = ntlm_hash # Create the baseDN - baseDN = "" - domainParts = domain.split(".") - for i in domainParts: - baseDN += f"dc={i}," + base_dn = "" + domain_parts = domain.split(".") + for i in domain_parts: + base_dn += f"dc={i}," # Remove last ',' - baseDN = baseDN[:-1] + base_dn = base_dn[:-1] try: - ldapConnection = ldap_impacket.LDAPConnection(f"ldap://{domain}", baseDN, domain) - ldapConnection.login(username, password, domain, lmhash, nthash) + ldap_connection = ldap_impacket.LDAPConnection(f"ldap://{domain}", base_dn, domain) + ldap_connection.login(username, password, domain, lmhash, nthash) # Connect to LDAP self.logger.extra["protocol"] = "LDAP" self.logger.extra["port"] = "389" # self.logger.success(out) - return ldapConnection + return ldap_connection except ldap_impacket.LDAPSessionError as e: if str(e).find("strongerAuthRequired") >= 0: # We need to try SSL try: - ldapConnection = ldap_impacket.LDAPConnection(f"ldaps://{domain}", baseDN, domain) - ldapConnection.login(username, password, domain, lmhash, nthash) + ldap_connection = ldap_impacket.LDAPConnection(f"ldaps://{domain}", base_dn, domain) + ldap_connection.login(username, password, domain, lmhash, nthash) self.logger.extra["protocol"] = "LDAPS" self.logger.extra["port"] = "636" # self.logger.success(out) - return ldapConnection + return ldap_connection except ldap_impacket.LDAPSessionError as e: - errorCode = str(e).split()[-2][:-1] + error_code = str(e).split()[-2][:-1] self.logger.fail( - f"{domain}\\{username}:{password if password else ntlm_hash} {ldap_error_status[errorCode] if errorCode in ldap_error_status else ''}", - color="magenta" if errorCode in ldap_error_status else "red", + f"{domain}\\{username}:{password if password else ntlm_hash} {ldap_error_status[error_code] if error_code in ldap_error_status else ''}", + color="magenta" if error_code in ldap_error_status else "red", ) else: - errorCode = str(e).split()[-2][:-1] + error_code = str(e).split()[-2][:-1] self.logger.fail( - f"{domain}\\{username}:{password if password else ntlm_hash} {ldap_error_status[errorCode] if errorCode in ldap_error_status else ''}", - color="magenta" if errorCode in ldap_error_status else "red", + f"{domain}\\{username}:{password if password else ntlm_hash} {ldap_error_status[error_code] if error_code in ldap_error_status else ''}", + color="magenta" if error_code in ldap_error_status else "red", ) return False @@ -174,13 +174,14 @@ class LDAPConnect: self.logger.debug(f"{domain}\\{username}:{password if password else ntlm_hash} {'Error connecting to the domain, please add option --kdcHost with the FQDN of the domain controller'}") return False + class LAPSv2Extract: def __init__(self, data, username, password, domain, ntlm_hash, do_kerberos, kdcHost, port): if ntlm_hash.find(":") != -1: self.lmhash, self.nthash = ntlm_hash.split(":") else: self.nthash = ntlm_hash - self.lmhash = '' + self.lmhash = "" self.data = data self.username = username @@ -195,63 +196,75 @@ class LAPSv2Extract: self.logger = NXCAdapter(extra={"protocol": "LDAP", "host": host, "port": port, "hostname": hostname}) def run(self): - KDSCache = {} - self.logger.info('[-] Unpacking blob') + kds_cache = {} + self.logger.info("[-] Unpacking blob") try: - encryptedLAPSBlob = EncryptedPasswordBlob(self.data) - parsed_cms_data, remaining = decoder.decode(encryptedLAPSBlob['Blob'], asn1Spec=rfc5652.ContentInfo()) - enveloped_data_blob = parsed_cms_data['content'] + encrypted_laps_blob = EncryptedPasswordBlob(self.data) + parsed_cms_data, remaining = decoder.decode(encrypted_laps_blob["Blob"], asn1Spec=rfc5652.ContentInfo()) + enveloped_data_blob = parsed_cms_data["content"] parsed_enveloped_data, _ = decoder.decode(enveloped_data_blob, asn1Spec=rfc5652.EnvelopedData()) - recipient_infos = parsed_enveloped_data['recipientInfos'] - kek_recipient_info = recipient_infos[0]['kekri'] - kek_identifier = kek_recipient_info['kekid'] - key_id = KeyIdentifier(bytes(kek_identifier['keyIdentifier'])) - tmp,_ = decoder.decode(kek_identifier['other']['keyAttr']) - sid = tmp['field-1'][0][0][1].asOctets().decode("utf-8") + recipient_infos = parsed_enveloped_data["recipientInfos"] + kek_recipient_info = recipient_infos[0]["kekri"] + kek_identifier = kek_recipient_info["kekid"] + key_id = KeyIdentifier(bytes(kek_identifier["keyIdentifier"])) + tmp, _ = decoder.decode(kek_identifier["other"]["keyAttr"]) + sid = tmp["field-1"][0][0][1].asOctets().decode("utf-8") target_sd = create_sd(sid) except Exception as e: - logging.error('Cannot unpack msLAPS-EncryptedPassword blob due to error %s' % str(e)) + self.logger.error(f"Cannot unpack msLAPS-EncryptedPassword blob due to error {e}") return # Check if item is in cache - if key_id['RootKeyId'] in KDSCache: + if key_id["RootKeyId"] in kds_cache: self.logger.info("Got KDS from cache") - gke = KDSCache[key_id['RootKeyId']] + gke = kds_cache[key_id["RootKeyId"]] else: # Connect on RPC over TCP to MS-GKDI to call opnum 0 GetKey - stringBinding = hept_map(destHost=self.domain, remoteIf=MSRPC_UUID_GKDI, protocol='ncacn_ip_tcp') - rpctransport = transport.DCERPCTransportFactory(stringBinding) - if hasattr(rpctransport, 'set_credentials'): - rpctransport.set_credentials(username=self.username, password=self.password, domain=self.domain, lmhash=self.lmhash, nthash=self.nthash) + string_binding = hept_map(destHost=self.domain, remoteIf=MSRPC_UUID_GKDI, protocol="ncacn_ip_tcp") + rpc_transport = transport.DCERPCTransportFactory(string_binding) + if hasattr(rpc_transport, "set_credentials"): + rpc_transport.set_credentials( + username=self.username, + password=self.password, + domain=self.domain, + lmhash=self.lmhash, + nthash=self.nthash + ) if self.do_kerberos: self.logger.info("Connecting using kerberos") - rpctransport.set_kerberos(self.do_kerberos, kdcHost=self.kdcHost) + rpc_transport.set_kerberos(self.do_kerberos, kdcHost=self.kdcHost) - dce = rpctransport.get_dce_rpc() + dce = rpc_transport.get_dce_rpc() dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_INTEGRITY) dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) - self.logger.info("Connecting to %s" % stringBinding) + self.logger.info(f"Connecting to {string_binding}") try: dce.connect() except Exception as e: - logging.error("Something went wrong, check error status => %s" % str(e)) + self.logger.error(f"Something went wrong, check error status => {e}") return False self.logger.info("Connected") try: dce.bind(MSRPC_UUID_GKDI) except Exception as e: - logging.error("Something went wrong, check error status => %s" % str(e)) + self.logger.error("Something went wrong, check error status => %s" % str(e)) return False self.logger.info("Successfully bound") - - self.logger.info("Calling MS-GKDI GetKey") - resp = GkdiGetKey(dce, target_sd=target_sd, l0=key_id['L0Index'], l1=key_id['L1Index'], l2=key_id['L2Index'], root_key_id=key_id['RootKeyId']) + + resp = GkdiGetKey( + dce, + target_sd=target_sd, + l0=key_id["L0Index"], + l1=key_id["L1Index"], + l2=key_id["L2Index"], + root_key_id=key_id["RootKeyId"] + ) self.logger.info("Decrypting password") # Unpack GroupKeyEnvelope - gke = GroupKeyEnvelope(b''.join(resp['pbbOut'])) - KDSCache[gke['RootKeyId']] = gke + gke = GroupKeyEnvelope(b''.join(resp["pbbOut"])) + kds_cache[gke["RootKeyId"]] = gke kek = compute_kek(gke, key_id) self.logger.info("KEK:\t%s" % kek) @@ -259,8 +272,8 @@ class LAPSv2Extract: iv, _ = decoder.decode(enc_content_parameter) iv = bytes(iv[0]) - cek = unwrap_cek(kek, bytes(kek_recipient_info['encryptedKey'])) + cek = unwrap_cek(kek, bytes(kek_recipient_info["encryptedKey"])) self.logger.info("CEK:\t%s" % cek) plaintext = decrypt_plaintext(cek, iv, remaining) - self.logger.info(plaintext[:-18].decode('utf-16le')) - return plaintext[:-18].decode('utf-16le') \ No newline at end of file + self.logger.info(plaintext[:-18].decode("utf-16le")) + return plaintext[:-18].decode("utf-16le") \ No newline at end of file diff --git a/nxc/protocols/smb/smbexec.py b/nxc/protocols/smb/smbexec.py index 6ff38683..81a70f92 100755 --- a/nxc/protocols/smb/smbexec.py +++ b/nxc/protocols/smb/smbexec.py @@ -170,9 +170,9 @@ class SMBEXEC: break except Exception as e: if tries >= self.__tries: - self.logger.fail(f"SMBEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method") + self.logger.fail("SMBEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method") break - if str(e).find("STATUS_BAD_NETWORK_NAME") >0 : + if str(e).find("STATUS_BAD_NETWORK_NAME") > 0: self.logger.fail(f"SMBEXEC: Getting the output file failed - target has blocked access to the share: {self.__share} (but the command may have executed!)") break if str(e).find("STATUS_SHARING_VIOLATION") >= 0 or str(e).find("STATUS_OBJECT_NAME_NOT_FOUND") >= 0: