diff --git a/moment.py b/moment.py old mode 100644 new mode 100755 index 58a3681..902737e --- a/moment.py +++ b/moment.py @@ -1,8 +1,12 @@ -import grequests, argparse, json +#! /usr/bin/python3 + +import grequests, argparse, json, requests from h2spacex import h2_frames, H2OnTlsConnection from time import sleep from urllib.parse import urlparse from ast import literal_eval +from colorama import Fore, Style + headers = {} settings = {} @@ -23,13 +27,15 @@ def parser(): parser.add_argument("-d", "--delimiter", default='=') parser.add_argument("-H", "--headers", type=str, default='') parser.add_argument("-p", "--port", default=443, type=int) - parser.add_argument("-t", "--type", default='json', choices=['text']) + parser.add_argument("-t", "--type", default='text', choices=['json']) + parser.add_argument("-sp", "--singlepacket", default=0, type=int) return parser.parse_args() def configure_session(): args = parser() headers.update({k: v for k,v in (*[('Cookie',c) for c in args.cookie], ('User-Agent', args.uagent)) if v}) + if args.headers: headers.update({k.strip():v.strip() for k, v in [x.split(': ') for x in args.headers.splitlines()]}) settings.update({ 'url': args.url, 'delimiter': args.delimiter, @@ -40,7 +46,8 @@ def configure_session(): 'output': args.output, 'header-string': args.headers, 'port': args.port, - 'type': args.type + 'type': args.type, + 'single-packet': args.singlepacket, }) if args.body: body.update({ @@ -72,27 +79,40 @@ def get_body(key, sub): elif t == 'text': return '&'.join(f"{k}={v}" for k, v in [*body.items(), (key, sub)]) def append_seq(seq): - return f"{u}?seq={seq}" if '?' not in (u:=settings['url']) else f"{u}&seq={c}" + return f"{u}?seq={seq}" if '?' not in (u:=settings['url']) else f"{u}&seq={seq}" + +def format_prepped_request(prepped): + # prepped has .method, .path_url, .headers and .body attribute to view the request + body = prepped.body + if body: body.encode() + headers = '\n'.join([f'{k}: {v}' for k, v in prepped.headers.items()]) + return f"""{prepped.method} {prepped.path_url} HTTP/1.1\n{headers}\n\n{body}""" #does quick analysis with a known password to see if timing attacks might be possible def start(): outp = {} k, v = next(iter(settings['sequence'].items())) seq = get_sequence(v) + for ch in chunk(seq, settings['rate-limit'], len(v)): if (t:=settings['type']) == 'json': rs = (grequests.post(append_seq(c), json=get_body(k, c), headers=headers) for c in ch) - elif t == 'text': rs = (grequests.post(append_seq(c), body=get_body(k,c), headers=headers) for c in ch) + elif t == 'text': rs = (grequests.post(append_seq(c), data=get_body(k,c), headers=headers) for c in ch) for resp in grequests.map(rs): if resp is None: - print("failed...") + print("failed to get response...") continue - outp[resp.url] = { - 'status_code': resp.status_code, - 'response_time': resp.elapsed.total_seconds(), - } + + resps = [resp] + resp.history + for r in resps: + outp[r.url] = { + 'status_code': r.status_code, + 'response_time': r.elapsed.total_seconds(), + } + if settings['verbose']: outp[r.url] |= {'request': format_prepped_request(r.request), 'response': r.text} return seq, outp def create_single_packets(bodies): + #single packet attack using h2spacex as demonstrated in the docs parsed = urlparse(settings['url']) @@ -142,7 +162,7 @@ def create_single_packets(bodies): h2_conn.send_bytes(temp_headers_bytes) # wait some time - sleep(0.5) + sleep(0.1) # send ping frame to warm up connection h2_conn.send_ping_frame() @@ -165,23 +185,29 @@ def create_single_packets(bodies): def analyze(sequence, outp): outp = dict(sorted(outp.items(), key=lambda x: x[1]['response_time'])) print(json.dumps(outp, indent=2)) - # TODO: compare sequence to the order of the responses when sorted by time, if we see a pattern then a timing attack could be possible - + keylist = [*outp] + possible = True and outp + for i in range(len(keylist)): + if len(keylist[i-1]) > len(keylist[i]): possible = False + if possible: print(Fore.RED,"TIMING ATTACK POSSIBLE!") + def build_header_string(): #removes trailing newlines and content length headers automatically outp = h.rstrip('\n')+'\n' if (h:='\n'.join([x for x in settings['header-string'].splitlines() if 'Content-Length' not in x])) else '' outp += '\n'.join(f"{k}: {v}" for k,v in headers.items()) - print(outp) return outp if __name__ == "__main__": configure_session() - #resp = start() - #analyze(*resp) - k, v = next(iter(settings['sequence'].items())) - seq = get_sequence(v) - #bodies = [json.dumps(body | {k : c}) for c in seq][-3:] - bodies = [get_body(k, v) for _ in range(50)] - print(bodies) - create_single_packets(bodies) \ No newline at end of file + resp = start() + analyze(*resp) + + if sp:=settings['single-packet']: + + k, v = next(iter(settings['sequence'].items())) + seq = get_sequence(v) + + #bodies = [json.dumps(body | {k : c}) for c in seq][-3:] + bodies = [get_body(k, v) for _ in range(sp)] + create_single_packets(bodies) \ No newline at end of file