ruff: fix flake8-builtins (A) issues

main
Marshall Hallenbeck 2023-10-12 17:34:01 -04:00
parent 7edc28359c
commit 40f557b8e9
11 changed files with 21 additions and 21 deletions

View File

@ -74,12 +74,12 @@ def which(cmd, mode=os.F_OK | os.X_OK, path=None):
files = [cmd] files = [cmd]
seen = set() seen = set()
for dir in path: for p in path:
normdir = os.path.normcase(dir) normdir = os.path.normcase(p)
if normdir not in seen: if normdir not in seen:
seen.add(normdir) seen.add(normdir)
for thefile in files: for thefile in files:
name = os.path.join(dir, thefile) name = os.path.join(p, thefile)
if _access_check(name, mode): if _access_check(name, mode):
return name return name
return None return None

View File

@ -35,7 +35,7 @@ class NXCAdapter(logging.LoggerAdapter):
logging.getLogger("lsassy").disabled = True logging.getLogger("lsassy").disabled = True
# logging.getLogger("impacket").disabled = True # logging.getLogger("impacket").disabled = True
def format(self, msg, *args, **kwargs): def format(self, msg, *args, **kwargs): # noqa: A003
"""Format msg for output """Format msg for output
This is used instead of process() since process() applies to _all_ messages, including debug calls This is used instead of process() since process() applies to _all_ messages, including debug calls
@ -191,7 +191,7 @@ class TermEscapeCodeFormatter(logging.Formatter):
def __init__(self, fmt=None, datefmt=None, style="%", validate=True): def __init__(self, fmt=None, datefmt=None, style="%", validate=True):
super().__init__(fmt, datefmt, style, validate) super().__init__(fmt, datefmt, style, validate)
def format(self, record): def format(self, record): # noqa: A003
escape_re = re.compile(r"\x1b\[[0-9;]*m") escape_re = re.compile(r"\x1b\[[0-9;]*m")
record.msg = re.sub(escape_re, "", str(record.msg)) record.msg = re.sub(escape_re, "", str(record.msg))
return super().format(record) return super().format(record)

View File

@ -121,13 +121,13 @@ class NXCModule:
context.log.highlight(ntds_hash) context.log.highlight(ntds_hash)
if ntds_hash.find("$") == -1: if ntds_hash.find("$") == -1:
if ntds_hash.find("\\") != -1: if ntds_hash.find("\\") != -1:
domain, hash = ntds_hash.split("\\") domain, clean_hash = ntds_hash.split("\\")
else: else:
domain = connection.domain domain = connection.domain
hash = ntds_hash clean_hash = ntds_hash
try: try:
username, _, lmhash, nthash, _, _, _ = hash.split(":") username, _, lmhash, nthash, _, _, _ = clean_hash.split(":")
parsed_hash = ":".join((lmhash, nthash)) parsed_hash = ":".join((lmhash, nthash))
if validate_ntlm(parsed_hash): if validate_ntlm(parsed_hash):
context.db.add_credential("hash", domain, username, parsed_hash, pillaged_from=host_id) context.db.add_credential("hash", domain, username, parsed_hash, pillaged_from=host_id)

View File

@ -326,7 +326,7 @@ class SMBSpiderPlus:
download_success = False download_success = False
try: try:
self.logger.info(f'Downloading file "{file_path}" => "{download_path}".') self.logger.info(f'Downloading file "{file_path}" => "{download_path}".')
remote_file.open() remote_file.open_file()
self.save_file(remote_file, share_name) self.save_file(remote_file, share_name)
remote_file.close() remote_file.close()
download_success = True download_success = True

View File

@ -35,7 +35,7 @@ class NXCModule:
try: try:
remote_file = RemoteFile(connection.conn, "DAV RPC Service", "IPC$", access=FILE_READ_DATA) remote_file = RemoteFile(connection.conn, "DAV RPC Service", "IPC$", access=FILE_READ_DATA)
remote_file.open() remote_file.open_file()
remote_file.close() remote_file.close()
context.log.highlight(self.output.format(connection.conn.getRemoteHost())) context.log.highlight(self.output.format(connection.conn.getRemoteHost()))

View File

