#!/usr/bin/env python3 # Author: Roberto Rodriguez (@Cyb3rWard0g) # License: GPL-3.0 import nbformat as nbf import glob import yaml from os import path import subprocess import time import argparse # Bannner print(r""" _________.__ ________ ___________ _________ / _____/|__| ____ _____ _____ \_____ \ \_ _____// _____/ \_____ \ | |/ ___\ / \\__ \ / ____/ | __)_ \_____ \ / \| / /_/ > Y Y \/ __ \_/ \ | \/ \ /_______ /|__\___ /|__|_| (____ /\_______ \/_______ /_______ / \/ /_____/ \/ \/ \/ \/ \/ _______ __ ___. __ \ \ _____/ |_ ____\_ |__ ____ ____ | | __ ______ / | \ / _ \ __\/ __ \| __ \ / _ \ / _ \| |/ / / ___/ / | ( <_> ) | \ ___/| \_\ ( <_> | <_> ) < \___ \ \____|__ /\____/|__| \___ >___ /\____/ \____/|__|_ \/____ > \/ \/ \/ \/ \/ V0.1 Creator: Roberto Rodriguez @Cyb3rWard0g License: GPL-3.0 """) # Initial description text = "This script allows you to automate the creation of Jupyter notebooks with sigma rules converted to Elasticsearch query strings by sigmac that you could use to run against an Elasticsearch database" # Initiate the parser parser = argparse.ArgumentParser(description=text) # Add arguments (store_true means no argument needed) parser.add_argument("-v", "--version", help="shows version of script", action="store_true") parser.add_argument("-r", "--rules-directory", help="sigma rules directory. i.e sigma/rules/", type=str , required=False) parser.add_argument("-c", "--field-mappings-config", help="path including file name to field mappings config", type=str , required=False) parser.add_argument("-e", "--es-url", help="elasticsearch url. i.e. http://helk-elasticsearch:9200", type=str , required=False) parser.add_argument("-i", "--es-default-index", help="default elasticsearch index. i.e. logs-*", type=str , required=False) parser.add_argument("-u", "--es-user", help="elasticsearch user name. i.e elastic", type=str , required=False) parser.add_argument("-p", "--es-password", help="elasticsearch user password", type=str , required=False) parser.add_argument("-o", "--output-notebooks-path", help="path to where to write the notebooks. i.e. notebooks/sigma/", type=str , required=False) args = parser.parse_args() if args.version: print("script version 0.1") if args.rules_directory: SIGMARULES = "{}**/*.yml".format(args.rules_directory) if args.field_mappings_config: FIELDMAPPINGSCONFIG = args.field_mappings_config if args.es_url: ESURL = args.es_url if args.es_default_index: DEFAULTESINDEX = args.es_default_index if args.output_notebooks_path: NOTEBOOKSPATH = args.output_notebooks_path if args.es_user: ESUSER = args.es_user if args.es_password: ESPASSWORD = args.es_password SIGMACTARGET = "es-qs" # Reading field Mappings print("[+] Reading index and field mappings from {}..".format(FIELDMAPPINGSCONFIG)) FIELDMAPPINGSDICT = yaml.safe_load(open(FIELDMAPPINGSCONFIG).read()) # ******* Transforming every sigma rule to specific backend ******* print("[+] Translating sigma rules to query strings..") start_time = time.time() originalCounter=0 actualCounter=0 # retrieve all yml file from the Windows category rules sigmaFiles = glob.glob(SIGMARULES, recursive=True) for sfile in sigmaFiles: originalCounter += 1 print(" [>] Processing {} rule".format(sfile)) try: rule = list(yaml.safe_load_all(open(sfile).read())) except yaml.YAMLError as e: print(" [!!] Script could not parse the yaml contents of {} and failed with the following error: {}".format(sfile, e)) # Testing if sigmac can translate sigma rule. sigmacCommand = ["sigmac", "-t", SIGMACTARGET, "-c", FIELDMAPPINGSCONFIG, "{}".format(sfile)] p = subprocess.run(sigmacCommand, stdout=subprocess.PIPE, text=True) detection = p.stdout.splitlines() # If there is an error translating rule, loop moves to the next iteration if not detection: print(" [!!] sigmac returned no queries. Skipping creation of notebook..") continue # Setting indices try: product = rule[0]['logsource'].setdefault('product', None) service = rule[0]['logsource'].setdefault('service', None) category = rule[0]['logsource'].setdefault('category', None) except KeyError: product = None service = None category = None if None not in (product, service): try: ESINDEX = FIELDMAPPINGSDICT['logsources']['{}-{}'.format(product, service)]['index'] except: print( " [!!] {} does not have pre-defined indices for this product and service..".format(FIELDMAPPINGSCONFIG)) ESINDEX = DEFAULTESINDEX else: ESINDEX = DEFAULTESINDEX print( " [++] Setting Elasticsearch index to {}".format(ESINDEX)) # ********* Creating Jupyter Notebook ************* # Extracting field values TITLE = rule[0]['title'] DESCRIPTION = rule[0]['description'] RULE_CONTENT = yaml.dump(rule, sort_keys=False) # Initializing Notebooks Cells nb = nbf.v4.new_notebook() nb['cells'] = [] # *** TITLE & DESCRIPTION**** nb['cells'].append(nbf.v4.new_markdown_cell("""# {} {}""".format(TITLE, DESCRIPTION))) # *** RULE CONTENT **** nb['cells'].append(nbf.v4.new_markdown_cell("""## Rule Content ``` {} ```""".format(RULE_CONTENT))) # *** RULES **** nb['cells'].append(nbf.v4.new_markdown_cell("## Querying Elasticsearch")) # Import Modules nb['cells'].append(nbf.v4.new_markdown_cell("### Import Libraries")) nb['cells'].append(nbf.v4.new_code_cell("""from elasticsearch import Elasticsearch from elasticsearch_dsl import Search import pandas as pd""")) # Initialize Elasticsearch client nb['cells'].append(nbf.v4.new_markdown_cell("### Initialize Elasticsearch client")) if 'ESPASSWORD' in locals(): nb['cells'].append(nbf.v4.new_code_cell("""es = Elasticsearch(['{}'], http_auth=({}, {})) searchContext = Search(using=es, index='{}', doc_type='doc')""".format(ESURL, ESUSER, ESPASSWORD, ESINDEX))) else: nb['cells'].append(nbf.v4.new_code_cell("""es = Elasticsearch(['{}']) searchContext = Search(using=es, index='{}', doc_type='doc')""".format(ESURL, ESINDEX))) # Set ES Query nb['cells'].append(nbf.v4.new_markdown_cell("### Run Elasticsearch Query")) for d in detection: nb['cells'].append(nbf.v4.new_code_cell("""s = searchContext.query('query_string', query='{}') response = s.execute() if response.success(): df = pd.DataFrame((d.to_dict() for d in s.scan()))""".format(d))) # Show Results # Set ES Query nb['cells'].append(nbf.v4.new_markdown_cell("### Show Results")) nb['cells'].append(nbf.v4.new_code_cell("df.head()")) # writing to Jupyter Notebook filename = path.splitext(path.basename(sfile))[0] notebookPath = "{}{}.ipynb".format(NOTEBOOKSPATH,filename) print( " [++] writing sigma rule to jupyter notebook {}".format(notebookPath)) nbf.write(nb, notebookPath) actualCounter += 1 timeTaken = time.time() - start_time print("\n--- {} out of {} sigma rules were turned into notebooks in {} seconds ---".format(actualCounter, originalCounter, timeTaken))