Merge pull request #191 from XiaoliChan/ntlm_parser

[lib] Improve ntlm_parser.py
main
Alex 2024-03-03 07:03:34 -05:00 committed by GitHub
commit 31b2a1f458
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 37 additions and 132 deletions

View File

@ -1,107 +1,34 @@
# Original from here: https://github.com/nopfor/ntlm_challenger # Original from here: https://github.com/fortra/impacket/blob/master/examples/DumpNTLMInfo.py#L568
import datetime import struct
from impacket import ntlm
from impacket.smb3 import WIN_VERSIONS from impacket.smb3 import WIN_VERSIONS
import contextlib
def decoder(byte_string, decode_type): def parse_challenge(challange):
if decode_type == "byte":
return byte_string.decode("UTF-8").replace("\x00", "")
else:
return int.from_bytes(byte_string, "little")
def parse_version(version_bytes):
major_version = version_bytes[0]
minor_version = version_bytes[1]
product_build = decoder(version_bytes[2:4], "int")
if product_build in WIN_VERSIONS:
return f"{WIN_VERSIONS[product_build]} Build {product_build}"
else:
return f"Windows {major_version}.{minor_version} Build {product_build}"
def parse_target_info(target_info_bytes):
MsvAvEOL = 0x0000
MsvAvNbComputerName = 0x0001
MsvAvNbDomainName = 0x0002
MsvAvDnsComputerName = 0x0003
MsvAvDnsDomainName = 0x0004
MsvAvDnsTreeName = 0x0005
MsvAvFlags = 0x0006
MsvAvTimestamp = 0x0007
MsvAvSingleHost = 0x0008
MsvAvTargetName = 0x0009
MsvAvChannelBindings = 0x000A
target_info = { target_info = {
"MsvAvNbComputerName": None, "hostname": None,
"MsvAvDnsDomainName": None, "domain": None,
"os_version": None
} }
info_offset = 0 challange = ntlm.NTLMAuthChallenge(challange)
av_pairs = ntlm.AV_PAIRS(challange["TargetInfoFields"][:challange["TargetInfoFields_len"]])
while info_offset < len(target_info_bytes): if av_pairs[ntlm.NTLMSSP_AV_HOSTNAME] is not None:
av_id = decoder(target_info_bytes[info_offset:info_offset + 2], "int") with contextlib.suppress(Exception):
av_len = decoder(target_info_bytes[info_offset + 2:info_offset + 4], "int") target_info["hostname"] = av_pairs[ntlm.NTLMSSP_AV_HOSTNAME][1].decode("utf-16le")
av_value = target_info_bytes[info_offset + 4:info_offset + 4 + av_len] if av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME] is not None:
with contextlib.suppress(Exception):
info_offset = info_offset + 4 + av_len target_info["domain"] = av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME][1].decode("utf-16le")
if "Version" in challange.fields:
if av_id == MsvAvEOL: version = challange["Version"]
pass if len(version) >= 4:
elif av_id == MsvAvNbComputerName: major_version = version[0]
target_info["MsvAvNbComputerName"] = decoder(av_value, "byte") minor_version = version[1]
elif av_id == MsvAvNbDomainName: product_build = struct.unpack("<H", version[2:4])[0]
target_info["MsvAvNbDomainName"] = decoder(av_value, "byte") if product_build in WIN_VERSIONS:
elif av_id == MsvAvDnsComputerName: target_info["os_version"] = f"{WIN_VERSIONS[product_build]} Build {product_build}"
target_info["MsvAvDnsComputerName"] = decoder(av_value, "byte") else:
elif av_id == MsvAvDnsDomainName: target_info["os_version"] = f"{major_version}.{minor_version} Build {product_build}"
target_info["MsvAvDnsDomainName"] = decoder(av_value, "byte")
elif av_id == MsvAvDnsTreeName:
target_info["MsvAvDnsTreeName"] = decoder(av_value, "byte")
elif av_id == MsvAvFlags:
pass
elif av_id == MsvAvTimestamp:
filetime = decoder(av_value, "int")
microseconds = (filetime - 116444736000000000) / 10
time = datetime.datetime(1970, 1, 1) + datetime.timedelta(microseconds=microseconds)
target_info["MsvAvTimestamp"] = time.strftime("%b %d, %Y %H:%M:%S.%f")
elif av_id == MsvAvSingleHost:
target_info["MsvAvSingleHost"] = decoder(av_value, "byte")
elif av_id == MsvAvTargetName:
target_info["MsvAvTargetName"] = decoder(av_value, "byte")
elif av_id == MsvAvChannelBindings:
target_info["MsvAvChannelBindings"] = av_value
return target_info return target_info
def parse_challenge(challenge_message):
# TargetNameFields
target_name_fields = challenge_message[12:20]
target_name_len = decoder(target_name_fields[0:2], "int")
target_name_offset = decoder(target_name_fields[4:8], "int")
# TargetInfoFields
target_info_fields = challenge_message[40:48]
target_info_len = decoder(target_info_fields[0:2], "int")
target_info_offset = decoder(target_info_fields[4:8], "int")
# Version
version = None
version_bytes = challenge_message[48:56]
version = parse_version(version_bytes)
# TargetName
target_name = challenge_message[target_name_offset:target_name_offset + target_name_len]
target_name = decoder(target_name, "byte")
# TargetInfo
target_info_bytes = challenge_message[target_info_offset:target_info_offset + target_info_len]
target_info = parse_target_info(target_info_bytes)
return {
"target_name": target_name,
"version": version,
"target_info": target_info
}

