From a9294e7b361d14c75424127f4235ab1078e3eb94 Mon Sep 17 00:00:00 2001 From: Marshall Hallenbeck Date: Sat, 29 Apr 2023 16:33:16 -0400 Subject: [PATCH] feat(ssh): add shell access tracking to DB and display it in cmedb --- cme/protocols/ssh.py | 3 +- cme/protocols/ssh/database.py | 13 +++- cme/protocols/ssh/db_navigator.py | 101 +++++++++++++++++++----------- 3 files changed, 78 insertions(+), 39 deletions(-) diff --git a/cme/protocols/ssh.py b/cme/protocols/ssh.py index 0aa77b2a..2fef4b7b 100644 --- a/cme/protocols/ssh.py +++ b/cme/protocols/ssh.py @@ -157,7 +157,6 @@ class ssh(connection): shell_access = False host_id = self.db.get_hosts(self.host)[0].id - self.db.add_loggedin_relation(cred_id, host_id) if self.check_if_admin(): shell_access = True @@ -175,6 +174,8 @@ class ssh(connection): else: shell_access = True + self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access) + if self.args.key_file: password = f"{password} (keyfile: {self.args.key_file})" diff --git a/cme/protocols/ssh/database.py b/cme/protocols/ssh/database.py index a73e80a6..3332140d 100644 --- a/cme/protocols/ssh/database.py +++ b/cme/protocols/ssh/database.py @@ -58,9 +58,11 @@ class database: "id" integer PRIMARY KEY, "credid" integer, "hostid" integer, + "shell" boolean, FOREIGN KEY(credid) REFERENCES credentials(id), FOREIGN KEY(hostid) REFERENCES hosts(id) )''') + # "admin" access with SSH means we have root access, which implies shell access since we run commands to check db_conn.execute( '''CREATE TABLE "admin_relations" ( "id" integer PRIMARY KEY, @@ -475,7 +477,7 @@ class database: results = self.sess.execute(q).all() return results - def add_loggedin_relation(self, cred_id, host_id): + def add_loggedin_relation(self, cred_id, host_id, shell=False): relation_query = select(self.LoggedinRelationsTable).filter( self.LoggedinRelationsTable.c.credid == cred_id, self.LoggedinRelationsTable.c.hostid == host_id @@ -486,7 +488,8 @@ class database: if not results: relation = { "credid": cred_id, - "hostid": host_id + "hostid": host_id, + "shell": shell } try: cme_logger.debug(f"Inserting loggedin_relations: {relation}") @@ -503,7 +506,7 @@ class database: except Exception as e: cme_logger.debug(f"Error inserting LoggedinRelation: {e}") - def get_loggedin_relations(self, cred_id=None, host_id=None): + def get_loggedin_relations(self, cred_id=None, host_id=None, shell=None): q = select(self.LoggedinRelationsTable) # .returning(self.LoggedinRelationsTable.c.id) if cred_id: q = q.filter( @@ -513,6 +516,10 @@ class database: q = q.filter( self.LoggedinRelationsTable.c.hostid == host_id ) + if shell: + q = q.filter( + self.LoggedinRelationsTable.c.shell == shell + ) results = self.sess.execute(q).all() return results diff --git a/cme/protocols/ssh/db_navigator.py b/cme/protocols/ssh/db_navigator.py index 961d9f3c..83add008 100644 --- a/cme/protocols/ssh/db_navigator.py +++ b/cme/protocols/ssh/db_navigator.py @@ -7,7 +7,7 @@ from cme.cmedb import DatabaseNavigator, print_table, print_help class navigator(DatabaseNavigator): def display_creds(self, creds): - data = [['CredID', 'Admin On', 'Total Login', 'Username', 'Password', 'CredType']] + data = [["CredID", "Admin On", "Total Login", "Total Shell", "Username", "Password", "CredType"]] for cred in creds: cred_id = cred[0] @@ -17,25 +17,27 @@ class navigator(DatabaseNavigator): admin_links = self.db.get_admin_relations(cred_id=cred_id) total_users = self.db.get_loggedin_relations(cred_id=cred_id) + total_shell = total_users = self.db.get_loggedin_relations(cred_id=cred_id, shell=True) data.append([ cred_id, - str(len(admin_links)) + ' Host(s)', - str(len(total_users)) + ' Host(s)', + str(len(admin_links)) + " Host(s)", + str(len(total_users)) + " Host(s)", + str(len(total_shell)) + " Shells(s)", username, password, credtype ]) - print_table(data, title='Credentials') + print_table(data, title="Credentials") # pull/545 def display_hosts(self, hosts): data = [[ - 'HostID', - 'Admins', - 'Total Users', - 'Host', - 'Port', + "HostID", + "Admins", + "Total Users", + "Host", + "Port", 'Banner', 'OS' ]] @@ -52,15 +54,15 @@ class navigator(DatabaseNavigator): data.append( [ host_id, - str(len(admin_users)) + ' Cred(s)', - str(len(total_users)) + ' User(s)', + str(len(admin_users)) + " Cred(s)", + str(len(total_users)) + " User(s)", host, port, banner, os ] ) - print_table(data, title='Hosts') + print_table(data, title="Hosts") def do_hosts(self, line): filter_term = line.strip() @@ -75,11 +77,11 @@ class navigator(DatabaseNavigator): self.display_hosts(hosts) elif len(hosts) == 1: data = [[ - 'HostID', - 'Host', - 'Port', - 'Banner', - 'OS' + "HostID", + "Host", + "Port", + "Banner", + "OS" ]] host_id_list = [] @@ -100,24 +102,48 @@ class navigator(DatabaseNavigator): os ] ) - print_table(data, title='Host') + print_table(data, title="Host") - data = [['CredID', 'CredType', 'UserName', 'Password']] + admin_access_data = [["CredID", "CredType", "UserName", "Password", "Shell"]] + nonadmin_access_data = [["CredID", "CredType", "UserName", "Password", "Shell"]] for host_id in host_id_list: - links = self.db.get_admin_relations(host_id=host_id) + admin_links = self.db.get_admin_relations(host_id=host_id) + nonadmin_links = self.db.get_loggedin_relations(host_id=host_id) - for link in links: + for link in admin_links: link_id, cred_id, host_id = link creds = self.db.get_credentials(filter_term=cred_id) for cred in creds: cred_id = cred[0] - credtype = cred[1] - username = cred[2] - password = cred[3] - # pillaged_from = cred[5] - data.append([cred_id, credtype, username, password]) - print_table(data, title='Credential(s) with Admin Access') + username = cred[1] + password = cred[2] + credtype = cred[3] + shell = True + + admin_access_data.append([cred_id, credtype, username, password, shell]) + + # probably a better way to do this without looping through and requesting them all again, + # but I just want to get this working for now + for link in nonadmin_links: + link_id, cred_id, host_id, shell = link + creds = self.db.get_credentials(filter_term=cred_id) + for cred in creds: + cred_id = cred[0] + username = cred[1] + password = cred[2] + credtype = cred[3] + shell = shell + + cred_data = [cred_id, credtype, username, password, shell] + + if cred_data not in admin_access_data: + nonadmin_access_data.append(cred_data) + + if len(nonadmin_access_data) > 1: + print_table(nonadmin_access_data, title="Credential(s) with Non Admin Access") + if len(admin_access_data) > 1: + print_table(admin_access_data, title="Credential(s) with Admin Access") def help_hosts(self): help_string = """ @@ -167,7 +193,7 @@ class navigator(DatabaseNavigator): if len(creds) != 1: self.display_creds(creds) elif len(creds) == 1: - cred_data = [['CredID', 'UserName', 'Password', 'CredType']] + cred_data = [["CredID", "UserName", "Password", "CredType"]] cred_id_list = [] for cred in creds: @@ -180,8 +206,8 @@ class navigator(DatabaseNavigator): cred_data.append([cred_id, username, password, credtype]) print_table(cred_data, title='Credential(s)') - admin_access_data = [['HostID', 'Host', 'Port', 'Banner', 'OS']] - nonadmin_access_data = [['HostID', 'Host', 'Port', 'Banner', 'OS']] + admin_access_data = [["HostID", "Host", "Port", "Banner", "OS", "Shell"]] + nonadmin_access_data = [["HostID", "Host", "Port", "Banner", "OS", "Shell"]] for cred_id in cred_id_list: admin_links = self.db.get_admin_relations(cred_id=cred_id) @@ -196,13 +222,14 @@ class navigator(DatabaseNavigator): port = h[2] banner = h[3] os = h[4] + shell = True # if we have root via SSH, we know it's a shell - admin_access_data.append([host_id, host, port, banner, os]) + admin_access_data.append([host_id, host, port, banner, os, shell]) # probably a better way to do this without looping through and requesting them all again, # but I just want to get this working for now for link in nonadmin_links: - link_id, cred_id, host_id = link + link_id, cred_id, host_id, shell = link hosts = self.db.get_hosts(host_id) for h in hosts: host_id = h[0] @@ -210,12 +237,15 @@ class navigator(DatabaseNavigator): port = h[2] banner = h[3] os = h[4] - host_data = [host_id, host, port, banner, os] + host_data = [host_id, host, port, banner, os, shell] if host_data not in admin_access_data: nonadmin_access_data.append(host_data) - print_table(nonadmin_access_data, title='Non-Admin Access to Host(s)') - print_table(admin_access_data, title='Admin Access to Host(s)') + # we look if it's greater than one because the header row always exists + if len(nonadmin_access_data) > 1: + print_table(nonadmin_access_data, title="Non-Admin Access to Host(s)") + if len(admin_access_data) > 1: + print_table(admin_access_data, title="Admin Access to Host(s)") def help_creds(self): help_string = """ @@ -282,6 +312,7 @@ class navigator(DatabaseNavigator): """ print_help(help_string) + @staticmethod def complete_hosts(self, text, line): """ Tab-complete 'hosts' commands.