NetExec/nxc/helpers/bloodhound.py

104 lines
4.5 KiB
Python
Raw Normal View History

2021-11-19 21:16:37 +00:00
def add_user_bh(user, domain, logger, config):
"""Adds a user to the BloodHound graph database.
2023-09-24 04:25:08 +00:00
Args:
2023-10-12 19:13:16 +00:00
----
2023-09-24 04:25:08 +00:00
user (str or list): The username of the user or a list of user dictionaries.
domain (str): The domain of the user.
logger (Logger): The logger object for logging messages.
config (ConfigParser): The configuration object for accessing BloodHound settings.
Returns:
2023-10-12 19:13:16 +00:00
-------
2023-09-24 04:25:08 +00:00
None
Raises:
2023-10-12 19:13:16 +00:00
------
2023-09-24 04:25:08 +00:00
AuthError: If the provided Neo4J credentials are not valid.
ServiceUnavailable: If Neo4J is not available on the specified URI.
Exception: If an unexpected error occurs with Neo4J.
"""
2021-11-20 21:37:14 +00:00
users_owned = []
if isinstance(user, str):
2023-05-02 15:17:59 +00:00
users_owned.append({"username": user.upper(), "domain": domain.upper()})
2021-11-20 21:37:14 +00:00
else:
users_owned = user
2023-09-22 14:40:29 +00:00
2023-05-02 15:17:59 +00:00
if config.get("BloodHound", "bh_enabled") != "False":
# we do a conditional import here to avoid loading these if BH isn't enabled
from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable
uri = f"bolt://{config.get('BloodHound', 'bh_uri')}:{config.get('BloodHound', 'bh_port')}"
2021-11-19 21:16:37 +00:00
2023-05-02 15:17:59 +00:00
driver = GraphDatabase.driver(
uri,
auth=(
config.get("BloodHound", "bh_user"),
config.get("BloodHound", "bh_pass"),
),
encrypted=False,
)
2021-11-19 21:16:37 +00:00
try:
with driver.session().begin_transaction() as tx:
for user_info in users_owned:
distinguished_name = "".join([f"DC={dc}," for dc in user_info["domain"].split(".")]).rstrip(",")
domain_query = tx.run(f"MATCH (d:Domain) WHERE d.distinguishedname STARTS WITH '{distinguished_name}' RETURN d").data()
if not domain_query:
logger.debug(f"Domain {user_info['domain']} not found in BloodHound. Falling back to domainless query.")
_add_without_domain(user_info, tx, logger)
else:
domain = domain_query[0]["d"].get("name")
_add_with_domain(user_info, domain, tx, logger)
2023-09-20 15:59:16 +00:00
except AuthError:
2023-05-10 20:18:07 +00:00
logger.fail(f"Provided Neo4J credentials ({config.get('BloodHound', 'bh_user')}:{config.get('BloodHound', 'bh_pass')}) are not valid.")
exit()
2023-09-20 15:59:16 +00:00
except ServiceUnavailable:
2023-05-10 20:18:07 +00:00
logger.fail(f"Neo4J does not seem to be available on {uri}.")
exit()
2023-09-22 14:40:29 +00:00
except Exception as e:
logger.fail(f"Unexpected error with Neo4J: {e}")
finally:
driver.close()
def _add_with_domain(user_info, domain, tx, logger):
if user_info["username"][-1] == "$":
user_owned = f"{user_info['username'][:-1]}.{domain}"
account_type = "Computer"
else:
user_owned = f"{user_info['username']}@{domain}"
account_type = "User"
result = tx.run(f"MATCH (c:{account_type} {{name:'{user_owned}'}}) RETURN c").data()
if len(result) == 0:
logger.fail("Account not found in the BloodHound database.")
return
if result[0]["c"].get("owned") in (False, None):
logger.debug(f"MATCH (c:{account_type} {{name:'{user_owned}'}}) SET c.owned=True RETURN c.name AS name")
result = tx.run(f"MATCH (c:{account_type} {{name:'{user_owned}'}}) SET c.owned=True RETURN c.name AS name").data()[0]
logger.highlight(f"Node {result['name']} successfully set as owned in BloodHound")
def _add_without_domain(user_info, tx, logger):
if user_info["username"][-1] == "$":
user_owned = user_info["username"][:-1]
account_type = "Computer"
else:
user_owned = user_info["username"]
account_type = "User"
result = tx.run(f"MATCH (c:{account_type}) WHERE c.name STARTS WITH '{user_owned}' RETURN c").data()
if len(result) == 0:
logger.fail("Account not found in the BloodHound database.")
return
elif len(result) >= 2:
logger.fail(f"Multiple accounts found with the name '{user_info['username']}' in the BloodHound database. Please specify the FQDN ex:domain.local")
return
elif result[0]["c"].get("owned") in (False, None):
logger.debug(f"MATCH (c:{account_type} {{name:'{result[0]['c']['name']}'}}) SET c.owned=True RETURN c.name AS name")
result = tx.run(f"MATCH (c:{account_type} {{name:'{result[0]['c']['name']}'}}) SET c.owned=True RETURN c.name AS name").data()[0]
logger.highlight(f"Node {result['name']} successfully set as owned in BloodHound")