Implement several checks

Administrator's name
Spooler service
WSUS configuration
Guest account
Powershell execution policy
main
François REYNAUD 2023-05-02 20:35:56 +02:00
parent cddbd26119
commit dd08ae0d86
1 changed files with 120 additions and 14 deletions

View File

@ -3,7 +3,7 @@
from pprint import pprint
from impacket.dcerpc.v5.rpcrt import DCERPCException
from impacket.dcerpc.v5 import rrp
from impacket.dcerpc.v5 import rrp, samr, scmr
from impacket.dcerpc.v5.rrp import DCERPCSessionError
from impacket.smbconnection import SessionError as SMBSessionError
#from impacket.dcerpc.v5.dtypes import RPC_UNICODE_STRING, DWORD,
@ -44,6 +44,40 @@ class CMEModule:
print(f'\x1b[33m{msg}', *args, '\x1b[0m')
except BlockingIOError:
pass
except TypeError:
print('\x1b[33m', repr(msg), *args, '\x1b[0m')
def get_local_users(self, connection):
remoteOps = RemoteOperations(smbConnection=connection.conn, doKerberos=False)
remoteOps.connectSamr(remoteOps.getMachineNameAndDomain()[0])
users = remoteOps.getDomainUsers()
remoteOps.finish()
return dict([(user['RelativeId'], user['Name']) for user in users['Buffer']['Buffer']])
def get_service(self, service_name, connection):
remoteOps = RemoteOperations(smbConnection=connection.conn, doKerberos=False)
machine_name,_ = remoteOps.getMachineNameAndDomain()
remoteOps._RemoteOperations__connectSvcCtl()
dce = remoteOps._RemoteOperations__scmr
scm_handle = scmr.hROpenSCManagerW(dce, machine_name)['lpScHandle']
service_handle = scmr.hROpenServiceW(dce, scm_handle, service_name)['lpServiceHandle']
service_config = scmr.hRQueryServiceConfigW(dce, service_handle)['lpServiceConfig']
service_status = scmr.hRQueryServiceStatus(dce, service_handle)['lpServiceStatus']['dwCurrentState']
remoteOps.finish()
return service_config, service_status
def get_user_info(self, connection, rid=501):
remoteOps = RemoteOperations(smbConnection=connection.conn, doKerberos=False)
machine_name, _ = remoteOps.getMachineNameAndDomain()
remoteOps.connectSamr(machine_name)
dce = remoteOps._RemoteOperations__samr
domain_handle = remoteOps._RemoteOperations__domainHandle
user_handle = samr.hSamrOpenUser(dce, domain_handle, userId=rid)['UserHandle']
user_info = samr.hSamrQueryInformationUser2(dce, user_handle, samr.USER_INFORMATION_CLASS.UserAllInformation)
user_info = user_info['Buffer']['All']
remoteOps.finish()
return user_info
def on_admin_login(self, context, connection):
self.log = connection.logger
@ -89,7 +123,6 @@ class CMEModule:
return success, reasons
def check_last_successful_update(self, connection):
records = connection.wmi(wmi_query='Select TimeGenerated FROM Win32_ReliabilityRecords Where EventIdentifier=19', namespace='root\\cimv2')
most_recent_update_date = records[0]['TimeGenerated']['value']
@ -103,6 +136,45 @@ class CMEModule:
else:
return False, [f'Last update was {days_since_last_update} > {OUTDATED_THRESHOLD} days ago']
def check_administrator_name(self, connection):
users = self.get_local_users(connection)
ok = users[500] not in ('Administrator', 'Administrateur')
reasons = [f'Administrator name changed to {users[500]}' if ok else 'Administrator name unchanged']
return ok, reasons
def check_guest_account_disabled(self, connection):
user_info = self.get_user_info(connection)
uac = user_info['UserAccountControl']
disabled = bool(uac & samr.USER_ACCOUNT_DISABLED)
reasons = ['Guest account disabled' if disabled else 'Guest account enabled']
return disabled, reasons
def check_spooler_service(self, connection):
ok = False
service_config, service_status = self.get_service('Spooler', connection)
if service_config['dwStartType'] == scmr.SERVICE_DISABLED:
ok = True
reasons = ['Spooler service disabled']
else:
reasons = ['Spooler service enabled']
if service_status == scmr.SERVICE_RUNNING:
reasons.append('Spooler service running')
elif service_status == scmr.SERVICE_STOPPED:
ok = True
reasons.append('Spooler service not running')
return ok, reasons
def check_wsus_running(self, connection):
ok = True
reasons = []
service_config, service_status = self.get_service('wuauserv', connection)
if service_config['dwStartType'] == scmr.SERVICE_DISABLED:
reasons = ['WSUS service disabled']
elif service_status != scmr.SERVICE_RUNNING:
reasons = ['WSUS service not running']
return ok, reasons
def add_result(self, host, result):
self.results[host]['checks'].append({
"Check":result.name,
@ -118,23 +190,30 @@ class CMEModule:
"""
Class for performing the checks and holding the results
"""
def __init__(self, name, description=""):
def __init__(self, name, description="", options={}):
self.name = name
self.description = description
self.ok = False
self.reasons = []
self.options = {
'lastWins':False,
'stopOnOK':False,
'stopOnKO':False,
'KOIfMissing':True
}
self.options.update(options)
def check(self, *specs, op=operator.eq):
def check(self, *specs, missing_ok=True):
"""
Perform checks that only require to compare values in the registry with expected values, according to the specs
a spec may be either a 3-tuple: (key name, value name, expected value), or a 4-tuple (key name, value name, expected value, operation), where operation is a function that implements a comparison operator
"""
op = operator.eq
self.ok = True
for spec in specs:
if len(spec) == 3:
(key, value_name, expected_value) = spec
op = operator.eq
elif len(spec) == 4:
(key, value_name, expected_value, op) = spec
if op == operator.eq:
@ -167,7 +246,8 @@ class CMEModule:
module.debug(f'Got value {value}')
if type(value) == DCERPCSessionError:
self.ok = False
if missing_ok == False or self.options['KOIfMissing']:
self.ok = False
if value.error_code in (ERROR_NO_MORE_ITEMS, ERROR_FILE_NOT_FOUND):
self.reasons.append(f'{key}: Key not found')
elif value.error_code == ERROR_OBJECT_NOT_FOUND:
@ -175,10 +255,16 @@ class CMEModule:
continue
if op(value, expected_value):
if self.options['lastWins']:
self.ok = True
self.reasons.append(opstring.format(left=f'{key}\\{value_name} ({value})', right=expected_value))
else:
self.reasons.append(nopstring.format(left=f'{key}\\{value_name} ({value})', right=expected_value))
self.ok = False
if self.ok and self.options['stopOnOK']:
break
if not self.ok and self.options['stopOnKO']:
break
return self
@ -196,20 +282,16 @@ class CMEModule:
else:
module.log.warning(self.name + ': ' + '\x1b[1;31mKO\x1b[0m')
# TODO: check_administrator_name
# TODO: check_print_spooler_service
# TODO: check_wsus
# TODO: check_applocker
# TODO: check_powershell_v2_availability
# TODO: check_nbtns
# TODO: check_smb_encryption
# TODO: check_nbtns (should be similar to check_laps)
# TODO: check_smb_encryption (maybe check the characteristics of the connection we already have ?)
# TODO: check_bitlockerconf
# TODO: check_guest_account
# TODO: check_execution_policy
for result in (
ConfigCheck('Last successful update', 'Checks how old is the last successful update').wrap_check(self.check_last_successful_update, connection),
ConfigCheck('LAPS', 'Checks if LAPS is installed').wrap_check(self.check_laps, dce, connection),
ConfigCheck("Administrator's name", 'Checks if Administror user name has been changed').wrap_check(self.check_administrator_name, connection),
ConfigCheck('UAC configuration', 'Checks if UAC configuration is secure').check((
'HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Policies\\System',
'EnableLUA', 1
@ -233,11 +315,24 @@ class CMEModule:
'DisabledComponents', (32, 255), in_
)
),
ConfigCheck('Spooler service', 'Checks if the spooler service is disabled').wrap_check(self.check_spooler_service, connection),
ConfigCheck('WDigest authentication', 'Checks if WDigest authentication is disabled').check((
'HKLM\\SYSTEM\\CurrentControlSet\\Control\\SecurityProviders\\WDigest',
'UseLogonCredential', 0
)
),
ConfigCheck('WSUS configuration', 'Checks if WSUS configuration uses HTTPS')\
.wrap_check(self.check_wsus_running, connection)\
.check((
'HKLM\\Software\\Policies\\Microsoft\\Windows\\WindowsUpdate',
'WUServer', 'https://', startswith
)
)\
.check((
'HKLM\\Software\\Policies\\Microsoft\\Windows\\WindowsUpdate',
'UseWUServer', 0, operator.eq
)
),
ConfigCheck('LSA cache', 'Checks how many logons are kept in the LSA cache').check((
'HKLM\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\Winlogon',
'CachedLogonsCount', 2, le
@ -310,6 +405,7 @@ class CMEModule:
'RestrictedAdminMode', 1
)
),
ConfigCheck('Guest account disabled', 'Checks if the guest account is disabled').wrap_check(self.check_guest_account_disabled, connection),
ConfigCheck('Automatic session lock', 'Checks if the session is automatically locked on after a period of inactivity').check((
'HKCU\\Control Panel\\Desktop',
'ScreenSaverIsSecure', 1
@ -318,6 +414,14 @@ class CMEModule:
'ScreenSaveTimeOut', 300, le
)
),
ConfigCheck('Powershell Execution Policy', 'Checks if the Powershell execution policy is set to "Restricted"', options={'KOIfMissing':False, 'lastWins':True}).check((
'HKLM\\SOFTWARE\\Microsoft\\PowerShell\\1\ShellIds\Microsoft.Powershell',
'ExecutionPolicy', 'Restricted\x00'
),(
'HKCU\\SOFTWARE\\Microsoft\\PowerShell\\1\ShellIds\Microsoft.Powershell',
'ExecutionPolicy', 'Restricted\x00'
)
),
ConfigCheck('Legal notice banner', 'Checks if there is a legal notice banner set').check((
'HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Policies\\System',
'legalnoticecaption', "", operator.ne
@ -441,13 +545,15 @@ class CMEModule:
return data
def le(reg_sz_string, number):
return int(reg_sz_string[:-1]) <= number
def in_(obj, seq):
return obj in seq
def startswith(string, start):
return string.startswith(start)
def ls(smb, path='\\', share='C$'):
l = []
try: