HELK/scripts/sigma2esnotebooks

163 lines
7.3 KiB
Python
Executable File

#!/usr/bin/env python3
# Author: Roberto Rodriguez (@Cyb3rWard0g)
# License: GPL-3.0
import nbformat as nbf
import glob
import yaml
from os import path
import subprocess
import time
import argparse
# Bannner
print(r"""
_________.__ ________ ___________ _________
/ _____/|__| ____ _____ _____ \_____ \ \_ _____// _____/
\_____ \ | |/ ___\ / \\__ \ / ____/ | __)_ \_____ \
/ \| / /_/ > Y Y \/ __ \_/ \ | \/ \
/_______ /|__\___ /|__|_| (____ /\_______ \/_______ /_______ /
\/ /_____/ \/ \/ \/ \/ \/
_______ __ ___. __
\ \ _____/ |_ ____\_ |__ ____ ____ | | __ ______
/ | \ / _ \ __\/ __ \| __ \ / _ \ / _ \| |/ / / ___/
/ | ( <_> ) | \ ___/| \_\ ( <_> | <_> ) < \___ \
\____|__ /\____/|__| \___ >___ /\____/ \____/|__|_ \/____ >
\/ \/ \/ \/ \/ V0.1
Creator: Roberto Rodriguez @Cyb3rWard0g
License: GPL-3.0
""")
# Initial description
text = "This script allows you to automate the creation of Jupyter notebooks with sigma rules converted to Elasticsearch query strings by sigmac that you could use to run against an Elasticsearch database"
# Initiate the parser
parser = argparse.ArgumentParser(description=text)
# Add arguments (store_true means no argument needed)
parser.add_argument("-v", "--version", help="shows version of script", action="store_true")
parser.add_argument("-r", "--rules-directory", help="sigma rules directory. i.e sigma/rules/", type=str , required=False)
parser.add_argument("-c", "--field-mappings-config", help="path including file name to field mappings config", type=str , required=False)
parser.add_argument("-e", "--es-url", help="elasticsearch url. i.e. http://helk-elasticsearch:9200", type=str , required=False)
parser.add_argument("-i", "--es-default-index", help="default elasticsearch index. i.e. logs-*", type=str , required=False)
parser.add_argument("-u", "--es-user", help="elasticsearch user name. i.e elastic", type=str , required=False)
parser.add_argument("-p", "--es-password", help="elasticsearch user password", type=str , required=False)
parser.add_argument("-o", "--output-notebooks-path", help="path to where to write the notebooks. i.e. notebooks/sigma/", type=str , required=False)
args = parser.parse_args()
if args.version:
print("script version 0.1")
if args.rules_directory:
SIGMARULES = "{}**/*.yml".format(args.rules_directory)
if args.field_mappings_config:
FIELDMAPPINGSCONFIG = args.field_mappings_config
if args.es_url:
ESURL = args.es_url
if args.es_default_index:
DEFAULTESINDEX = args.es_default_index
if args.output_notebooks_path:
NOTEBOOKSPATH = args.output_notebooks_path
if args.es_user:
ESUSER = args.es_user
if args.es_password:
ESPASSWORD = args.es_password
SIGMACTARGET = "es-qs"
# Reading field Mappings
print("[+] Reading index and field mappings from {}..".format(FIELDMAPPINGSCONFIG))
FIELDMAPPINGSDICT = yaml.safe_load(open(FIELDMAPPINGSCONFIG).read())
# ******* Transforming every sigma rule to specific backend *******
print("[+] Translating sigma rules to query strings..")
start_time = time.time()
originalCounter=0
actualCounter=0
# retrieve all yml file from the Windows category rules
sigmaFiles = glob.glob(SIGMARULES, recursive=True)
for sfile in sigmaFiles:
originalCounter += 1
print(" [>] Processing {} rule".format(sfile))
try:
rule = list(yaml.safe_load_all(open(sfile).read()))
except yaml.YAMLError as e:
print(" [!!] Script could not parse the yaml contents of {} and failed with the following error: {}".format(sfile, e))
# Testing if sigmac can translate sigma rule.
sigmacCommand = ["sigmac", "-t", SIGMACTARGET, "-c", FIELDMAPPINGSCONFIG, "{}".format(sfile)]
p = subprocess.run(sigmacCommand, stdout=subprocess.PIPE, text=True)
detection = p.stdout.splitlines()
# If there is an error translating rule, loop moves to the next iteration
if not detection:
print(" [!!] sigmac returned no queries. Skipping creation of notebook..")
continue
# Setting indices
try:
product = rule[0]['logsource'].setdefault('product', None)
service = rule[0]['logsource'].setdefault('service', None)
category = rule[0]['logsource'].setdefault('category', None)
except KeyError:
product = None
service = None
category = None
if None not in (product, service):
try:
ESINDEX = FIELDMAPPINGSDICT['logsources']['{}-{}'.format(product, service)]['index']
except:
print( " [!!] {} does not have pre-defined indices for this product and service..".format(FIELDMAPPINGSCONFIG))
ESINDEX = DEFAULTESINDEX
else:
ESINDEX = DEFAULTESINDEX
print( " [++] Setting Elasticsearch index to {}".format(ESINDEX))
# ********* Creating Jupyter Notebook *************
# Extracting field values
TITLE = rule[0]['title']
DESCRIPTION = rule[0]['description']
RULE_CONTENT = yaml.dump(rule, sort_keys=False)
# Initializing Notebooks Cells
nb = nbf.v4.new_notebook()
nb['cells'] = []
# *** TITLE & DESCRIPTION****
nb['cells'].append(nbf.v4.new_markdown_cell("""# {}
{}""".format(TITLE, DESCRIPTION)))
# *** RULE CONTENT ****
nb['cells'].append(nbf.v4.new_markdown_cell("""## Rule Content
```
{}
```""".format(RULE_CONTENT)))
# *** RULES ****
nb['cells'].append(nbf.v4.new_markdown_cell("## Querying Elasticsearch"))
# Import Modules
nb['cells'].append(nbf.v4.new_markdown_cell("### Import Libraries"))
nb['cells'].append(nbf.v4.new_code_cell("""from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
import pandas as pd"""))
# Initialize Elasticsearch client
nb['cells'].append(nbf.v4.new_markdown_cell("### Initialize Elasticsearch client"))
if 'ESPASSWORD' in locals():
nb['cells'].append(nbf.v4.new_code_cell("""es = Elasticsearch(['{}'], http_auth=({}, {}))
searchContext = Search(using=es, index='{}', doc_type='doc')""".format(ESURL, ESUSER, ESPASSWORD, ESINDEX)))
else:
nb['cells'].append(nbf.v4.new_code_cell("""es = Elasticsearch(['{}'])
searchContext = Search(using=es, index='{}', doc_type='doc')""".format(ESURL, ESINDEX)))
# Set ES Query
nb['cells'].append(nbf.v4.new_markdown_cell("### Run Elasticsearch Query"))
for d in detection:
nb['cells'].append(nbf.v4.new_code_cell("""s = searchContext.query('query_string', query='{}')
response = s.execute()
if response.success():
df = pd.DataFrame((d.to_dict() for d in s.scan()))""".format(d)))
# Show Results
# Set ES Query
nb['cells'].append(nbf.v4.new_markdown_cell("### Show Results"))
nb['cells'].append(nbf.v4.new_code_cell("df.head()"))
# writing to Jupyter Notebook
filename = path.splitext(path.basename(sfile))[0]
notebookPath = "{}{}.ipynb".format(NOTEBOOKSPATH,filename)
print( " [++] writing sigma rule to jupyter notebook {}".format(notebookPath))
nbf.write(nb, notebookPath)
actualCounter += 1
timeTaken = time.time() - start_time
print("\n--- {} out of {} sigma rules were turned into notebooks in {} seconds ---".format(actualCounter, originalCounter, timeTaken))