From a8b14cee7f0ff75c5c2ad1c0d59f04a2a297c56e Mon Sep 17 00:00:00 2001 From: Marshall Hallenbeck Date: Wed, 20 Sep 2023 12:24:42 -0400 Subject: [PATCH] refactor(add_computer): refactor and add docstrings to add_computer module --- nxc/modules/add_computer.py | 402 +++++++++++++++++++----------------- 1 file changed, 215 insertions(+), 187 deletions(-) diff --git a/nxc/modules/add_computer.py b/nxc/modules/add_computer.py index 312e90db..df159e4f 100644 --- a/nxc/modules/add_computer.py +++ b/nxc/modules/add_computer.py @@ -1,17 +1,18 @@ #!/usr/bin/env python3 +# -*- coding: utf-8 -* -# -*- coding: utf-8 -*- - +import ssl import ldap3 from impacket.dcerpc.v5 import samr, epm, transport + class NXCModule: - ''' + """ Module by CyberCelt: @Cyb3rC3lt Initial module: https://github.com/Cyb3rC3lt/CrackMapExec-Modules Thanks to the guys at impacket for the original code - ''' + """ name = 'add-computer' description = 'Adds or deletes a domain computer' @@ -20,7 +21,7 @@ class NXCModule: multiple_hosts = False def options(self, context, module_options): - ''' + """ add-computer: Specify add-computer to call the module using smb NAME: Specify the NAME option to name the Computer to be added PASSWORD: Specify the PASSWORD option to supply a password for the Computer to be added @@ -29,7 +30,7 @@ class NXCModule: Usage: nxc smb $DC-IP -u Username -p Password -M add-computer -o NAME="BADPC" PASSWORD="Password1" nxc smb $DC-IP -u Username -p Password -M add-computer -o NAME="BADPC" DELETE=True nxc smb $DC-IP -u Username -p Password -M add-computer -o NAME="BADPC" PASSWORD="Password2" CHANGEPW=True - ''' + """ self.__baseDN = None self.__computerGroup = None @@ -61,8 +62,6 @@ class NXCModule: exit(1) def on_login(self, context, connection): - - #Set some variables self.__domain = connection.domain self.__domainNetbios = connection.domain self.__kdcHost = connection.hostname + "." + connection.domain @@ -86,222 +85,251 @@ class NXCModule: self.__lmhash = "00000000000000000000000000000000" # First try to add via SAMR over SMB - self.doSAMRAdd(context) + self.do_samr_add(context) # If SAMR fails now try over LDAPS if not self.noLDAPRequired: - self.doLDAPSAdd(connection,context) + self.do_ldaps_add(connection, context) else: exit(1) - def doSAMRAdd(self,context): + def do_samr_add(self, context): + """ + Connects to a target server and performs various operations related to adding or deleting machine accounts. + + Args: + context (object): The context object. + + Returns: + None + """ + target = self.__targetIp or self.__target + string_binding = epm.hept_map(target, samr.MSRPC_UUID_SAMR, protocol="ncacn_np") + + rpc_transport = transport.DCERPCTransportFactory(string_binding) + rpc_transport.set_dport(self.__port) if self.__targetIp is not None: - stringBinding = epm.hept_map(self.__targetIp, samr.MSRPC_UUID_SAMR, protocol = 'ncacn_np') - else: - stringBinding = epm.hept_map(self.__target, samr.MSRPC_UUID_SAMR, protocol = 'ncacn_np') - rpctransport = transport.DCERPCTransportFactory(stringBinding) - rpctransport.set_dport(self.__port) + rpc_transport.setRemoteHost(self.__targetIp) + rpc_transport.setRemoteName(self.__target) - if self.__targetIp is not None: - rpctransport.setRemoteHost(self.__targetIp) - rpctransport.setRemoteName(self.__target) - - if hasattr(rpctransport, 'set_credentials'): + if hasattr(rpc_transport, 'set_credentials'): # This method exists only for selected protocol sequences. - rpctransport.set_credentials(self.__username, self.__password, self.__domain, self.__lmhash, - self.__nthash, self.__aesKey) + rpc_transport.set_credentials( + self.__username, + self.__password, + self.__domain, + self.__lmhash, + self.__nthash, + self.__aesKey + ) - rpctransport.set_kerberos(self.__doKerberos, self.__kdcHost) + rpc_transport.set_kerberos(self.__doKerberos, self.__kdcHost) - dce = rpctransport.get_dce_rpc() - servHandle = None - domainHandle = None - userHandle = None - try: - dce.connect() - dce.bind(samr.MSRPC_UUID_SAMR) + dce = rpc_transport.get_dce_rpc() + dce.connect() + dce.bind(samr.MSRPC_UUID_SAMR) - samrConnectResponse = samr.hSamrConnect5(dce, '\\\\%s\x00' % self.__target, - samr.SAM_SERVER_ENUMERATE_DOMAINS | samr.SAM_SERVER_LOOKUP_DOMAIN ) - servHandle = samrConnectResponse['ServerHandle'] + samr_connect_response = samr.hSamrConnect5( + dce, + '\\\\%s\x00' % self.__target, + samr.SAM_SERVER_ENUMERATE_DOMAINS | samr.SAM_SERVER_LOOKUP_DOMAIN + ) + serv_handle = samr_connect_response['ServerHandle'] - samrEnumResponse = samr.hSamrEnumerateDomainsInSamServer(dce, servHandle) - domains = samrEnumResponse['Buffer']['Buffer'] - domainsWithoutBuiltin = list(filter(lambda x : x['Name'].lower() != 'builtin', domains)) + samr_enum_response = samr.hSamrEnumerateDomainsInSamServer(dce, serv_handle) + domains = samr_enum_response['Buffer']['Buffer'] + domains_without_builtin = [ + domain for domain in domains if domain['Name'].lower() != 'builtin' + ] + if len(domains_without_builtin) > 1: + domain = list(filter(lambda x: x['Name'].lower() == self.__domainNetbios, domains)) + if len(domain) != 1: + context.log.highlight(u'{}'.format('This domain does not exist: "' + self.__domainNetbios + '"')) + context.log.highlight("Available domain(s):") + for domain in domains: + context.log.highlight(f" * {domain['Name']}") + raise Exception() + else: + selected_domain = domain[0]["Name"] + else: + selected_domain = domains_without_builtin[0]["Name"] - if len(domainsWithoutBuiltin) > 1: - domain = list(filter(lambda x : x['Name'].lower() == self.__domainNetbios, domains)) - if len(domain) != 1: - context.log.highlight(u'{}'.format( - 'This domain does not exist: "' + self.__domainNetbios + '"')) - logging.critical("Available domain(s):") - for domain in domains: - logging.error(" * %s" % domain['Name']) + samr_lookup_domain_response = samr.hSamrLookupDomainInSamServer( + dce, serv_handle, selected_domain + ) + domain_sid = samr_lookup_domain_response["DomainId"] + + context.log.debug(f"Opening domain {selected_domain}...") + samr_open_domain_response = samr.hSamrOpenDomain( + dce, serv_handle, samr.DOMAIN_LOOKUP | samr.DOMAIN_CREATE_USER, domain_sid + ) + domain_handle = samr_open_domain_response["DomainHandle"] + + if self.__noAdd or self.__delete: + try: + check_for_user = samr.hSamrLookupNamesInDomain( + dce, domain_handle, [self.__computerName] + ) + except samr.DCERPCSessionError as e: + if e.error_code == 0xc0000073: + context.log.highlight( + f"{self.__computerName} not found in domain {selected_domain}" + ) + self.noLDAPRequired = True raise Exception() else: - selectedDomain = domain[0]['Name'] - else: - selectedDomain = domainsWithoutBuiltin[0]['Name'] - - samrLookupDomainResponse = samr.hSamrLookupDomainInSamServer(dce, servHandle, selectedDomain) - domainSID = samrLookupDomainResponse['DomainId'] - - if logging.getLogger().level == logging.DEBUG: - logging.info("Opening domain %s..." % selectedDomain) - samrOpenDomainResponse = samr.hSamrOpenDomain(dce, servHandle, samr.DOMAIN_LOOKUP | samr.DOMAIN_CREATE_USER , domainSID) - domainHandle = samrOpenDomainResponse['DomainHandle'] - - if self.__noAdd or self.__delete: - try: - checkForUser = samr.hSamrLookupNamesInDomain(dce, domainHandle, [self.__computerName]) - except samr.DCERPCSessionError as e: - if e.error_code == 0xc0000073: - context.log.highlight(u'{}'.format( - self.__computerName + ' not found in domain ' + selectedDomain)) - self.noLDAPRequired = True - raise Exception() - else: - raise - - userRID = checkForUser['RelativeIds']['Element'][0] - if self.__delete: - access = samr.DELETE - message = "delete" - else: - access = samr.USER_FORCE_PASSWORD_CHANGE - message = "set the password for" - try: - openUser = samr.hSamrOpenUser(dce, domainHandle, access, userRID) - userHandle = openUser['UserHandle'] - except samr.DCERPCSessionError as e: - if e.error_code == 0xc0000022: - context.log.highlight(u'{}'.format( - self.__username + ' does not have the right to ' + message + " " + self.__computerName)) - self.noLDAPRequired = True - raise Exception() - else: - raise - else: - if self.__computerName is not None: - try: - checkForUser = samr.hSamrLookupNamesInDomain(dce, domainHandle, [self.__computerName]) - self.noLDAPRequired = True - context.log.highlight(u'{}'.format( - 'Computer account already exists with the name: "' + self.__computerName + '"')) - raise Exception() - except samr.DCERPCSessionError as e: - if e.error_code != 0xc0000073: - raise - else: - foundUnused = False - while not foundUnused: - self.__computerName = self.generateComputerName() - try: - checkForUser = samr.hSamrLookupNamesInDomain(dce, domainHandle, [self.__computerName]) - except samr.DCERPCSessionError as e: - if e.error_code == 0xc0000073: - foundUnused = True - else: - raise - try: - createUser = samr.hSamrCreateUser2InDomain(dce, domainHandle, self.__computerName, samr.USER_WORKSTATION_TRUST_ACCOUNT, samr.USER_FORCE_PASSWORD_CHANGE,) - self.noLDAPRequired = True - context.log.highlight('Successfully added the machine account: "' + self.__computerName + '" with Password: "' + self.__computerPassword + '"') - except samr.DCERPCSessionError as e: - if e.error_code == 0xc0000022: - context.log.highlight(u'{}'.format( - 'The following user does not have the right to create a computer account: "' + self.__username + '"')) - raise Exception() - elif e.error_code == 0xc00002e7: - context.log.highlight(u'{}'.format( - 'The following user exceeded their machine account quota: "' + self.__username + '"')) - raise Exception() - else: - raise - userHandle = createUser['UserHandle'] + raise + user_rid = check_for_user['RelativeIds']['Element'][0] if self.__delete: - samr.hSamrDeleteUser(dce, userHandle) - context.log.highlight(u'{}'.format('Successfully deleted the "' + self.__computerName + '" Computer account')) - self.noLDAPRequired=True - userHandle = None + access = samr.DELETE + message = "delete" else: - samr.hSamrSetPasswordInternal4New(dce, userHandle, self.__computerPassword) - if self.__noAdd: + access = samr.USER_FORCE_PASSWORD_CHANGE + message = "set the password for" + try: + open_user = samr.hSamrOpenUser(dce, domain_handle, access, user_rid) + user_handle = open_user['UserHandle'] + except samr.DCERPCSessionError as e: + if e.error_code == 0xc0000022: context.log.highlight(u'{}'.format( - 'Successfully set the password of machine "' + self.__computerName + '" with password "' + self.__computerPassword + '"')) - self.noLDAPRequired=True - else: - checkForUser = samr.hSamrLookupNamesInDomain(dce, domainHandle, [self.__computerName]) - userRID = checkForUser['RelativeIds']['Element'][0] - openUser = samr.hSamrOpenUser(dce, domainHandle, samr.MAXIMUM_ALLOWED, userRID) - userHandle = openUser['UserHandle'] - req = samr.SAMPR_USER_INFO_BUFFER() - req['tag'] = samr.USER_INFORMATION_CLASS.UserControlInformation - req['Control']['UserAccountControl'] = samr.USER_WORKSTATION_TRUST_ACCOUNT - samr.hSamrSetInformationUser2(dce, userHandle, req) - if not self.noLDAPRequired: - context.log.highlight(u'{}'.format( - 'Successfully added the machine account "' + self.__computerName + '" with Password: "' + self.__computerPassword + '"')) + self.__username + ' does not have the right to ' + message + " " + self.__computerName)) self.noLDAPRequired = True + raise Exception() + else: + raise + else: + if self.__computerName is not None: + try: + samr.hSamrLookupNamesInDomain(dce, domain_handle, [self.__computerName]) + self.noLDAPRequired = True + context.log.highlight(u'{}'.format( + 'Computer account already exists with the name: "' + self.__computerName + '"')) + raise Exception() + except samr.DCERPCSessionError as e: + if e.error_code != 0xc0000073: + raise + else: + found_unused = False + while not found_unused: + self.__computerName = self.generateComputerName() + try: + samr.hSamrLookupNamesInDomain(dce, domain_handle, [self.__computerName]) + except samr.DCERPCSessionError as e: + if e.error_code == 0xc0000073: + found_unused = True + else: + raise + try: + create_user = samr.hSamrCreateUser2InDomain(dce, domain_handle, self.__computerName, samr.USER_WORKSTATION_TRUST_ACCOUNT, samr.USER_FORCE_PASSWORD_CHANGE,) + self.noLDAPRequired = True + context.log.highlight('Successfully added the machine account: "' + self.__computerName + '" with Password: "' + self.__computerPassword + '"') + except samr.DCERPCSessionError as e: + if e.error_code == 0xc0000022: + context.log.highlight(u'{}'.format( + 'The following user does not have the right to create a computer account: "' + self.__username + '"')) + raise Exception() + elif e.error_code == 0xc00002e7: + context.log.highlight(u'{}'.format( + 'The following user exceeded their machine account quota: "' + self.__username + '"')) + raise Exception() + else: + raise + user_handle = create_user['UserHandle'] - except Exception: - if logging.getLogger().level == logging.DEBUG: - import traceback - traceback.print_exc() - finally: - if userHandle is not None: - samr.hSamrCloseHandle(dce, userHandle) - if domainHandle is not None: - samr.hSamrCloseHandle(dce, domainHandle) - if servHandle is not None: - samr.hSamrCloseHandle(dce, servHandle) + if self.__delete: + samr.hSamrDeleteUser(dce, user_handle) + context.log.highlight(u'{}'.format('Successfully deleted the "' + self.__computerName + '" Computer account')) + self.noLDAPRequired=True + user_handle = None + else: + samr.hSamrSetPasswordInternal4New(dce, user_handle, self.__computerPassword) + if self.__noAdd: + context.log.highlight(u'{}'.format( + 'Successfully set the password of machine "' + self.__computerName + '" with password "' + self.__computerPassword + '"')) + self.noLDAPRequired=True + else: + check_for_user = samr.hSamrLookupNamesInDomain(dce, domain_handle, [self.__computerName]) + user_rid = check_for_user['RelativeIds']['Element'][0] + open_user = samr.hSamrOpenUser( + dce, domain_handle, access, user_rid + ) + user_handle = open_user['UserHandle'] + req = samr.SAMPR_USER_INFO_BUFFER() + req['tag'] = samr.USER_INFORMATION_CLASS.UserControlInformation + req['Control']['UserAccountControl'] = samr.USER_WORKSTATION_TRUST_ACCOUNT + samr.hSamrSetInformationUser2(dce, user_handle, req) + if not self.noLDAPRequired: + context.log.highlight(u'{}'.format( + 'Successfully added the machine account "' + self.__computerName + '" with Password: "' + self.__computerPassword + '"')) + self.noLDAPRequired = True + + if user_handle is not None: + samr.hSamrCloseHandle(dce, user_handle) + if domain_handle is not None: + samr.hSamrCloseHandle(dce, domain_handle) + if serv_handle is not None: + samr.hSamrCloseHandle(dce, serv_handle) dce.disconnect() - def doLDAPSAdd(self, connection, context): + def do_ldaps_add(self, connection, context): + """ + Performs an LDAPS add operation. + + Args: + connection (Connection): The LDAP connection object. + context (Context): The context object. + + Returns: + None + + Raises: + None + """ ldap_domain = connection.domain.replace(".", ",dc=") spns = [ - 'HOST/%s' % self.__computerName, - 'HOST/%s.%s' % (self.__computerName, connection.domain), - 'RestrictedKrbHost/%s' % self.__computerName, - 'RestrictedKrbHost/%s.%s' % (self.__computerName, connection.domain), + f"HOST/{self.__computerName}", + f"HOST/{self.__computerName}.{connection.domain}", + f"RestrictedKrbHost/{self.__computerName}", + f"RestrictedKrbHost/{self.__computerName}.{connection.domain}", ] ucd = { - 'dnsHostName': '%s.%s' % (self.__computerName, connection.domain), - 'userAccountControl': 0x1000, - 'servicePrincipalName': spns, - 'sAMAccountName': self.__computerName, - 'unicodePwd': ('"%s"' % self.__computerPassword).encode('utf-16-le') + "dnsHostName": f"{self.__computerName}.{connection.domain}", + "userAccountControl": 0x1000, + "servicePrincipalName": spns, + "sAMAccountName": self.__computerName, + "unicodePwd": f'"{self.__computerPassword}"'.encode('utf-16-le') } - tls = ldap3.Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2, ciphers='ALL:@SECLEVEL=0') - ldapServer = ldap3.Server(connection.host, use_ssl=True, port=636, get_info=ldap3.ALL, tls=tls) - c = Connection(ldapServer, connection.username + '@' + connection.domain, connection.password) + tls = ldap3.Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2, ciphers="ALL:@SECLEVEL=0") + ldap_server = ldap3.Server(connection.host, use_ssl=True, port=636, get_info=ldap3.ALL, tls=tls) + c = ldap3.Connection(ldap_server, f"{connection.username}@{connection.domain}", connection.password) c.bind() - if (self.__delete): - result = c.delete("cn=" + self.__computerName + ",cn=Computers,dc=" + ldap_domain) + if self.__delete: + result = c.delete(f"cn={self.__computerName},cn=Computers,dc={ldap_domain}") if result: - context.log.highlight(u'{}'.format('Successfully deleted the "' + self.__computerName + '" Computer account')) + context.log.highlight(f'Successfully deleted the "{self.__computerName}" Computer account') elif result is False and c.last_error == "noSuchObject": - context.log.highlight(u'{}'.format('Computer named "' + self.__computerName + '" was not found')) + context.log.highlight(f'Computer named "{self.__computerName}" was not found') elif result is False and c.last_error == "insufficientAccessRights": - context.log.highlight( - u'{}'.format('Insufficient Access Rights to delete the Computer "' + self.__computerName + '"')) + context.log.highlight(f'Insufficient Access Rights to delete the Computer "{self.__computerName}"') else: - context.log.highlight(u'{}'.format( - 'Unable to delete the "' + self.__computerName + '" Computer account. The error was: ' + c.last_error)) + context.log.highlight( + f'Unable to delete the "{self.__computerName}" Computer account. The error was: {c.last_error}') else: - result = c.add("cn=" + self.__computerName + ",cn=Computers,dc=" + ldap_domain, - ['top', 'person', 'organizationalPerson', 'user', 'computer'], ucd) + result = c.add( + f"cn={self.__computerName},cn=Computers,dc={ldap_domain}", + ['top', 'person', 'organizationalPerson', 'user', 'computer'], + ucd + ) if result: - context.log.highlight('Successfully added the machine account: "' + self.__computerName + '" with Password: "' + self.__computerPassword + '"') - context.log.highlight(u'{}'.format('You can try to verify this with the nxc command:')) - context.log.highlight(u'{}'.format( - 'nxc ldap ' + connection.host + ' -u ' + connection.username + ' -p ' + connection.password + ' -M group-mem -o GROUP="Domain Computers"')) + context.log.highlight( + f'Successfully added the machine account: "{self.__computerName}" with Password: "{self.__computerPassword}"') + context.log.highlight("You can try to verify this with the nxc command:") + context.log.highlight(f"nxc ldap {connection.host} -u {connection.username} -p {connection.password} -M group-mem -o GROUP='Domain Computers'") elif result is False and c.last_error == "entryAlreadyExists": - context.log.highlight(u'{}'.format('The Computer account "' + self.__computerName + '" already exists')) + context.log.highlight(f"The Computer account '{self.__computerName}' already exists") elif not result: - context.log.highlight(u'{}'.format( - 'Unable to add the "' + self.__computerName + '" Computer account. The error was: ' + c.last_error)) - c.unbind() + context.log.highlight(f"Unable to add the '{self.__computerName}' Computer account. The error was: {c.last_error}") + c.unbind()