ruff: add LOG (no fixes) and RUF and auto-fix

main
Marshall Hallenbeck 2023-10-16 12:54:57 -04:00
parent b39389f122
commit 84d55aa264
22 changed files with 54 additions and 69 deletions

View File

@ -322,22 +322,7 @@ def invoke_obfuscation(script_string):
str: The obfuscated payload for execution.
"""
random_alphabet = "".join(random.choice([i.upper(), i]) for i in ascii_lowercase)
random_delimiters = [
"_",
"-",
",",
"{",
"}",
"~",
"!",
"@",
"%",
"&",
"<",
">",
";",
":",
] + list(random_alphabet)
random_delimiters = ["_", "-", ",", "{", "}", "~", "!", "@", "%", "&", "<", ">", ";", ":", *list(random_alphabet)]
# Only use a subset of current delimiters to randomize what you see in every iteration of this script's output.
random_delimiters = [choice(random_delimiters) for _ in range(int(len(random_delimiters) / 4))]

View File

@ -98,7 +98,7 @@ class NXCModule:
urls.append(match.group(1))
except Exception as e:
entry = host_name or "item"
self.context.log.fail(f"Skipping {entry}, cannot process LDAP entry due to error: '{str(e)}'")
self.context.log.fail(f"Skipping {entry}, cannot process LDAP entry due to error: '{e!s}'")
if host_name:
self.context.log.highlight(f"Found PKI Enrollment Server: {host_name}")

View File

@ -119,12 +119,12 @@ class TriggerAuth:
try:
dce.connect()
except Exception as e:
nxc_logger.debug(f"Something went wrong, check error status => {str(e)}")
nxc_logger.debug(f"Something went wrong, check error status => {e!s}")
return None
try:
dce.bind(uuidtup_to_bin(("4FC742E0-4A10-11CF-8273-00AA004AE673", "3.0")))
except Exception as e:
nxc_logger.debug(f"Something went wrong, check error status => {str(e)}")
nxc_logger.debug(f"Something went wrong, check error status => {e!s}")
return None
nxc_logger.debug("[+] Successfully bound!")
return dce

View File

@ -74,7 +74,7 @@ class NXCModule:
answers.append([sAMAccountName, description])
except Exception as e:
context.log.debug("Exception:", exc_info=True)
context.log.debug(f"Skipping item, cannot process due to error {str(e)}")
context.log.debug(f"Skipping item, cannot process due to error {e!s}")
answers = self.filter_answer(context, answers)
if len(answers) > 0:
context.log.success("Found following users: ")

View File

@ -74,7 +74,7 @@ class NXCModule:
memberOf += [str(group) for group in attribute["vals"] if isinstance(group._value, bytes)]
except Exception as e:
context.log.debug("Exception:", exc_info=True)
context.log.debug(f"Skipping item, cannot process due to error {str(e)}")
context.log.debug(f"Skipping item, cannot process due to error {e!s}")
if len(memberOf) > 0:
context.log.success(f"User: {self.user} is member of following groups: ")
for group in memberOf:

View File

@ -247,14 +247,14 @@ def coerce(
try:
dce.connect()
except Exception as e:
context.log.debug(f"Something went wrong, check error status => {str(e)}")
context.log.debug(f"Something went wrong, check error status => {e!s}")
sys.exit()
context.log.info("[+] Connected!")
context.log.info(f"[+] Binding to {binding_params[pipe]['MSRPC_UUID_EFSR'][0]}")
try:
dce.bind(uuidtup_to_bin(binding_params[pipe]["MSRPC_UUID_EFSR"]))
except Exception as e:
context.log.debug(f"Something went wrong, check error status => {str(e)}")
context.log.debug(f"Something went wrong, check error status => {e!s}")
sys.exit()
context.log.info("[+] Successfully bound!")
return dce
@ -286,6 +286,6 @@ def efs_rpc_open_file_raw(dce, listener, context=None):
context.log.info("[+] Attack worked!")
return True
else:
context.log.debug(f"Something went wrong, check error status => {str(e)}")
context.log.debug(f"Something went wrong, check error status => {e!s}")
else:
context.log.debug(f"Something went wrong, check error status => {str(e)}")
context.log.debug(f"Something went wrong, check error status => {e!s}")

View File

@ -43,7 +43,7 @@ class NXCModule:
# Connect and bind to MS-RPRN (https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-rprn/848b8334-134a-4d02-aea4-03b673d6c515)
stringbinding = r"ncacn_np:%s[\PIPE\spoolss]" % connection.host
context.log.info(f"Binding to {repr(stringbinding)}")
context.log.info(f"Binding to {stringbinding!r}")
rpctransport = transport.DCERPCTransportFactory(stringbinding)

View File

@ -79,7 +79,7 @@ class NXCModule:
else:
smb_rdp.rdp_wrapper(self.action)
except Exception as e:
context.log.fail(f"Enable RDP via smb error: {str(e)}")
context.log.fail(f"Enable RDP via smb error: {e!s}")
elif self.method == "wmi":
context.log.info("Executing over WMI(ncacn_ip_tcp)")
@ -194,7 +194,7 @@ class RdpSmb:
rtype, data = rrp.hBaseRegQueryValue(remoteOps._RemoteOperations__rrp, key_handle, "PortNumber")
self.logger.success(f"RDP Port: {str(data)}")
self.logger.success(f"RDP Port: {data!s}")
# https://github.com/rapid7/metasploit-framework/blob/master/modules/post/windows/manage/enable_rdp.rb
def firewall_cmd(self, action):
@ -318,7 +318,7 @@ class RdpWmi:
self.__iWbemLevel1Login.RemRelease()
std_reg_prov, resp = i_wbem_services.GetObject("StdRegProv")
out = std_reg_prov.GetDWORDValue(2147483650, "SYSTEM\\CurrentControlSet\\Control\\Terminal Server\\WinStations\\RDP-Tcp", "PortNumber")
self.logger.success(f"RDP Port: {str(out.uValue)}")
self.logger.success(f"RDP Port: {out.uValue!s}")
# Nt version under 6 not support RAM.
def rdp_ram_wrapper(self, action):

View File

@ -166,7 +166,7 @@ class NXCModule:
{
"name": recordname,
"type": RECORD_TYPE_MAPPING[dr["Type"]],
"value": address[list(address.fields)[0]].toFqdn(),
"value": address[next(iter(address.fields))].toFqdn(),
}
)
elif dr["Type"] == 28:

View File

@ -234,14 +234,14 @@ class CoerceAuth:
dce.disconnect()
return 1
nxc_logger.debug(f"Something went wrong, check error status => {str(e)}")
nxc_logger.debug(f"Something went wrong, check error status => {e!s}")
nxc_logger.info("Connected!")
nxc_logger.info(f"Binding to {binding_params[pipe]['UUID'][0]}")
try:
dce.bind(uuidtup_to_bin(binding_params[pipe]["UUID"]))
except Exception as e:
nxc_logger.debug(f"Something went wrong, check error status => {str(e)}")
nxc_logger.debug(f"Something went wrong, check error status => {e!s}")
nxc_logger.info("Successfully bound!")
return dce

View File

@ -223,7 +223,7 @@ class SMBSpiderPlus:
except Exception as e:
traceback.print_exc()
self.logger.fail(f"Error enumerating shares: {str(e)}")
self.logger.fail(f"Error enumerating shares: {e!s}")
# Save the metadata.
self.dump_folder_metadata(self.results)
@ -332,7 +332,7 @@ class SMBSpiderPlus:
if "STATUS_SHARING_VIOLATION" in str(e):
pass
except Exception as e:
self.logger.fail(f'Failed to download file "{file_path}". Error: {str(e)}')
self.logger.fail(f'Failed to download file "{file_path}". Error: {e!s}')
# Increment stats counters
if download_success:
@ -385,7 +385,7 @@ class SMBSpiderPlus:
fd.write(json.dumps(results, indent=4, sort_keys=True))
self.logger.success(f'Saved share-file metadata to "{metadata_path}".')
except Exception as e:
self.logger.fail(f"Failed to save share metadata: {str(e)}")
self.logger.fail(f"Failed to save share metadata: {e!s}")
def print_stats(self):
"""Prints the statistics during processing"""

View File

@ -86,7 +86,7 @@ class NXCModule:
perRecordCallback=self.process_record,
)
except LDAPSearchError as e:
context.log.fail(f"Obtained unexpected exception: {str(e)}")
context.log.fail(f"Obtained unexpected exception: {e!s}")
finally:
self.delete_log_file()
@ -140,7 +140,7 @@ class NXCModule:
description = attribute["vals"][0].asOctets().decode("utf-8")
except Exception as e:
entry = sAMAccountName or "item"
self.context.error(f"Skipping {entry}, cannot process LDAP entry due to error: '{str(e)}'")
self.context.error(f"Skipping {entry}, cannot process LDAP entry due to error: '{e!s}'")
if description and sAMAccountName not in self.account_names:
self.desc_count += 1

View File

@ -232,7 +232,7 @@ class ldap(connection):
dce.disconnect()
return 64
except Exception as e:
self.logger.fail(f"Error retrieving os arch of {self.host}: {str(e)}")
self.logger.fail(f"Error retrieving os arch of {self.host}: {e!s}")
return 0
@ -396,13 +396,13 @@ class ldap(connection):
error, desc = e.getErrorString()
used_ccache = " from ccache" if useCache else f":{process_secret(kerb_pass)}"
self.logger.fail(
f"{self.domain}\\{self.username}{used_ccache} {str(error)}",
f"{self.domain}\\{self.username}{used_ccache} {error!s}",
color="magenta" if error in ldap_error_status else "red",
)
return False
except (KeyError, KerberosException, OSError) as e:
self.logger.fail(
f"{self.domain}\\{self.username}{' from ccache' if useCache else ':%s' % (kerb_pass if not self.config.get('nxc', 'audit_mode') else self.config.get('nxc', 'audit_mode') * 8)} {str(e)}",
f"{self.domain}\\{self.username}{' from ccache' if useCache else ':%s' % (kerb_pass if not self.config.get('nxc', 'audit_mode') else self.config.get('nxc', 'audit_mode') * 8)} {e!s}",
color="red",
)
return False
@ -443,7 +443,7 @@ class ldap(connection):
except SessionError as e:
error, desc = e.getErrorString()
self.logger.fail(
f"{self.domain}\\{self.username}{' from ccache' if useCache else ':%s' % (kerb_pass if not self.config.get('nxc', 'audit_mode') else self.config.get('nxc', 'audit_mode') * 8)} {str(error)}",
f"{self.domain}\\{self.username}{' from ccache' if useCache else ':%s' % (kerb_pass if not self.config.get('nxc', 'audit_mode') else self.config.get('nxc', 'audit_mode') * 8)} {error!s}",
color="magenta" if error in ldap_error_status else "red",
)
return False
@ -457,7 +457,7 @@ class ldap(connection):
else:
error_code = str(e).split()[-2][:-1]
self.logger.fail(
f"{self.domain}\\{self.username}{' from ccache' if useCache else ':%s' % (kerb_pass if not self.config.get('nxc', 'audit_mode') else self.config.get('nxc', 'audit_mode') * 8)} {str(error_code)}",
f"{self.domain}\\{self.username}{' from ccache' if useCache else ':%s' % (kerb_pass if not self.config.get('nxc', 'audit_mode') else self.config.get('nxc', 'audit_mode') * 8)} {error_code!s}",
color="magenta" if error_code in ldap_error_status else "red",
)
return False
@ -954,7 +954,7 @@ class ldap(connection):
else:
answers += [[spn, sAMAccountName, memberOf, pwdLastSet, lastLogon, delegation] for spn in SPNs]
except Exception as e:
nxc_logger.error(f"Skipping item, cannot process due to error {str(e)}")
nxc_logger.error(f"Skipping item, cannot process due to error {e!s}")
if len(answers) > 0:
self.logger.display(f"Total of records returned {len(answers):d}")
@ -1121,7 +1121,7 @@ class ldap(connection):
)
except Exception as e:
self.logger.debug("Exception:", exc_info=True)
self.logger.debug(f"Skipping item, cannot process due to error {str(e)}")
self.logger.debug(f"Skipping item, cannot process due to error {e!s}")
if len(answers) > 0:
self.logger.debug(answers)
for value in answers:
@ -1178,7 +1178,7 @@ class ldap(connection):
)
except Exception as e:
self.logger.debug("Exception:", exc_info=True)
self.logger.debug(f"Skipping item, cannot process due to error {str(e)}")
self.logger.debug(f"Skipping item, cannot process due to error {e!s}")
if len(answers) > 0:
self.logger.debug(answers)
for value in answers:

View File

@ -141,7 +141,7 @@ class KerberosAttacks:
kdcHost=self.kdcHost,
)
except Exception as e:
nxc_logger.debug(f"TGT: {str(e)}")
nxc_logger.debug(f"TGT: {e!s}")
tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(
user_name,
self.password,

View File

@ -110,7 +110,7 @@ class LDAPConnect:
return False
except KerberosError as e:
self.logger.fail(
f"{domain}\\{username}:{password if password else ntlm_hash} {str(e)}",
f"{domain}\\{username}:{password if password else ntlm_hash} {e!s}",
color="red",
)
return False
@ -238,7 +238,7 @@ class LAPSv2Extract:
try:
dce.bind(MSRPC_UUID_GKDI)
except Exception as e:
self.logger.error(f"Something went wrong, check error status => {str(e)}")
self.logger.error(f"Something went wrong, check error status => {e!s}")
return False
self.logger.info("Successfully bound")
self.logger.info("Calling MS-GKDI GetKey")

View File

@ -198,7 +198,7 @@ class smb(connection):
dce.disconnect()
return 64
except Exception as e:
self.logger.debug(f"Error retrieving os arch of {self.host}: {str(e)}")
self.logger.debug(f"Error retrieving os arch of {self.host}: {e!s}")
return 0

View File

@ -16,6 +16,7 @@ from sqlalchemy.orm import sessionmaker, scoped_session
from nxc.logger import nxc_logger
import sys
from typing import Optional
# if there is an issue with SQLAlchemy and a connection cannot be cleaned up properly it spews out annoying warnings
warnings.filterwarnings("ignore", category=SAWarning)
@ -716,7 +717,7 @@ class database:
except Exception as e:
nxc_logger.debug(f"Issue while inserting DPAPI Backup Key: {e}")
def get_domain_backupkey(self, domain: str = None):
def get_domain_backupkey(self, domain: Optional[str] = None):
"""
Get domain backupkey
:domain is the domain fqdn
@ -771,11 +772,11 @@ class database:
def get_dpapi_secrets(
self,
filter_term=None,
host: str = None,
dpapi_type: str = None,
windows_user: str = None,
username: str = None,
url: str = None,
host: Optional[str] = None,
dpapi_type: Optional[str] = None,
windows_user: Optional[str] = None,
username: Optional[str] = None,
url: Optional[str] = None,
):
"""Get dpapi secrets from nxcdb"""
q = select(self.DpapiSecrets)

