fixed a bug in analyze mode where headers weren't parsed; changed the default body type to text

main
Tonabrix1 2024-08-21 06:40:15 -04:00
parent 2bdf6fbd81
commit 3e8fce9594
1 changed files with 48 additions and 22 deletions

70
moment.py Normal file → Executable file
View File

@ -1,8 +1,12 @@
import grequests, argparse, json #! /usr/bin/python3
import grequests, argparse, json, requests
from h2spacex import h2_frames, H2OnTlsConnection from h2spacex import h2_frames, H2OnTlsConnection
from time import sleep from time import sleep
from urllib.parse import urlparse from urllib.parse import urlparse
from ast import literal_eval from ast import literal_eval
from colorama import Fore, Style
headers = {} headers = {}
settings = {} settings = {}
@ -23,13 +27,15 @@ def parser():
parser.add_argument("-d", "--delimiter", default='=') parser.add_argument("-d", "--delimiter", default='=')
parser.add_argument("-H", "--headers", type=str, default='') parser.add_argument("-H", "--headers", type=str, default='')
parser.add_argument("-p", "--port", default=443, type=int) parser.add_argument("-p", "--port", default=443, type=int)
parser.add_argument("-t", "--type", default='json', choices=['text']) parser.add_argument("-t", "--type", default='text', choices=['json'])
parser.add_argument("-sp", "--singlepacket", default=0, type=int)
return parser.parse_args() return parser.parse_args()
def configure_session(): def configure_session():
args = parser() args = parser()
headers.update({k: v for k,v in (*[('Cookie',c) for c in args.cookie], ('User-Agent', args.uagent)) if v}) headers.update({k: v for k,v in (*[('Cookie',c) for c in args.cookie], ('User-Agent', args.uagent)) if v})
if args.headers: headers.update({k.strip():v.strip() for k, v in [x.split(': ') for x in args.headers.splitlines()]})
settings.update({ settings.update({
'url': args.url, 'url': args.url,
'delimiter': args.delimiter, 'delimiter': args.delimiter,
@ -40,7 +46,8 @@ def configure_session():
'output': args.output, 'output': args.output,
'header-string': args.headers, 'header-string': args.headers,
'port': args.port, 'port': args.port,
'type': args.type 'type': args.type,
'single-packet': args.singlepacket,
}) })
if args.body: if args.body:
body.update({ body.update({
@ -72,27 +79,40 @@ def get_body(key, sub):
elif t == 'text': return '&'.join(f"{k}={v}" for k, v in [*body.items(), (key, sub)]) elif t == 'text': return '&'.join(f"{k}={v}" for k, v in [*body.items(), (key, sub)])
def append_seq(seq): def append_seq(seq):
return f"{u}?seq={seq}" if '?' not in (u:=settings['url']) else f"{u}&seq={c}" return f"{u}?seq={seq}" if '?' not in (u:=settings['url']) else f"{u}&seq={seq}"
def format_prepped_request(prepped):
# prepped has .method, .path_url, .headers and .body attribute to view the request
body = prepped.body
if body: body.encode()
headers = '\n'.join([f'{k}: {v}' for k, v in prepped.headers.items()])
return f"""{prepped.method} {prepped.path_url} HTTP/1.1\n{headers}\n\n{body}"""
#does quick analysis with a known password to see if timing attacks might be possible #does quick analysis with a known password to see if timing attacks might be possible
def start(): def start():
outp = {} outp = {}
k, v = next(iter(settings['sequence'].items())) k, v = next(iter(settings['sequence'].items()))
seq = get_sequence(v) seq = get_sequence(v)
for ch in chunk(seq, settings['rate-limit'], len(v)): for ch in chunk(seq, settings['rate-limit'], len(v)):
if (t:=settings['type']) == 'json': rs = (grequests.post(append_seq(c), json=get_body(k, c), headers=headers) for c in ch) if (t:=settings['type']) == 'json': rs = (grequests.post(append_seq(c), json=get_body(k, c), headers=headers) for c in ch)
elif t == 'text': rs = (grequests.post(append_seq(c), body=get_body(k,c), headers=headers) for c in ch) elif t == 'text': rs = (grequests.post(append_seq(c), data=get_body(k,c), headers=headers) for c in ch)
for resp in grequests.map(rs): for resp in grequests.map(rs):
if resp is None: if resp is None:
print("failed...") print("failed to get response...")
continue continue
outp[resp.url] = {
'status_code': resp.status_code, resps = [resp] + resp.history
'response_time': resp.elapsed.total_seconds(), for r in resps:
} outp[r.url] = {
'status_code': r.status_code,
'response_time': r.elapsed.total_seconds(),
}
if settings['verbose']: outp[r.url] |= {'request': format_prepped_request(r.request), 'response': r.text}
return seq, outp return seq, outp
def create_single_packets(bodies): def create_single_packets(bodies):
#single packet attack using h2spacex as demonstrated in the docs
parsed = urlparse(settings['url']) parsed = urlparse(settings['url'])
@ -142,7 +162,7 @@ def create_single_packets(bodies):
h2_conn.send_bytes(temp_headers_bytes) h2_conn.send_bytes(temp_headers_bytes)
# wait some time # wait some time
sleep(0.5) sleep(0.1)
# send ping frame to warm up connection # send ping frame to warm up connection
h2_conn.send_ping_frame() h2_conn.send_ping_frame()
@ -165,23 +185,29 @@ def create_single_packets(bodies):
def analyze(sequence, outp): def analyze(sequence, outp):
outp = dict(sorted(outp.items(), key=lambda x: x[1]['response_time'])) outp = dict(sorted(outp.items(), key=lambda x: x[1]['response_time']))
print(json.dumps(outp, indent=2)) print(json.dumps(outp, indent=2))
# TODO: compare sequence to the order of the responses when sorted by time, if we see a pattern then a timing attack could be possible keylist = [*outp]
possible = True and outp
for i in range(len(keylist)):
if len(keylist[i-1]) > len(keylist[i]): possible = False
if possible: print(Fore.RED,"TIMING ATTACK POSSIBLE!")
def build_header_string(): def build_header_string():
#removes trailing newlines and content length headers automatically #removes trailing newlines and content length headers automatically
outp = h.rstrip('\n')+'\n' if (h:='\n'.join([x for x in settings['header-string'].splitlines() if 'Content-Length' not in x])) else '' outp = h.rstrip('\n')+'\n' if (h:='\n'.join([x for x in settings['header-string'].splitlines() if 'Content-Length' not in x])) else ''
outp += '\n'.join(f"{k}: {v}" for k,v in headers.items()) outp += '\n'.join(f"{k}: {v}" for k,v in headers.items())
print(outp)
return outp return outp
if __name__ == "__main__": if __name__ == "__main__":
configure_session() configure_session()
#resp = start() resp = start()
#analyze(*resp) analyze(*resp)
k, v = next(iter(settings['sequence'].items()))
seq = get_sequence(v) if sp:=settings['single-packet']:
#bodies = [json.dumps(body | {k : c}) for c in seq][-3:]
bodies = [get_body(k, v) for _ in range(50)] k, v = next(iter(settings['sequence'].items()))
print(bodies) seq = get_sequence(v)
create_single_packets(bodies)
#bodies = [json.dumps(body | {k : c}) for c in seq][-3:]
bodies = [get_body(k, v) for _ in range(sp)]
create_single_packets(bodies)