Merge pull request #172 from XiaoliChan/winrm-ntlm-info

[winrm] say goodbye to SMB
main
Alex 2024-02-27 19:00:06 -05:00 committed by GitHub
commit ac60614474
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 138 additions and 71 deletions

107
nxc/helpers/ntlm_parser.py Normal file
View File

@ -0,0 +1,107 @@
# Original from here: https://github.com/nopfor/ntlm_challenger
import datetime
from impacket.smb3 import WIN_VERSIONS
def decoder(byte_string, decode_type):
if decode_type == "byte":
return byte_string.decode("UTF-8").replace("\x00", "")
else:
return int.from_bytes(byte_string, "little")
def parse_version(version_bytes):
major_version = version_bytes[0]
minor_version = version_bytes[1]
product_build = decoder(version_bytes[2:4], "int")
if product_build in WIN_VERSIONS:
return f"{WIN_VERSIONS[product_build]} Build {product_build}"
else:
return f"Windows {major_version}.{minor_version} Build {product_build}"
def parse_target_info(target_info_bytes):
MsvAvEOL = 0x0000
MsvAvNbComputerName = 0x0001
MsvAvNbDomainName = 0x0002
MsvAvDnsComputerName = 0x0003
MsvAvDnsDomainName = 0x0004
MsvAvDnsTreeName = 0x0005
MsvAvFlags = 0x0006
MsvAvTimestamp = 0x0007
MsvAvSingleHost = 0x0008
MsvAvTargetName = 0x0009
MsvAvChannelBindings = 0x000A
target_info = {
"MsvAvNbComputerName": None,
"MsvAvDnsDomainName": None,
}
info_offset = 0
while info_offset < len(target_info_bytes):
av_id = decoder(target_info_bytes[info_offset:info_offset + 2], "int")
av_len = decoder(target_info_bytes[info_offset + 2:info_offset + 4], "int")
av_value = target_info_bytes[info_offset + 4:info_offset + 4 + av_len]
info_offset = info_offset + 4 + av_len
if av_id == MsvAvEOL:
pass
elif av_id == MsvAvNbComputerName:
target_info["MsvAvNbComputerName"] = decoder(av_value, "byte")
elif av_id == MsvAvNbDomainName:
target_info["MsvAvNbDomainName"] = decoder(av_value, "byte")
elif av_id == MsvAvDnsComputerName:
target_info["MsvAvDnsComputerName"] = decoder(av_value, "byte")
elif av_id == MsvAvDnsDomainName:
target_info["MsvAvDnsDomainName"] = decoder(av_value, "byte")
elif av_id == MsvAvDnsTreeName:
target_info["MsvAvDnsTreeName"] = decoder(av_value, "byte")
elif av_id == MsvAvFlags:
pass
elif av_id == MsvAvTimestamp:
filetime = decoder(av_value, "int")
microseconds = (filetime - 116444736000000000) / 10
time = datetime.datetime(1970, 1, 1) + datetime.timedelta(microseconds=microseconds)
target_info["MsvAvTimestamp"] = time.strftime("%b %d, %Y %H:%M:%S.%f")
elif av_id == MsvAvSingleHost:
target_info["MsvAvSingleHost"] = decoder(av_value, "byte")
elif av_id == MsvAvTargetName:
target_info["MsvAvTargetName"] = decoder(av_value, "byte")
elif av_id == MsvAvChannelBindings:
target_info["MsvAvChannelBindings"] = av_value
return target_info
def parse_challenge(challenge_message):
# TargetNameFields
target_name_fields = challenge_message[12:20]
target_name_len = decoder(target_name_fields[0:2], "int")
target_name_offset = decoder(target_name_fields[4:8], "int")
# TargetInfoFields
target_info_fields = challenge_message[40:48]
target_info_len = decoder(target_info_fields[0:2], "int")
target_info_offset = decoder(target_info_fields[4:8], "int")
# Version
version = None
version_bytes = challenge_message[48:56]
version = parse_version(version_bytes)
# TargetName
target_name = challenge_message[target_name_offset:target_name_offset + target_name_len]
target_name = decoder(target_name, "byte")
# TargetInfo
target_info_bytes = challenge_message[target_info_offset:target_info_offset + target_info_len]
target_info = parse_target_info(target_info_bytes)
return {
"target_name": target_name,
"version": version,
"target_info": target_info
}

View File