View File

@ -52,9 +52,9 @@ class winrm(connection):
def enum_host_info(self): def enum_host_info(self):
ntlm_info = parse_challenge(base64.b64decode(self.challenge_header.split(" ")[1].replace(",", ""))) ntlm_info = parse_challenge(base64.b64decode(self.challenge_header.split(" ")[1].replace(",", "")))
self.domain = ntlm_info["target_info"]["MsvAvDnsDomainName"] self.domain = ntlm_info["domain"]
self.hostname = ntlm_info["target_info"]["MsvAvNbComputerName"] self.hostname = ntlm_info["hostname"]
self.server_os = ntlm_info["version"] self.server_os = ntlm_info["os_version"]
self.logger.extra["hostname"] = self.hostname self.logger.extra["hostname"] = self.hostname
self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}") self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}")

View File

@ -1,10 +1,10 @@
import os import os
import struct
import logging import logging
from io import StringIO from io import StringIO
from six import indexbytes
from datetime import datetime from datetime import datetime
from nxc.helpers.ntlm_parser import parse_challenge
from nxc.config import process_secret from nxc.config import process_secret
from nxc.connection import connection, dcom_FirewallChecker, requires_admin from nxc.connection import connection, dcom_FirewallChecker, requires_admin
from nxc.logger import NXCAdapter from nxc.logger import NXCAdapter
@ -18,7 +18,6 @@ from impacket.dcerpc.v5 import transport, epm
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_WINNT, RPC_C_AUTHN_GSS_NEGOTIATE, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, MSRPC_BIND, MSRPCBind, CtxItem, MSRPCHeader, SEC_TRAILER, MSRPCBindAck from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_WINNT, RPC_C_AUTHN_GSS_NEGOTIATE, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, MSRPC_BIND, MSRPCBind, CtxItem, MSRPCHeader, SEC_TRAILER, MSRPCBindAck
from impacket.dcerpc.v5.dcomrt import DCOMConnection from impacket.dcerpc.v5.dcomrt import DCOMConnection
from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login
import contextlib
MSRPC_UUID_PORTMAP = uuidtup_to_bin(("E1AF8308-5D1F-11C9-91A4-08002B14A0FA", "3.0")) MSRPC_UUID_PORTMAP = uuidtup_to_bin(("E1AF8308-5D1F-11C9-91A4-08002B14A0FA", "3.0"))
@ -86,7 +85,6 @@ class wmi(connection):
def enum_host_info(self): def enum_host_info(self):
# All code pick from DumpNTLNInfo.py # All code pick from DumpNTLNInfo.py
# https://github.com/fortra/impacket/blob/master/examples/DumpNTLMInfo.py # https://github.com/fortra/impacket/blob/master/examples/DumpNTLMInfo.py
ntlmChallenge = None
bind = MSRPCBind() bind = MSRPCBind()
item = CtxItem() item = CtxItem()
@ -123,39 +121,19 @@ class wmi(connection):
if buffer != 0: if buffer != 0:
response = MSRPCHeader(buffer) response = MSRPCHeader(buffer)
bindResp = MSRPCBindAck(response.getData()) bindResp = MSRPCBindAck(response.getData())
ntlm_info = parse_challenge(bindResp["auth_data"])
ntlmChallenge = ntlm.NTLMAuthChallenge(bindResp["auth_data"]) self.domain = ntlm_info["domain"]
self.hostname = ntlm_info["hostname"]
if ntlmChallenge["TargetInfoFields_len"] > 0: self.server_os = ntlm_info["os_version"]
av_pairs = ntlm.AV_PAIRS(ntlmChallenge["TargetInfoFields"][: ntlmChallenge["TargetInfoFields_len"]]) self.logger.extra["hostname"] = self.hostname
if av_pairs[ntlm.NTLMSSP_AV_HOSTNAME][1] is not None:
try:
self.hostname = av_pairs[ntlm.NTLMSSP_AV_HOSTNAME][1].decode("utf-16le")
except Exception:
self.hostname = self.host
if av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME][1] is not None:
try:
self.domain = av_pairs[ntlm.NTLMSSP_AV_DNS_DOMAINNAME][1].decode("utf-16le")
except Exception:
self.domain = self.args.domain
if av_pairs[ntlm.NTLMSSP_AV_DNS_HOSTNAME][1] is not None:
with contextlib.suppress(Exception):
self.fqdn = av_pairs[ntlm.NTLMSSP_AV_DNS_HOSTNAME][1].decode("utf-16le")
if "Version" in ntlmChallenge.fields:
version = ntlmChallenge["Version"]
if len(version) >= 4:
self.server_os = "Windows NT %d.%d Build %d" % (indexbytes(version, 0), indexbytes(version, 1), struct.unpack("<H", version[2:4])[0])
else: else:
self.hostname = self.host self.hostname = self.host
if self.args.local_auth: if self.args.local_auth:
self.domain = self.hostname self.domain = self.hostname
if self.args.domain: if self.args.domain:
self.domain = self.args.domain self.domain = self.args.domain
self.fqdn = f"{self.hostname}.{self.domain}" self.fqdn = f"{self.hostname}.{self.domain}"
self.logger.extra["hostname"] = self.hostname
self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}".replace(":", "-")) self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}".replace(":", "-"))
def print_host_info(self): def print_host_info(self):