import os import struct import logging from io import StringIO from six import indexbytes from datetime import datetime from nxc.config import process_secret from nxc.connection import connection, dcom_FirewallChecker, requires_admin from nxc.logger import NXCAdapter from nxc.protocols.wmi import wmiexec, wmiexec_event from impacket import ntlm from impacket.uuid import uuidtup_to_bin from impacket.krb5.ccache import CCache from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5 import transport, epm from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_WINNT, RPC_C_AUTHN_GSS_NEGOTIATE, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, MSRPC_BIND, MSRPCBind, CtxItem, MSRPCHeader, SEC_TRAILER, MSRPCBindAck from impacket.dcerpc.v5.dcomrt import DCOMConnection from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login import contextlib MSRPC_UUID_PORTMAP = uuidtup_to_bin(("E1AF8308-5D1F-11C9-91A4-08002B14A0FA", "3.0")) class wmi(connection): def __init__(self, args, db, host): self.domain = None self.hash = "" self.lmhash = "" self.nthash = "" self.fqdn = "" self.remoteName = "" self.server_os = None self.doKerberos = False self.stringBinding = None # from: https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-erref/18d8fbe8-a967-4f1c-ae50-99ca8e491d2d self.rpc_error_status = { "0000052F": "STATUS_ACCOUNT_RESTRICTION", "00000533": "STATUS_ACCOUNT_DISABLED", "00000775": "STATUS_ACCOUNT_LOCKED_OUT", "00000701": "STATUS_ACCOUNT_EXPIRED", "00000532": "STATUS_PASSWORD_EXPIRED", "00000530": "STATUS_INVALID_LOGON_HOURS", "00000531": "STATUS_INVALID_WORKSTATION", "00000569": "STATUS_LOGON_TYPE_NOT_GRANTED", "00000773": "STATUS_PASSWORD_MUST_CHANGE", "00000005": "STATUS_ACCESS_DENIED", "0000052E": "STATUS_LOGON_FAILURE", "0000052B": "STATUS_WRONG_PASSWORD", "00000721": "RPC_S_SEC_PKG_ERROR" } connection.__init__(self, args, db, host) def proto_logger(self): self.logger = NXCAdapter( extra={ "protocol": "WMI", "host": self.host, "port": self.args.port, "hostname": self.hostname } ) def create_conn_obj(self): if self.remoteName == "": self.remoteName = self.host try: rpctansport = transport.DCERPCTransportFactory(fr"ncacn_ip_tcp:{self.remoteName}[{str(self.args.port)}]") rpctansport.set_credentials(username="", password="", domain="", lmhash="", nthash="", aesKey="") rpctansport.setRemoteHost(self.host) rpctansport.set_connect_timeout(self.args.rpc_timeout) dce = rpctansport.get_dce_rpc() dce.set_auth_type(RPC_C_AUTHN_WINNT) dce.connect() dce.bind(MSRPC_UUID_PORTMAP) dce.disconnect() except Exception as e: self.logger.debug(str(e)) return False else: self.conn = rpctansport return True def enum_host_info(self): # All code pick from DumpNTLNInfo.py # https://github.com/fortra/impacket/blob/master/examples/DumpNTLMInfo.py ntlmChallenge = None bind = MSRPCBind() item = CtxItem() item["AbstractSyntax"] = epm.MSRPC_UUID_PORTMAP item["TransferSyntax"] = uuidtup_to_bin(("8a885d04-1ceb-11c9-9fe8-08002b104860", "2.0")) item["ContextID"] = 0 item["TransItems"] = 1 bind.addCtxItem(item) packet = MSRPCHeader() packet["type"] = MSRPC_BIND packet["pduData"] = bind.getData() packet["call_id"] = 1 auth = ntlm.getNTLMSSPType1("", "", signingRequired=True, use_ntlmv2=True) sec_trailer = SEC_TRAILER() sec_trailer["auth_type"] = RPC_C_AUTHN_WINNT sec_trailer["auth_level"] = RPC_C_AUTHN_LEVEL_PKT_INTEGRITY sec_trailer["auth_ctx_id"] = 0 + 79231 pad = (4 - (len(packet.get_packet()) % 4)) % 4 if pad != 0: packet["pduData"] += b"\xFF" * pad sec_trailer["auth_pad_len"] = pad packet["sec_trailer"] = sec_trailer packet["auth_data"] = auth try: self.conn.connect() self.conn.send(packet.get_packet()) buffer = self.conn.recv() except Exception: buffer = 0 if buffer != 0: response = MSRPCHeader(buffer) bindResp = MSRPCBindAck(response.getData()) ntlmChallenge = ntlm.NTLMAuthChallenge(bindResp["auth_data"]) if ntlmChallenge["TargetInfoFields_len"] > 0: av_pairs = ntlm.AV_PAIRS(ntlmChallenge["TargetInfoFields"][: ntlmChallenge["TargetInfoFields_len"]]) if av_pairs[ntlm.NTLMSSP_AV_HOSTNAME][1] is not None: try: self.hostname = av_pairs[ntlm.NTLMSSP_AV_HOSTNAME][1].decode("utf-16le") except Exception: self.hostname = self.host if av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME][1] is not None: try: self.domain = av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME][1].decode("utf-16le") except Exception: self.domain = self.args.domain if av_pairs[ntlm.NTLMSSP_AV_DNS_HOSTNAME][1] is not None: with contextlib.suppress(Exception): self.fqdn = av_pairs[ntlm.NTLMSSP_AV_DNS_HOSTNAME][1].decode("utf-16le") if "Version" in ntlmChallenge.fields: version = ntlmChallenge["Version"] if len(version) >= 4: self.server_os = "Windows NT %d.%d Build %d" % (indexbytes(version, 0), indexbytes(version, 1), struct.unpack(" {v['value']}") except Exception as e: if str(e).find("S_FALSE") < 0: self.logger.debug(e) dcom.disconnect() return records @requires_admin def execute(self, command=None, get_output=False): output = "" if not command: command = self.args.execute if not self.args.no_output: get_output = True if "systeminfo" in command and self.args.exec_timeout < 10: self.logger.fail("Execute 'systeminfo' must set the interval time higher than 10 seconds") return False if self.server_os is not None and "NT 5" in self.server_os: self.logger.fail("Execute command failed, not support current server os (version < NT 6)") return False if self.args.exec_method == "wmiexec": exec_method = wmiexec.WMIEXEC(self.conn.getRemoteName(), self.username, self.password, self.domain, self.lmhash, self.nthash, self.doKerberos, self.kdcHost, self.aesKey, self.logger, self.args.exec_timeout, self.args.codec) output = exec_method.execute(command, get_output) elif self.args.exec_method == "wmiexec-event": exec_method = wmiexec_event.WMIEXEC_EVENT(self.conn.getRemoteName(), self.username, self.password, self.domain, self.lmhash, self.nthash, self.doKerberos, self.kdcHost, self.aesKey, self.logger, self.args.exec_timeout, self.args.codec) output = exec_method.execute(command, get_output) self.conn.disconnect() if output == "" and get_output: self.logger.fail("Execute command failed, probabaly got detection by AV.") return False else: self.logger.success(f'Executed command: "{command}" via {self.args.exec_method}') buf = StringIO(output).readlines() for line in buf: self.logger.highlight(line.strip()) return output