ruff: add flake8-return (RET) and auto-run

main
Marshall Hallenbeck 2023-10-14 14:16:28 -04:00
parent 781afc9c65
commit cdcde5a91b
39 changed files with 113 additions and 119 deletions

View File

@ -37,7 +37,7 @@ def gethost_addrinfo(hostname):
def requires_admin(func):
def _decorator(self, *args, **kwargs):
if self.admin_privs is False:
return
return None
return func(self, *args, **kwargs)
return wraps(func)(_decorator)
@ -412,6 +412,7 @@ class connection:
return self.hash_login(domain, username, secret)
elif cred_type == "aesKey":
return self.kerberos_login(domain, username, "", "", secret, self.kdcHost, False)
return None
def login(self):
"""Try to login using the credentials specified in the command line or in the database.
@ -460,6 +461,7 @@ class connection:
owned[user_index] = True
if not self.args.continue_on_success:
return True
return None
else:
if len(username) != len(secret):
self.logger.error("Number provided of usernames and passwords/hashes do not match!")
@ -469,6 +471,7 @@ class connection:
owned[user_index] = True
if not self.args.continue_on_success:
return True
return None
def mark_pwned(self):
return highlight(f"({pwned_label})" if self.admin_privs else "")

View File

@ -21,3 +21,4 @@ def get_desktop_uagent(uagent=None):
return desktop_uagents[random.choice(desktop_uagents.keys())]
elif uagent:
return desktop_uagents[uagent]
return None

View File

@ -15,3 +15,4 @@ def highlight(text, color="yellow"):
return f"{colored(text, 'yellow', attrs=['bold'])}"
elif color == "red":
return f"{colored(text, 'red', attrs=['bold'])}"
return None

View File

@ -104,9 +104,8 @@ def obfs_ps_script(path_to_script):
# strip block comments
stripped_code = re.sub(re.compile("<#.*?#>", re.DOTALL), "", script.read())
# strip blank lines, lines starting with #, and verbose/debug statements
stripped_code = "\n".join([line for line in stripped_code.split("\n") if ((line.strip() != "") and (not line.strip().startswith("#")) and (not line.strip().lower().startswith("write-verbose ")) and (not line.strip().lower().startswith("write-debug ")))])
return "\n".join([line for line in stripped_code.split("\n") if ((line.strip() != "") and (not line.strip().startswith("#")) and (not line.strip().lower().startswith("write-verbose ")) and (not line.strip().lower().startswith("write-debug ")))])
return stripped_code
def create_ps_command(ps_command, force_ps32=False, dont_obfs=False, custom_amsi=None):
@ -490,6 +489,5 @@ def invoke_obfuscation(script_string):
choice(["", " "]) + new_script + choice(["", " "]) + "|" + choice(["", " "]) + invoke_expression,
]
obfuscated_payload = choice(invoke_options)
return choice(invoke_options)
return obfuscated_payload

View File

@ -90,6 +90,7 @@ class ModuleLoader:
else:
self.logger.fail(f"Module {module.name.upper()} is not supported for protocol {self.args.protocol}")
sys.exit(1)
return None
def get_module_info(self, module_path):
"""Get the path, description, and options from a module"""

View File

@ -176,13 +176,12 @@ class NXCAdapter(logging.LoggerAdapter):
newpath = os.path.expanduser("~/.nxc") + "/logs/" + datetime.now().strftime("%Y-%m-%d")
if not os.path.exists(newpath):
os.makedirs(newpath)
log_filename = os.path.join(
return os.path.join(
os.path.expanduser("~/.nxc"),
"logs",
datetime.now().strftime("%Y-%m-%d"),
f"log_{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}.log",
)
return log_filename
class TermEscapeCodeFormatter(logging.Formatter):

View File

@ -408,12 +408,11 @@ class NXCModule:
searchFilter=f"(objectSid={sid})",
attributes=["sAMAccountName"],
)[0][0]
samname = self.ldap_session.search(
return self.ldap_session.search(
searchBase=self.baseDN,
searchFilter=f"(objectSid={sid})",
attributes=["sAMAccountName"],
)[0][1][0][1][0]
return samname
except Exception:
context.log.debug(f"SID not found in LDAP: {sid}")
return ""

View File

@ -121,12 +121,12 @@ class TriggerAuth:
dce.connect()
except Exception as e:
nxc_logger.debug(f"Something went wrong, check error status => {str(e)}")
return
return None
try:
dce.bind(uuidtup_to_bin(("4FC742E0-4A10-11CF-8273-00AA004AE673", "3.0")))
except Exception as e:
nxc_logger.debug(f"Something went wrong, check error status => {str(e)}")
return
return None
nxc_logger.debug("[+] Successfully bound!")
return dce

View File

@ -202,8 +202,7 @@ class LsaLookupNames:
request["Names"].append(name1)
request["TranslatedSids"]["Sids"] = NULL
request["LookupLevel"] = lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta
resp = dce.request(request)
return resp
return dce.request(request)
conf = {

View File

@ -78,5 +78,7 @@ class NXCModule:
except socket.gaierror:
context.log.debug("Missing IP")
context.log.highlight(f"{answer[0]} ({answer[1]}) (No IP Found)")
return None
else:
context.log.success(f"Unable to find any computers with the text {self.TEXT}")
return None

View File

@ -82,6 +82,8 @@ class NXCModule:
context.log.success("Found following users: ")
for answer in answers:
context.log.highlight(f"User: {answer[0]} description: {answer[1]}")
return None
return None
def filter_answer(self, context, answers):
# No option to filter

View File

@ -62,6 +62,8 @@ class NXCModule:
context.log.success("Found the following members of the " + self.GROUP + " group:")
for answer in self.answers:
context.log.highlight(f"{answer[0]}")
return None
return None
# Carry out an LDAP search for the Group with the supplied Group name
@ -82,11 +84,9 @@ def do_search(self, context, connection, searchFilter, attributeName):
for attribute in item["attributes"]:
if str(attribute["type"]) == attributeName:
if attributeName == "objectSid":
attribute_value = bytes(attribute["vals"][0])
return attribute_value
return bytes(attribute["vals"][0])
elif attributeName == "distinguishedName":
attribute_value = bytes(attribute["vals"][0])
return attribute_value
return bytes(attribute["vals"][0])
else:
attribute_value = str(attribute["vals"][0])
if attribute_value is not None:

View File

@ -91,3 +91,5 @@ class NXCModule:
# print("Group name: %s" % group_name)
context.log.highlight(f"{group_name}")
return None
return None

View File

@ -42,8 +42,7 @@ def neo4j_local_admins(context, driver):
except Exception as e:
context.log.fail(f"Could not pull admins: {e}")
return None
results = list(admins.data())
return results
return list(admins.data())
def create_db(local_admins, dbconnection, cursor):
@ -233,6 +232,7 @@ class NXCModule:
self.save_credentials(context, connection, cred["domain"], cred["username"], cred["password"], cred["lmhash"], cred["nthash"])
global credentials_data
credentials_data = credentials_output
return None
def spider_pcs(self, context, connection, cursor, dbconnection, driver):
cursor.execute("SELECT * from admin_users WHERE hash is not NULL")

View File

@ -51,6 +51,7 @@ class NXCModule:
# LDAPS bind successful
# because channel binding is not enforced
return False
return None
# Conduct a bind to LDAPS with channel binding supported
# but intentionally miscalculated. In the case that and
@ -73,8 +74,10 @@ class NXCModule:
return False
elif err is not None:
context.log.fail("ERROR while connecting to " + str(connection.domain) + ": " + str(err))
return None
elif err is None:
return False
return None
# Domain Controllers do not have a certificate setup for
# LDAPS on port 636 by default. If this has not been setup,
@ -125,8 +128,10 @@ class NXCModule:
exit()
elif err is None:
return False
return None
else:
context.log.fail(str(err))
return None
# Run trough all our code blocks to determine LDAP signing and channel binding settings.
stype = asyauthSecret.PASS if not connection.nthash else asyauthSecret.NT

View File

@ -118,6 +118,7 @@ class NXCModule:
context.log.debug("Calling process_credentials")
self.process_credentials(context, connection, credentials_output)
return None
def process_credentials(self, context, connection, credentials):
if len(credentials) == 0:

View File

@ -91,9 +91,8 @@ def calculate_doublepulsar_xor_key(s):
temp = ((s & 0xFF00) | (s << 16)) << 8 | (((s >> 16) | s & 0xFF0000) >> 8)
# Multiply the temp value by 2 and perform a bitwise XOR with 0xFFFFFFFF
x = 2 * temp ^ 0xFFFFFFFF
return 2 * temp ^ 0xFFFFFFFF
return x
def negotiate_proto_request():

View File

@ -140,10 +140,10 @@ class NXCModule:
else:
self.context.log.display(f"{user.username} can impersonate: {grantor.username}")
return self.browse_path(context, initial_user, grantor)
return None
def query_and_get_output(self, query):
results = self.mssql_conn.sql_query(query)
return results
return self.mssql_conn.sql_query(query)
def sql_exec_as(self, grantors: list) -> str:
"""
@ -276,8 +276,7 @@ class NXCModule:
self.revert_context(exec_as)
self.context.log.debug(f"Response: {res}")
self.context.log.debug(f"Response Type: {type(res)}")
tables = [table["name"] for table in res]
return tables
return [table["name"] for table in res]
def is_db_owner(self, database, exec_as="") -> bool:
"""
@ -426,8 +425,7 @@ class NXCModule:
WHERE a.permission_name like 'IMPERSONATE%'"""
res = self.query_and_get_output(exec_as + query)
self.revert_context(exec_as)
users = [user["name"] for user in res]
return users
return [user["name"] for user in res]
def remove_sysadmin_priv(self) -> bool:
"""

View File

@ -92,6 +92,8 @@ class NXCModule:
value = self.convert_time_field(field, pso[field])
context.log.highlight(f"{field}: {value}")
context.log.highlight("-----")
return None
else:
context.log.info("No Password Settings Objects (PSO) found.")
return None

View File

@ -135,8 +135,7 @@ class SMBSpiderPlus:
def get_remote_file(self, share, path):
"""Checks if a path is readable in a SMB share."""
try:
remote_file = RemoteFile(self.smb.conn, path, share, access=FILE_READ_DATA)
return remote_file
return RemoteFile(self.smb.conn, path, share, access=FILE_READ_DATA)
except SessionError:
if self.reconnect():
return self.get_remote_file(share, path)

View File

@ -547,7 +547,7 @@ class HostChecker:
root_key, subkey = keyName.split("\\", 1)
except ValueError:
self.context.log.error(f"HostChecker.reg_query_value(): Could not split keyname {keyName}")
return
return None
ans = self._open_root_key(dce, connection, root_key)
if ans is None:

View File

@ -41,8 +41,7 @@ if platform != "win32":
def create_db_engine(db_path):
db_engine = sqlalchemy.create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
return db_engine
return sqlalchemy.create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
async def start_run(protocol_obj, args, db, targets):

View File

@ -25,8 +25,7 @@ class UserExitedProto(Exception):
def create_db_engine(db_path):
db_engine = create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
return db_engine
return create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
def print_table(data, title=None):

View File

@ -89,6 +89,7 @@ class Ftp(connection):
self.conn.close()
return True
self.conn.close()
return None
def list_directory_full(self):
# in the future we can use mlsd/nlst if we want, but this gives a full output like `ls -la`

View File

@ -139,6 +139,7 @@ class database:
if updated_ids:
nxc_logger.debug(f"add_host() - Host IDs Updated: {updated_ids}")
return updated_ids
return None
def add_credential(self, username, password):
"""Check if this credential has already been added to the database, if not add it in."""
@ -179,8 +180,7 @@ class database:
# hacky way to get cred_id since we can't use returning() yet
if len(credentials) == 1:
cred_id = self.get_credential(username, password)
return cred_id
return self.get_credential(username, password)
else:
return credentials
@ -225,8 +225,7 @@ class database:
else:
q = select(self.CredentialsTable)
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def is_host_valid(self, host_id):
"""Check if this host ID is valid."""
@ -260,8 +259,7 @@ class database:
def get_user(self, username):
q = select(self.CredentialsTable).filter(func.lower(self.CredentialsTable.c.username) == func.lower(username))
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def get_users(self, filter_term=None):
q = select(self.CredentialsTable)
@ -272,8 +270,7 @@ class database:
elif filter_term and filter_term != "":
like_term = func.lower(f"%{filter_term}%")
q = q.filter(func.lower(self.CredentialsTable.c.username).like(like_term))
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def add_loggedin_relation(self, cred_id, host_id):
relation_query = select(self.LoggedinRelationsTable).filter(
@ -296,6 +293,7 @@ class database:
return inserted_id_results[0].id
except Exception as e:
nxc_logger.debug(f"Error inserting LoggedinRelation: {e}")
return None
def get_loggedin_relations(self, cred_id=None, host_id=None):
q = select(self.LoggedinRelationsTable) # .returning(self.LoggedinRelationsTable.c.id)
@ -303,8 +301,7 @@ class database:
q = q.filter(self.LoggedinRelationsTable.c.credid == cred_id)
if host_id:
q = q.filter(self.LoggedinRelationsTable.c.hostid == host_id)
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def remove_loggedin_relations(self, cred_id=None, host_id=None):
q = delete(self.LoggedinRelationsTable)

View File

@ -699,8 +699,7 @@ class ldap(connection):
# loop over the count of small endians
sub_authority = "-" + "-".join([str(int.from_bytes(sid[8 + (i * 4) : 12 + (i * 4)], byteorder="little")) for i in range(sub_authorities)])
object_sid = "S-" + str(revision) + "-" + str(identifier_authority) + sub_authority
return object_sid
return "S-" + str(revision) + "-" + str(identifier_authority) + sub_authority
except Exception:
pass
return sid
@ -752,13 +751,12 @@ class ldap(connection):
# Microsoft Active Directory set an hard limit of 1000 entries returned by any search
paged_search_control = ldapasn1_impacket.SimplePagedResultsControl(criticality=True, size=1000)
resp = self.ldapConnection.search(
return self.ldapConnection.search(
searchFilter=searchFilter,
attributes=attributes,
sizeLimit=sizeLimit,
searchControls=[paged_search_control],
)
return resp
except ldap_impacket.LDAPSearchError as e:
if e.getErrorString().find("sizeLimitExceeded") >= 0:
# We should never reach this code as we use paged search now
@ -866,6 +864,7 @@ class ldap(connection):
resp = self.search(search_filter, attributes, 0)
if resp == []:
self.logger.highlight("No entries found!")
return None
elif resp:
answers = []
self.logger.display(f"Total of records returned {len(resp):d}")
@ -922,9 +921,10 @@ class ldap(connection):
return True
else:
self.logger.highlight("No entries found!")
return
return None
else:
self.logger.fail("Error with the LDAP account used")
return None
def kerberoasting(self):
# Building the search filter
@ -1038,8 +1038,9 @@ class ldap(connection):
return True
else:
self.logger.highlight("No entries found!")
return
return None
self.logger.fail("Error with the LDAP account used")
return None
def trusted_for_delegation(self):
# Building the search filter
@ -1186,7 +1187,7 @@ class ldap(connection):
self.logger.highlight(f"User: {value[0]} Status: {value[5]}")
else:
self.logger.fail("No entries found!")
return
return None
def admin_count(self):
# Building the search filter

View File

@ -205,7 +205,7 @@ class KerberosAttacks:
if domain == "":
nxc_logger.error("Empty Domain not allowed in Kerberos")
return
return None
req_body["realm"] = domain
now = datetime.utcnow() + timedelta(days=1)
@ -247,7 +247,7 @@ class KerberosAttacks:
else:
# The user doesn't have UF_DONT_REQUIRE_PREAUTH set
nxc_logger.debug(f"User {userName} doesn't have UF_DONT_REQUIRE_PREAUTH set")
return
return None
# Let's output the TGT enc-part/cipher in Hashcat format, in case somebody wants to use it.
if as_rep["enc-part"]["etype"] == 17 or as_rep["enc-part"]["etype"] == 18:

View File

@ -212,7 +212,7 @@ class LAPSv2Extract:
target_sd = create_sd(sid)
except Exception as e:
self.logger.error(f"Cannot unpack msLAPS-EncryptedPassword blob due to error {e}")
return
return None
# Check if item is in cache
if key_id["RootKeyId"] in kds_cache:

View File

@ -240,8 +240,7 @@ class database:
else:
q = select(self.AdminRelationsTable)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def remove_admin_relation(self, user_ids=None, host_ids=None):
q = delete(self.AdminRelationsTable)
@ -277,8 +276,7 @@ class database:
else:
q = select(self.UsersTable)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def is_host_valid(self, host_id):
"""Check if this host ID is valid."""
@ -306,5 +304,4 @@ class database:
like_term = func.lower(f"%{filter_term}%")
q = select(self.HostsTable).filter(self.HostsTable.c.ip.like(like_term) | func.lower(self.HostsTable.c.hostname).like(like_term))
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()

View File

@ -320,7 +320,7 @@ class smb(connection):
data = d.run()
except Exception as e:
self.logger.fail(str(e))
return
return None
r = loads(data)
msMCSAdmPwd = r["p"]
username_laps = r["n"]
@ -1091,8 +1091,7 @@ class smb(connection):
def users(self):
self.logger.display("Trying to dump local users with SAMRPC protocol")
users = UserSamrDump(self).dump()
return users
return UserSamrDump(self).dump()
def hosts(self):
hosts = []
@ -1493,7 +1492,7 @@ class smb(connection):
conn.smb_session = self.conn
except Exception as e:
self.logger.debug(f"Could not upgrade connection: {e}")
return
return None
plaintexts = {username: password for _, _, username, password, _, _ in self.db.get_credentials(cred_type="plaintext")}
nthashes = {username: nt.split(":")[1] if ":" in nt else nt for _, _, username, nt, _, _ in self.db.get_credentials(cred_type="hash")}
@ -1521,7 +1520,7 @@ class smb(connection):
if len(masterkeys) == 0:
self.logger.fail("No masterkeys looted")
return
return None
self.logger.success(f"Got {highlight(len(masterkeys))} decrypted masterkeys. Looting secrets...")
@ -1626,6 +1625,7 @@ class smb(connection):
credential.password,
credential.url,
)
return None
@requires_admin
def lsa(self):

View File

@ -289,6 +289,7 @@ class database:
if updated_ids:
nxc_logger.debug(f"add_host() - Host IDs Updated: {updated_ids}")
return updated_ids
return None
def add_credential(self, credtype, domain, username, password, group_id=None, pillaged_from=None):
"""Check if this credential has already been added to the database, if not add it in."""
@ -405,8 +406,7 @@ class database:
else:
q = select(self.AdminRelationsTable)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def remove_admin_relation(self, user_ids=None, host_ids=None):
q = delete(self.AdminRelationsTable)
@ -442,8 +442,7 @@ class database:
else:
q = select(self.UsersTable)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def get_credential(self, cred_type, domain, username, password):
q = select(self.UsersTable).filter(
@ -464,6 +463,7 @@ class database:
results = self.conn.execute(q).all()
return len(results) > 0
return None
def is_host_valid(self, host_id):
"""Check if this host ID is valid."""
@ -618,8 +618,7 @@ class database:
elif group_id:
q = select(self.GroupRelationsTable).filter(self.GroupRelationsTable.c.groupid == group_id)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def remove_group_relations(self, user_id=None, group_id=None):
q = delete(self.GroupRelationsTable)
@ -644,16 +643,14 @@ class database:
elif filter_term and filter_term != "":
like_term = func.lower(f"%{filter_term}%")
q = q.filter(func.lower(self.UsersTable.c.username).like(like_term))
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def get_user(self, domain, username):
q = select(self.UsersTable).filter(
func.lower(self.UsersTable.c.domain) == func.lower(domain),
func.lower(self.UsersTable.c.username) == func.lower(username),
)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def get_domain_controllers(self, domain=None):
return self.get_hosts(filter_term="dc", domain=domain)
@ -689,8 +686,7 @@ class database:
q = select(self.SharesTable).filter(self.SharesTable.c.name.like(like_term))
else:
q = select(self.SharesTable)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def get_shares_by_access(self, permissions, share_id=None):
permissions = permissions.lower()
@ -701,8 +697,7 @@ class database:
q = q.filter(self.SharesTable.c.read == 1)
if "w" in permissions:
q = q.filter(self.SharesTable.c.write == 1)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def get_users_with_share_access(self, host_id, share_name, permissions):
permissions = permissions.lower()
@ -711,9 +706,8 @@ class database:
q = q.filter(self.SharesTable.c.read == 1)
if "w" in permissions:
q = q.filter(self.SharesTable.c.write == 1)
results = self.conn.execute(q).all()
return self.conn.execute(q).all()
return results
def add_domain_backupkey(self, domain: str, pvk: bytes):
"""
@ -849,6 +843,7 @@ class database:
return inserted_id_results[0].id
except Exception as e:
nxc_logger.debug(f"Error inserting LoggedinRelation: {e}")
return None
def get_loggedin_relations(self, user_id=None, host_id=None):
q = select(self.LoggedinRelationsTable) # .returning(self.LoggedinRelationsTable.c.id)
@ -856,8 +851,7 @@ class database:
q = q.filter(self.LoggedinRelationsTable.c.userid == user_id)
if host_id:
q = q.filter(self.LoggedinRelationsTable.c.hostid == host_id)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def remove_loggedin_relations(self, user_id=None, host_id=None):
q = delete(self.LoggedinRelationsTable)
@ -920,6 +914,7 @@ class database:
if updated_ids:
nxc_logger.debug(f"add_check() - Checks IDs Updated: {updated_ids}")
return updated_ids
return None
def add_check_result(self, host_id, check_id, secure, reasons):
"""Check if this check result has already been added to the database, if not, add it in."""
@ -932,3 +927,4 @@ class database:
if updated_ids:
nxc_logger.debug(f"add_check_result() - Check Results IDs Updated: {updated_ids}")
return updated_ids
return None

View File

@ -112,7 +112,7 @@ class FirefoxTriage:
json_logins = json.loads(logins_data)
if "logins" not in json_logins:
return [] # No logins key in logins.json file
logins = [
return [
(
self.decode_login_data(row["encryptedUsername"]),
self.decode_login_data(row["encryptedPassword"]),
@ -120,7 +120,6 @@ class FirefoxTriage:
)
for row in json_logins["logins"]
]
return logins
def get_key(self, key4_data, master_password=b""):
fh = tempfile.NamedTemporaryFile()
@ -152,6 +151,7 @@ class FirefoxTriage:
fh.close()
return b""
fh.close()
return None
def is_master_password_correct(self, key_data, master_password=b""):
try:
@ -242,3 +242,4 @@ class FirefoxTriage:
return decrypted
else:
return None
return None

View File

@ -52,12 +52,11 @@ class SamrFunc:
if "Builtin" not in domains:
logging.error("No Builtin group to query locally on")
return
return None
domain_handle = self.samr_query.get_domain_handle("Builtin")
groups = self.samr_query.get_domain_aliases(domain_handle)
return self.samr_query.get_domain_aliases(domain_handle)
return groups
def get_custom_groups(self):
domains = self.samr_query.get_domains()
@ -120,7 +119,7 @@ class SAMRQuery:
string_binding = f"ncacn_np:{self.__port}[\pipe\samr]"
nxc_logger.debug(f"Binding to {string_binding}")
# using a direct SMBTransport instead of DCERPCTransportFactory since we need the filename to be '\samr'
rpc_transport = transport.SMBTransport(
return transport.SMBTransport(
self.__remote_host,
self.__port,
r"\samr",
@ -132,7 +131,6 @@ class SAMRQuery:
self.__aesKey,
doKerberos=self.__kerberos,
)
return rpc_transport
def get_dce(self):
rpc_transport = self.get_transport()
@ -142,10 +140,10 @@ class SAMRQuery:
dce.bind(samr.MSRPC_UUID_SAMR)
except NetBIOSError as e:
logging.error(f"NetBIOSError on Connection: {e}")
return
return None
except SessionError as e:
logging.error(f"SessionError on Connection: {e}")
return
return None
return dce
def get_server_handle(self):
@ -158,7 +156,7 @@ class SAMRQuery:
return resp["ServerHandle"]
else:
nxc_logger.debug("Error creating Samr handle")
return
return None
def get_domains(self):
resp = samr.hSamrEnumerateDomainsInSamServer(self.dce, self.server_handle)

View File

@ -139,6 +139,7 @@ class SMBEXEC:
pass
self.get_output_remote()
return None
def get_output_remote(self):
if self.__retOutput is False:

View File

@ -79,6 +79,7 @@ class ssh(connection):
self.logger.info("Determined user is root via `sudo -ln` command")
self.admin_privs = True
return True
return None
def plaintext_login(self, username, password, private_key=None):
try:
@ -184,3 +185,4 @@ class ssh(connection):
for line in stdout:
self.logger.highlight(line.strip())
return stdout
return None

View File

@ -167,6 +167,7 @@ class database:
if updated_ids:
nxc_logger.debug(f"add_host() - Host IDs Updated: {updated_ids}")
return updated_ids
return None
def add_credential(self, credtype, username, password, key=None):
"""Check if this credential has already been added to the database, if not add it in."""
@ -247,7 +248,7 @@ class database:
nxc_logger.debug(f"check_q: {check_q}")
if check_q:
nxc_logger.debug(f"Key already exists for cred_id {cred_id}")
return
return None
key_data = {"credid": cred_id, "data": key}
self.sess.execute(Insert(self.KeysTable), key_data)
@ -261,8 +262,7 @@ class database:
q = q.filter(self.KeysTable.c.id == key_id)
elif cred_id is not None:
q = q.filter(self.KeysTable.c.credid == cred_id)
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def add_admin_user(self, credtype, username, secret, host_id=None, cred_id=None):
add_links = []
@ -306,8 +306,7 @@ class database:
else:
q = select(self.AdminRelationsTable)
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def remove_admin_relation(self, cred_ids=None, host_ids=None):
q = delete(self.AdminRelationsTable)
@ -343,8 +342,7 @@ class database:
else:
q = select(self.CredentialsTable)
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def get_credential(self, cred_type, username, password):
q = select(self.CredentialsTable).filter(
@ -397,13 +395,11 @@ class database:
elif filter_term and filter_term != "":
like_term = func.lower(f"%{filter_term}%")
q = q.filter(func.lower(self.CredentialsTable.c.username).like(like_term))
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def get_user(self, domain, username):
q = select(self.CredentialsTable).filter(func.lower(self.CredentialsTable.c.username) == func.lower(username))
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def add_loggedin_relation(self, cred_id, host_id, shell=False):
relation_query = select(self.LoggedinRelationsTable).filter(
@ -426,6 +422,7 @@ class database:
return inserted_id_results[0].id
except Exception as e:
nxc_logger.debug(f"Error inserting LoggedinRelation: {e}")
return None
def get_loggedin_relations(self, cred_id=None, host_id=None, shell=None):
q = select(self.LoggedinRelationsTable) # .returning(self.LoggedinRelationsTable.c.id)
@ -435,8 +432,7 @@ class database:
q = q.filter(self.LoggedinRelationsTable.c.hostid == host_id)
if shell:
q = q.filter(self.LoggedinRelationsTable.c.shell == shell)
results = self.sess.execute(q).all()
return results
return self.sess.execute(q).all()
def remove_loggedin_relations(self, cred_id=None, host_id=None):
q = delete(self.LoggedinRelationsTable)

View File

@ -262,8 +262,7 @@ class database:
else:
q = select(self.AdminRelationsTable)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def remove_admin_relation(self, user_ids=None, host_ids=None):
q = delete(self.AdminRelationsTable)
@ -299,8 +298,7 @@ class database:
else:
q = select(self.UsersTable)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def is_credential_local(self, credential_id):
q = select(self.UsersTable.c.domain).filter(self.UsersTable.c.id == credential_id)
@ -311,6 +309,7 @@ class database:
results = self.conn.execute(q).all()
return len(results) > 0
return None
def is_host_valid(self, host_id):
"""Check if this host ID is valid."""
@ -356,16 +355,14 @@ class database:
elif filter_term and filter_term != "":
like_term = func.lower(f"%{filter_term}%")
q = q.filter(func.lower(self.UsersTable.c.username).like(like_term))
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def get_user(self, domain, username):
q = select(self.UsersTable).filter(
func.lower(self.UsersTable.c.domain) == func.lower(domain),
func.lower(self.UsersTable.c.username) == func.lower(username),
)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def add_loggedin_relation(self, user_id, host_id):
relation_query = select(self.LoggedinRelationsTable).filter(
@ -392,8 +389,7 @@ class database:
q = q.filter(self.LoggedinRelationsTable.c.userid == user_id)
if host_id:
q = q.filter(self.LoggedinRelationsTable.c.hostid == host_id)
results = self.conn.execute(q).all()
return results
return self.conn.execute(q).all()
def remove_loggedin_relations(self, user_id=None, host_id=None):
q = delete(self.LoggedinRelationsTable)

View File

@ -80,7 +80,7 @@ build-backend = "poetry.core.masonry.api"
# Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
# Other options: pep8-naming (N), flake8-annotations (ANN), flake8-blind-except (BLE), flake8-commas (COM), flake8-pyi (PYI), flake8-pytest-style (PT), etc
select = ["E", "F", "D", "UP", "YTT", "ASYNC", "B", "A", "C4", "ISC", "ICN", "PIE", "PT", "Q", "RSE"]
select = ["E", "F", "D", "UP", "YTT", "ASYNC", "B", "A", "C4", "ISC", "ICN", "PIE", "PT", "Q", "RSE", "RET"]
ignore = [ "E501", "F405", "F841", "D100", "D101", "D102", "D103", "D104", "D105", "D106", "D107", "D203", "D204", "D205", "D212", "D213", "D400", "D401", "D415", "D417", "D419"]
# Allow autofix for all enabled rules (when `--fix`) is provided.

View File

@ -31,8 +31,7 @@ def get_cli_args():
help="Display errors from commands",
)
parsed_args = parser.parse_args()
return parsed_args
return parser.parse_args()
def generate_commands(args):