NetExec/nxc/modules/spider_plus.py

524 lines
22 KiB
Python
Executable File

import json
import errno
import os
import time
import traceback
from nxc.protocols.smb.remotefile import RemoteFile
from impacket.smb3structs import FILE_READ_DATA
from impacket.smbconnection import SessionError
CHUNK_SIZE = 4096
def human_size(nbytes):
"""Takes a number of bytes as input and converts it to a human-readable size representation with appropriate units (e.g., KB, MB, GB, TB)"""
suffixes = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]
# Find the appropriate unit suffix and convert bytes to higher units
for i in range(len(suffixes)):
if nbytes < 1024 or i == len(suffixes) - 1:
break
nbytes /= 1024.0
# Format the number of bytes with two decimal places and remove trailing zeros and decimal point
size_str = f"{nbytes:.2f}".rstrip("0").rstrip(".")
# Return the human-readable size with the appropriate unit suffix
return f"{size_str} {suffixes[i]}"
def human_time(timestamp):
"""Takes a numerical timestamp (seconds since the epoch) and formats it as a human-readable date and time in the format "YYYY-MM-DD HH:MM:SS"""
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
def make_dirs(path):
"""Creates directories at the given path. It handles the exception `os.errno.EEXIST` that may occur if the directories already exist."""
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def get_list_from_option(opt):
"""Takes a comma-separated string and converts it to a list of lowercase strings.
It filters out empty strings from the input before converting.
"""
return [o.lower() for o in filter(bool, opt.split(","))]
class SMBSpiderPlus:
def __init__(
self,
smb,
logger,
download_flag,
stats_flag,
exclude_exts,
exclude_filter,
max_file_size,
output_folder,
):
self.smb = smb
self.host = self.smb.conn.getRemoteHost()
self.max_connection_attempts = 5
self.logger = logger
self.results = {}
self.stats = {
"shares": [],
"shares_readable": [],
"shares_writable": [],
"num_shares_filtered": 0,
"num_folders": 0,
"num_folders_filtered": 0,
"num_files": 0,
"file_sizes": [],
"file_exts": set(),
"num_get_success": 0,
"num_get_fail": 0,
"num_files_filtered": 0,
"num_files_unmodified": 0,
"num_files_updated": 0,
}
self.download_flag = download_flag
self.stats_flag = stats_flag
self.exclude_filter = exclude_filter
self.exclude_exts = exclude_exts
self.max_file_size = max_file_size
self.output_folder = output_folder
# Make sure the output_folder exists
make_dirs(self.output_folder)
def reconnect(self):
"""Performs a series of reconnection attempts, up to `self.max_connection_attempts`, with a 3-second delay between each attempt.
It renegotiates the session by creating a new connection object and logging in again.
"""
for i in range(1, self.max_connection_attempts + 1):
self.logger.display(f"Reconnection attempt #{i}/{self.max_connection_attempts} to server.")
# Renegotiate the session
time.sleep(3)
self.smb.create_conn_obj()
self.smb.login()
return True
return False
def list_path(self, share, subfolder):
"""Returns a list of paths for a given share/folder."""
filelist = []
try:
# Get file list for the current folder
filelist = self.smb.conn.listPath(share, subfolder + "*")
except SessionError as e:
self.logger.debug(f'Failed listing files on share "{share}" in folder "{subfolder}".')
self.logger.debug(str(e))
if "STATUS_ACCESS_DENIED" in str(e):
self.logger.debug(f'Cannot list files in folder "{subfolder}".')
elif "STATUS_OBJECT_PATH_NOT_FOUND" in str(e):
self.logger.debug(f"The folder {subfolder} does not exist.")
elif self.reconnect():
filelist = self.list_path(share, subfolder)
return filelist
def get_remote_file(self, share, path):
"""Checks if a path is readable in a SMB share."""
try:
return RemoteFile(self.smb.conn, path, share, access=FILE_READ_DATA)
except SessionError:
if self.reconnect():
return self.get_remote_file(share, path)
def read_chunk(self, remote_file, chunk_size=CHUNK_SIZE):
"""Reads the next chunk of data from the provided remote file using the specified chunk size.
If a `SessionError` is encountered, it retries up to 3 times by reconnecting the SMB connection.
If the maximum number of retries is exhausted or an unexpected exception occurs, it returns an empty chunk.
"""
chunk = ""
retry = 3
while retry > 0:
retry -= 1
try:
chunk = remote_file.read(chunk_size)
break
except SessionError:
if self.reconnect():
# Little hack to reset the smb connection instance
remote_file.__smbConnection = self.smb.conn
return self.read_chunk(remote_file)
except Exception:
traceback.print_exc()
break
return chunk
def get_file_save_path(self, remote_file):
r"""Processes the remote file path to extract the filename and the folder path where the file should be saved locally.
It converts forward slashes (/) and backslashes (\) in the remote file path to the appropriate path separator for the local file system.
The folder path and filename are then obtained separately.
"""
# Remove the backslash before the remote host part and replace slashes with the appropriate path separator
remote_file_path = str(remote_file)[2:].replace("/", os.path.sep).replace("\\", os.path.sep)
# Split the path to obtain the folder path and the filename
folder, filename = os.path.split(remote_file_path)
# Join the output folder with the folder path to get the final local folder path
folder = os.path.join(self.output_folder, folder)
return folder, filename
def spider_shares(self):
"""Enumerates all available shares for the SMB connection, spiders through the readable shares, and saves the metadata of the shares to a JSON file"""
self.logger.info("Enumerating shares for spidering.")
shares = self.smb.shares()
try:
# Get all available shares for the SMB connection
for share in shares:
share_perms = share["access"]
share_name = share["name"]
self.stats["shares"].append(share_name)
self.logger.info(f'Share "{share_name}" has perms {share_perms}')
if "WRITE" in share_perms:
self.stats["shares_writable"].append(share_name)
if "READ" in share_perms:
self.stats["shares_readable"].append(share_name)
else:
# We only want to spider readable shares
self.logger.debug(f'Share "{share_name}" not readable.')
continue
# `exclude_filter` is applied to the shares name
if share_name.lower() in self.exclude_filter:
self.logger.info(f'Share "{share_name}" has been excluded.')
self.stats["num_shares_filtered"] += 1
continue
try:
# Start the spider at the root of the share folder
self.results[share_name] = {}
self.spider_folder(share_name, "")
except SessionError:
traceback.print_exc()
self.logger.fail("Got a session error while spidering.")
self.reconnect()
except Exception as e:
traceback.print_exc()
self.logger.fail(f"Error enumerating shares: {e!s}")
# Save the metadata.
self.dump_folder_metadata(self.results)
# Print stats.
if self.stats_flag:
self.print_stats()
return self.results
def spider_folder(self, share_name, folder):
"""Traverses through the contents of the specified share and folder.
It checks each entry (file or folder) against various filters, performs file metadata recording, and downloads eligible files if the download flag is set.
"""
self.logger.info(f'Spider share "{share_name}" in folder "{folder}".')
filelist = self.list_path(share_name, folder + "*")
# For each entry:
# - It's a folder then we spider it (skipping `.` and `..`)
# - It's a file then we apply the checks
for result in filelist:
next_filedir = result.get_longname()
if next_filedir in [".", ".."]:
continue
next_fullpath = folder + next_filedir
result_type = "folder" if result.is_directory() else "file"
self.stats[f"num_{result_type}s"] += 1
# Check file-dir exclusion filter.
if any(d in next_filedir.lower() for d in self.exclude_filter):
self.logger.info(f'The {result_type} "{next_filedir}" has been excluded')
self.stats[f"{result_type}s_filtered"] += 1
continue
if result_type == "folder":
self.logger.info(f'Current folder in share "{share_name}": "{next_fullpath}"')
self.spider_folder(share_name, next_fullpath + "/")
else:
self.logger.info(f'Current file in share "{share_name}": "{next_fullpath}"')
self.parse_file(share_name, next_fullpath, result)
def parse_file(self, share_name, file_path, file_info):
"""Checks file attributes against various filters, records file metadata, and downloads eligible files if the download flag is set"""
# Record the file metadata
file_size = file_info.get_filesize()
file_creation_time = file_info.get_ctime_epoch()
file_modified_time = file_info.get_mtime_epoch()
file_access_time = file_info.get_atime_epoch()
self.results[share_name][file_path] = {
"size": human_size(file_size),
"ctime_epoch": human_time(file_creation_time),
"mtime_epoch": human_time(file_modified_time),
"atime_epoch": human_time(file_access_time),
}
self.stats["file_sizes"].append(file_size)
# Check if proceeding with download attempt.
if not self.download_flag:
return
# Check file extension filter.
_, file_extension = os.path.splitext(file_path)
if file_extension:
self.stats["file_exts"].add(file_extension.lower())
if file_extension.lower() in self.exclude_exts:
self.logger.info(f'The file "{file_path}" has an excluded extension.')
self.stats["num_files_filtered"] += 1
return
# Check file size limits.
if file_size > self.max_file_size:
self.logger.info(f"File {file_path} has size {human_size(file_size)} > max size {human_size(self.max_file_size)}.")
self.stats["num_files_filtered"] += 1
return
# Check if the remote file is readable.
remote_file = self.get_remote_file(share_name, file_path)
if not remote_file:
self.logger.fail(f'Cannot read remote file "{file_path}".')
self.stats["num_get_fail"] += 1
return
# Check if the file is already downloaded and up-to-date.
file_dir, file_name = self.get_file_save_path(remote_file)
download_path = os.path.join(file_dir, file_name)
needs_update_flag = False
if os.path.exists(download_path):
if file_modified_time <= os.stat(download_path).st_mtime and os.path.getsize(download_path) == file_size:
self.logger.info(f'File already downloaded "{file_path}" => "{download_path}".')
self.stats["num_files_unmodified"] += 1
return
else:
needs_update_flag = True
# Download file.
download_success = False
try:
self.logger.info(f'Downloading file "{file_path}" => "{download_path}".')
remote_file.open_file()
self.save_file(remote_file, share_name)
remote_file.close()
download_success = True
except SessionError as e:
if "STATUS_SHARING_VIOLATION" in str(e):
pass
except Exception as e:
self.logger.fail(f'Failed to download file "{file_path}". Error: {e!s}')
# Increment stats counters
if download_success:
self.stats["num_get_success"] += 1
if needs_update_flag:
self.stats["num_files_updated"] += 1
else:
self.stats["num_get_fail"] += 1
def save_file(self, remote_file, share_name):
"""Reads the `remote_file` in chunks using the `read_chunk` method.
Each chunk is then written to the local file until the entire file is saved.
It handles cases where the file remains empty due to errors.
"""
# Reset the remote_file to point to the beginning of the file.
remote_file.seek(0, 0)
folder, filename = self.get_file_save_path(remote_file)
download_path = os.path.join(folder, filename)
# Create the subdirectories based on the share name and file path.
self.logger.debug(f"Creating folder '{folder}'")
make_dirs(folder)
try:
with open(download_path, "wb") as fd:
while True:
chunk = self.read_chunk(remote_file)
if not chunk:
break
fd.write(chunk)
except Exception as e:
self.logger.fail(f'Error writing file "{download_path}" from share "{share_name}": {e}')
# Check if the file is empty and should not be.
if os.path.getsize(download_path) == 0 and remote_file.get_filesize() > 0:
os.remove(download_path)
remote_path = str(remote_file)[2:]
self.logger.fail(f'Unable to download file "{remote_path}".')
def dump_folder_metadata(self, results):
"""Takes the metadata results as input and writes them to a JSON file in the `self.output_folder`.
The results are formatted with indentation and sorted keys before being written to the file.
"""
metadata_path = os.path.join(self.output_folder, f"{self.host}.json")
try:
with open(metadata_path, "w", encoding="utf-8") as fd:
fd.write(json.dumps(results, indent=4, sort_keys=True))
self.logger.success(f'Saved share-file metadata to "{metadata_path}".')
except Exception as e:
self.logger.fail(f"Failed to save share metadata: {e!s}")
def print_stats(self):
"""Prints the statistics during processing"""
# Share statistics.
shares = self.stats.get("shares", [])
if shares:
num_shares = len(shares)
shares_str = ", ".join(shares)
self.logger.display(f"SMB Shares: {num_shares} ({shares_str})")
shares_readable = self.stats.get("shares_readable", [])
if shares_readable:
num_readable_shares = len(shares_readable)
shares_readable_str = ", ".join(shares_readable[:10]) + "..." if len(shares_readable) > 10 else ", ".join(shares_readable)
self.logger.display(f"SMB Readable Shares: {num_readable_shares} ({shares_readable_str})")
shares_writable = self.stats.get("shares_writable", [])
if shares_writable:
num_writable_shares = len(shares_writable)
shares_writable_str = ", ".join(shares_writable[:10]) + "..." if len(shares_writable) > 10 else ", ".join(shares_writable)
self.logger.display(f"SMB Writable Shares: {num_writable_shares} ({shares_writable_str})")
num_shares_filtered = self.stats.get("num_shares_filtered", 0)
if num_shares_filtered:
self.logger.display(f"SMB Filtered Shares: {num_shares_filtered}")
# Folder statistics.
num_folders = self.stats.get("num_folders", 0)
self.logger.display(f"Total folders found: {num_folders}")
num_folders_filtered = self.stats.get("num_folders_filtered", 0)
if num_folders_filtered:
num_filtered_folders = len(num_folders_filtered)
self.logger.display(f"Folders Filtered: {num_filtered_folders}")
# File statistics.
num_files = self.stats.get("num_files", 0)
self.logger.display(f"Total files found: {num_files}")
num_files_filtered = self.stats.get("num_files_filtered", 0)
if num_files_filtered:
self.logger.display(f"Files filtered: {num_files_filtered}")
if num_files == 0:
return
# File sizing statistics.
file_sizes = self.stats.get("file_sizes", [])
if file_sizes:
total_file_size = sum(file_sizes)
min_file_size = min(file_sizes)
max_file_size = max(file_sizes)
average_file_size = total_file_size / num_files
self.logger.display(f"File size average: {human_size(average_file_size)}")
self.logger.display(f"File size min: {human_size(min_file_size)}")
self.logger.display(f"File size max: {human_size(max_file_size)}")
# Extension statistics.
file_exts = list(self.stats.get("file_exts", []))
if file_exts:
num_unique_file_exts = len(file_exts)
unique_exts_str = ", ".join(file_exts[:10]) + "..." if len(file_exts) > 10 else ", ".join(file_exts)
self.logger.display(f"File unique exts: {num_unique_file_exts} ({unique_exts_str})")
# Download statistics.
if self.download_flag:
num_get_success = self.stats.get("num_get_success", 0)
if num_get_success:
self.logger.display(f"Downloads successful: {num_get_success}")
num_get_fail = self.stats.get("num_get_fail", 0)
if num_get_fail:
self.logger.display(f"Downloads failed: {num_get_fail}")
num_files_unmodified = self.stats.get("num_files_unmodified", 0)
if num_files_unmodified:
self.logger.display(f"Unmodified files: {num_files_unmodified}")
num_files_updated = self.stats.get("num_files_updated", 0)
if num_files_updated:
self.logger.display(f"Updated files: {num_files_updated}")
if num_files_unmodified and not num_files_updated:
self.logger.display("All files were not changed.")
if num_files_filtered == num_files:
self.logger.display("All files were ignored.")
if num_get_fail == 0:
self.logger.success("All files processed successfully.")
class NXCModule:
"""Spider Plus Nodule
Module by @vincd
Updated by @godylockz
"""
name = "spider_plus"
description = "List files recursively and save a JSON share-file metadata to the 'OUTPUT_FOLDER'. See module options for finer configuration."
supported_protocols = ["smb"]
opsec_safe = True # Does the module touch disk?
multiple_hosts = True # Does the module support multiple hosts?
def options(self, context, module_options):
"""
List files recursively (excluding `EXCLUDE_FILTER` and `EXCLUDE_EXTS` extensions) and save JSON share-file metadata to the `OUTPUT_FOLDER`.
If `DOWNLOAD_FLAG`=True, download files smaller then `MAX_FILE_SIZE` to the `OUTPUT_FOLDER`.
DOWNLOAD_FLAG Download all share folders/files (Default: False)
STATS_FLAG Disable file/download statistics (Default: True)
EXCLUDE_EXTS Case-insensitive extension filter to exclude (Default: ico,lnk)
EXCLUDE_FILTER Case-insensitive filter to exclude folders/files (Default: print$,ipc$)
MAX_FILE_SIZE Max file size to download (Default: 51200)
OUTPUT_FOLDER Path of the local folder to save files (Default: /tmp/nxc_spider_plus)
"""
self.download_flag = False
if any("DOWNLOAD" in key for key in module_options):
self.download_flag = True
self.stats_flag = True
if any("STATS" in key for key in module_options):
self.stats_flag = False
self.exclude_exts = get_list_from_option(module_options.get("EXCLUDE_EXTS", "ico,lnk"))
self.exclude_exts = [d.lower() for d in self.exclude_exts] # force case-insensitive
self.exclude_filter = get_list_from_option(module_options.get("EXCLUDE_FILTER", "print$,ipc$"))
self.exclude_filter = [d.lower() for d in self.exclude_filter] # force case-insensitive
self.max_file_size = int(module_options.get("MAX_FILE_SIZE", 50 * 1024))
self.output_folder = module_options.get("OUTPUT_FOLDER", os.path.join("/tmp", "nxc_spider_plus"))
def on_login(self, context, connection):
context.log.display("Started module spidering_plus with the following options:")
context.log.display(f" DOWNLOAD_FLAG: {self.download_flag}")
context.log.display(f" STATS_FLAG: {self.stats_flag}")
context.log.display(f"EXCLUDE_FILTER: {self.exclude_filter}")
context.log.display(f" EXCLUDE_EXTS: {self.exclude_exts}")
context.log.display(f" MAX_FILE_SIZE: {human_size(self.max_file_size)}")
context.log.display(f" OUTPUT_FOLDER: {self.output_folder}")
spider = SMBSpiderPlus(
connection,
context.log,
self.download_flag,
self.stats_flag,
self.exclude_exts,
self.exclude_filter,
self.max_file_size,
self.output_folder,
)
spider.spider_shares()