@ -1,7 +1,7 @@
import os import os
import base64
import requests import requests
import urllib3 import urllib3
import contextlib
import logging import logging
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@ -10,13 +10,13 @@ from datetime import datetime
from pypsrp.wsman import NAMESPACES from pypsrp.wsman import NAMESPACES
from pypsrp.client import Client from pypsrp.client import Client
from impacket.smbconnection import SMBConnection
from impacket.examples.secretsdump import LocalOperations, LSASecrets, SAMHashes from impacket.examples.secretsdump import LocalOperations, LSASecrets, SAMHashes
from nxc.config import process_secret from nxc.config import process_secret
from nxc.connection import connection from nxc.connection import connection
from nxc.helpers.bloodhound import add_user_bh from nxc.helpers.bloodhound import add_user_bh
from nxc.helpers.misc import gen_random_string from nxc.helpers.misc import gen_random_string
from nxc.helpers.ntlm_parser import parse_challenge
from nxc.logger import NXCAdapter from nxc.logger import NXCAdapter
@ -33,58 +33,33 @@ class winrm(connection):
self.lmhash = "" self.lmhash = ""
self.nthash = "" self.nthash = ""
self.ssl = False self.ssl = False
self.auth_type = None self.challenge_header = None
connection.__init__(self, args, db, host) connection.__init__(self, args, db, host)
def proto_logger(self): def proto_logger(self):
# Reason why default is SMB/445, because default is enumerate over SMB. # For more details, please check the function "print_host_info"
# For more details, please check the function "print_host_info"
logging.getLogger("pypsrp").disabled = True logging.getLogger("pypsrp").disabled = True
logging.getLogger("pypsrp.wsman").disabled = True logging.getLogger("pypsrp.wsman").disabled = True
self.logger = NXCAdapter( self.logger = NXCAdapter(
extra={ extra={
"protocol": "SMB", "protocol": "WINRM",
"host": self.host, "host": self.host,
"port": "445", "port": "5985",
"hostname": self.hostname, "hostname": self.hostname,
} }
) )
def enum_host_info(self): def enum_host_info(self):
# smb no open, specify the domain ntlm_info = parse_challenge(base64.b64decode(self.challenge_header.split(" ")[1].replace(",", "")))
if self.args.no_smb: self.domain = ntlm_info["target_info"]["MsvAvDnsDomainName"]
self.domain = self.args.domain self.hostname = ntlm_info["target_info"]["MsvAvNbComputerName"]
else: self.server_os = ntlm_info["version"]
try: self.logger.extra["hostname"] = self.hostname
smb_conn = SMBConnection(self.host, self.host, None, timeout=5)
no_ntlm = False
except Exception as e:
self.logger.fail(f"Error retrieving host domain: {e} specify one manually with the '-d' flag")
else:
try:
smb_conn.login("", "")
except BrokenPipeError:
self.logger.fail("Broken Pipe Error while attempting to login")
except Exception as e:
if "STATUS_NOT_SUPPORTED" in str(e):
# no ntlm supported
no_ntlm = True
self.domain = smb_conn.getServerDNSDomainName() if not no_ntlm else self.args.domain self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}")
self.hostname = smb_conn.getServerName() if not no_ntlm else self.host
self.server_os = smb_conn.getServerOS()
if isinstance(self.server_os.lower(), bytes):
self.server_os = self.server_os.decode("utf-8")
self.logger.extra["hostname"] = self.hostname self.db.add_host(self.host, self.port, self.hostname, self.domain, self.server_os)
self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}")
with contextlib.suppress(Exception):
smb_conn.logoff()
self.db.add_host(self.host, self.port, self.hostname, self.domain, self.server_os)
if self.args.domain: if self.args.domain:
self.domain = self.args.domain self.domain = self.args.domain
@ -98,16 +73,10 @@ class winrm(connection):
self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}".replace(":", "-")) self.output_filename = os.path.expanduser(f"~/.nxc/logs/{self.hostname}_{self.host}_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}".replace(":", "-"))
def print_host_info(self): def print_host_info(self):
if self.args.no_smb: self.logger.extra["protocol"] = "WINRM-SSL" if self.ssl else "WINRM"
self.logger.extra["protocol"] = "WINRM-SSL" if self.ssl else "WINRM" self.logger.extra["port"] = self.port
self.logger.extra["port"] = self.port self.logger.display(f"{self.server_os} (name:{self.hostname}) (domain:{self.domain})")
self.logger.display(f"{self.server_os} (name:{self.hostname}) (domain:{self.domain})")
else:
self.logger.display(f"{self.server_os} (name:{self.hostname}) (domain:{self.domain})")
self.logger.extra["protocol"] = "WINRM-SSL" if self.ssl else "WINRM"
self.logger.extra["port"] = self.port
self.logger.info(f"Connection information: {self.endpoint} (auth type:{self.auth_type}) (domain:{self.domain if self.args.domain else ''})")
return True return True
def create_conn_obj(self): def create_conn_obj(self):
@ -117,6 +86,14 @@ class winrm(connection):
endpoints = {} endpoints = {}
headers = {
"Content-Length": "0",
"Keep-Alive": "true",
"Content-Type": "application/soap+xml;charset=UTF-8",
"User-Agent": "Microsoft WinRM Client",
"Authorization": "Negotiate TlRMTVNTUAABAAAAB4IIogAAAAAAAAAAAAAAAAAAAAAGAbEdAAAADw=="
}
for protocol in self.args.check_proto: for protocol in self.args.check_proto:
endpoints[protocol] = {} endpoints[protocol] = {}
endpoints[protocol]["port"] = self.port[self.args.check_proto.index(protocol)] if len(self.port) == 2 else self.port[0] endpoints[protocol]["port"] = self.port[self.args.check_proto.index(protocol)] if len(self.port) == 2 else self.port[0]
@ -131,9 +108,12 @@ class winrm(connection):
self.port = endpoints[protocol]["port"] self.port = endpoints[protocol]["port"]
try: try:
self.logger.debug(f"Requesting URL: {endpoints[protocol]['url']}") self.logger.debug(f"Requesting URL: {endpoints[protocol]['url']}")
res = requests.post(endpoints[protocol]["url"], verify=False, timeout=self.args.http_timeout) res = requests.post(endpoints[protocol]["url"], headers=headers, verify=False, timeout=self.args.http_timeout)
self.logger.debug(f"Received response code: {res.status_code}") self.logger.debug(f"Received response code: {res.status_code}")
self.auth_type = res.headers["WWW-Authenticate"] if "WWW-Authenticate" in res.headers else "NOAUTH" self.challenge_header = res.headers["WWW-Authenticate"]
if (not self.challenge_header) or ("Negotiate" not in self.challenge_header):
self.logger.info('Failed to get NTLM challenge from target "/wsman" endpoint, maybe isn\'t winrm service.')
return False
self.endpoint = endpoints[protocol]["url"] self.endpoint = endpoints[protocol]["url"]
self.ssl = endpoints[protocol]["ssl"] self.ssl = endpoints[protocol]["ssl"]
return True return True

