Introduced new option to set the name of the Navigator layer

master
Ruben Bouman 2020-06-08 16:56:56 +02:00
parent ee144b374e
commit 217980bbd3
5 changed files with 49 additions and 28 deletions

View File

@ -8,11 +8,12 @@ from generic import *
# Imports for pandas and plotly are because of performance reasons in the function that uses these libraries.
def generate_data_sources_layer(filename, output_filename):
def generate_data_sources_layer(filename, output_filename, layer_name):
"""
Generates a generic layer for data sources.
:param filename: the filename of the YAML file containing the data sources administration
:param output_filename: the output filename defined by the user
:param layer_name: the name of the Navigator layer
:return:
"""
my_data_sources, name, platform, exceptions = _load_data_sources(filename)
@ -20,7 +21,10 @@ def generate_data_sources_layer(filename, output_filename):
# Do the mapping between my data sources and MITRE data sources:
my_techniques = _map_and_colorize_techniques(my_data_sources, platform, exceptions)
layer = get_layer_template_data_sources("Data sources " + name, 'description', 'attack', platform)
if not layer_name:
layer_name = 'Data sources ' + name
layer = get_layer_template_data_sources(layer_name, 'description', 'attack', platform)
layer['techniques'] = my_techniques
json_string = simplejson.dumps(layer).replace('}, ', '},\n')

View File

@ -59,7 +59,8 @@ def _init_menu():
'not updated without your approval. The updated visibility '
'scores are calculated in the same way as with the option: '
'-y, --yaml', action='store_true')
parser_data_sources.add_argument('-of', '--output-filename', help='define the output filename')
parser_data_sources.add_argument('-of', '--output-filename', help='set the output filename')
parser_data_sources.add_argument('-ln', '--layer-name', help='set the name of the Navigator layer')
parser_data_sources.add_argument('--health', help='check the YAML file(s) for errors', action='store_true')
# create the visibility parser
@ -88,7 +89,8 @@ def _init_menu():
'the ATT&CK navigator', action='store_true')
parser_visibility.add_argument('-g', '--graph', help='generate a graph with visibility added through time',
action='store_true')
parser_visibility.add_argument('-of', '--output-filename', help='define the output filename')
parser_visibility.add_argument('-of', '--output-filename', help='set the output filename')
parser_visibility.add_argument('-ln', '--layer-name', help='set the name of the Navigator layer')
parser_visibility.add_argument('--health', help='check the YAML file for errors', action='store_true')
# create the detection parser
@ -119,7 +121,8 @@ def _init_menu():
'the ATT&CK navigator', action='store_true')
parser_detection.add_argument('-g', '--graph', help='generate a graph with detections added through time',
action='store_true')
parser_detection.add_argument('-of', '--output-filename', help='define the output filename')
parser_detection.add_argument('-of', '--output-filename', help='set the output filename')
parser_detection.add_argument('-ln', '--layer-name', help='set the name of the Navigator layer')
parser_detection.add_argument('--health', help='check the YAML file(s) for errors', action='store_true')
# create the group parser
@ -157,7 +160,8 @@ def _init_menu():
'the EQL search. The default behaviour is to only include the '
'most recent \'score\' objects',
action='store_true', default=False)
parser_group.add_argument('-of', '--output-filename', help='define the output filename')
parser_group.add_argument('-of', '--output-filename', help='set the output filename')
parser_group.add_argument('-ln', '--layer-name', help='set the name of the Navigator layer')
parser_group.add_argument('--health', help='check the YAML file(s) for errors', action='store_true')
# create the generic parser
@ -206,7 +210,7 @@ def _menu(menu_parser):
if args.update and check_file(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION, args.health):
update_technique_administration_file(file_ds, args.file_tech)
if args.layer:
generate_data_sources_layer(file_ds, args.output_filename)
generate_data_sources_layer(file_ds, args.output_filename, args.layer_name)
if args.excel:
export_data_source_list_to_excel(file_ds, args.output_filename, eql_search=args.search)
if args.graph:
@ -232,9 +236,9 @@ def _menu(menu_parser):
if not file_tech:
quit() # something went wrong in executing the search or 0 results where returned
if args.layer:
generate_visibility_layer(file_tech, args.file_ds, False, args.output_filename)
generate_visibility_layer(file_tech, args.file_ds, False, args.output_filename, args.layer_name)
if args.overlay:
generate_visibility_layer(file_tech, args.file_ds, True, args.output_filename)
generate_visibility_layer(file_tech, args.file_ds, True, args.output_filename, args.layer_name)
if args.graph:
plot_graph(file_tech, 'visibility', args.output_filename)
if args.excel:
@ -244,7 +248,7 @@ def _menu(menu_parser):
elif args.subparser in ['group', 'g']:
if not generate_group_heat_map(args.groups, args.overlay, args.overlay_type, args.stage, args.platform,
args.software_group, args.search_visibility, args.search_detection, args.health,
args.output_filename, include_all_score_objs=args.all_scores):
args.output_filename, args.layer_name, include_all_score_objs=args.all_scores):
quit() # something went wrong in executing the search or 0 results where returned
elif args.subparser in ['detection', 'd']:
@ -264,9 +268,9 @@ def _menu(menu_parser):
if not file_tech:
quit() # something went wrong in executing the search or 0 results where returned
if args.layer:
generate_detection_layer(file_tech, args.file_ds, False, args.output_filename)
generate_detection_layer(file_tech, args.file_ds, False, args.output_filename, args.layer_name)
if args.overlay and check_file(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, args.health):
generate_detection_layer(file_tech, args.file_ds, True, args.output_filename)
generate_detection_layer(file_tech, args.file_ds, True, args.output_filename, args.layer_name)
if args.graph:
plot_graph(file_tech, 'detection', args.output_filename)
if args.excel:

