Merge pull request #214 from Pennyw0rth/issue/203

Fix SMB users lookup and return last password set date
main
Alex 2024-03-22 00:48:01 +01:00 committed by GitHub
commit 0608628aff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 100 additions and 46 deletions

View File

@ -1000,8 +1000,12 @@ class smb(connection):
return groups
def users(self):
self.logger.display("Trying to dump local users with SAMRPC protocol")
return UserSamrDump(self).dump()
if len(self.args.users) > 1:
self.logger.display(f"Dumping users: {', '.join(self.args.users)}")
else:
self.logger.info("Trying to dump local users with SAMRPC protocol")
return UserSamrDump(self).dump(self.args.users)
def hosts(self):
hosts = []

View File

@ -38,7 +38,7 @@ def proto_args(parser, std_parser, module_parser):
egroup.add_argument("--disks", action="store_true", help="enumerate disks")
egroup.add_argument("--loggedon-users-filter", action="store", help="only search for specific user, works with regex")
egroup.add_argument("--loggedon-users", action="store_true", help="enumerate logged on users")
egroup.add_argument("--users", nargs="?", const="", metavar="USER", help="enumerate domain users, if a user is specified than only its information is queried.")
egroup.add_argument("--users", nargs="*", metavar="USER", help="enumerate domain users, if a user is specified than only its information is queried.")
egroup.add_argument("--groups", nargs="?", const="", metavar="GROUP", help="enumerate domain groups, if a group is specified than its members are enumerated")
egroup.add_argument("--computers", nargs="?", const="", metavar="COMPUTER", help="enumerate computer users")
egroup.add_argument("--local-groups", nargs="?", const="", metavar="GROUP", help="enumerate local groups, if a group is specified then its members are enumerated")

View File

