diff --git a/nxc/modules/rdp.py b/nxc/modules/rdp.py index 60bb6b3e..30669222 100644 --- a/nxc/modules/rdp.py +++ b/nxc/modules/rdp.py @@ -16,7 +16,7 @@ from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY class NXCModule: name = "rdp" description = "Enables/Disables RDP" - supported_protocols = ["smb" ,"wmi"] + supported_protocols = ["smb", "wmi"] opsec_safe = True multiple_hosts = True @@ -73,23 +73,23 @@ class NXCModule: if self.method == "smb": context.log.info("Executing over SMB(ncacn_np)") try: - smb_rdp = rdp_SMB(context, connection) + smb_rdp = RdpSmb(context, connection) if "ram" in self.action: - smb_rdp.rdp_RAMWrapper(self.action) + smb_rdp.rdp_ram_wrapper(self.action) else: - smb_rdp.rdp_Wrapper(self.action) + smb_rdp.rdp_wrapper(self.action) except Exception as e: context.log.fail(f"Enable RDP via smb error: {str(e)}") elif self.method == "wmi": context.log.info("Executing over WMI(ncacn_ip_tcp)") - wmi_rdp = rdp_WMI(context, connection, self.dcom_timeout) + wmi_rdp = RdpWmi(context, connection, self.dcom_timeout) if hasattr(wmi_rdp, '_rdp_WMI__iWbemLevel1Login'): if "ram" in self.action: # Nt version under 6 not support RAM. try: - wmi_rdp.rdp_RAMWrapper(self.action) + wmi_rdp.rdp_ram_wrapper(self.action) except Exception as e: if "WBEM_E_NOT_FOUND" in str(e): context.log.fail("System version under NT6 not support restricted admin mode") @@ -98,7 +98,7 @@ class NXCModule: pass else: try: - wmi_rdp.rdp_Wrapper(self.action, self.oldSystem) + wmi_rdp.rdp_wrapper(self.action, self.oldSystem) except Exception as e: if "WBEM_E_INVALID_NAMESPACE" in str(e): context.log.fail('Looks like target system version is under NT6, please add "OLD=true" in module options.') @@ -107,76 +107,77 @@ class NXCModule: pass wmi_rdp._rdp_WMI__dcom.disconnect() -class rdp_SMB: + +class RdpSmb: def __init__(self, context, connection): self.context = context self.__smbconnection = connection.conn self.__execute = connection.execute self.logger = context.log - def rdp_Wrapper(self, action): - remoteOps = RemoteOperations(self.__smbconnection, False) - remoteOps.enableRegistry() + def rdp_wrapper(self, action): + remote_ops = RemoteOperations(self.__smbconnection, False) + remote_ops.enableRegistry() - if remoteOps._RemoteOperations__rrp: - ans = rrp.hOpenLocalMachine(remoteOps._RemoteOperations__rrp) + if remote_ops._RemoteOperations__rrp: + ans = rrp.hOpenLocalMachine(remote_ops._RemoteOperations__rrp) regHandle = ans["phKey"] ans = rrp.hBaseRegOpenKey( - remoteOps._RemoteOperations__rrp, + remote_ops._RemoteOperations__rrp, regHandle, "SYSTEM\\CurrentControlSet\\Control\\Terminal Server", ) - keyHandle = ans["phkResult"] + key_handle = ans["phkResult"] ans = rrp.hBaseRegSetValue( - remoteOps._RemoteOperations__rrp, - keyHandle, + remote_ops._RemoteOperations__rrp, + key_handle, "fDenyTSConnections", rrp.REG_DWORD, 0 if action == "enable" else 1, ) - rtype, data = rrp.hBaseRegQueryValue(remoteOps._RemoteOperations__rrp, keyHandle, "fDenyTSConnections") + rtype, data = rrp.hBaseRegQueryValue(remote_ops._RemoteOperations__rrp, key_handle, "fDenyTSConnections") if int(data) == 0: self.logger.success("Enable RDP via SMB(ncacn_np) successfully") elif int(data) == 1: self.logger.success("Disable RDP via SMB(ncacn_np) successfully") - self.firewall_CMD(action) + self.firewall_cmd(action) if action == "enable": - self.query_RDPPort(remoteOps, regHandle) + self.query_rdp_port(remote_ops, regHandle) try: - remoteOps.finish() - except: + remote_ops.finish() + except Exception: pass - def rdp_RAMWrapper(self, action): - remoteOps = RemoteOperations(self.__smbconnection, False) - remoteOps.enableRegistry() + def rdp_ram_wrapper(self, action): + remote_ops = RemoteOperations(self.__smbconnection, False) + remote_ops.enableRegistry() - if remoteOps._RemoteOperations__rrp: - ans = rrp.hOpenLocalMachine(remoteOps._RemoteOperations__rrp) - regHandle = ans["phKey"] + if remote_ops._RemoteOperations__rrp: + ans = rrp.hOpenLocalMachine(remote_ops._RemoteOperations__rrp) + reg_handle = ans["phKey"] ans = rrp.hBaseRegOpenKey( - remoteOps._RemoteOperations__rrp, - regHandle, + remote_ops._RemoteOperations__rrp, + reg_handle, "System\\CurrentControlSet\\Control\\Lsa", ) - keyHandle = ans["phkResult"] + key_handle = ans["phkResult"] rrp.hBaseRegSetValue( - remoteOps._RemoteOperations__rrp, - keyHandle, + remote_ops._RemoteOperations__rrp, + key_handle, "DisableRestrictedAdmin", rrp.REG_DWORD, 0 if action == "enable-ram" else 1, ) - rtype, data = rrp.hBaseRegQueryValue(remoteOps._RemoteOperations__rrp, keyHandle, "DisableRestrictedAdmin") + rtype, data = rrp.hBaseRegQueryValue(remote_ops._RemoteOperations__rrp, key_handle, "DisableRestrictedAdmin") if int(data) == 0: self.logger.success("Enable RDP Restricted Admin Mode via SMB(ncacn_np) succeed") @@ -184,25 +185,25 @@ class rdp_SMB: self.logger.success("Disable RDP Restricted Admin Mode via SMB(ncacn_np) succeed") try: - remoteOps.finish() - except: + remote_ops.finish() + except Exception: pass - def query_RDPPort(self, remoteOps, regHandle): + def query_rdp_port(self, remoteOps, regHandle): if remoteOps: ans = rrp.hBaseRegOpenKey( remoteOps._RemoteOperations__rrp, regHandle, "SYSTEM\\CurrentControlSet\\Control\\Terminal Server\\WinStations\\RDP-Tcp", ) - keyHandle = ans["phkResult"] + key_handle = ans["phkResult"] - rtype, data = rrp.hBaseRegQueryValue(remoteOps._RemoteOperations__rrp, keyHandle, "PortNumber") + rtype, data = rrp.hBaseRegQueryValue(remoteOps._RemoteOperations__rrp, key_handle, "PortNumber") self.logger.success(f"RDP Port: {str(data)}") # https://github.com/rapid7/metasploit-framework/blob/master/modules/post/windows/manage/enable_rdp.rb - def firewall_CMD(self, action): + def firewall_cmd(self, action): cmd = f"netsh firewall set service type = remotedesktop mode = {action}" self.logger.info("Configure firewall via execute command.") output = self.__execute(cmd, True) @@ -211,7 +212,8 @@ class rdp_SMB: else: self.logger.fail(f"{action.capitalize()} RDP firewall rules via cmd failed, maybe got detected by AV software.") -class rdp_WMI: + +class RdpWmi: def __init__(self, context, connection, timeout): self.logger = context.log self.__currentprotocol = context.protocol @@ -241,9 +243,9 @@ class rdp_WMI: kdcHost=self.__kdcHost, ) - iInterface = self.__dcom.CoCreateInstanceEx(wmi.CLSID_WbemLevel1Login, wmi.IID_IWbemLevel1Login) + i_interface = self.__dcom.CoCreateInstanceEx(wmi.CLSID_WbemLevel1Login, wmi.IID_IWbemLevel1Login) if self.__currentprotocol == "smb": - flag, self.__stringBinding = dcom_FirewallChecker(iInterface, self.__timeout) + flag, self.__stringBinding = dcom_FirewallChecker(i_interface, self.__timeout) if not flag or not self.__stringBinding: error_msg = f'RDP-WMI: Dcom initialization failed on connection with stringbinding: "{self.__stringBinding}", please increase the timeout with the module option "DCOM-TIMEOUT=10". If it\'s still failing maybe something is blocking the RPC connection, please try to use "-o" with "METHOD=smb"' @@ -253,90 +255,98 @@ class rdp_WMI: self.logger.fail(error_msg) if not flag else self.logger.debug(error_msg) # Make it force break function self.__dcom.disconnect() - self.__iWbemLevel1Login = wmi.IWbemLevel1Login(iInterface) + self.__iWbemLevel1Login = wmi.IWbemLevel1Login(i_interface) except Exception as e: - self.logger.fail(f'Unexpected wmi error: {str(e)}, please try to use "-o" with "METHOD=smb"') + self.logger.fail(f'Unexpected wmi error: {e}, please try to use "-o" with "METHOD=smb"') if self.__iWbemLevel1Login in locals(): self.__dcom.disconnect() - def rdp_Wrapper(self, action, old=False): + def rdp_wrapper(self, action, old=False): if old is False: # According to this document: https://learn.microsoft.com/en-us/windows/win32/termserv/win32-tslogonsetting # Authentication level must set to RPC_C_AUTHN_LEVEL_PKT_PRIVACY when accessing namespace "//./root/cimv2/TerminalServices" - iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2/TerminalServices', NULL, NULL) - iWbemServices.get_dce_rpc().set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) + i_wbem_services = self.__iWbemLevel1Login.NTLMLogin( + "//./root/cimv2/TerminalServices", + NULL, + NULL + ) + i_wbem_services.get_dce_rpc().set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) self.__iWbemLevel1Login.RemRelease() - iEnumWbemClassObject = iWbemServices.ExecQuery("SELECT * FROM Win32_TerminalServiceSetting") - iWbemClassObject = iEnumWbemClassObject.Next(0xffffffff,1)[0] - if action == 'enable': + i_enum_wbem_class_object = i_wbem_services.ExecQuery("SELECT * FROM Win32_TerminalServiceSetting") + i_wbem_class_object = i_enum_wbem_class_object.Next(0xffffffff, 1)[0] + if action == "enable": self.logger.info("Enabled RDP services and setting up firewall.") - iWbemClassObject.SetAllowTSConnections(1,1) - elif action == 'disable': + i_wbem_class_object.SetAllowTSConnections(1, 1) + elif action == "disable": self.logger.info("Disabled RDP services and setting up firewall.") - iWbemClassObject.SetAllowTSConnections(0,0) + i_wbem_class_object.SetAllowTSConnections(0, 0) else: - iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2', NULL, NULL) + i_wbem_services = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2', NULL, NULL) self.__iWbemLevel1Login.RemRelease() - iEnumWbemClassObject = iWbemServices.ExecQuery("SELECT * FROM Win32_TerminalServiceSetting") - iWbemClassObject = iEnumWbemClassObject.Next(0xffffffff,1)[0] - if action == 'enable': + i_enum_wbem_class_object = i_wbem_services.ExecQuery("SELECT * FROM Win32_TerminalServiceSetting") + i_wbem_class_object = i_enum_wbem_class_object.Next(0xffffffff, 1)[0] + if action == "enable": self.logger.info("Enabling RDP services (old system not support setting up firewall)") - iWbemClassObject.SetAllowTSConnections(1) - elif action == 'disable': + i_wbem_class_object.SetAllowTSConnections(1) + elif action == "disable": self.logger.info("Disabling RDP services (old system not support setting up firewall)") - iWbemClassObject.SetAllowTSConnections(0) + i_wbem_class_object.SetAllowTSConnections(0) - self.query_RDPResult(old) + self.query_rdp_result(old) if action == 'enable': - self.query_RDPPort() + self.query_rdp_port() # Need to create new iWbemServices interface in order to flush results - def query_RDPResult(self, old=False): + def query_rdp_result(self, old=False): if old is False: - iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2/TerminalServices', NULL, NULL) - iWbemServices.get_dce_rpc().set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) + i_wbem_services = self.__iWbemLevel1Login.NTLMLogin("//./root/cimv2/TerminalServices", NULL, NULL) + i_wbem_services.get_dce_rpc().set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) self.__iWbemLevel1Login.RemRelease() - iEnumWbemClassObject = iWbemServices.ExecQuery("SELECT * FROM Win32_TerminalServiceSetting") - iWbemClassObject = iEnumWbemClassObject.Next(0xffffffff,1)[0] - result = dict(iWbemClassObject.getProperties()) - result = result['AllowTSConnections']['value'] + i_enum_wbem_class_object = i_wbem_services.ExecQuery("SELECT * FROM Win32_TerminalServiceSetting") + i_wbem_class_object = i_enum_wbem_class_object.Next(0xffffffff, 1)[0] + result = dict(i_wbem_class_object.getProperties()) + result = result["AllowTSConnections"]["value"] if result == 0: self.logger.success("Disable RDP via WMI(ncacn_ip_tcp) successfully") else: self.logger.success("Enable RDP via WMI(ncacn_ip_tcp) successfully") else: - iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2', NULL, NULL) + i_wbem_services = self.__iWbemLevel1Login.NTLMLogin("//./root/cimv2", NULL, NULL) self.__iWbemLevel1Login.RemRelease() - iEnumWbemClassObject = iWbemServices.ExecQuery("SELECT * FROM Win32_TerminalServiceSetting") - iWbemClassObject = iEnumWbemClassObject.Next(0xffffffff,1)[0] - result = dict(iWbemClassObject.getProperties()) - result = result['AllowTSConnections']['value'] + i_enum_wbem_class_object = i_wbem_services.ExecQuery("SELECT * FROM Win32_TerminalServiceSetting") + i_wbem_class_object = i_enum_wbem_class_object.Next(0xffffffff, 1)[0] + result = dict(i_wbem_class_object.getProperties()) + result = result["AllowTSConnections"]["value"] if result == 0: self.logger.success("Disable RDP via WMI(ncacn_ip_tcp) successfully (old system)") else: self.logger.success("Enable RDP via WMI(ncacn_ip_tcp) successfully (old system)") - def query_RDPPort(self): - iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/DEFAULT', NULL, NULL) + def query_rdp_port(self): + i_wbem_services = self.__iWbemLevel1Login.NTLMLogin("//./root/DEFAULT", NULL, NULL) self.__iWbemLevel1Login.RemRelease() - StdRegProv, resp = iWbemServices.GetObject("StdRegProv") - out = StdRegProv.GetDWORDValue(2147483650, 'SYSTEM\\CurrentControlSet\\Control\\Terminal Server\\WinStations\\RDP-Tcp', 'PortNumber') + std_reg_prov, resp = i_wbem_services.GetObject("StdRegProv") + out = std_reg_prov.GetDWORDValue( + 2147483650, + "SYSTEM\\CurrentControlSet\\Control\\Terminal Server\\WinStations\\RDP-Tcp", + "PortNumber" + ) self.logger.success(f"RDP Port: {str(out.uValue)}") # Nt version under 6 not support RAM. - def rdp_RAMWrapper(self, action): - iWbemServices = self.__iWbemLevel1Login.NTLMLogin('//./root/cimv2', NULL, NULL) + def rdp_ram_wrapper(self, action): + i_wbem_services = self.__iWbemLevel1Login.NTLMLogin("//./root/cimv2", NULL, NULL) self.__iWbemLevel1Login.RemRelease() - StdRegProv, resp = iWbemServices.GetObject("StdRegProv") - if action == 'enable-ram': + std_reg_prov, resp = i_wbem_services.GetObject("StdRegProv") + if action == "enable-ram": self.logger.info("Enabling Restricted Admin Mode.") - StdRegProv.SetDWORDValue(2147483650, 'System\\CurrentControlSet\\Control\\Lsa', 'DisableRestrictedAdmin', 0) - elif action == 'disable-ram': + std_reg_prov.SetDWORDValue(2147483650, 'System\\CurrentControlSet\\Control\\Lsa', "DisableRestrictedAdmin", 0) + elif action == "disable-ram": self.logger.info("Disabling Restricted Admin Mode (Clear).") - StdRegProv.DeleteValue(2147483650, 'System\\CurrentControlSet\\Control\\Lsa', 'DisableRestrictedAdmin') - out = StdRegProv.GetDWORDValue(2147483650, 'System\\CurrentControlSet\\Control\\Lsa', 'DisableRestrictedAdmin') + std_reg_prov.DeleteValue(2147483650, "System\\CurrentControlSet\\Control\\Lsa", "DisableRestrictedAdmin") + out = std_reg_prov.GetDWORDValue(2147483650, "System\\CurrentControlSet\\Control\\Lsa", "DisableRestrictedAdmin") if out.uValue == 0: self.logger.success("Enable RDP Restricted Admin Mode via WMI(ncacn_ip_tcp) successfully") elif out.uValue is None: - self.logger.success("Disable RDP Restricted Admin Mode via WMI(ncacn_ip_tcp) successfully") \ No newline at end of file + self.logger.success("Disable RDP Restricted Admin Mode via WMI(ncacn_ip_tcp) successfully")