feat(smbdb): update remaining functions to proper sqlalchemy syntax

main
Marshall Hallenbeck 2023-03-08 01:24:44 -05:00
parent 1d33c58059
commit 121b25d243
1 changed files with 41 additions and 23 deletions

View File

@ -454,12 +454,20 @@ class database:
"""
Removes a credential ID from the database
"""
# for cred_id in creds_id:
# self.conn.query(self.UsersTable).filter(
# self.UsersTable.c.id == cred_id
# ).delete()
# self.conn.commit()
# self.conn.close()
#
del_hosts = []
for cred_id in creds_id:
self.conn.query(self.UsersTable).filter(
q = delete(self.UsersTable).filter(
self.UsersTable.c.id == cred_id
).delete()
self.conn.commit()
self.conn.close()
)
del_hosts.append(q)
asyncio.run(self.conn.execute(q))
def add_admin_user(self, credtype, domain, username, password, host, user_id=None):
domain = domain.split('.')[0].upper()
@ -545,30 +553,30 @@ class database:
return results
def remove_admin_relation(self, user_ids=None, host_ids=None):
q = delete(self.AdminRelationsTable)
if user_ids:
for user_id in user_ids:
self.conn.query(self.AdminRelationsTable).filter(
q.filter(
self.AdminRelationsTable.c.userid == user_id
).delete()
)
elif host_ids:
for host_id in host_ids:
self.conn.query(self.AdminRelationsTable).filter(
self.AdminRelationsTable.c.hostid == host_id
).delete()
self.conn.commit()
self.conn.close()
q.filter(
self.AdminRelationsTable.c.hostid == host_id
)
asyncio.run(self.conn.execute(q))
def remove_group_relations(self, user_id=None, group_id=None):
q = delete(self.GroupRelationsTable)
if user_id:
self.conn.query(self.GroupRelationsTable).filter(
q.filter(
self.GroupRelationsTable.c.userid == user_id
).delete()
)
elif group_id:
self.conn.query(self.GroupRelationsTable).filter(
q.filter(
self.GroupRelationsTable.c.groupid == group_id
).delete()
self.conn.commit()
self.conn.close()
)
asyncio.run(self.conn.execute(q))
def is_credential_valid(self, credential_id):
"""
@ -582,16 +590,26 @@ class database:
return len(results) > 0
def is_credential_local(self, credential_id):
user_domain = self.conn.query(self.UsersTable.c.domain).filter(
# user_domain = self.conn.query(self.UsersTable.c.domain).filter(
# self.UsersTable.c.id == credential_id
# ).all()
q = select(self.UsersTable.c.domain).filter(
self.UsersTable.c.id == credential_id
).all()
)
user_domain = asyncio.run(self.conn.execute(q)).all()
if user_domain:
results = self.conn.query(self.ComputersTable).filter(
q = select(self.ComputersTable).filter(
func.lower(self.ComputersTable.c.id) == func.lower(user_domain)
).all()
self.conn.commit()
self.conn.close()
)
results = asyncio.run(self.conn.execute(q)).all()
# results = self.conn.query(self.ComputersTable).filter(
# func.lower(self.ComputersTable.c.id) == func.lower(user_domain)
# ).all()
# self.conn.commit()
# self.conn.close()
return len(results) > 0
def get_credentials(self, filter_term=None, cred_type=None):