implemented single packet poc from h2spacex, still needs work relating to analysis
parent
abb699fe4c
commit
2bdf6fbd81
113
moment.py
113
moment.py
|
@ -1,4 +1,8 @@
|
|||
import grequests, argparse, json
|
||||
from h2spacex import h2_frames, H2OnTlsConnection
|
||||
from time import sleep
|
||||
from urllib.parse import urlparse
|
||||
from ast import literal_eval
|
||||
|
||||
headers = {}
|
||||
settings = {}
|
||||
|
@ -17,12 +21,15 @@ def parser():
|
|||
parser.add_argument("-r", "--rate", default=RATE_LIMIT, type=int)
|
||||
parser.add_argument("-o", "--output", default='output.json')
|
||||
parser.add_argument("-d", "--delimiter", default='=')
|
||||
parser.add_argument("-H", "--headers", type=str, default='')
|
||||
parser.add_argument("-p", "--port", default=443, type=int)
|
||||
parser.add_argument("-t", "--type", default='json', choices=['text'])
|
||||
return parser.parse_args()
|
||||
|
||||
def configure_session():
|
||||
args = parser()
|
||||
|
||||
headers.update({k: v for k,v in (('Cookie',args.cookie), ('User-Agent', args.uagent)) if v})
|
||||
headers.update({k: v for k,v in (*[('Cookie',c) for c in args.cookie], ('User-Agent', args.uagent)) if v})
|
||||
settings.update({
|
||||
'url': args.url,
|
||||
'delimiter': args.delimiter,
|
||||
|
@ -31,10 +38,13 @@ def configure_session():
|
|||
'rate-limit': args.rate,
|
||||
'body': args.body,
|
||||
'output': args.output,
|
||||
'header-string': args.headers,
|
||||
'port': args.port,
|
||||
'type': args.type
|
||||
})
|
||||
if args.body:
|
||||
body.update({
|
||||
k:v for k,v in [x.split(args.delimiter) for x in args.body.split('&')]
|
||||
k:v if v not in ['true','false'] else literal_eval(v.capitalize()) for k,v in [x.split(args.delimiter) for x in args.body.split('&')]
|
||||
})
|
||||
if args.verbose:
|
||||
pretty_print("Settings:", settings)
|
||||
|
@ -57,8 +67,12 @@ def get_sequence(s):
|
|||
for i in range(len(s)+1): yield s[:i]
|
||||
|
||||
#takes a substring from the sequence and returns a post body
|
||||
def get_body(sub):
|
||||
body = {}
|
||||
def get_body(key, sub):
|
||||
if (t:=settings['type']) == 'json': return body | {key: sub}
|
||||
elif t == 'text': return '&'.join(f"{k}={v}" for k, v in [*body.items(), (key, sub)])
|
||||
|
||||
def append_seq(seq):
|
||||
return f"{u}?seq={seq}" if '?' not in (u:=settings['url']) else f"{u}&seq={c}"
|
||||
|
||||
#does quick analysis with a known password to see if timing attacks might be possible
|
||||
def start():
|
||||
|
@ -66,7 +80,8 @@ def start():
|
|||
k, v = next(iter(settings['sequence'].items()))
|
||||
seq = get_sequence(v)
|
||||
for ch in chunk(seq, settings['rate-limit'], len(v)):
|
||||
rs = (grequests.post(f"{settings['url']}?seq={c}", json=body | {k: c}, headers=headers) for c in ch)
|
||||
if (t:=settings['type']) == 'json': rs = (grequests.post(append_seq(c), json=get_body(k, c), headers=headers) for c in ch)
|
||||
elif t == 'text': rs = (grequests.post(append_seq(c), body=get_body(k,c), headers=headers) for c in ch)
|
||||
for resp in grequests.map(rs):
|
||||
if resp is None:
|
||||
print("failed...")
|
||||
|
@ -77,12 +92,96 @@ def start():
|
|||
}
|
||||
return seq, outp
|
||||
|
||||
def create_single_packets(bodies):
|
||||
parsed = urlparse(settings['url'])
|
||||
|
||||
|
||||
h2_conn = H2OnTlsConnection(
|
||||
hostname=parsed.netloc,
|
||||
port_number=settings['port']
|
||||
)
|
||||
|
||||
h2_conn.setup_connection()
|
||||
|
||||
head = build_header_string()
|
||||
|
||||
try_num = len(bodies)
|
||||
|
||||
stream_ids_list = h2_conn.generate_stream_ids(number_of_streams=try_num)
|
||||
|
||||
all_headers_frames = [] # all headers frame + data frames which have not the last byte
|
||||
all_data_frames = [] # all data frames which contain the last byte
|
||||
|
||||
temp_string = ''
|
||||
|
||||
for i in range(0, try_num):
|
||||
header_frames_without_last_byte, last_data_frame_with_last_byte = h2_conn.create_single_packet_http2_post_request_frames(
|
||||
method='POST',
|
||||
headers_string=head+f'Content-Length: {len(bodies[i])}\n',
|
||||
scheme=parsed.scheme,
|
||||
stream_id=stream_ids_list[i],
|
||||
authority=parsed.netloc,
|
||||
body=bodies[i],
|
||||
path=parsed.path
|
||||
)
|
||||
|
||||
all_headers_frames.append(header_frames_without_last_byte)
|
||||
all_data_frames.append(last_data_frame_with_last_byte)
|
||||
|
||||
# concatenate all headers bytes
|
||||
temp_headers_bytes = b''
|
||||
for h in all_headers_frames: temp_headers_bytes += bytes(h)
|
||||
|
||||
|
||||
# concatenate all data frames which have last byte
|
||||
temp_data_bytes = b''
|
||||
for d in all_data_frames: temp_data_bytes += bytes(d)
|
||||
|
||||
#print(temp_headers_bytes, temp_data_bytes, sep='\n\n')
|
||||
|
||||
h2_conn.send_bytes(temp_headers_bytes)
|
||||
|
||||
# wait some time
|
||||
sleep(0.5)
|
||||
|
||||
# send ping frame to warm up connection
|
||||
h2_conn.send_ping_frame()
|
||||
|
||||
# send remaining data frames
|
||||
h2_conn.send_bytes(temp_data_bytes)
|
||||
|
||||
resp = h2_conn.read_response_from_socket(_timeout=5)
|
||||
frame_parser = h2_frames.FrameParser(h2_connection=h2_conn)
|
||||
frame_parser.add_frames(resp)
|
||||
frame_parser.show_response_of_sent_requests()
|
||||
|
||||
print('---')
|
||||
|
||||
print(temp_string)
|
||||
|
||||
sleep(3)
|
||||
h2_conn.close_connection()
|
||||
|
||||
def analyze(sequence, outp):
|
||||
outp = dict(sorted(outp.items(), key=lambda x: x[1]['response_time']))
|
||||
print(json.dumps(outp, indent=2))
|
||||
# TODO: compare sequence to the order of the responses when sorted by time, if we see a pattern then a timing attack could be possible
|
||||
|
||||
|
||||
def build_header_string():
|
||||
#removes trailing newlines and content length headers automatically
|
||||
outp = h.rstrip('\n')+'\n' if (h:='\n'.join([x for x in settings['header-string'].splitlines() if 'Content-Length' not in x])) else ''
|
||||
outp += '\n'.join(f"{k}: {v}" for k,v in headers.items())
|
||||
print(outp)
|
||||
return outp
|
||||
|
||||
if __name__ == "__main__":
|
||||
configure_session()
|
||||
resp = start()
|
||||
analyze(*resp)
|
||||
#resp = start()
|
||||
#analyze(*resp)
|
||||
k, v = next(iter(settings['sequence'].items()))
|
||||
seq = get_sequence(v)
|
||||
#bodies = [json.dumps(body | {k : c}) for c in seq][-3:]
|
||||
bodies = [get_body(k, v) for _ in range(50)]
|
||||
print(bodies)
|
||||
create_single_packets(bodies)
|
Loading…
Reference in New Issue