refactor(nxcdb): move shared fdatabase functions to single file
parent
246cc6c0f2
commit
b4f3bacb99
|
@ -0,0 +1,90 @@
|
|||
import sys
|
||||
import configparser
|
||||
import shutil
|
||||
from sqlalchemy import create_engine
|
||||
from sqlite3 import connect
|
||||
from os import mkdir
|
||||
from os.path import exists
|
||||
from os.path import join as path_join
|
||||
|
||||
from nxc.loaders.protocolloader import ProtocolLoader
|
||||
from nxc.paths import WS_PATH, WORKSPACE_DIR
|
||||
|
||||
def create_db_engine(db_path):
|
||||
return create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
|
||||
|
||||
|
||||
def open_config(config_path):
|
||||
try:
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_path)
|
||||
except Exception as e:
|
||||
print(f"[-] Error reading nxc.conf: {e}")
|
||||
sys.exit(1)
|
||||
return config
|
||||
|
||||
|
||||
def get_workspace(config):
|
||||
return config.get("nxc", "workspace")
|
||||
|
||||
|
||||
def get_db(config):
|
||||
return config.get("nxc", "last_used_db")
|
||||
|
||||
|
||||
def write_configfile(config, config_path):
|
||||
with open(config_path, "w") as configfile:
|
||||
config.write(configfile)
|
||||
|
||||
|
||||
def create_workspace(workspace_name, p_loader, protocols):
|
||||
mkdir(path_join(WORKSPACE_DIR, workspace_name))
|
||||
|
||||
for protocol in protocols:
|
||||
protocol_object = p_loader.load_protocol(protocols[protocol]["dbpath"])
|
||||
proto_db_path = path_join(WORKSPACE_DIR, workspace_name, f"{protocol}.db")
|
||||
|
||||
if not exists(proto_db_path):
|
||||
print(f"[*] Initializing {protocol.upper()} protocol database")
|
||||
conn = connect(proto_db_path)
|
||||
c = conn.cursor()
|
||||
|
||||
# try to prevent some weird sqlite I/O errors
|
||||
c.execute("PRAGMA journal_mode = OFF")
|
||||
c.execute("PRAGMA foreign_keys = 1")
|
||||
|
||||
protocol_object.database.db_schema(c)
|
||||
|
||||
# commit the changes and close everything off
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def delete_workspace(workspace_name):
|
||||
shutil.rmtree(path_join(WORKSPACE_DIR, workspace_name))
|
||||
|
||||
|
||||
def initialize_db(logger):
|
||||
if not exists(path_join(WS_PATH, "default")):
|
||||
logger.debug("Creating default workspace")
|
||||
mkdir(path_join(WS_PATH, "default"))
|
||||
|
||||
p_loader = ProtocolLoader()
|
||||
protocols = p_loader.get_protocols()
|
||||
for protocol in protocols:
|
||||
protocol_object = p_loader.load_protocol(protocols[protocol]["dbpath"])
|
||||
proto_db_path = path_join(WS_PATH, "default", f"{protocol}.db")
|
||||
|
||||
if not exists(proto_db_path):
|
||||
logger.debug(f"Initializing {protocol.upper()} protocol database")
|
||||
conn = connect(proto_db_path)
|
||||
c = conn.cursor()
|
||||
# try to prevent some weird sqlite I/O errors
|
||||
c.execute("PRAGMA journal_mode = OFF") # could try setting to PERSIST if DB corruption starts occurring
|
||||
c.execute("PRAGMA foreign_keys = 1")
|
||||
# set a small timeout (5s) so if another thread is writing to the database, the entire program doesn't crash
|
||||
c.execute("PRAGMA busy_timeout = 5000")
|
||||
protocol_object.database.db_schema(c)
|
||||
# commit the changes and close everything off
|
||||
conn.commit()
|
||||
conn.close()
|
93
nxc/nxcdb.py
93
nxc/nxcdb.py
|
@ -1,31 +1,23 @@
|
|||
import cmd
|
||||
import configparser
|
||||
import csv
|
||||
import sys
|
||||
import os
|
||||
from os import listdir
|
||||
from os.path import exists
|
||||
from os.path import join as path_join
|
||||
import shutil
|
||||
from sqlite3 import connect
|
||||
import sys
|
||||
from textwrap import dedent
|
||||
|
||||
from requests import get, post, ConnectionError
|
||||
from sqlalchemy import create_engine
|
||||
from terminaltables import AsciiTable
|
||||
|
||||
from nxc.loaders.protocolloader import ProtocolLoader
|
||||
from nxc.paths import CONFIG_PATH, WS_PATH, WORKSPACE_DIR
|
||||
from nxc.paths import CONFIG_PATH, WORKSPACE_DIR
|
||||
from nxc.database import create_db_engine, open_config, get_workspace, get_db, write_configfile, create_workspace
|
||||
|
||||
|
||||
class UserExitedProto(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def create_db_engine(db_path):
|
||||
return create_engine(f"sqlite:///{db_path}", isolation_level="AUTOCOMMIT", future=True)
|
||||
|
||||
|
||||
def print_table(data, title=None):
|
||||
print()
|
||||
table = AsciiTable(data)
|
||||
|
@ -446,29 +438,15 @@ class NXCDBMenu(cmd.Cmd):
|
|||
def __init__(self, config_path):
|
||||
cmd.Cmd.__init__(self)
|
||||
self.config_path = config_path
|
||||
|
||||
try:
|
||||
self.config = configparser.ConfigParser()
|
||||
self.config.read(self.config_path)
|
||||
except Exception as e:
|
||||
print(f"[-] Error reading nxc.conf: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
self.conn = None
|
||||
self.p_loader = ProtocolLoader()
|
||||
self.protocols = self.p_loader.get_protocols()
|
||||
|
||||
self.workspace = self.config.get("nxc", "workspace")
|
||||
self.config = open_config(self.config_path)
|
||||
self.workspace = get_workspace(self.config)
|
||||
self.db = get_db(self.config)
|
||||
self.do_workspace(self.workspace)
|
||||
|
||||
self.db = self.config.get("nxc", "last_used_db")
|
||||
if self.db:
|
||||
self.do_proto(self.db)
|
||||
|
||||
def write_configfile(self):
|
||||
with open(self.config_path, "w") as configfile:
|
||||
self.config.write(configfile)
|
||||
|
||||
def do_proto(self, proto):
|
||||
if not proto:
|
||||
return
|
||||
|
@ -479,7 +457,7 @@ class NXCDBMenu(cmd.Cmd):
|
|||
db_nav_object = self.p_loader.load_protocol(self.protocols[proto]["nvpath"])
|
||||
db_object = self.p_loader.load_protocol(self.protocols[proto]["dbpath"])
|
||||
self.config.set("nxc", "last_used_db", proto)
|
||||
self.write_configfile()
|
||||
write_configfile(self.config, self.config_path)
|
||||
try:
|
||||
proto_menu = db_nav_object.navigator(self, db_object.database(self.conn), proto)
|
||||
proto_menu.cmdloop()
|
||||
|
@ -506,7 +484,7 @@ class NXCDBMenu(cmd.Cmd):
|
|||
if subcommand == "create":
|
||||
new_workspace = line.split()[1].strip()
|
||||
print(f"[*] Creating workspace '{new_workspace}'")
|
||||
self.create_workspace(new_workspace, self.p_loader, self.protocols)
|
||||
create_workspace(new_workspace, self.p_loader, self.protocols)
|
||||
self.do_workspace(new_workspace)
|
||||
elif subcommand == "list":
|
||||
print("[*] Enumerating Workspaces")
|
||||
|
@ -517,7 +495,7 @@ class NXCDBMenu(cmd.Cmd):
|
|||
print(workspace)
|
||||
elif exists(path_join(WORKSPACE_DIR, line)):
|
||||
self.config.set("nxc", "workspace", line)
|
||||
self.write_configfile()
|
||||
write_configfile(self.config, self.config_path)
|
||||
self.workspace = line
|
||||
self.prompt = f"nxcdb ({line}) > "
|
||||
|
||||
|
@ -539,59 +517,6 @@ class NXCDBMenu(cmd.Cmd):
|
|||
"""
|
||||
print_help(help_string)
|
||||
|
||||
@staticmethod
|
||||
def create_workspace(workspace_name, p_loader, protocols):
|
||||
os.mkdir(path_join(WORKSPACE_DIR, workspace_name))
|
||||
|
||||
for protocol in protocols:
|
||||
protocol_object = p_loader.load_protocol(protocols[protocol]["dbpath"])
|
||||
proto_db_path = path_join(WORKSPACE_DIR, workspace_name, f"{protocol}.db")
|
||||
|
||||
if not exists(proto_db_path):
|
||||
print(f"[*] Initializing {protocol.upper()} protocol database")
|
||||
conn = connect(proto_db_path)
|
||||
c = conn.cursor()
|
||||
|
||||
# try to prevent some weird sqlite I/O errors
|
||||
c.execute("PRAGMA journal_mode = OFF")
|
||||
c.execute("PRAGMA foreign_keys = 1")
|
||||
|
||||
protocol_object.database.db_schema(c)
|
||||
|
||||
# commit the changes and close everything off
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def delete_workspace(workspace_name):
|
||||
shutil.rmtree(path_join(WORKSPACE_DIR, workspace_name))
|
||||
|
||||
|
||||
def initialize_db(logger):
|
||||
if not exists(path_join(WS_PATH, "default")):
|
||||
logger.debug("Creating default workspace")
|
||||
os.mkdir(path_join(WS_PATH, "default"))
|
||||
|
||||
p_loader = ProtocolLoader()
|
||||
protocols = p_loader.get_protocols()
|
||||
for protocol in protocols:
|
||||
protocol_object = p_loader.load_protocol(protocols[protocol]["dbpath"])
|
||||
proto_db_path = path_join(WS_PATH, "default", f"{protocol}.db")
|
||||
|
||||
if not exists(proto_db_path):
|
||||
logger.debug(f"Initializing {protocol.upper()} protocol database")
|
||||
conn = connect(proto_db_path)
|
||||
c = conn.cursor()
|
||||
# try to prevent some weird sqlite I/O errors
|
||||
c.execute("PRAGMA journal_mode = OFF") # could try setting to PERSIST if DB corruption starts occurring
|
||||
c.execute("PRAGMA foreign_keys = 1")
|
||||
# set a small timeout (5s) so if another thread is writing to the database, the entire program doesn't crash
|
||||
c.execute("PRAGMA busy_timeout = 5000")
|
||||
protocol_object.database.db_schema(c)
|
||||
# commit the changes and close everything off
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def main():
|
||||
if not exists(CONFIG_PATH):
|
||||
|
|
Loading…
Reference in New Issue