[lib] Improve ntlm_parser.py
Signed-off-by: XiaoliChan <30458572+XiaoliChan@users.noreply.github.com>main
parent
ac60614474
commit
423b70bcfb
|
@ -1,107 +1,34 @@
|
||||||
# Original from here: https://github.com/nopfor/ntlm_challenger
|
# Original from here: https://github.com/fortra/impacket/blob/master/examples/DumpNTLMInfo.py#L568
|
||||||
|
|
||||||
import datetime
|
import struct
|
||||||
|
|
||||||
|
from impacket import ntlm
|
||||||
from impacket.smb3 import WIN_VERSIONS
|
from impacket.smb3 import WIN_VERSIONS
|
||||||
|
import contextlib
|
||||||
|
|
||||||
|
|
||||||
def decoder(byte_string, decode_type):
|
def parse_challenge(challange):
|
||||||
if decode_type == "byte":
|
|
||||||
return byte_string.decode("UTF-8").replace("\x00", "")
|
|
||||||
else:
|
|
||||||
return int.from_bytes(byte_string, "little")
|
|
||||||
|
|
||||||
|
|
||||||
def parse_version(version_bytes):
|
|
||||||
major_version = version_bytes[0]
|
|
||||||
minor_version = version_bytes[1]
|
|
||||||
product_build = decoder(version_bytes[2:4], "int")
|
|
||||||
if product_build in WIN_VERSIONS:
|
|
||||||
return f"{WIN_VERSIONS[product_build]} Build {product_build}"
|
|
||||||
else:
|
|
||||||
return f"Windows {major_version}.{minor_version} Build {product_build}"
|
|
||||||
|
|
||||||
|
|
||||||
def parse_target_info(target_info_bytes):
|
|
||||||
MsvAvEOL = 0x0000
|
|
||||||
MsvAvNbComputerName = 0x0001
|
|
||||||
MsvAvNbDomainName = 0x0002
|
|
||||||
MsvAvDnsComputerName = 0x0003
|
|
||||||
MsvAvDnsDomainName = 0x0004
|
|
||||||
MsvAvDnsTreeName = 0x0005
|
|
||||||
MsvAvFlags = 0x0006
|
|
||||||
MsvAvTimestamp = 0x0007
|
|
||||||
MsvAvSingleHost = 0x0008
|
|
||||||
MsvAvTargetName = 0x0009
|
|
||||||
MsvAvChannelBindings = 0x000A
|
|
||||||
|
|
||||||
target_info = {
|
target_info = {
|
||||||
"MsvAvNbComputerName": None,
|
"hostname": None,
|
||||||
"MsvAvDnsDomainName": None,
|
"domain": None,
|
||||||
}
|
"os_version": None
|
||||||
info_offset = 0
|
|
||||||
|
|
||||||
while info_offset < len(target_info_bytes):
|
|
||||||
av_id = decoder(target_info_bytes[info_offset:info_offset + 2], "int")
|
|
||||||
av_len = decoder(target_info_bytes[info_offset + 2:info_offset + 4], "int")
|
|
||||||
av_value = target_info_bytes[info_offset + 4:info_offset + 4 + av_len]
|
|
||||||
|
|
||||||
info_offset = info_offset + 4 + av_len
|
|
||||||
|
|
||||||
if av_id == MsvAvEOL:
|
|
||||||
pass
|
|
||||||
elif av_id == MsvAvNbComputerName:
|
|
||||||
target_info["MsvAvNbComputerName"] = decoder(av_value, "byte")
|
|
||||||
elif av_id == MsvAvNbDomainName:
|
|
||||||
target_info["MsvAvNbDomainName"] = decoder(av_value, "byte")
|
|
||||||
elif av_id == MsvAvDnsComputerName:
|
|
||||||
target_info["MsvAvDnsComputerName"] = decoder(av_value, "byte")
|
|
||||||
elif av_id == MsvAvDnsDomainName:
|
|
||||||
target_info["MsvAvDnsDomainName"] = decoder(av_value, "byte")
|
|
||||||
elif av_id == MsvAvDnsTreeName:
|
|
||||||
target_info["MsvAvDnsTreeName"] = decoder(av_value, "byte")
|
|
||||||
elif av_id == MsvAvFlags:
|
|
||||||
pass
|
|
||||||
elif av_id == MsvAvTimestamp:
|
|
||||||
filetime = decoder(av_value, "int")
|
|
||||||
microseconds = (filetime - 116444736000000000) / 10
|
|
||||||
time = datetime.datetime(1970, 1, 1) + datetime.timedelta(microseconds=microseconds)
|
|
||||||
target_info["MsvAvTimestamp"] = time.strftime("%b %d, %Y %H:%M:%S.%f")
|
|
||||||
elif av_id == MsvAvSingleHost:
|
|
||||||
target_info["MsvAvSingleHost"] = decoder(av_value, "byte")
|
|
||||||
elif av_id == MsvAvTargetName:
|
|
||||||
target_info["MsvAvTargetName"] = decoder(av_value, "byte")
|
|
||||||
elif av_id == MsvAvChannelBindings:
|
|
||||||
target_info["MsvAvChannelBindings"] = av_value
|
|
||||||
return target_info
|
|
||||||
|
|
||||||
|
|
||||||
def parse_challenge(challenge_message):
|
|
||||||
# TargetNameFields
|
|
||||||
target_name_fields = challenge_message[12:20]
|
|
||||||
target_name_len = decoder(target_name_fields[0:2], "int")
|
|
||||||
target_name_offset = decoder(target_name_fields[4:8], "int")
|
|
||||||
|
|
||||||
# TargetInfoFields
|
|
||||||
target_info_fields = challenge_message[40:48]
|
|
||||||
target_info_len = decoder(target_info_fields[0:2], "int")
|
|
||||||
target_info_offset = decoder(target_info_fields[4:8], "int")
|
|
||||||
|
|
||||||
# Version
|
|
||||||
version = None
|
|
||||||
version_bytes = challenge_message[48:56]
|
|
||||||
version = parse_version(version_bytes)
|
|
||||||
|
|
||||||
# TargetName
|
|
||||||
target_name = challenge_message[target_name_offset:target_name_offset + target_name_len]
|
|
||||||
target_name = decoder(target_name, "byte")
|
|
||||||
|
|
||||||
# TargetInfo
|
|
||||||
target_info_bytes = challenge_message[target_info_offset:target_info_offset + target_info_len]
|
|
||||||
target_info = parse_target_info(target_info_bytes)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"target_name": target_name,
|
|
||||||
"version": version,
|
|
||||||
"target_info": target_info
|
|
||||||
}
|
}
|
||||||
|
challange = ntlm.NTLMAuthChallenge(challange)
|
||||||
|
av_pairs = ntlm.AV_PAIRS(challange["TargetInfoFields"][:challange["TargetInfoFields_len"]])
|
||||||
|
if av_pairs[ntlm.NTLMSSP_AV_HOSTNAME] is not None:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
target_info["hostname"] = av_pairs[ntlm.NTLMSSP_AV_HOSTNAME][1].decode("utf-16le")
|
||||||
|
if av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME] is not None:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
target_info["domain"] = av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME][1].decode("utf-16le")
|
||||||
|
if "Version" in challange.fields:
|
||||||
|
version = challange["Version"]
|
||||||
|
if len(version) >= 4:
|
||||||
|
major_version = version[0]
|
||||||
|
minor_version = version[1]
|
||||||
|
product_build = struct.unpack("<H", version[2:4])[0]
|
||||||
|
if product_build in WIN_VERSIONS:
|
||||||
|
target_info["os_version"] = f"{WIN_VERSIONS[product_build]} Build {product_build}"
|
||||||
|
else:
|
||||||
|
target_info["os_version"] = f"{major_version}.{minor_version} Build {product_build}"
|
||||||
|
return target_info
|
|
@ -52,9 +52,9 @@ class winrm(connection):
|
||||||
|
|
||||||
def enum_host_info(self):
|
def enum_host_info(self):
|
||||||
ntlm_info = parse_challenge(base64.b64decode(self.challenge_header.split(" ")[1].replace(",", "")))
|
ntlm_info = parse_challenge(base64.b64decode(self.challenge_header.split(" ")[1].replace(",", "")))
|
||||||
self.domain = ntlm_info["target_info"]["MsvAvDnsDomainName"]
|
self.domain = ntlm_info["domain"]
|
||||||
self.hostname = ntlm_info["target_info"]["MsvAvNbComputerName"]
|
self.hostname = ntlm_info["hostname"]
|
||||||
self.server_os = ntlm_info["version"]
|
self.server_os = ntlm_info["os_version"]
|
||||||
self.logger.extra["hostname"] = self.hostname
|
self.logger.extra["hostname"] = self.hostname
|
||||||
|
|
||||||
self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}")
|
self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}")
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
import os
|
import os
|
||||||
import struct
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
from six import indexbytes
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
from nxc.helpers.ntlm_parser import parse_challenge
|
||||||
from nxc.config import process_secret
|
from nxc.config import process_secret
|
||||||
from nxc.connection import connection, dcom_FirewallChecker, requires_admin
|
from nxc.connection import connection, dcom_FirewallChecker, requires_admin
|
||||||
from nxc.logger import NXCAdapter
|
from nxc.logger import NXCAdapter
|
||||||
|
@ -18,7 +18,6 @@ from impacket.dcerpc.v5 import transport, epm
|
||||||
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_WINNT, RPC_C_AUTHN_GSS_NEGOTIATE, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, MSRPC_BIND, MSRPCBind, CtxItem, MSRPCHeader, SEC_TRAILER, MSRPCBindAck
|
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_WINNT, RPC_C_AUTHN_GSS_NEGOTIATE, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, MSRPC_BIND, MSRPCBind, CtxItem, MSRPCHeader, SEC_TRAILER, MSRPCBindAck
|
||||||
from impacket.dcerpc.v5.dcomrt import DCOMConnection
|
from impacket.dcerpc.v5.dcomrt import DCOMConnection
|
||||||
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login
|
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login
|
||||||
import contextlib
|
|
||||||
|
|
||||||
MSRPC_UUID_PORTMAP = uuidtup_to_bin(("E1AF8308-5D1F-11C9-91A4-08002B14A0FA", "3.0"))
|
MSRPC_UUID_PORTMAP = uuidtup_to_bin(("E1AF8308-5D1F-11C9-91A4-08002B14A0FA", "3.0"))
|
||||||
|
|
||||||
|
@ -86,7 +85,6 @@ class wmi(connection):
|
||||||
def enum_host_info(self):
|
def enum_host_info(self):
|
||||||
# All code pick from DumpNTLNInfo.py
|
# All code pick from DumpNTLNInfo.py
|
||||||
# https://github.com/fortra/impacket/blob/master/examples/DumpNTLMInfo.py
|
# https://github.com/fortra/impacket/blob/master/examples/DumpNTLMInfo.py
|
||||||
ntlmChallenge = None
|
|
||||||
|
|
||||||
bind = MSRPCBind()
|
bind = MSRPCBind()
|
||||||
item = CtxItem()
|
item = CtxItem()
|
||||||
|
@ -123,39 +121,19 @@ class wmi(connection):
|
||||||
if buffer != 0:
|
if buffer != 0:
|
||||||
response = MSRPCHeader(buffer)
|
response = MSRPCHeader(buffer)
|
||||||
bindResp = MSRPCBindAck(response.getData())
|
bindResp = MSRPCBindAck(response.getData())
|
||||||
|
ntlm_info = parse_challenge(bindResp["auth_data"])
|
||||||
ntlmChallenge = ntlm.NTLMAuthChallenge(bindResp["auth_data"])
|
self.domain = ntlm_info["domain"]
|
||||||
|
self.hostname = ntlm_info["hostname"]
|
||||||
if ntlmChallenge["TargetInfoFields_len"] > 0:
|
self.server_os = ntlm_info["os_version"]
|
||||||
av_pairs = ntlm.AV_PAIRS(ntlmChallenge["TargetInfoFields"][: ntlmChallenge["TargetInfoFields_len"]])
|
self.logger.extra["hostname"] = self.hostname
|
||||||
if av_pairs[ntlm.NTLMSSP_AV_HOSTNAME][1] is not None:
|
|
||||||
try:
|
|
||||||
self.hostname = av_pairs[ntlm.NTLMSSP_AV_HOSTNAME][1].decode("utf-16le")
|
|
||||||
except Exception:
|
|
||||||
self.hostname = self.host
|
|
||||||
if av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME][1] is not None:
|
|
||||||
try:
|
|
||||||
self.domain = av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME][1].decode("utf-16le")
|
|
||||||
except Exception:
|
|
||||||
self.domain = self.args.domain
|
|
||||||
if av_pairs[ntlm.NTLMSSP_AV_DNS_HOSTNAME][1] is not None:
|
|
||||||
with contextlib.suppress(Exception):
|
|
||||||
self.fqdn = av_pairs[ntlm.NTLMSSP_AV_DNS_HOSTNAME][1].decode("utf-16le")
|
|
||||||
if "Version" in ntlmChallenge.fields:
|
|
||||||
version = ntlmChallenge["Version"]
|
|
||||||
if len(version) >= 4:
|
|
||||||
self.server_os = "Windows NT %d.%d Build %d" % (indexbytes(version, 0), indexbytes(version, 1), struct.unpack("<H", version[2:4])[0])
|
|
||||||
else:
|
else:
|
||||||
self.hostname = self.host
|
self.hostname = self.host
|
||||||
|
|
||||||
if self.args.local_auth:
|
if self.args.local_auth:
|
||||||
self.domain = self.hostname
|
self.domain = self.hostname
|
||||||
if self.args.domain:
|
if self.args.domain:
|
||||||
self.domain = self.args.domain
|
self.domain = self.args.domain
|
||||||
self.fqdn = f"{self.hostname}.{self.domain}"
|
self.fqdn = f"{self.hostname}.{self.domain}"
|
||||||
|
|
||||||
self.logger.extra["hostname"] = self.hostname
|
|
||||||
|
|
||||||
self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}".replace(":", "-"))
|
self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}".replace(":", "-"))
|
||||||
|
|
||||||
def print_host_info(self):
|
def print_host_info(self):
|
||||||
|
|
Loading…
Reference in New Issue