@ -4,6 +4,7 @@ from impacket.dcerpc.v5 import transport, samr
from impacket.dcerpc.v5.rpcrt import DCERPCException
from impacket.dcerpc.v5.rpcrt import DCERPC_v5
from impacket.nt_errors import STATUS_MORE_ENTRIES
from datetime import datetime, timedelta
class UserSamrDump:
@ -26,6 +27,8 @@ class UserSamrDump:
self.doKerberos = connection.kerberos
self.protocols = UserSamrDump.KNOWN_PROTOCOLS.keys()
self.users = []
self.rpc_transport = None
self.dce = None
if self.hash is not None:
if self.hash.find(":") != -1:
@ -36,46 +39,37 @@ class UserSamrDump:
if self.password is None:
self.password = ""
def dump(self):
def dump(self, requested_users=None):
# Try all requested protocols until one works.
for protocol in self.protocols:
try:
protodef = UserSamrDump.KNOWN_PROTOCOLS[protocol]
port = protodef[1]
except KeyError:
self.logger.debug(f"Invalid Protocol '{protocol}'")
self.logger.debug(f"Invalid Protocol: {protocol}")
self.logger.debug(f"Trying protocol {protocol}")
rpctransport = transport.SMBTransport(
self.addr,
port,
r"\samr",
self.username,
self.password,
self.domain,
self.lmhash,
self.nthash,
self.aesKey,
doKerberos=self.doKerberos,
)
self.rpc_transport = transport.SMBTransport(self.addr, port, r"\samr", self.username, self.password, self.domain, self.lmhash, self.nthash, self.aesKey, doKerberos=self.doKerberos)
try:
self.fetchList(rpctransport)
self.fetch_users(requested_users)
break
except Exception as e:
self.logger.debug(f"Protocol failed: {e}")
self.logger.debug(f"Connection with protocol {protocol} failed: {e}")
return self.users
def fetchList(self, rpctransport):
dce = DCERPC_v5(rpctransport)
dce.connect()
dce.bind(samr.MSRPC_UUID_SAMR)
def fetch_users(self, requested_users):
self.dce = DCERPC_v5(self.rpc_transport)
self.dce.connect()
self.dce.bind(samr.MSRPC_UUID_SAMR)
# Setup Connection
resp = samr.hSamrConnect2(dce)
resp = samr.hSamrConnect2(self.dce)
if resp["ErrorCode"] != 0:
raise Exception("Connect error")
resp2 = samr.hSamrEnumerateDomainsInSamServer(
dce,
self.dce,
serverHandle=resp["ServerHandle"],
enumerationContext=0,
preferedMaximumLength=500,
@ -84,7 +78,7 @@ class UserSamrDump:
raise Exception("Connect error")
resp3 = samr.hSamrLookupDomainInSamServer(
dce,
self.dce,
serverHandle=resp["ServerHandle"],
name=resp2["Buffer"]["Buffer"][0]["Name"],
)
@ -92,7 +86,7 @@ class UserSamrDump:
raise Exception("Connect error")
resp4 = samr.hSamrOpenDomain(
dce,
self.dce,
serverHandle=resp["ServerHandle"],
desiredAccess=samr.MAXIMUM_ALLOWED,
domainId=resp3["DomainId"],
@ -101,28 +95,84 @@ class UserSamrDump:
raise Exception("Connect error")
self.__domains = resp2["Buffer"]["Buffer"]
domainHandle = resp4["DomainHandle"]
domain_handle = resp4["DomainHandle"]
# End Setup
status = STATUS_MORE_ENTRIES
enumerationContext = 0
while status == STATUS_MORE_ENTRIES:
if requested_users:
self.logger.debug(f"Looping through users requested and looking up their information: {requested_users}")
try:
resp = samr.hSamrEnumerateUsersInDomain(dce, domainHandle, enumerationContext=enumerationContext)
names_lookup_resp = samr.hSamrLookupNamesInDomain(self.dce, domain_handle, requested_users)
rids = [r["Data"] for r in names_lookup_resp["RelativeIds"]["Element"]]
self.logger.debug(f"Specific RIDs retrieved: {rids}")
users = self.get_user_info(domain_handle, rids)
except DCERPCException as e:
if str(e).find("STATUS_MORE_ENTRIES") < 0:
self.logger.fail("Error enumerating domain user(s)")
break
resp = e.get_packet()
self.logger.success("Enumerated domain user(s)")
for user in resp["Buffer"]["Buffer"]:
r = samr.hSamrOpenUser(dce, domainHandle, samr.MAXIMUM_ALLOWED, user["RelativeId"])
info_user = samr.hSamrQueryInformationUser2(dce, r["UserHandle"], samr.USER_INFORMATION_CLASS.UserAllInformation)["Buffer"]["All"]["AdminComment"]
self.logger.highlight(f"{self.domain}\\{user['Name']:<30} {info_user}")
self.users.append(user["Name"])
samr.hSamrCloseHandle(dce, r["UserHandle"])
self.logger.debug(f"Exception while requesting users in domain: {e}")
if "STATUS_SOME_NOT_MAPPED" in str(e):
# which user is not translated correctly isn't returned so we can't tell the user which is failing, which is very annoying
self.logger.fail("One of the users requested does not exist in the domain, causing a critical failure during translation, re-check the users and try again")
else:
self.logger.fail(f"Error occurred when looking up users in domain: {e}")
else:
status = STATUS_MORE_ENTRIES
enumerationContext = 0
while status == STATUS_MORE_ENTRIES:
try:
enumerate_users_resp = samr.hSamrEnumerateUsersInDomain(self.dce, domain_handle, enumerationContext=enumerationContext)
except DCERPCException as e:
if str(e).find("STATUS_MORE_ENTRIES") < 0:
self.logger.fail("Error enumerating domain user(s)")
break
enumerate_users_resp = e.get_packet()
enumerationContext = resp["EnumerationContext"]
status = resp["ErrorCode"]
rids = [r["RelativeId"] for r in enumerate_users_resp["Buffer"]["Buffer"]]
self.logger.debug(f"Full domain RIDs retrieved: {rids}")
users = self.get_user_info(domain_handle, rids)
dce.disconnect()
# set these for the while loop
enumerationContext = enumerate_users_resp["EnumerationContext"]
status = enumerate_users_resp["ErrorCode"]
self.print_user_info(users)
self.dce.disconnect()
def get_user_info(self, domain_handle, user_ids):
self.logger.debug(f"Getting user info for users: {user_ids}")
users = []
for user in user_ids:
self.logger.debug(f"Calling hSamrOpenUser for RID {user}")
open_user_resp = samr.hSamrOpenUser(
self.dce,
domain_handle,
samr.MAXIMUM_ALLOWED,
user
)
info_user_resp = samr.hSamrQueryInformationUser2(
self.dce,
open_user_resp["UserHandle"],
samr.USER_INFORMATION_CLASS.UserAllInformation
)["Buffer"]
user_info = info_user_resp["All"]
user_name = user_info["UserName"]
bad_pwd_count = user_info["BadPasswordCount"]
user_description = user_info["AdminComment"]
last_pw_set = old_large_int_to_datetime(user_info["PasswordLastSet"])
if last_pw_set == "1601-01-01 00:00:00":
last_pw_set = "<never>"
users.append({"name": user_name, "description": user_description, "bad_pwd_count": bad_pwd_count, "last_pw_set": last_pw_set})
samr.hSamrCloseHandle(self.dce, open_user_resp["UserHandle"])
return users
def print_user_info(self, users):
self.logger.highlight(f"{'-Username-':<30}{'-Last PW Set-':<20}{'-BadPW-':<8}{'-Description-':<60}")
for user in users:
self.logger.debug(f"Full user info: {user}")
self.logger.highlight(f"{user['name']:<30}{user['last_pw_set']:<20}{user['bad_pwd_count']:<8}{user['description']} ")
def old_large_int_to_datetime(large_int):
combined = (large_int["HighPart"] << 32) | large_int["LowPart"]
timestamp_seconds = combined / 10**7
start_date = datetime(1601, 1, 1)
return (start_date + timedelta(seconds=timestamp_seconds)).replace(microsecond=0).strftime("%Y-%m-%d %H:%M:%S")