View File

@ -238,7 +238,7 @@ class PassPolDump:
self.logger.highlight(f"Password Complexity Flags: {self.__pass_prop or 'None'}")
for i, a in enumerate(self.__pass_prop):
self.logger.highlight(f"\t{PASSCOMPLEX[i]} {str(a)}")
self.logger.highlight(f"\t{PASSCOMPLEX[i]} {a!s}")
self.logger.highlight("")
self.logger.highlight(f"Minimum password age: {self.__min_pass_age}")

View File

@ -67,7 +67,7 @@ class wmi(connection):
if self.remoteName == "":
self.remoteName = self.host
try:
rpctansport = transport.DCERPCTransportFactory(fr"ncacn_ip_tcp:{self.remoteName}[{str(self.args.port)}]")
rpctansport = transport.DCERPCTransportFactory(fr"ncacn_ip_tcp:{self.remoteName}[{self.args.port!s}]")
rpctansport.set_credentials(username="", password="", domain="", lmhash="", nthash="", aesKey="")
rpctansport.setRemoteHost(self.host)
rpctansport.set_connect_timeout(self.args.rpc_timeout)
@ -241,11 +241,11 @@ class wmi(connection):
out = f"{self.domain}\\{self.username}{used_ccache} {error_msg}"
self.logger.fail(out)
elif "kerberos sessionerror" in str(e).lower():
out = f"{self.domain}\\{self.username}{used_ccache} {list(e.getErrorString())[0]}"
out = f"{self.domain}\\{self.username}{used_ccache} {next(iter(e.getErrorString()))}"
self.logger.fail(out, color="magenta")
return False
else:
out = f"{self.domain}\\{self.username}{used_ccache} {str(e)}"
out = f"{self.domain}\\{self.username}{used_ccache} {e!s}"
self.logger.fail(out, color="red")
return False
else:
@ -292,7 +292,7 @@ class wmi(connection):
except Exception as e:
dce.disconnect()
self.logger.debug(str(e))
out = f"{self.domain}\\{self.username}:{process_secret(self.password)} {str(e)}"
out = f"{self.domain}\\{self.username}:{process_secret(self.password)} {e!s}"
self.logger.fail(out, color="red")
else:
try:
@ -347,7 +347,7 @@ class wmi(connection):
except Exception as e:
dce.disconnect()
self.logger.debug(str(e))
out = f"{domain}\\{self.username}:{process_secret(self.nthash)} {str(e)}"
out = f"{domain}\\{self.username}:{process_secret(self.nthash)} {e!s}"
self.logger.fail(out, color="red")
else:
try:

View File

@ -11,7 +11,6 @@
# cmd.exe /Q /c {command} > C:\windows\temp\{random}.txt (aka command results)
# powershell convert the command results into base64, and save it into C:\windows\temp\{random2}.txt (now the command results was base64 encoded)
# Create registry path: HKLM:\Software\Classes\hello, then add C:\windows\temp\{random2}.txt into HKLM:\Software\Classes\hello\{NewKey}
# Remove anythings which in C:\windows\temp\ # noqa: ERA001
# Stage 2:
# WQL query the HKLM:\Software\Classes\hello\{NewKey} and get results, after the results(base64 strings) retrieved, removed
@ -73,8 +72,8 @@ class WMIEXEC:
self.logger.error(str(e))
def execute_WithOutput(self, command):
result_output = f"C:\\windows\\temp\\{str(uuid.uuid4())}.txt"
result_output_b64 = f"C:\\windows\\temp\\{str(uuid.uuid4())}.txt"
result_output = f"C:\\windows\\temp\\{uuid.uuid4()!s}.txt"
result_output_b64 = f"C:\\windows\\temp\\{uuid.uuid4()!s}.txt"
keyName = str(uuid.uuid4())
self.__registry_Path = f"Software\\Classes\\{gen_random_string(6)}"
@ -100,4 +99,4 @@ class WMIEXEC:
self.logger.debug(f"Removing temporary registry path: HKLM\\{self.__registry_Path}")
retVal = descriptor.DeleteKey(2147483650, self.__registry_Path)
except Exception as e:
self.logger.debug(f"Target: {self.__host} removing temporary registry path error: {str(e)}")
self.logger.debug(f"Target: {self.__host} removing temporary registry path error: {e!s}")

