mirror of https://github.com/infosecn1nja/HELK.git
524 lines
16 KiB
524 lines
16 KiB
"cells": [
"cell_type": "markdown",
"metadata": {},
"source": [
"# Introduction to Spark Graphframes and Sysmon\n",
"## Goals:\n",
"* Get familiarized with the basics of Spark Graphframes\n",
"* Confirm Jupyter can import Graphframes library\n",
"* Confirm Spark can read data from Elasticsearch\n",
"* Learn to create a graphframe from sysmon Index\n",
"* Learn the basics of GraphFrames Motifs"
"cell_type": "markdown",
"metadata": {},
"source": [
"## What is Graphframes?\n",
"GraphFrames is a package for Apache Spark which provides DataFrame-based Graphs. \n",
"* It provides high-level APIs in Scala, Java, and Python. \n",
"* It aims to provide both the functionality of GraphX and extended functionality taking advantage of Spark DataFrames.\n",
"* This extended functionality includes motif finding, DataFrame-based serialization, and highly expressive graph queries."
"cell_type": "markdown",
"metadata": {},
"source": [
"### What is a graph?\n",
"GraphFrames represent graphs: \n",
"* Vertices (e.g., users)\n",
"* Edges (e.g., relationships between users)."
"cell_type": "markdown",
"metadata": {},
"source": [
"## Import SparkSession Class"
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession"
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create a SparkSession instance\n",
"* Define a **spark** variable\n",
"* Pass values to the **appName** and **master** functions\n",
" * For the master function, we are going to use the HELK's Spark Master container (helk-spark-master)\n",
"* This time add the **config()** function to set Elasticsearch information needed to read from it"
"cell_type": "markdown",
"metadata": {},
"source": [
"[**config(key=None, value=None, conf=None)**](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.Builder.config)\n",
"* Sets a config option.\n",
"* Options set using this method are automatically propagated to both SparkConf and SparkSession’s own configuration."
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"spark = SparkSession.builder \\\n",
" .appName(\"HELK Graphs\") \\\n",
" .master(\"spark://helk-spark-master:7077\") \\\n",
" .config(\"es.read.field.as.array.include\", \"tags\") \\\n",
" .config(\"es.nodes\",\"helk-elasticsearch:9200\") \\\n",
" .config(\"es.net.http.auth.user\",\"elastic\") \\\n",
" .config(\"es.net.http.auth.pass\",\"elasticpassword\") \\\n",
" .enableHiveSupport() \\\n",
" .getOrCreate()\n",
" #If you are using elastic TRIAL license, then you need the es.net.http.auth.pass value\n",
" #If you are using elastic BASIC license, then you can remove the es.net.http.auth.pass value"
"cell_type": "markdown",
"metadata": {},
"source": [
"## Check the SparkSession variable"
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
"data": {
"text/html": [
" <div>\n",
" <p><b>SparkSession - hive</b></p>\n",
" \n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
" <p><a href=\"http://403892d82956:4041\">Spark UI</a></p>\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v2.4.0</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>spark://helk-spark-master:7077</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>HELK Graphs</code></dd>\n",
" </dl>\n",
" </div>\n",
" \n",
" </div>\n",
" "
"text/plain": [
"<pyspark.sql.session.SparkSession at 0x7f23dc07e898>"
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
"source": [
"cell_type": "markdown",
"metadata": {},
"source": [
"## Import Graphframes & SQL Functions"
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from graphframes import *\n",
"from pyspark.sql.functions import *"
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"| id|inDegree|\n",
"| c| 1|\n",
"| b| 2|\n",
"CPU times: user 492 ms, sys: 1.2 s, total: 1.69 s\n",
"Wall time: 53.2 s\n"
"source": [
"# Create a Vertex DataFrame with unique ID column \"id\"# Creat \n",
"v = spark.createDataFrame([\n",
" (\"a\", \"Alice\", 34),\n",
" (\"b\", \"Bob\", 36),\n",
" (\"c\", \"Charlie\", 30),\n",
"], [\"id\", \"name\", \"age\"])\n",
"# Create an Edge DataFrame with \"src\" and \"dst\" columns\n",
"e = spark.createDataFrame([\n",
" (\"a\", \"b\", \"friend\"),\n",
" (\"b\", \"c\", \"follow\"),\n",
" (\"c\", \"b\", \"follow\"),\n",
"], [\"src\", \"dst\", \"relationship\"])\n",
"# Create a GraphFrame\n",
"from graphframes import *\n",
"g = GraphFrame(v, e)\n",
"# Query: Get in-degree of each vertex.\n",
"# Query: Count the number of \"follow\" connections in the graph.\n",
"g.edges.filter(\"relationship = 'follow'\").count()"
"cell_type": "markdown",
"metadata": {},
"source": [
"## Read data from the HELK Elasticsearch via Spark SQL"
"cell_type": "markdown",
"metadata": {},
"source": [
"### Using the Dataframe API to access Elasticsearch index (Elasticsearch-Sysmon Index)"
"cell_type": "markdown",
"metadata": {},
"source": [
"* As we know, Spark SQL is a Spark module for structured data processing, and provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.\n",
"* Elasticsearch becomes a native source for Spark SQL so that data can be indexed and queried from Spark SQL transparently\n",
"* Spark SQL works with structured data - in other words, all entries are expected to have the same structure (same number of fields, of the same type and name)\n",
"* Using unstructured data (documents with different structures) is not supported and will cause problems.\n",
"* Through the **org.elasticsearch.spark.sql** package, esDF methods are available on the SQLContext API\n",
"Reference: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html"
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"es_reader = (spark\n",
" .read\n",
" .format(\"org.elasticsearch.spark.sql\")\n",
" .option(\"inferSchema\", \"true\")\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"[**load(path=None, format=None, schema=None, **options)**](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.load)\n",
"* Loads data from a data source and returns it as a :class`DataFrame`."
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 0 ns, sys: 4 ms, total: 4 ms\n",
"Wall time: 2.2 s\n"
"source": [
"sysmon_df = es_reader.load(\"logs-endpoint-winevent-sysmon-*/doc\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"# ProcessCreate & Motifs"
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Vertices Dataframe"
"cell_type": "markdown",
"metadata": {},
"source": [
"We are going to replace the column name from **process_guid** to **id** because thats the column name that Graphframes uses for the row IDs.\n",
"[**withColumn(colName, col)**](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join)\n",
"* Returns a new DataFrame by adding a column or replacing the existing column that has the same name.\n",
"* The column expression must be an expression over this DataFrame; attempting to add a column from some other dataframe will raise an error."
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"vertices = (sysmon_df.withColumn(\"id\", sysmon_df.process_guid)\n",
" .select(\"id\",\"user_name\",\"host_name\",\"process_parent_name\",\"process_name\",\"action\")\n",
" )\n",
"vertices = vertices.filter(vertices.action == \"processcreate\")"
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"|id |user_name |host_name |process_parent_name|process_name |action |\n",
"|1C9FDC81-9806-5C6F-0000-00100CDDD000|system |DESKTOP-LFD11QP.RIVENDELL.local|cmd.exe |conhost.exe |processcreate|\n",
"|1C9FDC81-9806-5C6F-0000-001051DAD000|system |DESKTOP-LFD11QP.RIVENDELL.local|svchost.exe |taskhostw.exe|processcreate|\n",
"|1C9FDC81-9807-5C6F-0000-00100EEED000|system |DESKTOP-LFD11QP.RIVENDELL.local|svchost.exe |wsqmcons.exe |processcreate|\n",
"|1C9FDC81-9809-5C6F-0000-00100E28D100|network service|DESKTOP-LFD11QP.RIVENDELL.local|gpupdate.exe |conhost.exe |processcreate|\n",
"|1C9FDC81-980A-5C6F-0000-0010903BD100|cbrown |DESKTOP-LFD11QP.RIVENDELL.local|services.exe |svchost.exe |processcreate|\n",
"only showing top 5 rows\n",
"CPU times: user 0 ns, sys: 4 ms, total: 4 ms\n",
"Wall time: 2.55 s\n"
"source": [
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Edges Dataframe"
"cell_type": "markdown",
"metadata": {},
"source": [
"We are going to make sure we also rename our **process_parent_guid** to **src** and **process_guid** to **dst**. This is to look for that relationship across our whole environment"
"cell_type": "markdown",
"metadata": {},
"source": [
"* Projects a set of SQL expressions and returns a new DataFrame.\n",
"* This is a variant of select() that accepts SQL expressions.\n",
"* You can also combine selecting columns and renaming columns in a single step with selectExpr"
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"edges = (sysmon_df\n",
" .filter(sysmon_df.action == \"processcreate\")\n",
" .selectExpr(\"process_parent_guid as src\",\"process_guid as dst\")\n",
" .withColumn(\"relationship\", lit(\"spawned\"))\n",
" )"
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"|src |dst |relationship|\n",
"|1C9FDC81-9806-5C6F-0000-001054D8D000|1C9FDC81-9806-5C6F-0000-00100CDDD000|spawned |\n",
"|1C9FDC81-84C9-5C6D-0000-001065210100|1C9FDC81-9806-5C6F-0000-001051DAD000|spawned |\n",
"|1C9FDC81-84C9-5C6D-0000-001065210100|1C9FDC81-9807-5C6F-0000-00100EEED000|spawned |\n",
"|1C9FDC81-9806-5C6F-0000-00102CEAD000|1C9FDC81-9809-5C6F-0000-00100E28D100|spawned |\n",
"|1C9FDC81-84C7-5C6D-0000-001025A90000|1C9FDC81-980A-5C6F-0000-0010903BD100|spawned |\n",
"only showing top 5 rows\n",
"CPU times: user 0 ns, sys: 4 ms, total: 4 ms\n",
"Wall time: 458 ms\n"
"source": [
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create a Graph (Vertices & Edges DataFrames)"
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"g = GraphFrame(vertices, edges)"
"cell_type": "markdown",
"metadata": {},
"source": [
"## Process A spawning Process B AND Process B Spawning Process C"
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 0 ns, sys: 4 ms, total: 4 ms\n",
"Wall time: 208 ms\n"
"source": [
"motifs = g.find(\"(a)-[]->(b);(b)-[]->(c)\")"
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 4 ms, sys: 4 ms, total: 8 ms\n",
"Wall time: 18.1 s\n"
"source": [
" .select(\"a.process_parent_name\",\"a.process_name\",\"b.process_name\",\"c.process_name\")\n",
" .show(20,truncate=False)\n",
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 12 ms, sys: 0 ns, total: 12 ms\n",
"Wall time: 12.9 s\n"
"source": [
"metadata": {
"kernelspec": {
"display_name": "PySpark_Python3",
"language": "python",
"name": "pyspark3"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
"nbformat": 4,
"nbformat_minor": 2