- Removed functionality due to the deprecation of the argument '-a, --applicable'.

- Renamed the Excel column 'General comment' to 'Technique comment'.
- Improved the function '_load_data_sources' to make use of StringIO instead of writing a temporary file to disk.
- Before the Excel file is created, it is made sure that the date is written in the following format "%Y-%m%d". This is necessary due to the new EQL query functionality.
- Added a try/except block to '_load_data_sources', for when an EQL query resulted in invalid data source administration YAML content.
master
Marcus Bakker 2019-08-08 14:41:34 +02:00
parent 7a0aedb2a3
commit 1d2fd69a5b
1 changed files with 82 additions and 74 deletions

View File

@ -1,62 +1,61 @@
import simplejson
from generic import *
import xlsxwriter
from pprint import pprint
from datetime import datetime
# Imports for pandas and plotly are because of performance reasons in the function that uses these libraries.
def generate_detection_layer(filename_techniques, filename_data_sources, overlay, filter_applicable_to):
def generate_detection_layer(filename_techniques, filename_data_sources, overlay):
"""
Generates layer for detection coverage and optionally an overlaid version with visibility coverage.
:param filename_techniques: the filename of the yaml file containing the techniques administration
:param filename_data_sources: the filename of the yaml file containing the data sources administration
:param filename_techniques: the filename of the YAML file containing the techniques administration
:param filename_data_sources: the filename of the YAML file containing the data sources administration
:param overlay: boolean value to specify if an overlay between detection and visibility should be generated
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return:
"""
if not overlay:
my_techniques, name, platform = load_techniques(filename_techniques, 'detection', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_detection = _map_and_colorize_techniques_for_detections(my_techniques)
layer_detection = get_layer_template_detections('Detections ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_detection, mapped_techniques_detection, 'detection', filter_applicable_to, name)
layer_detection = get_layer_template_detections('Detections ' + name, 'description', 'attack', platform)
_write_layer(layer_detection, mapped_techniques_detection, 'detection', name)
else:
my_techniques, name, platform = load_techniques(filename_techniques, 'all', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques)
my_data_sources = _load_data_sources(filename_data_sources)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, filter_applicable_to)
layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', filter_applicable_to, name)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources)
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name)
def generate_visibility_layer(filename_techniques, filename_data_sources, overlay, filter_applicable_to):
def generate_visibility_layer(filename_techniques, filename_data_sources, overlay):
"""
Generates layer for visibility coverage and optionally an overlaid version with detection coverage.
:param filename_techniques: the filename of the yaml file containing the techniques administration
:param filename_data_sources: the filename of the yaml file containing the data sources administration
:param filename_techniques: the filename of the YAML file containing the techniques administration
:param filename_data_sources: the filename of the YAML file containing the data sources administration
:param overlay: boolean value to specify if an overlay between detection and visibility should be generated
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return:
"""
my_data_sources = _load_data_sources(filename_data_sources)
if not overlay:
my_techniques, name, platform = load_techniques(filename_techniques, 'visibility', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources)
layer_visibility = get_layer_template_visibility('Visibility ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', filter_applicable_to, name)
layer_visibility = get_layer_template_visibility('Visibility ' + name, 'description', 'attack', platform)
_write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', name)
else:
my_techniques, name, platform = load_techniques(filename_techniques, 'all', filter_applicable_to)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, filter_applicable_to)
layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', filter_applicable_to, name)
my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources)
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name)
def plot_detection_graph(filename, filter_applicable_to):
def plot_detection_graph(filename):
"""
Generates a line graph which shows the improvements on detections through the time.
:param filename: the filename of the yaml file containing the techniques administration
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:param filename: the filename of the YAML file containing the techniques administration
:return:
"""
my_techniques, name, platform = load_techniques(filename, 'detection', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename)
graph_values = []
for t in my_techniques.values():
@ -70,49 +69,61 @@ def plot_detection_graph(filename, filter_applicable_to):
df = pd.DataFrame(graph_values).groupby('date', as_index=False)[['count']].sum()
df['cumcount'] = df.ix[::1, 'count'].cumsum()[::1]
output_filename = 'output/graph_detection_%s.html' % filter_applicable_to
output_filename = 'output/graph_detection.html'
import plotly
import plotly.graph_objs as go
plotly.offline.plot(
{'data': [go.Scatter(x=df['date'], y=df['cumcount'])],
'layout': go.Layout(title="# of detections for %s %s" % (name, filter_applicable_to))},
'layout': go.Layout(title="# of detections for %s" % name)},
filename=output_filename, auto_open=False
)
print("File written: " + output_filename)
def _load_data_sources(filename):
def _load_data_sources(file):
"""
Loads the data sources (including all properties) from the given yaml file.
:param filename: the filename of the yaml file containing the data sources administration
:return: dictionary with data sources (including properties)
Loads the data sources (including all properties) from the given YAML file.
:param file: the file location of the YAML file containing the data sources administration or a dict
:return: dictionary with data sources, name, platform and exceptions list.
"""
my_data_sources = {}
_yaml = init_yaml()
with open(filename, 'r') as yaml_file:
yaml_content = _yaml.load(yaml_file)
if isinstance(file, dict):
# file is a dict instance created due to the use of an EQL query by the user
yaml_content = file
else:
# file is a file location on disk
_yaml = init_yaml()
with open(file, 'r') as yaml_file:
yaml_content = _yaml.load(yaml_file)
try:
for d in yaml_content['data_sources']:
dq = d['data_quality']
if dq['device_completeness'] > 0 and dq['data_field_completeness'] > 0 and dq['timeliness'] > 0 and dq['consistency'] > 0:
my_data_sources[d['data_source_name']] = d
except KeyError:
# When using an EQL that does not result in a dict having 'data_sources' objects. Trow an error.
print(EQL_INVALID_RESULT_DS)
pprint(yaml_content)
quit()
return my_data_sources
def _write_layer(layer, mapped_techniques, filename_prefix, filename_suffix, name):
def _write_layer(layer, mapped_techniques, filename_prefix, name):
"""
Writes the json layer file to disk.
:param layer: the prepped layer dictionary
:param mapped_techniques: the techniques section that will be included in the layer
:param filename_prefix: the prefix for the output filename
:param filename_suffix: the suffix for the output filename
:param name: the name that will be used in the filename together with the prefix
:return:
"""
layer['techniques'] = mapped_techniques
json_string = simplejson.dumps(layer).replace('}, ', '},\n')
filename_suffix = '_' + filename_suffix if filename_suffix != '' else ''
output_filename = normalize_name_to_filename('output/%s_%s%s.json' % (filename_prefix, name, filename_suffix))
output_filename = normalize_name_to_filename('output/%s_%s.json' % (filename_prefix, name))
with open(output_filename, 'w') as f:
f.write(json_string)
print("File written: " + output_filename)
@ -120,7 +131,7 @@ def _write_layer(layer, mapped_techniques, filename_prefix, filename_suffix, nam
def _map_and_colorize_techniques_for_detections(my_techniques):
"""
Determine the color of the techniques based on the detection score in the given yaml file.
Determine the color of the techniques based on the detection score in the given YAML file.
:param my_techniques: the configured techniques
:return: a dictionary with techniques that can be used in the layer's output file
"""
@ -159,7 +170,7 @@ def _map_and_colorize_techniques_for_detections(my_techniques):
x['metadata'].append({'name': '-Applicable to', 'value': applicable_to})
x['metadata'].append({'name': '-Detection score', 'value': str(d_score)})
x['metadata'].append({'name': '-Detection location', 'value': location})
x['metadata'].append({'name': '-General comment', 'value': general_comment})
x['metadata'].append({'name': '-Technique comment', 'value': general_comment})
x['metadata'].append({'name': '-Detection comment', 'value': get_latest_comment(detection)})
if cnt != tcnt:
x['metadata'].append({'name': '---', 'value': '---'})
@ -174,7 +185,7 @@ def _map_and_colorize_techniques_for_detections(my_techniques):
def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
"""
Determine the color of the techniques based on the visibility score in the given yaml file.
Determine the color of the techniques based on the visibility score in the given YAML file.
:param my_techniques: the configured techniques
:param my_data_sources: the configured data sources
:return: a dictionary with techniques that can be used in the layer's output file
@ -215,7 +226,7 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
general_comment = str(visibility['comment']) if str(visibility['comment']) != '' else '-'
x['metadata'].append({'name': '-Applicable to', 'value': applicable_to})
x['metadata'].append({'name': '-Visibility score', 'value': str(get_latest_score(visibility))})
x['metadata'].append({'name': '-General comment', 'value': general_comment})
x['metadata'].append({'name': '-Technique comment', 'value': general_comment})
x['metadata'].append({'name': '-Visibility comment', 'value': get_latest_comment(visibility)})
if cnt != tcnt:
x['metadata'].append({'name': '---', 'value': '---'})
@ -242,12 +253,11 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
return mapped_techniques
def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, filter_applicable_to):
def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources):
"""
Determine the color of the techniques based on both detection and visibility.
:param my_techniques: the configured techniques
:param my_data_sources: the configured data sources
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return: a dictionary with techniques that can be used in the layer's output file
"""
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
@ -264,15 +274,6 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, fi
detection = True if detection_score > 0 else False
visibility = True if visibility_score > 0 else False
# Additional filtering based on applicable_to field. Overrules the score.
a2_d = set([a for d in technique_data['detection'] for a in d['applicable_to']])
a2_v = set([a for v in technique_data['detection'] for a in v['applicable_to']])
if filter_applicable_to != 'all' and filter_applicable_to not in a2_d and 'all' not in a2_d:
detection = False
if filter_applicable_to != 'all' and filter_applicable_to not in a2_v and 'all' not in a2_v:
visibility = False
if detection and visibility:
color = COLOR_OVERLAY_BOTH
elif detection and not visibility:
@ -297,10 +298,10 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, fi
# Metadata for detection:
cnt = 1
tcnt = len([d for d in technique_data['detection'] if get_latest_score(d) >= 0 and (filter_applicable_to == 'all' or filter_applicable_to in d['applicable_to'] or 'all' in d['applicable_to'])])
tcnt = len([d for d in technique_data['detection'] if get_latest_score(d) >= 0])
for detection in technique_data['detection']:
d_score = get_latest_score(detection)
if d_score >= 0 and (filter_applicable_to == 'all' or filter_applicable_to in detection['applicable_to'] or 'all' in detection['applicable_to']):
if d_score >= 0:
location = ', '.join(detection['location'])
location = location if location != '' else '-'
applicable_to = ', '.join(detection['applicable_to'])
@ -308,7 +309,7 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, fi
x['metadata'].append({'name': '-Applicable to', 'value': applicable_to})
x['metadata'].append({'name': '-Detection score', 'value': str(d_score)})
x['metadata'].append({'name': '-Detection location', 'value': location})
x['metadata'].append({'name': '-General comment', 'value': general_comment})
x['metadata'].append({'name': '-Technique comment', 'value': general_comment})
x['metadata'].append({'name': '-Detection comment', 'value': get_latest_comment(detection)})
if cnt != tcnt:
x['metadata'].append({'name': '---', 'value': '---'})
@ -318,18 +319,17 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, fi
if tcnt > 0:
x['metadata'].append({'name': '---', 'value': '---'})
cnt = 1
tcnt = len([v for v in technique_data['visibility'] if filter_applicable_to == 'all' or filter_applicable_to in v['applicable_to'] or 'all' in v['applicable_to']])
tcnt = len([v for v in technique_data['visibility']])
for visibility in technique_data['visibility']:
if filter_applicable_to == 'all' or filter_applicable_to in visibility['applicable_to'] or 'all' in visibility['applicable_to']:
applicable_to = ', '.join(visibility['applicable_to'])
general_comment = str(visibility['comment']) if str(visibility['comment']) != '' else '-'
x['metadata'].append({'name': '-Applicable to', 'value': applicable_to})
x['metadata'].append({'name': '-Visibility score', 'value': str(get_latest_score(visibility))})
x['metadata'].append({'name': '-General comment', 'value': general_comment})
x['metadata'].append({'name': '-Visibility comment', 'value': get_latest_comment(visibility)})
if cnt != tcnt:
x['metadata'].append({'name': '---', 'value': '---'})
cnt += 1
applicable_to = ', '.join(visibility['applicable_to'])
general_comment = str(visibility['comment']) if str(visibility['comment']) != '' else '-'
x['metadata'].append({'name': '-Applicable to', 'value': applicable_to})
x['metadata'].append({'name': '-Visibility score', 'value': str(get_latest_score(visibility))})
x['metadata'].append({'name': '-Technique comment', 'value': general_comment})
x['metadata'].append({'name': '-Visibility comment', 'value': get_latest_comment(visibility)})
if cnt != tcnt:
x['metadata'].append({'name': '---', 'value': '---'})
cnt += 1
mapped_techniques.append(x)
@ -339,10 +339,10 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, fi
def export_techniques_list_to_excel(filename):
"""
Makes an overview of the MITRE ATT&CK techniques from the YAML administration file.
:param filename: the filename of the yaml file containing the techniques administration
:param filename: the filename of the YAML file containing the techniques administration
:return:
"""
my_techniques, name, platform = load_techniques(filename, 'all')
my_techniques, name, platform = load_techniques(filename)
my_techniques = dict(sorted(my_techniques.items(), key=lambda kv: kv[0], reverse=False))
mitre_techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
@ -390,7 +390,7 @@ def export_techniques_list_to_excel(filename):
worksheet_detections.write(y, 4, 'Date', format_bold_left)
worksheet_detections.write(y, 5, 'Score', format_bold_left)
worksheet_detections.write(y, 6, 'Location', format_bold_left)
worksheet_detections.write(y, 7, 'General comment', format_bold_left)
worksheet_detections.write(y, 7, 'Technique comment', format_bold_left)
worksheet_detections.write(y, 8, 'Detection comment', format_bold_left)
worksheet_detections.set_column(0, 0, 14)
worksheet_detections.set_column(1, 1, 40)
@ -410,7 +410,11 @@ def export_techniques_list_to_excel(filename):
get_tactics(get_technique(mitre_techniques, technique_id))),
valign_top)
worksheet_detections.write(y, 3, ', '.join(detection['applicable_to']), wrap_text)
worksheet_detections.write(y, 4, str(get_latest_date(detection)).replace('None', ''), valign_top)
# make sure the date format is '%Y-%m-%d'. When we've done a EQL query this will become '%Y-%m-%d %H %M $%S'
tmp_date = get_latest_date(detection)
if isinstance(tmp_date, datetime):
tmp_date = tmp_date.strftime('%Y-%m-%d')
worksheet_detections.write(y, 4, str(tmp_date).replace('None', ''), valign_top)
ds = get_latest_score(detection)
worksheet_detections.write(y, 5, ds, detection_score_0 if ds == 0 else detection_score_1 if ds == 1 else detection_score_2 if ds == 2 else detection_score_3 if ds == 3 else detection_score_4 if ds == 4 else detection_score_5 if ds == 5 else no_score)
worksheet_detections.write(y, 6, '\n'.join(detection['location']), wrap_text)
@ -429,7 +433,7 @@ def export_techniques_list_to_excel(filename):
worksheet_visibility.write(y, 3, 'Applicable to', format_bold_left)
worksheet_visibility.write(y, 4, 'Date', format_bold_left)
worksheet_visibility.write(y, 5, 'Score', format_bold_left)
worksheet_visibility.write(y, 6, 'General Comment', format_bold_left)
worksheet_visibility.write(y, 6, 'Technique comment', format_bold_left)
worksheet_visibility.write(y, 7, 'Visibility comment', format_bold_left)
worksheet_visibility.set_column(0, 0, 14)
worksheet_visibility.set_column(1, 1, 40)
@ -447,7 +451,11 @@ def export_techniques_list_to_excel(filename):
worksheet_visibility.write(y, 2, ', '.join(t.capitalize() for t in
get_tactics(get_technique(mitre_techniques, technique_id))), valign_top)
worksheet_visibility.write(y, 3, ', '.join(visibility['applicable_to']), wrap_text)
worksheet_visibility.write(y, 4, str(get_latest_date(visibility)).replace('None', ''), valign_top)
# make sure the date format is '%Y-%m-%d'. When we've done a EQL query this will become '%Y-%m-%d %H %M $%S'
tmp_date = get_latest_date(visibility)
if isinstance(tmp_date, datetime):
tmp_date = tmp_date.strftime('%Y-%m-%d')
worksheet_visibility.write(y, 4, str(tmp_date).replace('None', ''), valign_top)
vs = get_latest_score(visibility)
worksheet_visibility.write(y, 5, vs, visibility_score_1 if vs == 1 else visibility_score_2 if vs == 2 else visibility_score_3 if vs == 3 else visibility_score_4 if vs == 4 else no_score)
v_comment = get_latest_comment(visibility, empty='')