View File

@ -464,8 +464,8 @@ def _get_group_list(groups, file_type):
return groups
def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, software_groups,
search_visibility, search_detection, health_is_called, output_filename, include_all_score_objs=False):
def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, software_groups, search_visibility,
search_detection, health_is_called, output_filename, layer_name, include_all_score_objs=False):
"""
Calls all functions that are necessary for the generation of the heat map and write a json layer to disk.
:param groups: threat actor groups
@ -479,6 +479,7 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
:param search_detection: detection EQL search query
:param health_is_called: boolean that specifies if detailed errors in the file will be printed
:param output_filename: output filename defined by the user
:param layer_name: the name of the Navigator layer
:param include_all_score_objs: include all score objects within the score_logbook for the EQL query
:return: returns nothing when something's wrong
"""
@ -572,8 +573,10 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
desc = 'stage: ' + stage + ' | platform(s): ' + platform_to_name(platform, separator=', ') + ' | group(s): ' \
+ ', '.join(groups_list) + ' | overlay group(s): ' + ', '.join(overlay_list)
layer = get_layer_template_groups(stage[0].upper() + stage[1:] + ' - ' + platform_to_name(platform, separator=', '),
max_count, desc, stage, platform, overlay_type)
if not layer_name:
layer_name = stage[0].upper() + stage[1:] + ' - ' + platform_to_name(platform, separator=', ')
layer = get_layer_template_groups(layer_name, max_count, desc, stage, platform, overlay_type)
layer['techniques'] = technique_layer
json_string = simplejson.dumps(layer).replace('}, ', '},\n')

View File

