Add user-desc module
Add the user-desc module that obtains user descriptions from Active Directory.main
parent
f5c9bfdf10
commit
d3d077cb7a
|
@ -0,0 +1,172 @@
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
from impacket.ldap import ldap, ldapasn1
|
||||||
|
from impacket.ldap.ldap import LDAPSearchError
|
||||||
|
|
||||||
|
|
||||||
|
class CMEModule:
|
||||||
|
'''
|
||||||
|
Get user descriptions stored in Active Directory.
|
||||||
|
|
||||||
|
Module by Tobias Neitzel (@qtc_de)
|
||||||
|
'''
|
||||||
|
name = 'user-desc'
|
||||||
|
description = 'Get user descriptions stored in Active Directory'
|
||||||
|
supported_protocols = ['ldap']
|
||||||
|
opsec_safe = True
|
||||||
|
multiple_hosts = True
|
||||||
|
|
||||||
|
def options(self, context, module_options):
|
||||||
|
'''
|
||||||
|
LDAP_FILTER Custom LDAP search filter (fully replaces the default search)
|
||||||
|
DESC_FILTER An additional seach filter for descriptions (supports wildcard *)
|
||||||
|
DESC_INVERT An additional seach filter for descriptions (shows non matching)
|
||||||
|
USER_FILTER An additional seach filter for usernames (supports wildcard *)
|
||||||
|
USER_INVERT An additional seach filter for usernames (shows non matching)
|
||||||
|
KEYWORDS Use a custom set of keywords (comma separated)
|
||||||
|
ADD_KEYWORDS Add additional keywords to the default set (comma separated)
|
||||||
|
'''
|
||||||
|
self.log_file = None
|
||||||
|
self.desc_count = 0
|
||||||
|
self.context = context
|
||||||
|
self.account_names = set()
|
||||||
|
self.keywords = {'pass', 'creds', 'creden', 'key', 'secret', 'default'}
|
||||||
|
|
||||||
|
if 'LDAP_FILTER' in module_options:
|
||||||
|
self.search_filter = module_options['LDAP_FILTER']
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.search_filter = '(&(objectclass=user)'
|
||||||
|
|
||||||
|
if 'DESC_FILTER' in module_options:
|
||||||
|
self.search_filter += '(description={})'.format(module_options['DESC_FILTER'])
|
||||||
|
|
||||||
|
if 'DESC_INVERT' in module_options:
|
||||||
|
self.search_filter += '(!(description={}))'.format(module_options['DESC_INVERT'])
|
||||||
|
|
||||||
|
if 'USER_FILTER' in module_options:
|
||||||
|
self.search_filter += '(sAMAccountName={})'.format(module_options['USER_FILTER'])
|
||||||
|
|
||||||
|
if 'USER_INVERT' in module_options:
|
||||||
|
self.search_filter += '(!(sAMAccountName={}))'.format(module_options['USER_INVERT'])
|
||||||
|
|
||||||
|
self.search_filter += ')'
|
||||||
|
|
||||||
|
if 'KEYWORDS' in module_options:
|
||||||
|
self.keywords = set(module_options['KEYWORDS'].split(','))
|
||||||
|
|
||||||
|
elif 'ADD_KEYWORDS' in module_options:
|
||||||
|
add_keywords = set(module_options['ADD_KEYWORDS'].split(','))
|
||||||
|
self.keywords = self.keywords.union(add_keywords)
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
'''
|
||||||
|
Destructor - closes the log file.
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
self.log_file.close()
|
||||||
|
|
||||||
|
info = 'Saved {} user descriptions to {}'.format(self.desc_count, self.log_file.name)
|
||||||
|
self.context.log.highlight(info)
|
||||||
|
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def on_login(self, context, connection):
|
||||||
|
'''
|
||||||
|
On successful LDAP login we perform a search for all user objects that have a description.
|
||||||
|
Users can specify additional LDAP filters that are applied to the query.
|
||||||
|
'''
|
||||||
|
self.create_log_file(connection.conn.getRemoteHost(), datetime.now().strftime("%Y%m%d_%H%M%S"))
|
||||||
|
context.log.debug("Starting LDAP search with search filter '{}'".format(self.search_filter))
|
||||||
|
|
||||||
|
try:
|
||||||
|
sc = ldap.SimplePagedResultsControl()
|
||||||
|
connection.ldapConnection.search(searchFilter=self.search_filter,
|
||||||
|
attributes=['sAMAccountName', 'description'],
|
||||||
|
sizeLimit=0, searchControls=[sc],
|
||||||
|
perRecordCallback=self.process_record)
|
||||||
|
|
||||||
|
except LDAPSearchError as e:
|
||||||
|
context.log.error('Obtained unexpected exception: {}'.format(str(e)))
|
||||||
|
|
||||||
|
def create_log_file(self, host, time):
|
||||||
|
'''
|
||||||
|
Create a log file for dumping user descriptions.
|
||||||
|
'''
|
||||||
|
logfile = 'UserDesc-{}-{}.log'.format(host, time)
|
||||||
|
logfile = Path.home().joinpath('.cme').joinpath('logs').joinpath(logfile)
|
||||||
|
|
||||||
|
self.context.log.debug("Creating log file '{}'".format(logfile))
|
||||||
|
|
||||||
|
self.log_file = open(logfile, 'w')
|
||||||
|
self.append_to_log("User:", "Description:")
|
||||||
|
|
||||||
|
def append_to_log(self, user, description):
|
||||||
|
'''
|
||||||
|
Append a new entry to the log file. Helper function that is only used to have an
|
||||||
|
unified padding on the user field.
|
||||||
|
'''
|
||||||
|
print(user.ljust(25), description, file=self.log_file)
|
||||||
|
|
||||||
|
def process_record(self, item):
|
||||||
|
'''
|
||||||
|
Function that is called to process the items obtained by the LDAP search. All items are
|
||||||
|
written to the log file per default. Items that contain one of the keywords configured
|
||||||
|
within this module are also printed to stdout.
|
||||||
|
|
||||||
|
On large Active Directories there seems to be a problem with duplicate user entries. For
|
||||||
|
some reason the process_record function is called multiple times with the same user entry.
|
||||||
|
Not sure whether this is a fault by this module or by impacket. As a workaround, this
|
||||||
|
function adds each new account name to a set and skips accounts that have already been added.
|
||||||
|
'''
|
||||||
|
if not isinstance(item, ldapasn1.SearchResultEntry):
|
||||||
|
return
|
||||||
|
|
||||||
|
sAMAccountName = ''
|
||||||
|
description = ''
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
for attribute in item['attributes']:
|
||||||
|
|
||||||
|
if str(attribute['type']) == 'sAMAccountName':
|
||||||
|
sAMAccountName = attribute['vals'][0].asOctets().decode('utf-8')
|
||||||
|
|
||||||
|
elif str(attribute['type']) == 'description':
|
||||||
|
description = attribute['vals'][0].asOctets().decode('utf-8')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
|
||||||
|
entry = sAMAccountName or 'item'
|
||||||
|
self.context.error("Skipping {}, cannot process LDAP entry due to error: '{}'".format(entry, str(e)))
|
||||||
|
|
||||||
|
if description and sAMAccountName not in self.account_names:
|
||||||
|
|
||||||
|
self.desc_count += 1
|
||||||
|
self.append_to_log(sAMAccountName, description)
|
||||||
|
|
||||||
|
if self.highlight(description):
|
||||||
|
self.context.log.highlight('User: {} - Description: {}'.format(sAMAccountName, description))
|
||||||
|
|
||||||
|
self.account_names.add(sAMAccountName)
|
||||||
|
|
||||||
|
def highlight(self, description):
|
||||||
|
'''
|
||||||
|
Check for interesting entries. Just checks whether certain keywords are contained within the
|
||||||
|
user description. Keywords are configured at the top of this class within the options function.
|
||||||
|
|
||||||
|
It is tempting to implement more logic here (e.g. catch all strings that are longer than seven
|
||||||
|
characters and contain 3 different character classes). Such functionality is nice when playing
|
||||||
|
CTF in small AD environments. When facing a real AD, such functionality gets annoying, because
|
||||||
|
it generates to much output with 99% of it being false positives.
|
||||||
|
|
||||||
|
The recommended way when targeting user descriptions is to use the keyword filter to catch low
|
||||||
|
hanging fruites. More dedicated searches for sensitive information should be done using the logfile.
|
||||||
|
This allows you to refine your search query at any time without having to pull data from AD again.
|
||||||
|
'''
|
||||||
|
for keyword in self.keywords:
|
||||||
|
if keyword.lower() in description.lower():
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
Loading…
Reference in New Issue