Merge pull request #123 from Pennyw0rth/nxcdb-marshall

nxcdb: refactor shared database/workspace setup code & allow for creation/setting of workspaces outside of nxcdb interactive console
main
Alex 2024-02-12 19:25:56 +01:00 committed by GitHub
commit ae121c566a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 154 additions and 98 deletions

97
nxc/database.py Normal file
View File

@ -0,0 +1,97 @@
import sys
import configparser
import shutil
from sqlalchemy import create_engine
from sqlite3 import connect
from os import mkdir
from os.path import exists
from os.path import join as path_join
from nxc.loaders.protocolloader import ProtocolLoader
from nxc.paths import WORKSPACE_DIR
def create_db_engine(db_path):
return create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
def open_config(config_path):
try:
config = configparser.ConfigParser()
config.read(config_path)
except Exception as e:
print(f"[-] Error reading nxc.conf: {e}")
sys.exit(1)
return config
def get_workspace(config):
return config.get("nxc", "workspace")
def set_workspace(config_path, workspace_name):
config = open_config(config_path)
config.set("nxc", "workspace", workspace_name)
write_configfile(config, config_path)
print(f"[*] Workspace set to {workspace_name}")
def get_db(config):
return config.get("nxc", "last_used_db")
def write_configfile(config, config_path):
with open(config_path, "w") as configfile:
config.write(configfile)
def create_workspace(workspace_name, p_loader=None):
"""
Create a new workspace with the given name.
Args:
----
workspace_name (str): The name of the workspace.
Returns:
-------
None
"""
if exists(path_join(WORKSPACE_DIR, workspace_name)):
print(f"[-] Workspace {workspace_name} already exists")
else:
print(f"[*] Creating {workspace_name} workspace")
mkdir(path_join(WORKSPACE_DIR, workspace_name))
if p_loader is None:
p_loader = ProtocolLoader()
protocols = p_loader.get_protocols()
for protocol in protocols:
protocol_object = p_loader.load_protocol(protocols[protocol]["dbpath"])
proto_db_path = path_join(WORKSPACE_DIR, workspace_name, f"{protocol}.db")
if not exists(proto_db_path):
print(f"[*] Initializing {protocol.upper()} protocol database")
conn = connect(proto_db_path)
c = conn.cursor()
# try to prevent some weird sqlite I/O errors
c.execute("PRAGMA journal_mode = OFF")
c.execute("PRAGMA foreign_keys = 1")
protocol_object.database.db_schema(c)
# commit the changes and close everything off
conn.commit()
conn.close()
def delete_workspace(workspace_name):
shutil.rmtree(path_join(WORKSPACE_DIR, workspace_name))
print(f"[*] Workspace {workspace_name} deleted")
def initialize_db():
if not exists(path_join(WORKSPACE_DIR, "default")):
create_workspace("default")

View File

@ -3,7 +3,7 @@ from os.path import exists
from os.path import join as path_join
import shutil
from nxc.paths import NXC_PATH, CONFIG_PATH, TMP_PATH, DATA_PATH
from nxc.nxcdb import initialize_db
from nxc.database import initialize_db
from nxc.logger import nxc_logger
@ -29,7 +29,7 @@ def first_run_setup(logger=nxc_logger):
logger.display(f"Creating missing folder {folder}")
mkdir(path_join(NXC_PATH, folder))
initialize_db(logger)
initialize_db()
if not exists(CONFIG_PATH):
logger.display("Copying default configuration file")

View File

@ -12,6 +12,7 @@ from nxc.paths import NXC_PATH
from nxc.console import nxc_console
from nxc.logger import nxc_logger
from nxc.config import nxc_config, nxc_workspace, config_log, ignore_opsec
from nxc.database import create_db_engine
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
from nxc.helpers import powershell
@ -21,7 +22,6 @@ from os.path import exists
from os.path import join as path_join
from sys import exit
import logging
import sqlalchemy
from rich.progress import Progress
import platform
@ -38,11 +38,6 @@ if platform.system() != "Windows":
resource.setrlimit(resource.RLIMIT_NOFILE, file_limit)
def create_db_engine(db_path):
return sqlalchemy.create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
async def start_run(protocol_obj, args, db, targets):
nxc_logger.debug("Creating ThreadPoolExecutor")
if args.no_progress or len(targets) == 1:

