from pathlib import Path from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy import MetaData, Table from sqlalchemy.exc import ( IllegalStateChangeError, NoInspectionAvailable, NoSuchTableError, ) from nxc.logger import nxc_logger import sys class database: def __init__(self, db_engine): self.CredentialsTable = None self.HostsTable = None self.db_engine = db_engine self.db_path = self.db_engine.url.database self.protocol = Path(self.db_path).stem.upper() self.metadata = MetaData() self.reflect_tables() session_factory = sessionmaker(bind=self.db_engine, expire_on_commit=True) Session = scoped_session(session_factory) # this is still named "conn" when it is the session object; TODO: rename self.conn = Session() @staticmethod def db_schema(db_conn): db_conn.execute( """CREATE TABLE "credentials" ( "id" integer PRIMARY KEY, "username" text, "password" text )""" ) db_conn.execute( """CREATE TABLE "hosts" ( "id" integer PRIMARY KEY, "ip" text, "hostname" text, "port" integer )""" ) def reflect_tables(self): with self.db_engine.connect(): try: self.CredentialsTable = Table("credentials", self.metadata, autoload_with=self.db_engine) self.HostsTable = Table("hosts", self.metadata, autoload_with=self.db_engine) except (NoInspectionAvailable, NoSuchTableError): print( f""" [-] Error reflecting tables for the {self.protocol} protocol - this means there is a DB schema mismatch [-] This is probably because a newer version of nxc is being ran on an old DB schema [-] Optionally save the old DB data (`cp {self.db_path} ~/nxc_{self.protocol.lower()}.bak`) [-] Then remove the nxc {self.protocol} DB (`rm -f {self.db_path}`) and run nxc to initialize the new DB""" ) sys.exit() def shutdown_db(self): try: self.conn.close() # due to the async nature of nxc, sometimes session state is a bit messy and this will throw: # Method 'close()' can't be called here; method '_connection_for_bind()' is already in progress and # this would cause an unexpected state change to except IllegalStateChangeError as e: nxc_logger.debug(f"Error while closing session db object: {e}") def clear_database(self): for table in self.metadata.sorted_tables: self.conn.execute(table.delete())