@ -273,7 +273,7 @@ def _menu_data_source(filename_ds):
_menu_data_source(filename_ds)
if choice == '3':
print('Writing data sources layer...')
generate_data_sources_layer(file_ds, None)
generate_data_sources_layer(file_ds, None, None)
_wait()
elif choice == '4':
print('Drawing the graph...')
@ -352,14 +352,14 @@ def _menu_detection(filename_t):
_menu_detection(filename_t)
if choice == '4':
print('Writing detection coverage layer...')
generate_detection_layer(file_tech, None, False, None)
generate_detection_layer(file_tech, None, False, None, None)
_wait()
elif choice == '5':
filename_ds = _select_file(MENU_NAME_DETECTION_COVERAGE_MAPPING, 'data sources (used to add metadata on the '
'involved data sources to the heat map)',
FILE_TYPE_DATA_SOURCE_ADMINISTRATION, False)
print('Writing detection coverage layer with visibility as overlay...')
generate_detection_layer(file_tech, filename_ds, True, None)
generate_detection_layer(file_tech, filename_ds, True, None, None)
_wait()
elif choice == '6':
print('Drawing the graph...')
@ -430,11 +430,11 @@ def _menu_visibility(filename_t, filename_ds):
_menu_visibility(filename_t, filename_ds)
if choice == '4':
print('Writing visibility coverage layer...')
generate_visibility_layer(file_tech, filename_ds, False, None)
generate_visibility_layer(file_tech, filename_ds, False, None, None)
_wait()
elif choice == '5':
print('Writing visibility coverage layer overlaid with detections...')
generate_visibility_layer(file_tech, filename_ds, True, None)
generate_visibility_layer(file_tech, filename_ds, True, None, None)
_wait()
elif choice == '6':
print('Drawing the graph...')
@ -540,7 +540,7 @@ def _menu_groups():
elif choice == '7':
if not generate_group_heat_map(groups, groups_overlay, overlay_type, default_stage, default_platform,
software_group, eql_query_visibility, eql_query_detection, False,
None, include_all_score_objs=eql_all_scores):
None, None, include_all_score_objs=eql_all_scores):
_wait()
_menu_groups()
_wait()

View File

@ -5,35 +5,41 @@ from datetime import datetime
# Imports for pandas and plotly are because of performance reasons in the function that uses these libraries.
def generate_detection_layer(filename_techniques, filename_data_sources, overlay, output_filename):
def generate_detection_layer(filename_techniques, filename_data_sources, overlay, output_filename, layer_name):
"""
Generates layer for detection coverage and optionally an overlaid version with visibility coverage.
:param filename_techniques: the filename of the YAML file containing the techniques administration
:param filename_data_sources: the filename of the YAML file containing the data sources administration
:param overlay: boolean value to specify if an overlay between detection and visibility should be generated
:param layer_name: the name of the Navigator layer
:param output_filename: the output filename defined by the user
:return:
"""
if not overlay:
my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_detection = _map_and_colorize_techniques_for_detections(my_techniques)
layer_detection = get_layer_template_detections('Detections ' + name, 'description', 'attack', platform)
if not layer_name:
layer_name = 'Detections ' + name
layer_detection = get_layer_template_detections(layer_name, 'description', 'attack', platform)
_write_layer(layer_detection, mapped_techniques_detection, 'detection', name, output_filename)
else:
my_techniques, name, platform = load_techniques(filename_techniques)
my_data_sources = _load_data_sources(filename_data_sources)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform)
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
if not layer_name:
layer_name = 'Visibility and Detection ' + name
layer_both = get_layer_template_layered(layer_name, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name, output_filename)
def generate_visibility_layer(filename_techniques, filename_data_sources, overlay, output_filename):
def generate_visibility_layer(filename_techniques, filename_data_sources, overlay, output_filename, layer_name):
"""
Generates layer for visibility coverage and optionally an overlaid version with detection coverage.
:param filename_techniques: the filename of the YAML file containing the techniques administration
:param filename_data_sources: the filename of the YAML file containing the data sources administration
:param overlay: boolean value to specify if an overlay between detection and visibility should be generated
:param output_filename: the output filename defined by the user
:param layer_name: the name of the Navigator layer
:return:
"""
my_data_sources = _load_data_sources(filename_data_sources)
@ -41,12 +47,16 @@ def generate_visibility_layer(filename_techniques, filename_data_sources, overla
if not overlay:
my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources, platform)
layer_visibility = get_layer_template_visibility('Visibility ' + name, 'description', 'attack', platform)
if not layer_name:
layer_name = 'Visibility ' + name
layer_visibility = get_layer_template_visibility(layer_name, 'description', 'attack', platform)
_write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', name, output_filename)
else:
my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform)
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
if not layer_name:
layer_name = 'Visibility and Detection ' + name
layer_both = get_layer_template_layered(layer_name, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name, output_filename)