View File

@ -1,31 +1,25 @@
import cmd
import configparser
import csv
import sys
import os
import argparse
from os import listdir
from os.path import exists
from os.path import join as path_join
import shutil
from sqlite3 import connect
import sys
from textwrap import dedent
from requests import get, post, ConnectionError
from sqlalchemy import create_engine
from terminaltables import AsciiTable
from termcolor import colored
from nxc.loaders.protocolloader import ProtocolLoader
from nxc.paths import CONFIG_PATH, WS_PATH, WORKSPACE_DIR
from nxc.paths import CONFIG_PATH, WORKSPACE_DIR
from nxc.database import create_db_engine, open_config, get_workspace, get_db, write_configfile, create_workspace, set_workspace
class UserExitedProto(Exception):
pass
def create_db_engine(db_path):
return create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
def print_table(data, title=None):
print()
table = AsciiTable(data)
@ -446,29 +440,15 @@ class NXCDBMenu(cmd.Cmd):
def __init__(self, config_path):
cmd.Cmd.__init__(self)
self.config_path = config_path
try:
self.config = configparser.ConfigParser()
self.config.read(self.config_path)
except Exception as e:
print(f"[-] Error reading nxc.conf: {e}")
sys.exit(1)
self.conn = None
self.p_loader = ProtocolLoader()
self.protocols = self.p_loader.get_protocols()
self.workspace = self.config.get("nxc", "workspace")
self.config = open_config(self.config_path)
self.workspace = get_workspace(self.config)
self.db = get_db(self.config)
self.do_workspace(self.workspace)
self.db = self.config.get("nxc", "last_used_db")
if self.db:
self.do_proto(self.db)
def write_configfile(self):
with open(self.config_path, "w") as configfile:
self.config.write(configfile)
def do_proto(self, proto):
if not proto:
return
@ -479,7 +459,7 @@ class NXCDBMenu(cmd.Cmd):
db_nav_object = self.p_loader.load_protocol(self.protocols[proto]["nvpath"])
db_object = self.p_loader.load_protocol(self.protocols[proto]["dbpath"])
self.config.set("nxc", "last_used_db", proto)
self.write_configfile()
write_configfile(self.config, self.config_path)
try:
proto_menu = db_nav_object.navigator(self, db_object.database(self.conn), proto)
proto_menu.cmdloop()
@ -506,18 +486,18 @@ class NXCDBMenu(cmd.Cmd):
if subcommand == "create":
new_workspace = line.split()[1].strip()
print(f"[*] Creating workspace '{new_workspace}'")
self.create_workspace(new_workspace, self.p_loader, self.protocols)
create_workspace(new_workspace, self.p_loader)
self.do_workspace(new_workspace)
elif subcommand == "list":
print("[*] Enumerating Workspaces")
for workspace in listdir(path_join(WORKSPACE_DIR)):
if workspace == self.workspace:
print("==> " + workspace)
print(f" * {colored(workspace, 'green')}")
else:
print(workspace)
print(f" {workspace}")
elif exists(path_join(WORKSPACE_DIR, line)):
self.config.set("nxc", "workspace", line)
self.write_configfile()
write_configfile(self.config, self.config_path)
self.workspace = line
self.prompt = f"nxcdb ({line}) > "
@ -538,65 +518,49 @@ class NXCDBMenu(cmd.Cmd):
Exits
"""
print_help(help_string)
@staticmethod
def create_workspace(workspace_name, p_loader, protocols):
os.mkdir(path_join(WORKSPACE_DIR, workspace_name))
for protocol in protocols:
protocol_object = p_loader.load_protocol(protocols[protocol]["dbpath"])
proto_db_path = path_join(WORKSPACE_DIR, workspace_name, f"{protocol}.db")
if not exists(proto_db_path):
print(f"[*] Initializing {protocol.upper()} protocol database")
conn = connect(proto_db_path)
c = conn.cursor()
# try to prevent some weird sqlite I/O errors
c.execute("PRAGMA journal_mode = OFF")
c.execute("PRAGMA foreign_keys = 1")
protocol_object.database.db_schema(c)
# commit the changes and close everything off
conn.commit()
conn.close()
def delete_workspace(workspace_name):
shutil.rmtree(path_join(WORKSPACE_DIR, workspace_name))
def initialize_db(logger):
if not exists(path_join(WS_PATH, "default")):
logger.debug("Creating default workspace")
os.mkdir(path_join(WS_PATH, "default"))
p_loader = ProtocolLoader()
protocols = p_loader.get_protocols()
for protocol in protocols:
protocol_object = p_loader.load_protocol(protocols[protocol]["dbpath"])
proto_db_path = path_join(WS_PATH, "default", f"{protocol}.db")
if not exists(proto_db_path):
logger.debug(f"Initializing {protocol.upper()} protocol database")
conn = connect(proto_db_path)
c = conn.cursor()
# try to prevent some weird sqlite I/O errors
c.execute("PRAGMA journal_mode = OFF") # could try setting to PERSIST if DB corruption starts occurring
c.execute("PRAGMA foreign_keys = 1")
# set a small timeout (5s) so if another thread is writing to the database, the entire program doesn't crash
c.execute("PRAGMA busy_timeout = 5000")
protocol_object.database.db_schema(c)
# commit the changes and close everything off
conn.commit()
conn.close()
def main():
if not exists(CONFIG_PATH):
print("[-] Unable to find config file")
sys.exit(1)
parser = argparse.ArgumentParser(
description="NXCDB is a database navigator for NXC",
)
parser.add_argument(
"-gw",
"--get-workspace",
action="store_true",
help="get the current workspace",
)
parser.add_argument(
"-cw",
"--create-workspace",
help="create a new workspace",
)
parser.add_argument(
"-sw",
"--set-workspace",
help="set the current workspace",
)
args = parser.parse_args()
if args.create_workspace:
create_workspace(args.create_workspace)
sys.exit()
if args.set_workspace:
set_workspace(CONFIG_PATH, args.set_workspace)
sys.exit()
if args.get_workspace:
current_workspace = get_workspace(open_config(CONFIG_PATH))
for workspace in listdir(path_join(WORKSPACE_DIR)):
if workspace == current_workspace:
print(f" * {colored(workspace, 'green')}")
else:
print(f" {workspace}")
sys.exit()
try:
nxcdbnav = NXCDBMenu(CONFIG_PATH)
nxcdbnav.cmdloop()

View File

@ -8,7 +8,7 @@ if os.name == "nt":
TMP_PATH = os.getenv("LOCALAPPDATA") + "\\Temp\\nxc_hosted"
if hasattr(sys, "getandroidapilevel"):
TMP_PATH = os.path.join("/data", "data", "com.termux", "files", "usr", "tmp", "nxc_hosted")
WS_PATH = os.path.join(NXC_PATH, "workspaces")
CERT_PATH = os.path.join(NXC_PATH, "nxc.pem")
CONFIG_PATH = os.path.join(NXC_PATH, "nxc.conf")
WORKSPACE_DIR = os.path.join(NXC_PATH, "workspaces")

View File

@ -7,13 +7,13 @@ from nxc.nxcdb import delete_workspace, NXCDBMenu
from nxc.first_run import first_run_setup
from nxc.loaders.protocolloader import ProtocolLoader
from nxc.logger import NXCAdapter
from nxc.paths import WS_PATH
from nxc.paths import WORKSPACE_DIR
from sqlalchemy.dialects.sqlite import Insert
@pytest.fixture(scope="session")
def db_engine():
db_path = os.path.join(WS_PATH, "test/smb.db")
db_path = os.path.join(WORKSPACE_DIR, "test/smb.db")
db_engine = create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
yield db_engine
db_engine.dispose()