View File

@ -48,8 +48,8 @@ class WMIEXEC_EVENT:
self.logger = logger
self.__exec_timeout = exec_timeout
self.__codec = codec
self.__instanceID = f"windows-object-{str(uuid.uuid4())}"
self.__instanceID_StoreResult = f"windows-object-{str(uuid.uuid4())}"
self.__instanceID = f"windows-object-{uuid.uuid4()!s}"
self.__instanceID_StoreResult = f"windows-object-{uuid.uuid4()!s}"
self.__dcom = DCOMConnection(self.__host, self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash, oxidResolver=True, doKerberos=self.__doKerberos, kdcHost=self.__kdcHost, aesKey=self.__aesKey)
iInterface = self.__dcom.CoCreateInstanceEx(CLSID_WbemLevel1Login, IID_IWbemLevel1Login)
@ -97,7 +97,7 @@ class WMIEXEC_EVENT:
# when wmi doing put instance, it will throwing a exception about data type error (lantin-1),
# but we can base64 encode it and submit the data without spcial charters to avoid it.
if self.__retOutput:
output_file = f"{str(uuid.uuid4())}.txt"
output_file = f"{uuid.uuid4()!s}.txt"
with open(get_ps_script("wmiexec_event_vbscripts/Exec_Command_WithOutput.vbs")) as vbs_file:
vbs = vbs_file.read()
vbs = vbs.replace("REPLACE_ME_BASE64_COMMAND", base64.b64encode(command.encode()).decode())

