100 lines
2.3 KiB
Python
100 lines
2.3 KiB
Python
#imports
|
|
from sqlalchemy import create_engine, Column, String, Integer
|
|
from sqlalchemy.orm import sessionmaker, declarative_base
|
|
from os.path import exists
|
|
from os import mkdir
|
|
from contextlib import contextmanager
|
|
|
|
#sql alchemy base for tables to inherit from
|
|
Base = declarative_base()
|
|
|
|
# Ensure the directory for the database exists
|
|
if not exists("db"):
|
|
mkdir("db")
|
|
|
|
# Define the Password model
|
|
class Password(Base):
|
|
__tablename__ = "Password"
|
|
domain = Column("domain", String, primary_key=True)
|
|
ccred = Column("ccred", String)
|
|
|
|
def __init__(self, domain, ccred):
|
|
self.domain = domain
|
|
self.ccred = ccred
|
|
|
|
|
|
# Database setup
|
|
engine = create_engine("sqlite:///db/creds.db")
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
Session = sessionmaker(bind=engine)
|
|
|
|
#manage context
|
|
@contextmanager
|
|
def get_session():
|
|
session = Session()
|
|
try:
|
|
yield session
|
|
session.commit()
|
|
except Exception as e:
|
|
session.rollback()
|
|
print(f"Database error: {e}")
|
|
finally:
|
|
session.close()
|
|
|
|
#store credential
|
|
def store(domain,ccred):
|
|
p = Password(domain,ccred)
|
|
with get_session() as session:
|
|
try:
|
|
session.add(p)
|
|
return "Stored credential"
|
|
except Exception as e:
|
|
return "Error storing password: {e}"
|
|
|
|
#load credemtial
|
|
def fetch(domain):
|
|
#with context manager
|
|
with get_session() as session:
|
|
|
|
#query passwords by domain
|
|
p = session.query(Password).filter(Password.domain == domain).first()
|
|
|
|
#if one is found
|
|
if p:
|
|
return p.ccred
|
|
|
|
#error
|
|
else:
|
|
return f"No cred found for '{domain}'"
|
|
def fetchall():
|
|
#with context manager
|
|
with get_session() as session:
|
|
|
|
#query passwords by domain
|
|
p = session.query(Password).all()
|
|
|
|
#if one is found
|
|
if p:
|
|
return p
|
|
|
|
#error
|
|
else:
|
|
return "No creds found"
|
|
#load credential
|
|
def update(domain,ccred):
|
|
#with context manager
|
|
with get_session() as session:
|
|
|
|
#query passwords by domain
|
|
p = session.query(Password).filter(Password.domain == domain).first()
|
|
|
|
#if one is found
|
|
if p:
|
|
p.ccred=ccred
|
|
return "updated credential"
|
|
|
|
#error
|
|
else:
|
|
return f"No cred found for '{domain}'"
|