#!/usr/bin/env python3 # -*- coding: utf-8 -*- import paramiko import re import uuid import logging import time import base64 from io import StringIO from nxc.config import process_secret from nxc.connection import * from nxc.logger import NXCAdapter from paramiko.ssh_exception import ( AuthenticationException, NoValidConnectionsError, SSHException, ) class ssh(connection): def __init__(self, args, db, host): self.protocol = "SSH" self.remote_version = "Unknown SSH Version" self.server_os_platform = "Linux" self.user_principal = "root" super().__init__(args, db, host) def proto_flow(self): self.logger.debug(f"Kicking off proto_flow") self.proto_logger() if self.create_conn_obj(): self.enum_host_info() self.print_host_info() if self.remote_version == "Unknown SSH Version": self.conn.close() return if self.login(): if hasattr(self.args, "module") and self.args.module: self.call_modules() else: self.call_cmd_args() self.conn.close() def proto_logger(self): logging.getLogger("paramiko").disabled = True logging.getLogger("paramiko.transport").disabled = True self.logger = NXCAdapter( extra={ "protocol": "SSH", "host": self.host, "port": self.args.port, "hostname": self.hostname, } ) def print_host_info(self): self.logger.display(self.remote_version if self.remote_version != "Unknown SSH Version" else f"{self.remote_version}, skipping...") return True def enum_host_info(self): if self.conn._transport.remote_version: self.remote_version = self.conn._transport.remote_version self.logger.debug(f'Remote version: {self.remote_version}') self.db.add_host(self.host, self.args.port, self.remote_version) def create_conn_obj(self): self.conn = paramiko.SSHClient() self.conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: self.conn.connect(self.host, port=self.args.port, timeout=self.args.ssh_timeout) except AuthenticationException: return True except SSHException: return True except NoValidConnectionsError: return False except socket.error: return False def check_if_admin(self): self.admin_privs = False if self.args.sudo_check: self.check_if_admin_sudo() return # we could add in another method to check by piping in the password to sudo # but that might be too much of an opsec concern - maybe add in a flag to do more checks? self.logger.info(f"Determined user is root via `id ; sudo -ln` command") stdin, stdout, stderr = self.conn.exec_command("id ; sudo -ln 2>&1") stdout = stdout.read().decode("utf-8", errors="ignore") admin_flag = { "(root)": [True, None], "NOPASSWD: ALL": [True, None], "(ALL : ALL) ALL": [True, None], "(sudo)": [False, f'Current user: "{self.username}" was in "sudo" group, please try "--sudo-check" to check if user can run sudo shell'] } for keyword in admin_flag.keys(): match = re.findall(re.escape(keyword), stdout) if match: self.logger.info(f'User: "{self.username}" matched keyword: {match[0]}') self.admin_privs = admin_flag[match[0]][0] if not self.admin_privs: tips = admin_flag[match[0]][1] continue else: break if not self.admin_privs and "tips" in locals(): self.logger.display(tips) return def check_if_admin_sudo(self): if not self.password: self.logger.error("Check admin with sudo not support private key.") return if self.args.sudo_check_method: method = self.args.sudo_check_method self.logger.info(f"Doing sudo check with method: {method}") if method == "sudo-stdin": stdin, stdout, stderr = self.conn.exec_command("sudo --help") stdout = stdout.read().decode("utf-8", errors="ignore") if "stdin" in stdout: shadow_Backup = f'/tmp/{uuid.uuid4()}' # sudo support stdin password stdin, stdout, stderr = self.conn.exec_command(f"echo {self.password} | sudo -S cp /etc/shadow {shadow_Backup} >/dev/null 2>&1 &") stdin, stdout, stderr = self.conn.exec_command(f"echo {self.password} | sudo -S chmod 777 {shadow_Backup} >/dev/null 2>&1 &") tries = 1 while True: self.logger.info(f"Checking {shadow_Backup} if it existed") stdin, stdout, stderr = self.conn.exec_command(f'ls {shadow_Backup}') if tries >= self.args.get_output_tries: self.logger.info(f'{shadow_Backup} not existed, maybe the pipe has been hanged over, please increase the number of tries with the option "--get-output-tries" or change other method with "--sudo-check-method". If it\'s still failing maybe sudo shell is not working with current user') break if stderr.read().decode('utf-8'): time.sleep(2) tries +=1 else: self.logger.info(f"{shadow_Backup} existed") self.admin_privs = True break self.logger.info(f"Remove up temporary files") stdin, stdout, stderr = self.conn.exec_command(f"rm -rf {shadow_Backup}") else: self.logger.error("Command: 'sudo' not support stdin mode, running command with 'sudo' failed") return else: stdin, stdout, stderr = self.conn.exec_command("mkfifo --help") stdout = stdout.read().decode("utf-8", errors="ignore") # check if user can execute mkfifo if "Create named pipes" in stdout: self.logger.info("Command: 'mkfifo' available") pipe_stdin = f'/tmp/systemd-{uuid.uuid4()}' pipe_stdout = f'/tmp/systemd-{uuid.uuid4()}' shadow_Backup = f'/tmp/{uuid.uuid4()}' stdin, stdout, stderr = self.conn.exec_command(f"mkfifo {pipe_stdin}; tail -f {pipe_stdin} | /bin/sh 2>&1 > {pipe_stdout} >/dev/null 2>&1 &") # 'script -qc /bin/sh /dev/null' means "upgrade" the shell, like reverse shell from netcat stdin, stdout, stderr = self.conn.exec_command(f"echo 'script -qc /bin/sh /dev/null' > {pipe_stdin}") stdin, stdout, stderr = self.conn.exec_command(f"echo 'sudo -s' > {pipe_stdin} && echo '{self.password}' > {pipe_stdin}") # Sometime the pipe will hanging(only happen with paramiko) # Can't get "whoami" or "id" result in pipe_stdout, maybe something wrong using pipe with paramiko # But one thing I can confirm, is the command was executed even can't get result from pipe_stdout tries = 1 self.logger.info(f"Copy /etc/shadow to {shadow_Backup} if pass the sudo auth") while True: self.logger.info(f"Checking {shadow_Backup} if it existed") stdin, stdout, stderr = self.conn.exec_command(f'ls {shadow_Backup}') if tries >= self.args.get_output_tries: self.logger.info(f'{shadow_Backup} not existed, maybe the pipe has been hanged over, please increase the number of tries with the option "--get-output-tries" or change other method with "--sudo-check-method". If it\'s still failing maybe sudo shell is not working with current user') break if stderr.read().decode('utf-8'): time.sleep(2) stdin, stdout, stderr = self.conn.exec_command(f"echo 'cp /etc/shadow {shadow_Backup} && chmod 777 {shadow_Backup}' > {pipe_stdin}") tries += 1 else: self.logger.info(f"{shadow_Backup} existed") self.admin_privs = True break self.logger.info(f"Remove up temporary files") stdin, stdout, stderr = self.conn.exec_command(f"rm -rf {shadow_Backup} {pipe_stdin} {pipe_stdout}") else: self.logger.error("Command: 'mkfifo' unavailable, running command with 'sudo' failed") return def plaintext_login(self, username, password, private_key=None): self.username = username self.password = password private_key = "" stdout = None stderr = None try: if self.args.key_file or private_key: self.logger.debug(f"Logging in with key") if self.args.key_file: with open(self.args.key_file, 'r') as f: private_key = f.read() pkey = paramiko.RSAKey.from_private_key(StringIO(private_key), password) self.conn._transport.auth_publickey(username, pkey) cred_id = self.db.add_credential( "key", username, password if password != "" else "", key=private_key, ) else: self.logger.debug(f"Logging {self.host} with username: {self.username}, password: {self.password}") self.conn._transport.auth_password(username, password, fallback=True) cred_id = self.db.add_credential("plaintext", username, password) # Some IOT devices will not raise exception in self.conn._transport.auth_password / self.conn._transport.auth_publickey stdin, stdout, stderr = self.conn.exec_command("id") stdout = stdout.read().decode("utf-8", errors="ignore") except Exception as e: if self.args.key_file: password = f"{process_secret(password)} (keyfile: {self.args.key_file})" self.logger.fail(f"{username}:{password} {e}") self.conn.close() return False else: shell_access = False host_id = self.db.get_hosts(self.host)[0].id if not stdout: stdin, stdout, stderr = self.conn.exec_command("whoami /priv") stdout = stdout.read().decode("utf-8", errors="ignore") self.server_os_platform = "Windows" self.user_principal = "admin" if "SeDebugPrivilege" in stdout: self.admin_privs = True elif "SeUndockPrivilege" in stdout: self.admin_privs = True self.user_principal = "admin (UAC)" else: # non admin (low priv) self.user_principal = "admin (low priv)" if not stdout: self.logger.debug(f"User: {self.username} can't get a basic shell") self.server_os_platform = "Network Devices" shell_access = False else: shell_access = True self.db.add_loggedin_relation(cred_id, host_id, shell=shell_access) if shell_access and self.server_os_platform == "Linux": self.check_if_admin() if self.admin_privs: self.logger.debug(f"User {username} logged in successfully and is root!") if self.args.key_file: self.db.add_admin_user("key", username, password, host_id=host_id, cred_id=cred_id) else: self.db.add_admin_user( "plaintext", username, password, host_id=host_id, cred_id=cred_id, ) if self.args.key_file: password = f"{process_secret(password)} (keyfile: {self.args.key_file})" display_shell_access = "- {} {} {}".format( f"({self.user_principal})" if self.admin_privs else f"(non {self.user_principal})", self.server_os_platform, '- Shell access!' if shell_access else '' ) # Force show pwn3d label self.admin_privs = True self.logger.success(f"{username}:{password} {self.mark_pwned()} {highlight(display_shell_access)}") return True def execute(self, payload=None, get_output=False): if not payload and self.args.execute: payload = self.args.execute if not self.args.no_output: get_output = True try: stdin, stdout, stderr = self.conn.exec_command(f"{payload} 2>&1") except AttributeError: return "" if get_output: self.logger.success("Executed command") if get_output: for line in stdout: self.logger.highlight(line.strip()) return stdout