View File

@ -1,6 +1,3 @@
from argparse import _StoreTrueAction
def proto_args(parser, std_parser, module_parser): def proto_args(parser, std_parser, module_parser):
winrm_parser = parser.add_parser("winrm", help="own stuff using WINRM", parents=[std_parser, module_parser]) winrm_parser = parser.add_parser("winrm", help="own stuff using WINRM", parents=[std_parser, module_parser])
winrm_parser.add_argument("-H", "--hash", metavar="HASH", dest="hash", nargs="+", default=[], help="NTLM hash(es) or file(s) containing NTLM hashes") winrm_parser.add_argument("-H", "--hash", metavar="HASH", dest="hash", nargs="+", default=[], help="NTLM hash(es) or file(s) containing NTLM hashes")
@ -9,12 +6,10 @@ def proto_args(parser, std_parser, module_parser):
winrm_parser.add_argument("--check-proto", nargs="+", default=["http", "https"], help="Choose what prorocol you want to check, default is %(default)s, format: 'http https'(with space separated) or 'single-protocol'") winrm_parser.add_argument("--check-proto", nargs="+", default=["http", "https"], help="Choose what prorocol you want to check, default is %(default)s, format: 'http https'(with space separated) or 'single-protocol'")
winrm_parser.add_argument("--laps", dest="laps", metavar="LAPS", type=str, help="LAPS authentification", nargs="?", const="administrator") winrm_parser.add_argument("--laps", dest="laps", metavar="LAPS", type=str, help="LAPS authentification", nargs="?", const="administrator")
winrm_parser.add_argument("--http-timeout", dest="http_timeout", type=int, default=10, help="HTTP timeout for WinRM connections") winrm_parser.add_argument("--http-timeout", dest="http_timeout", type=int, default=10, help="HTTP timeout for WinRM connections")
no_smb_arg = winrm_parser.add_argument("--no-smb", action=get_conditional_action(_StoreTrueAction), make_required=[], help="No smb connection")
dgroup = winrm_parser.add_mutually_exclusive_group() dgroup = winrm_parser.add_mutually_exclusive_group()
domain_arg = dgroup.add_argument("-d", metavar="DOMAIN", dest="domain", type=str, default=None, help="domain to authenticate to") dgroup.add_argument("-d", metavar="DOMAIN", dest="domain", type=str, default=None, help="domain to authenticate to")
dgroup.add_argument("--local-auth", action="store_true", help="authenticate locally to each target") dgroup.add_argument("--local-auth", action="store_true", help="authenticate locally to each target")
no_smb_arg.make_required = [domain_arg]
cgroup = winrm_parser.add_argument_group("Credential Gathering", "Options for gathering credentials") cgroup = winrm_parser.add_argument_group("Credential Gathering", "Options for gathering credentials")
cgroup.add_argument("--dump-method", action="store", default="cmd", choices={"cmd", "powershell"}, help="Select shell type in hashes dump") cgroup.add_argument("--dump-method", action="store", default="cmd", choices={"cmd", "powershell"}, help="Select shell type in hashes dump")
@ -29,18 +24,3 @@ def proto_args(parser, std_parser, module_parser):
cgroup.add_argument("-X", metavar="PS_COMMAND", dest="ps_execute", help="execute the specified PowerShell command") cgroup.add_argument("-X", metavar="PS_COMMAND", dest="ps_execute", help="execute the specified PowerShell command")
return parser return parser
def get_conditional_action(baseAction):
class ConditionalAction(baseAction):
def __init__(self, option_strings, dest, **kwargs):
x = kwargs.pop("make_required", [])
super().__init__(option_strings, dest, **kwargs)
self.make_required = x
def __call__(self, parser, namespace, values, option_string=None):
for x in self.make_required:
x.required = True
super().__call__(parser, namespace, values, option_string)
return ConditionalAction