View File

@ -81,7 +81,7 @@ build-backend = "poetry.core.masonry.api"
# McCabe complexity (`C901`) by default.
# Other options: pep8-naming (N), flake8-annotations (ANN), flake8-blind-except (BLE), flake8-commas (COM), flake8-pyi (PYI), flake8-pytest-style (PT), flake8-unused-arguments (ARG), etc
# Should tackle flake8-use-pathlib (PTH) at some point
select = ["E", "F", "D", "UP", "YTT", "ASYNC", "B", "A", "C4", "ISC", "ICN", "PIE", "PT", "Q", "RSE", "RET", "SIM", "TID", "ERA", "FLY", "PERF", "FURB"]
select = ["E", "F", "D", "UP", "YTT", "ASYNC", "B", "A", "C4", "ISC", "ICN", "PIE", "PT", "Q", "RSE", "RET", "SIM", "TID", "ERA", "FLY", "PERF", "FURB", "LOG", "RUF"]
ignore = [ "E501", "F405", "D100", "D101", "D102", "D103", "D104", "D105", "D106", "D107", "D203", "D204", "D205", "D212", "D213", "D400", "D401", "D415", "D417", "D419", "RET505", "RET506", "RET507", "RET508", "PERF203"]
# Allow autofix for all enabled rules (when `--fix`) is provided.