NetExec/nxc/protocols/smb/samrfunc.py

254 lines
9.0 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# Majorly stolen from https://gist.github.com/ropnop/7a41da7aabb8455d0898db362335e139
# Which in turn stole from Impacket :)
# Code refactored and added to by @mjhallenbeck (Marshall-Hallenbeck on GitHub)
import logging
2023-05-07 22:51:01 +00:00
from impacket.dcerpc.v5 import transport, lsat, lsad, samr
from impacket.dcerpc.v5.dtypes import MAXIMUM_ALLOWED
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE
2023-05-07 22:51:01 +00:00
from impacket.nmb import NetBIOSError
from impacket.smbconnection import SessionError
from nxc.logger import nxc_logger
2023-04-14 19:15:23 +00:00
class SamrFunc:
def __init__(self, connection):
self.logger = connection.logger
2023-05-08 18:39:36 +00:00
self.addr = connection.host if not connection.kerberos else connection.hostname + "." + connection.domain
self.protocol = connection.args.port
self.username = connection.username
self.password = connection.password
self.domain = connection.domain
self.hash = connection.hash
2023-05-02 15:17:59 +00:00
self.lmhash = ""
self.nthash = ""
2023-07-03 17:18:33 +00:00
self.aesKey = connection.aesKey
self.doKerberos = connection.kerberos
if self.hash is not None:
2023-05-02 15:17:59 +00:00
if self.hash.find(":") != -1:
self.lmhash, self.nthash = self.hash.split(":")
else:
self.nthash = self.hash
if self.password is None:
2023-05-02 15:17:59 +00:00
self.password = ""
self.samr_query = SAMRQuery(
username=self.username,
password=self.password,
2023-07-03 17:18:33 +00:00
domain=self.domain,
remote_name=self.addr,
remote_host=self.addr,
2023-05-02 15:17:59 +00:00
kerberos=self.doKerberos,
2023-07-03 17:18:33 +00:00
aesKey=self.aesKey,
)
self.lsa_query = LSAQuery(username=self.username, password=self.password, domain=self.domain, remote_name=self.addr, remote_host=self.addr, kerberos=self.doKerberos, aesKey=self.aesKey, logger=self.logger)
def get_builtin_groups(self):
domains = self.samr_query.get_domains()
if "Builtin" not in domains:
2023-09-20 15:59:16 +00:00
logging.error("No Builtin group to query locally on")
return
2023-05-02 15:17:59 +00:00
domain_handle = self.samr_query.get_domain_handle("Builtin")
groups = self.samr_query.get_domain_aliases(domain_handle)
return groups
def get_custom_groups(self):
domains = self.samr_query.get_domains()
custom_groups = {}
for domain in domains:
if domain == "Builtin":
continue
domain_handle = self.samr_query.get_domain_handle(domain)
custom_groups.update(self.samr_query.get_domain_aliases(domain_handle))
return custom_groups
def get_local_groups(self):
builtin_groups = self.get_builtin_groups()
custom_groups = self.get_custom_groups()
return {**builtin_groups, **custom_groups}
def get_local_users(self):
pass
def get_local_administrators(self):
self.get_builtin_groups()
if "Administrators" in self.groups:
2023-05-08 18:39:36 +00:00
self.logger.success(f"Found Local Administrators group: RID {self.groups['Administrators']}")
2023-05-02 15:17:59 +00:00
domain_handle = self.samr_query.get_domain_handle("Builtin")
2023-09-20 15:59:16 +00:00
self.logger.debug("Querying group members")
2023-05-08 18:39:36 +00:00
member_sids = self.samr_query.get_alias_members(domain_handle, self.groups["Administrators"])
member_names = self.lsa_query.lookup_sids(member_sids)
for sid, name in zip(member_sids, member_names):
print(f"{name} - {sid}")
class SAMRQuery:
2023-05-02 15:17:59 +00:00
def __init__(
self,
username="",
password="",
domain="",
port=445,
remote_name="",
remote_host="",
kerberos=None,
2023-07-03 17:18:33 +00:00
aesKey="",
2023-05-02 15:17:59 +00:00
):
self.__username = username
self.__password = password
self.__domain = domain
2023-05-02 15:17:59 +00:00
self.__lmhash = ""
self.__nthash = ""
2023-07-03 17:18:33 +00:00
self.__aesKey = aesKey
self.__port = port
self.__remote_name = remote_name
self.__remote_host = remote_host
self.__kerberos = kerberos
self.dce = self.get_dce()
self.server_handle = self.get_server_handle()
def get_transport(self):
string_binding = f"ncacn_np:{self.__port}[\pipe\samr]"
nxc_logger.debug(f"Binding to {string_binding}")
# using a direct SMBTransport instead of DCERPCTransportFactory since we need the filename to be '\samr'
rpc_transport = transport.SMBTransport(
self.__remote_host,
self.__port,
2023-05-02 15:17:59 +00:00
r"\samr",
self.__username,
self.__password,
self.__domain,
self.__lmhash,
self.__nthash,
self.__aesKey,
2023-05-02 15:17:59 +00:00
doKerberos=self.__kerberos,
)
return rpc_transport
def get_dce(self):
rpc_transport = self.get_transport()
try:
dce = rpc_transport.get_dce_rpc()
dce.connect()
dce.bind(samr.MSRPC_UUID_SAMR)
2023-05-07 22:51:01 +00:00
except NetBIOSError as e:
logging.error(f"NetBIOSError on Connection: {e}")
return
2023-05-07 22:51:01 +00:00
except SessionError as e:
logging.error(f"SessionError on Connection: {e}")
return
return dce
def get_server_handle(self):
if self.dce:
try:
resp = samr.hSamrConnect(self.dce)
except samr.DCERPCException as e:
nxc_logger.debug(f"Error while connecting with Samr: {e}")
return None
2023-05-02 15:17:59 +00:00
return resp["ServerHandle"]
else:
2023-09-20 15:59:16 +00:00
nxc_logger.debug("Error creating Samr handle")
return
def get_domains(self):
resp = samr.hSamrEnumerateDomainsInSamServer(self.dce, self.server_handle)
2023-05-02 15:17:59 +00:00
domains = resp["Buffer"]["Buffer"]
domain_names = []
for domain in domains:
2023-05-02 15:17:59 +00:00
domain_names.append(domain["Name"])
return domain_names
def get_domain_handle(self, domain_name):
2023-05-08 18:39:36 +00:00
resp = samr.hSamrLookupDomainInSamServer(self.dce, self.server_handle, domain_name)
resp = samr.hSamrOpenDomain(self.dce, serverHandle=self.server_handle, domainId=resp["DomainId"])
2023-05-02 15:17:59 +00:00
return resp["DomainHandle"]
def get_domain_aliases(self, domain_handle):
resp = samr.hSamrEnumerateAliasesInDomain(self.dce, domain_handle)
aliases = {}
2023-05-02 15:17:59 +00:00
for alias in resp["Buffer"]["Buffer"]:
aliases[alias["Name"]] = alias["RelativeId"]
return aliases
def get_alias_handle(self, domain_handle, alias_id):
2023-05-08 18:39:36 +00:00
resp = samr.hSamrOpenAlias(self.dce, domain_handle, desiredAccess=MAXIMUM_ALLOWED, aliasId=alias_id)
2023-05-02 15:17:59 +00:00
return resp["AliasHandle"]
def get_alias_members(self, domain_handle, alias_id):
alias_handle = self.get_alias_handle(domain_handle, alias_id)
resp = samr.hSamrGetMembersInAlias(self.dce, alias_handle)
member_sids = []
2023-05-02 15:17:59 +00:00
for member in resp["Members"]["Sids"]:
member_sids.append(member["SidPointer"].formatCanonical())
return member_sids
class LSAQuery:
def __init__(self, username="", password="", domain="", port=445, remote_name="", remote_host="", aesKey="", kerberos=None, logger=None):
self.__username = username
self.__password = password
self.__domain = domain
2023-05-02 15:17:59 +00:00
self.__lmhash = ""
self.__nthash = ""
2023-07-03 17:18:33 +00:00
self.__aesKey = aesKey
self.__port = port
self.__remote_name = remote_name
self.__remote_host = remote_host
self.__kerberos = kerberos
self.dce = self.get_dce()
self.policy_handle = self.get_policy_handle()
self.logger = logger
def get_transport(self):
string_binding = f"ncacn_np:{self.__remote_name}[\\pipe\\lsarpc]"
rpc_transport = transport.DCERPCTransportFactory(string_binding)
rpc_transport.set_dport(self.__port)
rpc_transport.setRemoteHost(self.__remote_host)
if self.__kerberos:
rpc_transport.set_kerberos(True, None)
2023-05-02 15:17:59 +00:00
if hasattr(rpc_transport, "set_credentials"):
# This method exists only for selected protocol sequences.
rpc_transport.set_credentials(
self.__username,
self.__password,
self.__domain,
self.__lmhash,
self.__nthash,
self.__aesKey,
)
return rpc_transport
def get_dce(self):
rpc_transport = self.get_transport()
try:
dce = rpc_transport.get_dce_rpc()
if self.__kerberos:
dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE)
dce.connect()
dce.bind(lsat.MSRPC_UUID_LSAT)
2023-05-07 22:51:01 +00:00
except NetBIOSError as e:
self.logger.fail(f"NetBIOSError on Connection: {e}")
return None
return dce
def get_policy_handle(self):
2023-05-08 18:39:36 +00:00
resp = lsad.hLsarOpenPolicy2(self.dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES)
2023-05-02 15:17:59 +00:00
return resp["PolicyHandle"]
def lookup_sids(self, sids):
2023-05-08 18:39:36 +00:00
resp = lsat.hLsarLookupSids(self.dce, self.policy_handle, sids, lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta)
names = []
2023-05-02 15:17:59 +00:00
for translated_names in resp["TranslatedNames"]["Names"]:
names.append(translated_names["Name"])
return names