feat(ssh): add shell access tracking to DB and display it in cmedb
parent
d4ac3aded1
commit
a9294e7b36
|
@ -157,7 +157,6 @@ class ssh(connection):
|
|||
|
||||
shell_access = False
|
||||
host_id = self.db.get_hosts(self.host)[0].id
|
||||
self.db.add_loggedin_relation(cred_id, host_id)
|
||||
|
||||
if self.check_if_admin():
|
||||
shell_access = True
|
||||
|
@ -175,6 +174,8 @@ class ssh(connection):
|
|||
else:
|
||||
shell_access = True
|
||||
|
||||
self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access)
|
||||
|
||||
if self.args.key_file:
|
||||
password = f"{password} (keyfile: {self.args.key_file})"
|
||||
|
||||
|
|
|
@ -58,9 +58,11 @@ class database:
|
|||
"id" integer PRIMARY KEY,
|
||||
"credid" integer,
|
||||
"hostid" integer,
|
||||
"shell" boolean,
|
||||
FOREIGN KEY(credid) REFERENCES credentials(id),
|
||||
FOREIGN KEY(hostid) REFERENCES hosts(id)
|
||||
)''')
|
||||
# "admin" access with SSH means we have root access, which implies shell access since we run commands to check
|
||||
db_conn.execute(
|
||||
'''CREATE TABLE "admin_relations" (
|
||||
"id" integer PRIMARY KEY,
|
||||
|
@ -475,7 +477,7 @@ class database:
|
|||
results = self.sess.execute(q).all()
|
||||
return results
|
||||
|
||||
def add_loggedin_relation(self, cred_id, host_id):
|
||||
def add_loggedin_relation(self, cred_id, host_id, shell=False):
|
||||
relation_query = select(self.LoggedinRelationsTable).filter(
|
||||
self.LoggedinRelationsTable.c.credid == cred_id,
|
||||
self.LoggedinRelationsTable.c.hostid == host_id
|
||||
|
@ -486,7 +488,8 @@ class database:
|
|||
if not results:
|
||||
relation = {
|
||||
"credid": cred_id,
|
||||
"hostid": host_id
|
||||
"hostid": host_id,
|
||||
"shell": shell
|
||||
}
|
||||
try:
|
||||
cme_logger.debug(f"Inserting loggedin_relations: {relation}")
|
||||
|
@ -503,7 +506,7 @@ class database:
|
|||
except Exception as e:
|
||||
cme_logger.debug(f"Error inserting LoggedinRelation: {e}")
|
||||
|
||||
def get_loggedin_relations(self, cred_id=None, host_id=None):
|
||||
def get_loggedin_relations(self, cred_id=None, host_id=None, shell=None):
|
||||
q = select(self.LoggedinRelationsTable) # .returning(self.LoggedinRelationsTable.c.id)
|
||||
if cred_id:
|
||||
q = q.filter(
|
||||
|
@ -513,6 +516,10 @@ class database:
|
|||
q = q.filter(
|
||||
self.LoggedinRelationsTable.c.hostid == host_id
|
||||
)
|
||||
if shell:
|
||||
q = q.filter(
|
||||
self.LoggedinRelationsTable.c.shell == shell
|
||||
)
|
||||
results = self.sess.execute(q).all()
|
||||
return results
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ from cme.cmedb import DatabaseNavigator, print_table, print_help
|
|||
|
||||
class navigator(DatabaseNavigator):
|
||||
def display_creds(self, creds):
|
||||
data = [['CredID', 'Admin On', 'Total Login', 'Username', 'Password', 'CredType']]
|
||||
data = [["CredID", "Admin On", "Total Login", "Total Shell", "Username", "Password", "CredType"]]
|
||||
|
||||
for cred in creds:
|
||||
cred_id = cred[0]
|
||||
|
@ -17,25 +17,27 @@ class navigator(DatabaseNavigator):
|
|||
|
||||
admin_links = self.db.get_admin_relations(cred_id=cred_id)
|
||||
total_users = self.db.get_loggedin_relations(cred_id=cred_id)
|
||||
total_shell = total_users = self.db.get_loggedin_relations(cred_id=cred_id, shell=True)
|
||||
|
||||
data.append([
|
||||
cred_id,
|
||||
str(len(admin_links)) + ' Host(s)',
|
||||
str(len(total_users)) + ' Host(s)',
|
||||
str(len(admin_links)) + " Host(s)",
|
||||
str(len(total_users)) + " Host(s)",
|
||||
str(len(total_shell)) + " Shells(s)",
|
||||
username,
|
||||
password,
|
||||
credtype
|
||||
])
|
||||
print_table(data, title='Credentials')
|
||||
print_table(data, title="Credentials")
|
||||
|
||||
# pull/545
|
||||
def display_hosts(self, hosts):
|
||||
data = [[
|
||||
'HostID',
|
||||
'Admins',
|
||||
'Total Users',
|
||||
'Host',
|
||||
'Port',
|
||||
"HostID",
|
||||
"Admins",
|
||||
"Total Users",
|
||||
"Host",
|
||||
"Port",
|
||||
'Banner',
|
||||
'OS'
|
||||
]]
|
||||
|
@ -52,15 +54,15 @@ class navigator(DatabaseNavigator):
|
|||
data.append(
|
||||
[
|
||||
host_id,
|
||||
str(len(admin_users)) + ' Cred(s)',
|
||||
str(len(total_users)) + ' User(s)',
|
||||
str(len(admin_users)) + " Cred(s)",
|
||||
str(len(total_users)) + " User(s)",
|
||||
host,
|
||||
port,
|
||||
banner,
|
||||
os
|
||||
]
|
||||
)
|
||||
print_table(data, title='Hosts')
|
||||
print_table(data, title="Hosts")
|
||||
|
||||
def do_hosts(self, line):
|
||||
filter_term = line.strip()
|
||||
|
@ -75,11 +77,11 @@ class navigator(DatabaseNavigator):
|
|||
self.display_hosts(hosts)
|
||||
elif len(hosts) == 1:
|
||||
data = [[
|
||||
'HostID',
|
||||
'Host',
|
||||
'Port',
|
||||
'Banner',
|
||||
'OS'
|
||||
"HostID",
|
||||
"Host",
|
||||
"Port",
|
||||
"Banner",
|
||||
"OS"
|
||||
]]
|
||||
host_id_list = []
|
||||
|
||||
|
@ -100,24 +102,48 @@ class navigator(DatabaseNavigator):
|
|||
os
|
||||
]
|
||||
)
|
||||
print_table(data, title='Host')
|
||||
print_table(data, title="Host")
|
||||
|
||||
data = [['CredID', 'CredType', 'UserName', 'Password']]
|
||||
admin_access_data = [["CredID", "CredType", "UserName", "Password", "Shell"]]
|
||||
nonadmin_access_data = [["CredID", "CredType", "UserName", "Password", "Shell"]]
|
||||
for host_id in host_id_list:
|
||||
links = self.db.get_admin_relations(host_id=host_id)
|
||||
admin_links = self.db.get_admin_relations(host_id=host_id)
|
||||
nonadmin_links = self.db.get_loggedin_relations(host_id=host_id)
|
||||
|
||||
for link in links:
|
||||
for link in admin_links:
|
||||
link_id, cred_id, host_id = link
|
||||
creds = self.db.get_credentials(filter_term=cred_id)
|
||||
|
||||
for cred in creds:
|
||||
cred_id = cred[0]
|
||||
credtype = cred[1]
|
||||
username = cred[2]
|
||||
password = cred[3]
|
||||
# pillaged_from = cred[5]
|
||||
data.append([cred_id, credtype, username, password])
|
||||
print_table(data, title='Credential(s) with Admin Access')
|
||||
username = cred[1]
|
||||
password = cred[2]
|
||||
credtype = cred[3]
|
||||
shell = True
|
||||
|
||||
admin_access_data.append([cred_id, credtype, username, password, shell])
|
||||
|
||||
# probably a better way to do this without looping through and requesting them all again,
|
||||
# but I just want to get this working for now
|
||||
for link in nonadmin_links:
|
||||
link_id, cred_id, host_id, shell = link
|
||||
creds = self.db.get_credentials(filter_term=cred_id)
|
||||
for cred in creds:
|
||||
cred_id = cred[0]
|
||||
username = cred[1]
|
||||
password = cred[2]
|
||||
credtype = cred[3]
|
||||
shell = shell
|
||||
|
||||
cred_data = [cred_id, credtype, username, password, shell]
|
||||
|
||||
if cred_data not in admin_access_data:
|
||||
nonadmin_access_data.append(cred_data)
|
||||
|
||||
if len(nonadmin_access_data) > 1:
|
||||
print_table(nonadmin_access_data, title="Credential(s) with Non Admin Access")
|
||||
if len(admin_access_data) > 1:
|
||||
print_table(admin_access_data, title="Credential(s) with Admin Access")
|
||||
|
||||
def help_hosts(self):
|
||||
help_string = """
|
||||
|
@ -167,7 +193,7 @@ class navigator(DatabaseNavigator):
|
|||
if len(creds) != 1:
|
||||
self.display_creds(creds)
|
||||
elif len(creds) == 1:
|
||||
cred_data = [['CredID', 'UserName', 'Password', 'CredType']]
|
||||
cred_data = [["CredID", "UserName", "Password", "CredType"]]
|
||||
cred_id_list = []
|
||||
|
||||
for cred in creds:
|
||||
|
@ -180,8 +206,8 @@ class navigator(DatabaseNavigator):
|
|||
cred_data.append([cred_id, username, password, credtype])
|
||||
print_table(cred_data, title='Credential(s)')
|
||||
|
||||
admin_access_data = [['HostID', 'Host', 'Port', 'Banner', 'OS']]
|
||||
nonadmin_access_data = [['HostID', 'Host', 'Port', 'Banner', 'OS']]
|
||||
admin_access_data = [["HostID", "Host", "Port", "Banner", "OS", "Shell"]]
|
||||
nonadmin_access_data = [["HostID", "Host", "Port", "Banner", "OS", "Shell"]]
|
||||
|
||||
for cred_id in cred_id_list:
|
||||
admin_links = self.db.get_admin_relations(cred_id=cred_id)
|
||||
|
@ -196,13 +222,14 @@ class navigator(DatabaseNavigator):
|
|||
port = h[2]
|
||||
banner = h[3]
|
||||
os = h[4]
|
||||
shell = True # if we have root via SSH, we know it's a shell
|
||||
|
||||
admin_access_data.append([host_id, host, port, banner, os])
|
||||
admin_access_data.append([host_id, host, port, banner, os, shell])
|
||||
|
||||
# probably a better way to do this without looping through and requesting them all again,
|
||||
# but I just want to get this working for now
|
||||
for link in nonadmin_links:
|
||||
link_id, cred_id, host_id = link
|
||||
link_id, cred_id, host_id, shell = link
|
||||
hosts = self.db.get_hosts(host_id)
|
||||
for h in hosts:
|
||||
host_id = h[0]
|
||||
|
@ -210,12 +237,15 @@ class navigator(DatabaseNavigator):
|
|||
port = h[2]
|
||||
banner = h[3]
|
||||
os = h[4]
|
||||
host_data = [host_id, host, port, banner, os]
|
||||
host_data = [host_id, host, port, banner, os, shell]
|
||||
if host_data not in admin_access_data:
|
||||
nonadmin_access_data.append(host_data)
|
||||
|
||||
print_table(nonadmin_access_data, title='Non-Admin Access to Host(s)')
|
||||
print_table(admin_access_data, title='Admin Access to Host(s)')
|
||||
# we look if it's greater than one because the header row always exists
|
||||
if len(nonadmin_access_data) > 1:
|
||||
print_table(nonadmin_access_data, title="Non-Admin Access to Host(s)")
|
||||
if len(admin_access_data) > 1:
|
||||
print_table(admin_access_data, title="Admin Access to Host(s)")
|
||||
|
||||
def help_creds(self):
|
||||
help_string = """
|
||||
|
@ -282,6 +312,7 @@ class navigator(DatabaseNavigator):
|
|||
"""
|
||||
print_help(help_string)
|
||||
|
||||
@staticmethod
|
||||
def complete_hosts(self, text, line):
|
||||
"""
|
||||
Tab-complete 'hosts' commands.
|
||||
|
|
Loading…
Reference in New Issue