formatting and defintion fixes

main
Marshall Hallenbeck 2023-03-30 22:05:45 -04:00
parent fec7a667d2
commit 5682b19bd5
2 changed files with 132 additions and 98 deletions

View File

@ -85,7 +85,7 @@ class connection(object):
def enum_host_info(self):
return
def print_host_info(info):
def print_host_info(self):
return
def create_conn_obj(self):

View File

@ -1,19 +1,15 @@
import binascii
import codecs
import json
import logging
import re
import traceback
import datetime
from enum import Enum
from impacket.ldap import ldaptypes
from impacket.uuid import string_to_bin, bin_to_string
from impacket.uuid import bin_to_string
from cme.helpers.msada_guids import SCHEMA_OBJECTS, EXTENDED_RIGHTS
from ldap3.protocol.formatters.formatters import format_sid
from ldap3.utils.conv import escape_filter_chars
from ldap3.protocol.microsoft import security_descriptor_control
from struct import *
OBJECT_TYPES_GUID = {}
OBJECT_TYPES_GUID.update(SCHEMA_OBJECTS)
@ -190,14 +186,14 @@ class ALLOWED_OBJECT_ACE_MASK_FLAGS(Enum):
WriteProperty = ldaptypes.ACCESS_ALLOWED_OBJECT_ACE.ADS_RIGHT_DS_WRITE_PROP
Self = ldaptypes.ACCESS_ALLOWED_OBJECT_ACE.ADS_RIGHT_DS_SELF
class CMEModule:
'''
Module to read and backup the Discretionary Access Control List of one or multiple objects.
class CMEModule:
"""
Module to read and backup the Discretionary Access Control List of one or multiple objects.
This module is essentially inspired from the dacledit.py script of Impacket that we have coauthored, @_nwodtuhs and me.
It has been converted to an LDAPConnection session, and improvements on the filtering and the ability to specify multiple targets have been added.
It could be interesting to implement the write/remove functions here, but a ldap3 session instead of a LDAPConnection one is required to write.
'''
"""
name = 'daclread'
description = 'Read and backup the Discretionary Access Control List of objects. Based on the work of @_nwodtuhs and @BlWasp_. Be carefull, this module cannot read the DACLS recursively, more explains in the options.'
supported_protocols = ['ldap']
@ -205,7 +201,7 @@ class CMEModule:
multiple_hosts = False
def options(self, context, module_options):
'''
"""
Be carefull, this module cannot read the DACLS recursively. For example, if an object has particular rights because it belongs to a group, the module will not be able to see it directly, you have to check the group rights manually.
TARGET The objects that we want to read or backup the DACLs, sepcified by its SamAccountName
TARGET_DN The object that we want to read or backup the DACL, specified by its DN (usefull to target the domain itself)
@ -214,8 +210,8 @@ class CMEModule:
ACE_TYPE The type of ACE to read (Allowed or Denied)
RIGHTS An interesting right to filter on ('FullControl', 'ResetPassword', 'WriteMembers', 'DCSync')
RIGHTS_GUID A right GUID that specify a particular rights to filter on
'''
"""
self.context = context
if module_options and 'TARGET' in module_options:
if re.search(r'^(.+)\/([^\/]+)$', module_options['TARGET']) is not None:
try:
@ -237,8 +233,7 @@ class CMEModule:
self.principal_sAMAccountName = module_options['PRINCIPAL']
else:
self.principal_sAMAccountName = None
self.principal_SID = None
self.principal_sid = None
if module_options and 'ACTION' in module_options:
self.action = module_options['ACTION']
@ -259,9 +254,9 @@ class CMEModule:
self.filename = None
def on_login(self, context, connection):
'''
"""
On a successful LDAP login we perform a search for the targets' SID, their Security Decriptors and the principal's SID if there is one specified
'''
"""
context.log.highlight("Be carefull, this module cannot read the DACLS recursively.")
self.baseDN = connection.ldapConnection._baseDN
@ -271,8 +266,14 @@ class CMEModule:
if self.principal_sAMAccountName is not None:
_lookedup_principal = self.principal_sAMAccountName
try:
self.principal_SID = format_sid(self.ldap_session.search(searchBase=self.baseDN, searchFilter='(sAMAccountName=%s)' % escape_filter_chars(_lookedup_principal), attributes=['objectSid'])[0][1][0][1][0])
context.log.highlight("Found principal SID to filter on: %s" % self.principal_SID)
self.principal_sid = format_sid(
self.ldap_session.search(
searchBase=self.baseDN,
searchFilter='(sAMAccountName=%s)' % escape_filter_chars(_lookedup_principal),
attributes=['objectSid']
)[0][1][0][1][0]
)
context.log.highlight("Found principal SID to filter on: %s" % self.principal_sid)
except Exception as e:
context.log.error('Principal SID not found in LDAP (%s)' % _lookedup_principal)
exit(1)
@ -286,7 +287,8 @@ class CMEModule:
# Extract security descriptor data
self.target_principal_dn = self.target_principal[0]
self.principal_raw_security_descriptor = str(self.target_principal[1][0][1][0]).encode('latin-1')
self.principal_security_descriptor = ldaptypes.SR_SECURITY_DESCRIPTOR(data=self.principal_raw_security_descriptor)
self.principal_security_descriptor = ldaptypes.SR_SECURITY_DESCRIPTOR(
data=self.principal_raw_security_descriptor)
context.log.highlight('Target principal found in LDAP (%s)' % self.target_principal[0])
except Exception as e:
context.log.error('Target SID not found in LDAP (%s)' % self.target_sAMAccountName)
@ -308,7 +310,8 @@ class CMEModule:
# Extract security descriptor data
self.target_principal_dn = self.target_principal[0]
self.principal_raw_security_descriptor = str(self.target_principal[1][0][1][0]).encode('latin-1')
self.principal_security_descriptor = ldaptypes.SR_SECURITY_DESCRIPTOR(data=self.principal_raw_security_descriptor)
self.principal_security_descriptor = ldaptypes.SR_SECURITY_DESCRIPTOR(
data=self.principal_raw_security_descriptor)
context.log.highlight('Target principal found in LDAP (%s)' % self.target_sAMAccountName)
except Exception as e:
context.log.error('Target SID not found in LDAP (%s)' % self.target_sAMAccountName)
@ -319,15 +322,13 @@ class CMEModule:
if self.action == 'backup':
self.backup(context)
# Main read funtion
# Prints the parsed DACL
def read(self, context):
parsed_dacl = self.parseDACL(context, self.principal_security_descriptor['Dacl'])
self.printparsedDACL(context, parsed_dacl)
parsed_dacl = self.parse_dacl(context, self.principal_security_descriptor['Dacl'])
self.print_parsed_dacl(context, parsed_dacl)
return
# Permits to export the DACL of the targets
# This function is called before any writing action (write, remove or restore)
def backup(self, context):
@ -335,13 +336,13 @@ class CMEModule:
backup["sd"] = binascii.hexlify(self.principal_raw_security_descriptor).decode('latin-1')
backup["dn"] = str(self.target_principal_dn)
if not self.filename:
self.filename = 'dacledit-%s-%s.bak' % (datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),self.target_sAMAccountName)
self.filename = 'dacledit-%s-%s.bak' % (
datetime.datetime.now().strftime("%Y%m%d-%H%M%S"), self.target_sAMAccountName)
with codecs.open(self.filename, 'w', 'latin-1') as outfile:
json.dump(backup, outfile)
context.log.highlight('DACL backed up to %s', self.filename)
self.filename = None
# Attempts to retrieve the DACL in the Security Descriptor of the specified target
def search_target_principal_security_descriptor(self, context, connection):
_lookedup_principal = ""
@ -349,22 +350,35 @@ class CMEModule:
controls = security_descriptor_control(sdflags=0x04)
if self.target_sAMAccountName is not None:
_lookedup_principal = self.target_sAMAccountName
target = self.ldap_session.search(searchBase=self.baseDN, searchFilter='(sAMAccountName=%s)' % escape_filter_chars(_lookedup_principal), attributes=['nTSecurityDescriptor'], searchControls=controls)
target = self.ldap_session.search(
searchBase=self.baseDN,
searchFilter='(sAMAccountName=%s)' % escape_filter_chars(_lookedup_principal),
attributes=['nTSecurityDescriptor'],
searchControls=controls
)
if self.target_DN is not None:
_lookedup_principal = self.target_DN
target = self.ldap_session.search(searchBase=self.baseDN, searchFilter='(distinguishedName=%s)' % _lookedup_principal, attributes=['nTSecurityDescriptor'], searchControls=controls)
target = self.ldap_session.search(
searchBase=self.baseDN,
searchFilter='(distinguishedName=%s)' % _lookedup_principal,
attributes=['nTSecurityDescriptor'],
searchControls=controls
)
try:
self.target_principal = target[0]
except Exception as e:
context.log.error('Principal not found in LDAP (%s), probably an LDAP session issue.' % _lookedup_principal)
exit(0)
# Attempts to retieve the SID and Distinguisehd Name from the sAMAccountName
# Not used for the moment
# - samname : a sAMAccountName
def get_user_info(self, context, samname):
self.ldap_session.search(searchBase=self.baseDN, searchFilter='(sAMAccountName=%s)' % escape_filter_chars(samname), attributes=['objectSid'])
self.ldap_session.search(
searchBase=self.baseDN,
searchFilter='(sAMAccountName=%s)' % escape_filter_chars(samname),
attributes=['objectSid']
)
try:
dn = self.ldap_session.entries[0].entry_dn
sid = format_sid(self.ldap_session.entries[0]['objectSid'].raw_values[0])
@ -373,7 +387,6 @@ class CMEModule:
context.log.error('User not found in LDAP: %s' % samname)
return False
# Attempts to resolve a SID and return the corresponding samaccountname
# - sid : the SID to resolve
def resolveSID(self, context, sid):
@ -383,31 +396,37 @@ class CMEModule:
# Tries to resolve the SID from the LDAP domain dump
else:
try:
dn = self.ldap_session.search(searchBase=self.baseDN, searchFilter='(objectSid=%s)' % sid, attributes=['sAMAccountName'])[0][0]
samname = self.ldap_session.search(searchBase=self.baseDN, searchFilter='(objectSid=%s)' % sid, attributes=['sAMAccountName'])[0][1][0][1][0]
dn = self.ldap_session.search(
searchBase=self.baseDN,
searchFilter='(objectSid=%s)' % sid,
attributes=['sAMAccountName']
)[0][0]
samname = self.ldap_session.search(
searchBase=self.baseDN,
searchFilter='(objectSid=%s)' % sid,
attributes=['sAMAccountName']
)[0][1][0][1][0]
return samname
except Exception as e:
context.log.debug('SID not found in LDAP: %s' % sid)
return ""
# Parses a full DACL
# - dacl : the DACL to parse, submitted in a Security Desciptor format
def parseDACL(self, context, dacl):
def parse_dacl(self, context, dacl):
parsed_dacl = []
context.log.debug("Parsing DACL")
i = 0
for ace in dacl['Data']:
parsed_ace = self.parseACE(context, ace)
parsed_ace = self.parse_ace(context, ace)
parsed_dacl.append(parsed_ace)
i += 1
return parsed_dacl
# Parses an access mask to extract the different values from a simple permission
# https://stackoverflow.com/questions/28029872/retrieving-security-descriptor-and-getting-number-for-filesystemrights
# - fsr : the access mask to parse
def parsePerms(self, fsr):
def parse_perms(self, fsr):
_perms = []
for PERM in SIMPLE_PERMISSIONS:
if (fsr & PERM.value) == PERM.value:
@ -418,12 +437,12 @@ class CMEModule:
_perms.append(PERM.name)
return _perms
# Parses a specified ACE and extract the different values (Flags, Access Mask, Trustee, ObjectType, InheritedObjectType)
# - ace : the ACE to parse
def parseACE(self, context, ace):
def parse_ace(self, context, ace):
# For the moment, only the Allowed and Denied Access ACE are supported
if ace['TypeName'] in [ "ACCESS_ALLOWED_ACE", "ACCESS_ALLOWED_OBJECT_ACE", "ACCESS_DENIED_ACE", "ACCESS_DENIED_OBJECT_ACE" ]:
if ace['TypeName'] in ["ACCESS_ALLOWED_ACE", "ACCESS_ALLOWED_OBJECT_ACE", "ACCESS_DENIED_ACE",
"ACCESS_DENIED_OBJECT_ACE"]:
parsed_ace = {}
parsed_ace['ACE Type'] = ace['TypeName']
# Retrieves ACE's flags
@ -436,8 +455,11 @@ class CMEModule:
# For standard ACE
# Extracts the access mask (by parsing the simple permissions) and the principal's SID
if ace['TypeName'] in ["ACCESS_ALLOWED_ACE", "ACCESS_DENIED_ACE"]:
parsed_ace['Access mask'] = "%s (0x%x)" % (", ".join(self.parsePerms(ace['Ace']['Mask']['Mask'])), ace['Ace']['Mask']['Mask'])
parsed_ace['Trustee (SID)'] = "%s (%s)" % (self.resolveSID(context, ace['Ace']['Sid'].formatCanonical()) or "UNKNOWN", ace['Ace']['Sid'].formatCanonical())
parsed_ace['Access mask'] = "%s (0x%x)" % (
", ".join(self.parse_perms(ace['Ace']['Mask']['Mask'])), ace['Ace']['Mask']['Mask'])
parsed_ace['Trustee (SID)'] = "%s (%s)" % (
self.resolveSID(context, ace['Ace']['Sid'].formatCanonical()) or "UNKNOWN",
ace['Ace']['Sid'].formatCanonical())
# For object-specific ACE
elif ace['TypeName'] in ["ACCESS_ALLOWED_OBJECT_ACE", "ACCESS_DENIED_OBJECT_ACE"]:
@ -464,11 +486,14 @@ class CMEModule:
if ace['Ace']['InheritedObjectTypeLen'] != 0:
inh_obj_type = bin_to_string(ace['Ace']['InheritedObjectType']).lower()
try:
parsed_ace['Inherited type (GUID)'] = "%s (%s)" % (OBJECT_TYPES_GUID[inh_obj_type], inh_obj_type)
parsed_ace['Inherited type (GUID)'] = "%s (%s)" % (
OBJECT_TYPES_GUID[inh_obj_type], inh_obj_type)
except KeyError:
parsed_ace['Inherited type (GUID)'] = "UNKNOWN (%s)" % inh_obj_type
# Extract the Trustee SID (the object that has the right over the DACL bearer)
parsed_ace['Trustee (SID)'] = "%s (%s)" % (self.resolveSID(context, ace['Ace']['Sid'].formatCanonical()) or "UNKNOWN", ace['Ace']['Sid'].formatCanonical())
parsed_ace['Trustee (SID)'] = "%s (%s)" % (
self.resolveSID(context, ace['Ace']['Sid'].formatCanonical()) or "UNKNOWN",
ace['Ace']['Sid'].formatCanonical())
else:
# If the ACE is not an access allowed
@ -483,10 +508,9 @@ class CMEModule:
parsed_ace['DEBUG'] = "ACE type not supported for parsing by dacleditor.py, feel free to contribute"
return parsed_ace
# Prints a full DACL by printing each parsed ACE
# - parsed_dacl : a parsed DACL from parseDACL()
def printparsedDACL(self, context, parsed_dacl):
# - parsed_dacl : a parsed DACL from parse_dacl()
def print_parsed_dacl(self, context, parsed_dacl):
context.log.debug("Printing parsed DACL")
i = 0
# If a specific right or a specific GUID has been specified, only the ACE with this right will be printed
@ -499,58 +523,67 @@ class CMEModule:
try:
if (self.rights == 'FullControl') and (self.rights not in parsed_ace['Access mask']):
print_ace = False
if (self.rights == 'DCSync') and (('Object type (GUID)' not in parsed_ace) or (RIGHTS_GUID.DS_Replication_Get_Changes_All.value not in parsed_ace['Object type (GUID)'])):
if (self.rights == 'DCSync') and (('Object type (GUID)' not in parsed_ace) or (
RIGHTS_GUID.DS_Replication_Get_Changes_All.value not in parsed_ace['Object type (GUID)'])):
print_ace = False
if (self.rights == 'WriteMembers') and (('Object type (GUID)' not in parsed_ace) or (RIGHTS_GUID.WriteMembers.value not in parsed_ace['Object type (GUID)'])):
if (self.rights == 'WriteMembers') and (('Object type (GUID)' not in parsed_ace) or (
RIGHTS_GUID.WriteMembers.value not in parsed_ace['Object type (GUID)'])):
print_ace = False
if (self.rights == 'ResetPassword') and (('Object type (GUID)' not in parsed_ace) or (RIGHTS_GUID.ResetPassword.value not in parsed_ace['Object type (GUID)'])):
if (self.rights == 'ResetPassword') and (('Object type (GUID)' not in parsed_ace) or (
RIGHTS_GUID.ResetPassword.value not in parsed_ace['Object type (GUID)'])):
print_ace = False
except Exception as e:
context.log.error("Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
context.log.error(
"Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
# Filter on specific right GUID
if self.rights_guid is not None:
try:
if ('Object type (GUID)' not in parsed_ace) or (self.rights_guid not in parsed_ace['Object type (GUID)']):
if ('Object type (GUID)' not in parsed_ace) or (
self.rights_guid not in parsed_ace['Object type (GUID)']):
print_ace = False
except Exception as e:
context.log.error("Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
context.log.error(
"Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
# Filter on ACE type
if self.ace_type == 'allowed':
try:
if ('ACCESS_ALLOWED_OBJECT_ACE' not in parsed_ace['ACE Type']) and ('ACCESS_ALLOWED_ACE' not in parsed_ace['ACE Type']):
if ('ACCESS_ALLOWED_OBJECT_ACE' not in parsed_ace['ACE Type']) and (
'ACCESS_ALLOWED_ACE' not in parsed_ace['ACE Type']):
print_ace = False
except Exception as e:
context.log.error("Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
context.log.error(
"Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
else:
try:
if ('ACCESS_DENIED_OBJECT_ACE' not in parsed_ace['ACE Type']) and ('ACCESS_DENIED_ACE' not in parsed_ace['ACE Type']):
if ('ACCESS_DENIED_OBJECT_ACE' not in parsed_ace['ACE Type']) and (
'ACCESS_DENIED_ACE' not in parsed_ace['ACE Type']):
print_ace = False
except Exception as e:
context.log.error("Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
context.log.error(
"Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
# Filter on trusted principal
if self.principal_SID is not None:
if self.principal_sid is not None:
try:
if self.principal_SID not in parsed_ace['Trustee (SID)']:
if self.principal_sid not in parsed_ace['Trustee (SID)']:
print_ace = False
except Exception as e:
context.log.error("Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
context.log.error(
"Error filtering ACE, probably because of ACE type unsupported for parsing yet (%s)" % e)
if print_ace:
print("[*] %-28s" % "ACE[%d] info" % i)
self.printparsedACE(parsed_ace)
self.print_parsed_ace(parsed_ace)
i += 1
# Prints properly a parsed ACE
# - parsed_ace : a parsed ACE from parseACE()
def printparsedACE(self, parsed_ace):
# - parsed_ace : a parsed ACE from parse_ace()
def print_parsed_ace(self, parsed_ace):
elements_name = list(parsed_ace.keys())
for attribute in elements_name:
print("[*] %-26s: %s" % (attribute, parsed_ace[attribute]))
# Retrieves the GUIDs for the specified rights
def build_guids_for_rights(self):
_rights_guids = []
@ -561,6 +594,7 @@ class CMEModule:
elif self.rights == "ResetPassword":
_rights_guids = [RIGHTS_GUID.ResetPassword.value]
elif self.rights == "DCSync":
_rights_guids = [RIGHTS_GUID.DS_Replication_Get_Changes.value, RIGHTS_GUID.DS_Replication_Get_Changes_All.value]
context.log.highlight('Built GUID: %s', _rights_guids)
_rights_guids = [RIGHTS_GUID.DS_Replication_Get_Changes.value,
RIGHTS_GUID.DS_Replication_Get_Changes_All.value]
self.context.log.highlight('Built GUID: %s', _rights_guids)
return _rights_guids