ruff: auto flake8-bugbear, see B006 mutable-argument-default for function param changes

main
Marshall Hallenbeck 2023-10-12 17:17:20 -04:00
parent 2d20b220ce
commit 4045bebd77
16 changed files with 55 additions and 31 deletions

View File

@ -109,7 +109,7 @@ def process_creds(context, connection, credentials_data, dbconnection, cursor, d
for path in paths:
if path:
for key, value in path.items():
for _key, value in path.items():
for item in value:
if isinstance(item, dict):
if {item["name"]} not in reported_da:
@ -173,7 +173,7 @@ class NXCModule:
# lsassy also removes all other handlers and overwrites the formatter which is bad (we want ours)
# so what we do is define "success" as a logging level, then do nothing with the output
logging.addLevelName(25, "SUCCESS")
setattr(logging, "success", lambda message, *args: ())
logging.success = lambda message, *args: ()
host = connection.host
domain_name = connection.domain
@ -247,10 +247,10 @@ class NXCModule:
if len(more_to_dump) > 0:
context.log.display(f"User {user[0]} has more access to {pc[0]}. Attempting to dump.")
connection.domain = user[0].split("@")[1]
setattr(connection, "host", pc[0].split(".")[0])
setattr(connection, "username", user[0].split("@")[0])
setattr(connection, "nthash", user[1])
setattr(connection, "nthash", user[1])
connection.host = pc[0].split(".")[0]
connection.username = user[0].split("@")[0]
connection.nthash = user[1]
connection.nthash = user[1]
try:
self.run_lsassy(context, connection, cursor)
cursor.execute("UPDATE pc_and_admins SET dumped = 'TRUE' WHERE pc_name LIKE '" + pc[0] + "%'")

View File

@ -162,7 +162,7 @@ class NXCModule:
exec_as.append(f"EXECUTE AS LOGIN = '{grantor}';")
return "".join(exec_as)
def perform_impersonation_check(self, user: User, grantors=[]):
def perform_impersonation_check(self, user: User, grantors=None):
"""
Performs an impersonation check for a given user.
@ -187,6 +187,8 @@ class NXCModule:
"""
# build EXECUTE AS if any grantors is specified
if grantors is None:
grantors = []
exec_as = self.sql_exec_as(grantors)
# do we have any privilege ?
if self.update_priv(user, exec_as):

View File

@ -249,7 +249,7 @@ class DNS_COUNT_NAME(Structure):
def toFqdn(self):
ind = 0
labels = []
for i in range(self["LabelCount"]):
for _i in range(self["LabelCount"]):
nextlen = unpack("B", self["RawName"][ind : ind + 1])[0]
labels.append(self["RawName"][ind + 1 : ind + 1 + nextlen].decode("utf-8"))
ind += nextlen + 1

View File

@ -47,7 +47,13 @@ class ConfigCheck:
module = None
def __init__(self, name, description="", checkers=[None], checker_args=[[]], checker_kwargs=[{}]):
def __init__(self, name, description="", checkers=None, checker_args=None, checker_kwargs=None):
if checker_kwargs is None:
checker_kwargs = [{}]
if checker_args is None:
checker_args = [[]]
if checkers is None:
checkers = [None]
self.check_id = None
self.name = name
self.description = description
@ -214,11 +220,13 @@ class HostChecker:
if host_id is not None:
self.connection.db.add_check_result(host_id, check.check_id, check.ok, ", ".join(check.reasons).replace("\x00", ""))
def check_registry(self, *specs, options={}):
def check_registry(self, *specs, options=None):
"""
Perform checks that only require to compare values in the registry with expected values, according to the specs
a spec may be either a 3-tuple: (key name, value name, expected value), or a 4-tuple (key name, value name, expected value, operation), where operation is a function that implements a comparison operator
"""
if options is None:
options = {}
default_options = {"lastWins": False, "stopOnOK": False, "stopOnKO": False, "KOIfMissing": True}
default_options.update(options)
options = default_options

View File

@ -53,7 +53,7 @@ class NXCModule:
rpc_con = transport.DCERPCTransportFactory(binding).get_dce_rpc()
rpc_con.connect()
rpc_con.bind(nrpc.MSRPC_UUID_NRPC)
for attempt in range(0, MAX_ATTEMPTS):
for _attempt in range(0, MAX_ATTEMPTS):
result = try_zero_authenticate(rpc_con, dc_handle, dc_ip, target_computer)
if result:
return True

View File

@ -154,7 +154,7 @@ def main():
protocol_object = getattr(p_loader.load_protocol(protocol_path), args.protocol)
nxc_logger.debug(f"Protocol Object: {protocol_object}")
protocol_db_object = getattr(p_loader.load_protocol(protocol_db_path), "database")
protocol_db_object = p_loader.load_protocol(protocol_db_path).database
nxc_logger.debug(f"Protocol DB Object: {protocol_db_object}")
db_path = path_join(NXC_PATH, "workspaces", nxc_workspace, f"{args.protocol}.db")
@ -165,7 +165,7 @@ def main():
db = protocol_db_object(db_engine)
# with the new nxc/config.py this can be eventually removed, as it can be imported anywhere
setattr(protocol_object, "config", nxc_config)
protocol_object.config = nxc_config
if args.module or args.list_modules:
loader = ModuleLoader(args, db, nxc_logger)
@ -242,7 +242,7 @@ def main():
# get currently set modules, otherwise default to empty list
current_modules = getattr(protocol_object, "module", [])
current_modules.append(module)
setattr(protocol_object, "module", current_modules)
protocol_object.module = current_modules
nxc_logger.debug(f"proto object module after adding: {protocol_object.module}")
if hasattr(args, "ntds") and args.ntds and not args.userntds:

View File

@ -492,7 +492,7 @@ class NXCDBMenu(cmd.Cmd):
self.config.set("nxc", "last_used_db", proto)
self.write_configfile()
try:
proto_menu = getattr(db_nav_object, "navigator")(self, getattr(db_object, "database")(self.conn), proto)
proto_menu = db_nav_object.navigator(self, db_object.database(self.conn), proto)
proto_menu.cmdloop()
except UserExitedProto:
pass
@ -567,7 +567,7 @@ class NXCDBMenu(cmd.Cmd):
c.execute("PRAGMA journal_mode = OFF")
c.execute("PRAGMA foreign_keys = 1")
getattr(protocol_object, "database").db_schema(c)
protocol_object.database.db_schema(c)
# commit the changes and close everything off
conn.commit()
@ -598,7 +598,7 @@ def initialize_db(logger):
c.execute("PRAGMA foreign_keys = 1")
# set a small timeout (5s) so if another thread is writing to the database, the entire program doesn't crash
c.execute("PRAGMA busy_timeout = 5000")
getattr(protocol_object, "database").db_schema(c)
protocol_object.database.db_schema(c)
# commit the changes and close everything off
conn.commit()
conn.close()

View File

@ -21,7 +21,7 @@ def parse_nmap_xml(nmap_output_file, protocol):
targets = []
for host in nmap_report.hosts:
for port, proto in host.get_open_ports():
for port, _proto in host.get_open_ports():
if port in protocol_dict[protocol]["ports"]:
targets.append(host.ipv4)
break

View File

@ -1004,7 +1004,7 @@ class ldap(connection):
self.logger.display(f"Total of records returned {len(answers):d}")
TGT = KerberosAttacks(self).get_tgt_kerberoasting()
dejavue = []
for (SPN, sAMAccountName, memberOf, pwdLastSet, lastLogon, delegation,) in answers:
for (_SPN, sAMAccountName, memberOf, pwdLastSet, lastLogon, delegation,) in answers:
if sAMAccountName not in dejavue:
downLevelLogonName = self.targetDomain + "\\" + sAMAccountName

View File

@ -404,7 +404,7 @@ class mssql(connection):
# The whole tds library in impacket needs a good overhaul to preserve my sanity
def handle_mssql_reply(self):
for keys in self.conn.replies.keys():
for i, key in enumerate(self.conn.replies[keys]):
for _i, key in enumerate(self.conn.replies[keys]):
if key["TokenType"] == TDS_ERROR_TOKEN:
error = f"ERROR({key['ServerName'].decode('utf-16le')}): Line {key['LineNumber']:d}: {key['MsgText'].decode('utf-16le')}"
self.conn.lastError = SQLErrorException(f"ERROR: Line {key['LineNumber']:d}: {key['MsgText'].decode('utf-16le')}")

View File

@ -1197,13 +1197,19 @@ class smb(connection):
self,
share=None,
folder=".",
pattern=[],
regex=[],
exclude_dirs=[],
pattern=None,
regex=None,
exclude_dirs=None,
depth=None,
content=False,
only_files=True,
):
if exclude_dirs is None:
exclude_dirs = []
if regex is None:
regex = []
if pattern is None:
pattern = []
spider = SMBSpider(self.conn, self.logger)
self.logger.display("Started spidering")
@ -1288,7 +1294,7 @@ class smb(connection):
so_far = 0
simultaneous = 1000
for j in range(max_rid // simultaneous + 1):
for _j in range(max_rid // simultaneous + 1):
if (max_rid - so_far) // simultaneous == 0:
sids_to_check = (max_rid - so_far) % simultaneous
else:

View File

@ -875,11 +875,13 @@ class database:
q = select(self.ConfChecksResultsTable)
return self.conn.execute(q).all()
def insert_data(self, table, select_results=[], **new_row):
def insert_data(self, table, select_results=None, **new_row):
"""
Insert a new row in the given table.
Basically it's just a more generic version of add_host
"""
if select_results is None:
select_results = []
results = []
updated_ids = []

View File

@ -385,7 +385,7 @@ class navigator(DatabaseNavigator):
check = check._asdict()
checks_dict[check["id"]] = check
for result_id, host_id, check_id, secure, reasons in results:
for _result_id, host_id, check_id, secure, reasons in results:
status = "OK" if secure else "KO"
host = self.db.get_hosts(host_id)[0]._asdict()
check = checks_dict[check_id]

View File

@ -15,7 +15,7 @@ def d2b(a):
t2bin = tbin[::-1]
if len(t2bin) != 8:
for x in range(6 - len(t2bin)):
for _x in range(6 - len(t2bin)):
t2bin.insert(0, 0)
return "".join([str(g) for g in t2bin])

View File

@ -25,13 +25,19 @@ class SMBSpider:
self,
share,
folder=".",
pattern=[],
regex=[],
exclude_dirs=[],
pattern=None,
regex=None,
exclude_dirs=None,
depth=None,
content=False,
onlyfiles=True,
):
if exclude_dirs is None:
exclude_dirs = []
if regex is None:
regex = []
if pattern is None:
pattern = []
if regex:
try:
self.regex = [re.compile(bytes(rx, "utf8")) for rx in regex]

View File

@ -32,7 +32,7 @@ def db_setup(db_engine):
NXCDBMenu.create_workspace("test", p_loader, protocols)
protocol_db_path = p_loader.get_protocols()[proto]["dbpath"]
protocol_db_object = getattr(p_loader.load_protocol(protocol_db_path), "database")
protocol_db_object = p_loader.load_protocol(protocol_db_path).database
database_obj = protocol_db_object(db_engine)
database_obj.reflect_tables()