@ -1691,13 +1691,13 @@ class smb(connection):
self.logger.highlight(ntds_hash) self.logger.highlight(ntds_hash)
if ntds_hash.find("$") == -1: if ntds_hash.find("$") == -1:
if ntds_hash.find("\\") != -1: if ntds_hash.find("\\") != -1:
domain, hash = ntds_hash.split("\\") domain, clean_hash = ntds_hash.split("\\")
else: else:
domain = self.domain domain = self.domain
hash = ntds_hash clean_hash = ntds_hash
try: try:
username, _, lmhash, nthash, _, _, _ = hash.split(":") username, _, lmhash, nthash, _, _, _ = clean_hash.split(":")
parsed_hash = ":".join((lmhash, nthash)) parsed_hash = ":".join((lmhash, nthash))
if validate_ntlm(parsed_hash): if validate_ntlm(parsed_hash):
self.db.add_credential("hash", domain, username, parsed_hash, pillaged_from=host_id) self.db.add_credential("hash", domain, username, parsed_hash, pillaged_from=host_id)

View File

@ -165,11 +165,11 @@ class MMCEXEC:
def execute(self, command, output=False): def execute(self, command, output=False):
self.__retOutput = output self.__retOutput = output
self.execute_remote(command) self.execute_remote(command)
self.exit() self.exit_mmc()
self.__dcom.disconnect() self.__dcom.disconnect()
return self.__outputBuffer return self.__outputBuffer
def exit(self): def exit_mmc(self):
try: try:
dispParams = DISPPARAMS(None, False) dispParams = DISPPARAMS(None, False)
dispParams["rgvarg"] = NULL dispParams["rgvarg"] = NULL
@ -179,7 +179,7 @@ class MMCEXEC:
self.__quit[0].Invoke(self.__quit[1], 0x409, DISPATCH_METHOD, dispParams, 0, [], []) self.__quit[0].Invoke(self.__quit[1], 0x409, DISPATCH_METHOD, dispParams, 0, [], [])
except Exception as e: except Exception as e:
self.logger.fail(f"Unexpect dcom error when doing exit() function in mmcexec: {str(e)}") self.logger.fail(f"Unexpected dcom error: {e}")
return True return True
def execute_remote(self, data): def execute_remote(self, data):

View File

@ -18,7 +18,7 @@ class RemoteFile:
self.__fid = None self.__fid = None
self.__currentOffset = 0 self.__currentOffset = 0
def open(self): def open_file(self):
self.__fid = self.__smbConnection.openFile(self.__tid, self.__fileName, desiredAccess=self.__access) self.__fid = self.__smbConnection.openFile(self.__tid, self.__fileName, desiredAccess=self.__access)
def seek(self, offset, whence): def seek(self, offset, whence):

View File

@ -169,7 +169,7 @@ class SMBSpider:
self.share, self.share,
access=FILE_READ_DATA, access=FILE_READ_DATA,
) )
rfile.open() rfile.open_file()
while True: while True:
try: try:

View File

@ -12,14 +12,14 @@ from nxc.logger import NXCAdapter, nxc_logger
class RequestHandler(BaseHTTPRequestHandler): class RequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args): def log_message(self, display_format, *args):
server_logger = NXCAdapter( server_logger = NXCAdapter(
extra={ extra={
"module_name": self.server.module.name.upper(), "module_name": self.server.module.name.upper(),
"host": self.client_address[0], "host": self.client_address[0],
} }
) )
server_logger.display(f"- - {format % args}") server_logger.display(f"- - {display_format % args}")
def do_GET(self): def do_GET(self):
if hasattr(self.server.module, "on_request"): if hasattr(self.server.module, "on_request"):

View File

@ -80,7 +80,7 @@ build-backend = "poetry.core.masonry.api"
# Ruff doesn't enable pycodestyle warnings (`W`) or # Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default. # McCabe complexity (`C901`) by default.
# Other options: pep8-naming (N), flake8-annotations (ANN), flake8-blind-except (BLE) # Other options: pep8-naming (N), flake8-annotations (ANN), flake8-blind-except (BLE)
select = ["E", "F", "D", "UP", "YTT", "ASYNC", "B"] select = ["E", "F", "D", "UP", "YTT", "ASYNC", "B", "A"]
ignore = [ "E501", "F405", "F841", "D100", "D101", "D102", "D103", "D104", "D105", "D106", "D107", "D203", "D204", "D205", "D212", "D213", "D400", "D401", "D415", "D417", "D419"] ignore = [ "E501", "F405", "F841", "D100", "D101", "D102", "D103", "D104", "D105", "D106", "D107", "D203", "D204", "D205", "D212", "D213", "D400", "D401", "D415", "D417", "D419"]
# Allow autofix for all enabled rules (when `--fix`) is provided. # Allow autofix for all enabled rules (when `--fix`) is provided.