252 lines
8.0 KiB
Python
Executable File
252 lines
8.0 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# KillerBee Metasploit relay server
|
|
|
|
import re
|
|
import os
|
|
import sys
|
|
import cmd
|
|
import time
|
|
import json
|
|
import base64
|
|
import socket
|
|
import threading
|
|
import pkg_resources # Used to get killerbee version
|
|
|
|
from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer
|
|
from urlparse import parse_qs,urlparse
|
|
from killerbee import *
|
|
|
|
last_errors = 0
|
|
starttime = 0
|
|
packets_sent = 0
|
|
last_sent = 0
|
|
username = None
|
|
password = None
|
|
kb = None
|
|
|
|
class MSFHandler(BaseHTTPRequestHandler):
|
|
def status(self):
|
|
status = {}
|
|
hw_versions = []
|
|
fw_version = pkg_resources.get_distribution("killerbee").version
|
|
device_names = []
|
|
for dev in kbutils.devlist():
|
|
hw_versions.append(dev[2])
|
|
device_names.append(dev[1])
|
|
if len(hw_versions) > 0:
|
|
status["operational"] = 1
|
|
else:
|
|
status["operational"] = 0
|
|
status["hw_specialty"] = { "zigbee": True }
|
|
# TODO: We should check firmware before reporting transmit capabilities
|
|
status["hw_capabilities"] = { "transmit": True}
|
|
status["last_10_errors"] = last_errors
|
|
status["api_version"] = "0.0.3"
|
|
status["fw_version"] = fw_version
|
|
if len(hw_versions) == 1:
|
|
status["hw_version"] = hw_versions[0]
|
|
status["device_name"] = device_names[0]
|
|
elif len(hw_versions) > 1:
|
|
status["hw_version"] = ', '.join(hw_versions)
|
|
status["device_name"] = ', '.join(device_names)
|
|
else:
|
|
status["hw_version"] = "Not Supported"
|
|
return status
|
|
|
|
def statistics(self):
|
|
global packets_sent
|
|
stats = {}
|
|
stats["uptime"] = int(time.time()) - starttime
|
|
stats["packet_stats"] = packets_sent
|
|
stats["last_request"] = last_sent
|
|
stats["voltage"] = "0.0v"
|
|
return stats
|
|
|
|
def datetime(self):
|
|
return { "sytem_datetime": int(time.time()) }
|
|
|
|
def timezone(self):
|
|
return { "system_timezone": time.strftime("%Z") }
|
|
|
|
def set_channel(self, args):
|
|
if not "chan" in args:
|
|
return self.not_supported()
|
|
chan = int(args["chan"][0])
|
|
kb.set_channel(chan)
|
|
return { "success": True }
|
|
|
|
def inject(self, args):
|
|
global packets_sent
|
|
if not "data" in args:
|
|
return self.not_supported()
|
|
try:
|
|
kb.inject(base64.urlsafe_b64decode(args["data"][0]))
|
|
packets_sent+=1
|
|
except Exception, e:
|
|
print("ERROR: Unable to inject packet: {0}".format(e))
|
|
return { "success": False }
|
|
return { "success": True }
|
|
|
|
def recv(self):
|
|
pkt = kb.pnext()
|
|
if pkt != None and pkt[1]:
|
|
return {"data": base64.urlsafe_b64encode(pkt[0]), "valid_crc": pkt[1], "rssi": pkt[2] }
|
|
return {}
|
|
|
|
def sniffer_off(self):
|
|
kb.sniffer_off()
|
|
return {"success": True }
|
|
|
|
def sniffer_on(self):
|
|
kb.sniffer_on()
|
|
return {"success": True }
|
|
|
|
def supported_devices(self):
|
|
devices = []
|
|
for dev in kbutils.devlist():
|
|
devices.append(dev[0])
|
|
return { "devices": devices }
|
|
|
|
def not_supported(self):
|
|
return { "status": "not supported" }
|
|
|
|
def send(self, data, resp=200):
|
|
self.send_response(resp)
|
|
self.send_header('Content-type','application/json')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(data))
|
|
return
|
|
|
|
def do_AUTHHEAD(self):
|
|
self.send_response(401)
|
|
self.send_header('WWW-Authenticate', 'Basic realm=\"Killerbee MSF Relay\"')
|
|
self.send_header('Content-type', 'text/html')
|
|
self.end_headers()
|
|
self.wfile.write("Please Authenticate")
|
|
|
|
def do_GET(self):
|
|
if not password == None:
|
|
if self.headers.getheader('Authorization') == None:
|
|
print("Did not authenticate")
|
|
self.do_AUTHHEAD()
|
|
return
|
|
if not self.headers.getheader('Authorization') == 'Basic '+base64.b64encode(username + ":" + password):
|
|
print("Bad Authentication")
|
|
self.do_AUTHHEAD()
|
|
return
|
|
url = urlparse(self.path)
|
|
args = parse_qs(url.query)
|
|
if self.path=="/status":
|
|
self.send(self.status())
|
|
elif self.path=="/statistics":
|
|
self.send(self.statistics())
|
|
elif self.path=="/settings/datetime":
|
|
self.send(self.datetime())
|
|
elif self.path=="/settings/timezone":
|
|
self.send(self.timezone())
|
|
elif self.path=="/zigbee/supported_devices":
|
|
self.send(self.supported_devices())
|
|
elif self.path.startswith("/zigbee/"):
|
|
re_dev = re.compile("/zigbee/([\d\w:]+)/")
|
|
m = re_dev.match(self.path)
|
|
if m:
|
|
dev = m.group(1)
|
|
if self.path.find("/set_channel?") > -1:
|
|
self.send(self.set_channel(args))
|
|
elif self.path.find("/inject?") > -1:
|
|
self.send(self.inject(args))
|
|
elif self.path.find("/recv") > -1:
|
|
self.send(self.recv())
|
|
elif self.path.find("/sniffer_off") > -1:
|
|
self.send(self.sniffer_off())
|
|
elif self.path.find("/sniffer_on") > -1:
|
|
self.send(self.sniffer_on())
|
|
else:
|
|
self.send(self.not_supported(), 404)
|
|
else:
|
|
self.send(self.not_supported(), 404)
|
|
else:
|
|
self.send(self.not_supported(), 404)
|
|
return
|
|
|
|
class Killerbee_MSFRelay(cmd.Cmd):
|
|
intro = """
|
|
KillerBee Metasploit Relay
|
|
"""
|
|
|
|
def __init__(self, ip='0.0.0.0', port=8080):
|
|
cmd.Cmd.__init__(self)
|
|
|
|
self._ip = ip
|
|
self._port = port
|
|
self._sock = None
|
|
self._pause = False
|
|
|
|
self.start()
|
|
|
|
def start(self):
|
|
self._go = True
|
|
while self._go:
|
|
# serve the NIC port
|
|
try:
|
|
self._sock = HTTPServer((self._ip, self._port), MSFHandler)
|
|
starttime = int(time.time())
|
|
print("KillerBee MSFRelay running.")
|
|
self._sock.serve_forever()
|
|
except KeyboardInterrupt:
|
|
self._sock.socket.close()
|
|
self._go = False
|
|
except:
|
|
sys.excepthook(*sys.exc_info())
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-i', '--iface', '--dev', action='store', dest='devstring')
|
|
parser.add_argument('-u', '--user', default="msf_relay", help='HTTP Username', type=str)
|
|
parser.add_argument('-p', '--password', default="rfcat_relaypass", help='HTTP Password', type=str)
|
|
parser.add_argument('-P', '--Port', default=8080, type=int)
|
|
parser.add_argument('--noauth', default=False, action="store_true", help='Do not require authentication')
|
|
parser.add_argument('--localonly', default=False, action="store_true", help='Listen on localhost only')
|
|
|
|
ifo = parser.parse_args()
|
|
|
|
try:
|
|
kb = KillerBee(device=ifo.devstring)
|
|
except KBInterfaceError as e:
|
|
print("Interface Error: {0}".format(e))
|
|
sys.exit(-1)
|
|
|
|
username = ifo.user
|
|
password = ifo.password
|
|
ip = "0.0.0.0"
|
|
port = ifo.Port
|
|
if ifo.noauth:
|
|
username = None
|
|
password = None
|
|
if ifo.localonly:
|
|
host = "127.0.0.1"
|
|
|
|
wait_msg = False
|
|
dev_found = False
|
|
while not dev_found:
|
|
try:
|
|
devs = kbutils.devlist()
|
|
if len(devs) > 0:
|
|
dev_found = True
|
|
elif not wait_msg:
|
|
print("Insert KillerBee compatible ZigBee device. (You may need to add permissions)")
|
|
wait_msg = True
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
except:
|
|
if not wait_msg:
|
|
print("Insert KillerBee compatible ZigBee device. (You may need to add permissions)")
|
|
wait_msg = True
|
|
|
|
beerelay = Killerbee_MSFRelay(ip, port)
|
|
|
|
import atexit
|
|
atexit.register(cleanupInteractiveAtExit)
|