- Added output_filename as option for datasource, visbility, detection and group modes.

- Fixed bug when having both dates and datetimes in techniques YAML file.
master
Ruben Bouman 2020-05-25 11:44:13 +02:00
parent a8d0c3759c
commit 1ac6a4ce78
6 changed files with 138 additions and 67 deletions

View File

@ -8,10 +8,11 @@ from generic import *
# Imports for pandas and plotly are because of performance reasons in the function that uses these libraries. # Imports for pandas and plotly are because of performance reasons in the function that uses these libraries.
def generate_data_sources_layer(filename): def generate_data_sources_layer(filename, output_filename):
""" """
Generates a generic layer for data sources. Generates a generic layer for data sources.
:param filename: the filename of the YAML file containing the data sources administration :param filename: the filename of the YAML file containing the data sources administration
:param output_filename: the output filename defined by the user
:return: :return:
""" """
my_data_sources, name, platform, exceptions = _load_data_sources(filename) my_data_sources, name, platform, exceptions = _load_data_sources(filename)
@ -23,13 +24,16 @@ def generate_data_sources_layer(filename):
layer['techniques'] = my_techniques layer['techniques'] = my_techniques
json_string = simplejson.dumps(layer).replace('}, ', '},\n') json_string = simplejson.dumps(layer).replace('}, ', '},\n')
write_file('data_sources', name, json_string) if not output_filename:
output_filename = create_output_filename('data_sources', name)
write_file(output_filename, json_string)
def plot_data_sources_graph(filename): def plot_data_sources_graph(filename, output_filename):
""" """
Generates a line graph which shows the improvements on numbers of data sources through time. Generates a line graph which shows the improvements on numbers of data sources through time.
:param filename: the filename of the YAML file containing the data sources administration :param filename: the filename of the YAML file containing the data sources administration
:param output_filename: the output filename defined by the user
:return: :return:
""" """
# pylint: disable=unused-variable # pylint: disable=unused-variable
@ -45,7 +49,11 @@ def plot_data_sources_graph(filename):
df = pd.DataFrame(graph_values).groupby('date', as_index=False)[['count']].sum() df = pd.DataFrame(graph_values).groupby('date', as_index=False)[['count']].sum()
df['cumcount'] = df['count'].cumsum() df['cumcount'] = df['count'].cumsum()
output_filename = get_non_existing_filename('output/graph_data_sources', 'html') if not output_filename:
output_filename = 'graph_data_sources'
elif output_filename.endswith('.html'):
output_filename = output_filename.replace('.html', '')
output_filename = get_non_existing_filename('output/' + output_filename, 'html')
import plotly import plotly
import plotly.graph_objs as go import plotly.graph_objs as go
@ -57,18 +65,22 @@ def plot_data_sources_graph(filename):
print("File written: " + output_filename) print("File written: " + output_filename)
def export_data_source_list_to_excel(filename, eql_search=False): def export_data_source_list_to_excel(filename, output_filename, eql_search=False):
""" """
Makes an overview of all MITRE ATT&CK data sources (via techniques) and lists which data sources are present Makes an overview of all MITRE ATT&CK data sources (via techniques) and lists which data sources are present
in the YAML administration including all properties and data quality score. in the YAML administration including all properties and data quality score.
:param filename: the filename of the YAML file containing the data sources administration :param filename: the filename of the YAML file containing the data sources administration
:param output_filename: the output filename defined by the user
:param eql_search: specify if an EQL search was performed which may have resulted in missing ATT&CK data sources :param eql_search: specify if an EQL search was performed which may have resulted in missing ATT&CK data sources
:return: :return:
""" """
# pylint: disable=unused-variable # pylint: disable=unused-variable
my_data_sources, name, platforms, exceptions = _load_data_sources(filename, filter_empty_scores=False) my_data_sources, name, platforms, exceptions = _load_data_sources(filename, filter_empty_scores=False)
if not output_filename:
excel_filename = get_non_existing_filename('output/data_sources', 'xlsx') output_filename = 'data_sources'
elif output_filename.endswith('.xlsx'):
output_filename = output_filename.replace('.xlsx', '')
excel_filename = get_non_existing_filename('output/' + output_filename, 'xlsx')
workbook = xlsxwriter.Workbook(excel_filename) workbook = xlsxwriter.Workbook(excel_filename)
worksheet = workbook.add_worksheet('Data sources') worksheet = workbook.add_worksheet('Data sources')
@ -307,7 +319,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
:return: :return:
""" """
# first we generate the new visibility scores contained within a temporary tech. admin YAML 'file' # first we generate the new visibility scores contained within a temporary tech. admin YAML 'file'
new_visibility_scores = generate_technique_administration_file(file_data_sources, write_file=False) new_visibility_scores = generate_technique_administration_file(file_data_sources, None, write_file=False)
# we get the date to remove the single quotes at the end of the code # we get the date to remove the single quotes at the end of the code
today = new_visibility_scores['techniques'][0]['visibility']['score_logbook'][0]['date'] today = new_visibility_scores['techniques'][0]['visibility']['score_logbook'][0]['date']
@ -509,10 +521,11 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
def generate_technique_administration_file(filename, write_file=True, all_techniques=False): def generate_technique_administration_file(filename, output_filename, write_file=True, all_techniques=False):
""" """
Generate a technique administration file based on the data source administration YAML file Generate a technique administration file based on the data source administration YAML file
:param filename: the filename of the YAML file containing the data sources administration :param filename: the filename of the YAML file containing the data sources administration
:param output_filename: the output filename defined by the user
:param write_file: by default the file is written to disk :param write_file: by default the file is written to disk
:param all_techniques: include all ATT&CK techniques in the generated YAML file that are applicable to the :param all_techniques: include all ATT&CK techniques in the generated YAML file that are applicable to the
platform(s) specified in the data source YAML file platform(s) specified in the data source YAML file
@ -575,8 +588,11 @@ def generate_technique_administration_file(filename, write_file=True, all_techni
# remove the single quotes from the date # remove the single quotes from the date
yaml_file_lines = fix_date_and_remove_null(file_lines, today, input_type='list') yaml_file_lines = fix_date_and_remove_null(file_lines, today, input_type='list')
output_filename = get_non_existing_filename('output/techniques-administration-' + if not output_filename:
normalize_name_to_filename(name + '-' + platform_to_name(platform)), 'yaml') output_filename = 'techniques-administration-' + normalize_name_to_filename(name + '-' + platform_to_name(platform))
elif output_filename.endswith('.yaml'):
output_filename = output_filename.replace('.yaml', '')
output_filename = get_non_existing_filename('output/' + output_filename, 'yaml')
with open(output_filename, 'w') as f: with open(output_filename, 'w') as f:
f.writelines(yaml_file_lines) f.writelines(yaml_file_lines)
print("File written: " + output_filename) print("File written: " + output_filename)

View File

@ -59,6 +59,7 @@ def _init_menu():
'not updated without your approval. The updated visibility ' 'not updated without your approval. The updated visibility '
'scores are calculated in the same way as with the option: ' 'scores are calculated in the same way as with the option: '
'-y, --yaml', action='store_true') '-y, --yaml', action='store_true')
parser_data_sources.add_argument('-of', '--output-filename', help='define the output filename')
parser_data_sources.add_argument('--health', help='check the YAML file(s) for errors', action='store_true') parser_data_sources.add_argument('--health', help='check the YAML file(s) for errors', action='store_true')
# create the visibility parser # create the visibility parser
@ -87,6 +88,7 @@ def _init_menu():
'the ATT&CK navigator', action='store_true') 'the ATT&CK navigator', action='store_true')
parser_visibility.add_argument('-g', '--graph', help='generate a graph with visibility added through time', parser_visibility.add_argument('-g', '--graph', help='generate a graph with visibility added through time',
action='store_true') action='store_true')
parser_visibility.add_argument('-of', '--output-filename', help='define the output filename')
parser_visibility.add_argument('--health', help='check the YAML file for errors', action='store_true') parser_visibility.add_argument('--health', help='check the YAML file for errors', action='store_true')
# create the detection parser # create the detection parser
@ -117,6 +119,7 @@ def _init_menu():
'the ATT&CK navigator', action='store_true') 'the ATT&CK navigator', action='store_true')
parser_detection.add_argument('-g', '--graph', help='generate a graph with detections added through time', parser_detection.add_argument('-g', '--graph', help='generate a graph with detections added through time',
action='store_true') action='store_true')
parser_detection.add_argument('-of', '--output-filename', help='define the output filename')
parser_detection.add_argument('--health', help='check the YAML file(s) for errors', action='store_true') parser_detection.add_argument('--health', help='check the YAML file(s) for errors', action='store_true')
# create the group parser # create the group parser
@ -154,6 +157,7 @@ def _init_menu():
'the EQL search. The default behaviour is to only include the ' 'the EQL search. The default behaviour is to only include the '
'most recent \'score\' objects', 'most recent \'score\' objects',
action='store_true', default=False) action='store_true', default=False)
parser_group.add_argument('-of', '--output-filename', help='define the output filename')
parser_group.add_argument('--health', help='check the YAML file(s) for errors', action='store_true') parser_group.add_argument('--health', help='check the YAML file(s) for errors', action='store_true')
# create the generic parser # create the generic parser
@ -202,13 +206,13 @@ def _menu(menu_parser):
if args.update and check_file(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION, args.health): if args.update and check_file(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION, args.health):
update_technique_administration_file(file_ds, args.file_tech) update_technique_administration_file(file_ds, args.file_tech)
if args.layer: if args.layer:
generate_data_sources_layer(file_ds) generate_data_sources_layer(file_ds, args.output_filename)
if args.excel: if args.excel:
export_data_source_list_to_excel(file_ds, eql_search=args.search) export_data_source_list_to_excel(file_ds, args.output_filename, eql_search=args.search)
if args.graph: if args.graph:
plot_data_sources_graph(file_ds) plot_data_sources_graph(file_ds, args.output_filename)
if args.yaml: if args.yaml:
generate_technique_administration_file(file_ds, all_techniques=args.yaml_all_techniques) generate_technique_administration_file(file_ds, args.output_filename, all_techniques=args.yaml_all_techniques)
elif args.subparser in ['visibility', 'v']: elif args.subparser in ['visibility', 'v']:
if args.layer or args.overlay: if args.layer or args.overlay:
@ -228,19 +232,19 @@ def _menu(menu_parser):
if not file_tech: if not file_tech:
quit() # something went wrong in executing the search or 0 results where returned quit() # something went wrong in executing the search or 0 results where returned
if args.layer: if args.layer:
generate_visibility_layer(file_tech, args.file_ds, False) generate_visibility_layer(file_tech, args.file_ds, False, args.output_filename)
if args.overlay: if args.overlay:
generate_visibility_layer(file_tech, args.file_ds, True) generate_visibility_layer(file_tech, args.file_ds, True, args.output_filename)
if args.graph: if args.graph:
plot_graph(file_tech, 'visibility') plot_graph(file_tech, 'visibility', args.output_filename)
if args.excel: if args.excel:
export_techniques_list_to_excel(file_tech) export_techniques_list_to_excel(file_tech, args.output_filename)
# todo add search capabilities # todo add search capabilities
elif args.subparser in ['group', 'g']: elif args.subparser in ['group', 'g']:
if not generate_group_heat_map(args.groups, args.overlay, args.overlay_type, args.stage, args.platform, if not generate_group_heat_map(args.groups, args.overlay, args.overlay_type, args.stage, args.platform,
args.software_group, args.search_visibility, args.search_detection, args.health, args.software_group, args.search_visibility, args.search_detection, args.health,
include_all_score_objs=args.all_scores): args.output_filename, include_all_score_objs=args.all_scores):
quit() # something went wrong in executing the search or 0 results where returned quit() # something went wrong in executing the search or 0 results where returned
elif args.subparser in ['detection', 'd']: elif args.subparser in ['detection', 'd']:
@ -260,13 +264,13 @@ def _menu(menu_parser):
if not file_tech: if not file_tech:
quit() # something went wrong in executing the search or 0 results where returned quit() # something went wrong in executing the search or 0 results where returned
if args.layer: if args.layer:
generate_detection_layer(file_tech, args.file_ds, False) generate_detection_layer(file_tech, args.file_ds, False, args.output_filename)
if args.overlay and check_file(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, args.health): if args.overlay and check_file(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, args.health):
generate_detection_layer(file_tech, args.file_ds, True) generate_detection_layer(file_tech, args.file_ds, True, args.output_filename)
if args.graph: if args.graph:
plot_graph(file_tech, 'detection') plot_graph(file_tech, 'detection', args.output_filename)
if args.excel: if args.excel:
export_techniques_list_to_excel(file_tech) export_techniques_list_to_excel(file_tech, args.output_filename)
elif args.subparser in ['generic', 'ge']: elif args.subparser in ['generic', 'ge']:
if args.datasources: if args.datasources:

View File

@ -342,16 +342,25 @@ def get_layer_template_layered(name, description, stage, platform):
return layer return layer
def write_file(filename_prefix, filename, content): def create_output_filename(filename_prefix, filename):
"""
Creates a filename using pre determined convention.
:param filename_prefix: prefix part of the filename
:param filename: filename
:return:
"""
return '%s_%s' % (filename_prefix, normalize_name_to_filename(filename))
def write_file(filename, content):
""" """
Writes content to a file and ensures if the file already exists it won't be overwritten by appending a number Writes content to a file and ensures if the file already exists it won't be overwritten by appending a number
as suffix. as suffix.
:param filename_prefix: prefix part of the filename
:param filename: filename :param filename: filename
:param content: the content of the file that needs to be written to the file :param content: the content of the file that needs to be written to the file
:return: :return:
""" """
output_filename = 'output/%s_%s' % (filename_prefix, normalize_name_to_filename(filename)) output_filename = 'output/%s' % clean_filename(filename)
output_filename = get_non_existing_filename(output_filename, 'json') output_filename = get_non_existing_filename(output_filename, 'json')
with open(output_filename, 'w') as f: with open(output_filename, 'w') as f:
@ -367,6 +376,8 @@ def get_non_existing_filename(filename, extension):
:param extension: :param extension:
:return: :return:
""" """
if filename.endswith('.' + extension):
filename = filename.replace('.' + extension, '')
if os.path.exists('%s.%s' % (filename, extension)): if os.path.exists('%s.%s' % (filename, extension)):
suffix = 1 suffix = 1
while os.path.exists('%s_%s.%s' % (filename, suffix, extension)): while os.path.exists('%s_%s.%s' % (filename, suffix, extension)):
@ -514,8 +525,15 @@ def get_latest_score_obj(yaml_object):
newest_score_obj = None newest_score_obj = None
newest_date = None newest_date = None
for score_obj in yaml_object['score_logbook']: for score_obj in yaml_object['score_logbook']:
if not newest_score_obj or score_obj['date'] > newest_date: # Scores in the score_logbook can be dates (yyyy-mm-dd) but also datetimes (yyyy-mm-dd hh:mm:ss.ffffff).
newest_date = score_obj['date'] # So convert the datetimes to dates to make it possible to compare.
if type(score_obj['date']) == dt: # dt is the name of the datetime class (see import table)
score_obj_date = score_obj['date'].date()
else:
score_obj_date = score_obj['date']
if not newest_score_obj or score_obj_date > newest_date:
newest_date = score_obj_date
newest_score_obj = score_obj newest_score_obj = score_obj
return newest_score_obj return newest_score_obj
@ -1003,3 +1021,12 @@ def get_platform_from_yaml(yaml_content):
valid_platform_list.append(PLATFORMS[p]) valid_platform_list.append(PLATFORMS[p])
platform = valid_platform_list platform = valid_platform_list
return platform return platform
def clean_filename(filename):
"""
Remove invalid characters from filename and maximize it to 200 characters
:param filename: Input filename
:return: sanitized filename
"""
return filename.replace('/', '').replace('\\', '').replace(':', '')[:200]

View File

@ -109,7 +109,7 @@ def _get_software_techniques(groups, stage, platform):
# software matches the ATT&CK Matrix and platform # software matches the ATT&CK Matrix and platform
# and the group is a group we are interested in # and the group is a group we are interested in
if s['x_mitre_platforms']: # there is software that do not have a platform, skip those if s['x_mitre_platforms']: # there is software that do not have a platform, skip those
if s['matrix'] == 'mitre-'+stage and (platform == 'all' or len(set(s['x_mitre_platforms']).intersection(set(platform))) > 0) and \ if s['matrix'] == 'mitre-' + stage and (platform == 'all' or len(set(s['x_mitre_platforms']).intersection(set(platform))) > 0) and \
(groups[0] == 'all' or s['group_id'].lower() in groups or _is_in_group(s['aliases'], groups)): (groups[0] == 'all' or s['group_id'].lower() in groups or _is_in_group(s['aliases'], groups)):
if s['group_id'] not in groups_dict: if s['group_id'] not in groups_dict:
groups_dict[s['group_id']] = {'group_name': s['name']} groups_dict[s['group_id']] = {'group_name': s['name']}
@ -197,7 +197,7 @@ def _get_group_techniques(groups, stage, platform, file_type):
platforms = 'Windows' platforms = 'Windows'
# group matches the: matrix/stage, platform and the group(s) we are interested in # group matches the: matrix/stage, platform and the group(s) we are interested in
if gr['matrix'] == 'mitre-'+stage and (platform == 'all' or len(set(platforms).intersection(set(platform))) > 0) and \ if gr['matrix'] == 'mitre-' + stage and (platform == 'all' or len(set(platforms).intersection(set(platform))) > 0) and \
(groups[0] == 'all' or gr['group_id'].lower() in groups or _is_in_group(gr['aliases'], groups)): (groups[0] == 'all' or gr['group_id'].lower() in groups or _is_in_group(gr['aliases'], groups)):
if gr['group_id'] not in groups_dict: if gr['group_id'] not in groups_dict:
groups_found.add(gr['group_id']) groups_found.add(gr['group_id'])
@ -462,7 +462,7 @@ def _get_group_list(groups, file_type):
def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, software_groups, def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, software_groups,
search_visibility, search_detection, health_is_called, include_all_score_objs=False): search_visibility, search_detection, health_is_called, output_filename, include_all_score_objs=False):
""" """
Calls all functions that are necessary for the generation of the heat map and write a json layer to disk. Calls all functions that are necessary for the generation of the heat map and write a json layer to disk.
:param groups: threat actor groups :param groups: threat actor groups
@ -475,6 +475,7 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
:param search_visibility: visibility EQL search query :param search_visibility: visibility EQL search query
:param search_detection: detection EQL search query :param search_detection: detection EQL search query
:param health_is_called: boolean that specifies if detailed errors in the file will be printed :param health_is_called: boolean that specifies if detailed errors in the file will be printed
:param output_filename: output filename defined by the user
:param include_all_score_objs: include all score objects within the score_logbook for the EQL query :param include_all_score_objs: include all score objects within the score_logbook for the EQL query
:return: returns nothing when something's wrong :return: returns nothing when something's wrong
""" """
@ -574,11 +575,15 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
json_string = simplejson.dumps(layer).replace('}, ', '},\n') json_string = simplejson.dumps(layer).replace('}, ', '},\n')
if stage == 'pre-attack': if not output_filename:
filename = '_'.join(groups_list) if stage == 'pre-attack':
elif overlay: filename = '_'.join(groups_list)
filename = platform_to_name(platform) + '_' + '_'.join(groups_list) + '-overlay_' + '_'.join(overlay_list) elif overlay:
else: filename = platform_to_name(platform) + '_' + '_'.join(groups_list) + '-overlay_' + '_'.join(overlay_list)
filename = platform_to_name(platform) + '_' + '_'.join(groups_list) else:
filename = platform_to_name(platform) + '_' + '_'.join(groups_list)
write_file(stage, filename[:255], json_string) filename = create_output_filename(stage, filename)
write_file(filename, json_string)
else:
write_file(output_filename, json_string)

View File

@ -31,9 +31,9 @@ def _clear():
name = '-= %s =-' % APP_NAME name = '-= %s =-' % APP_NAME
desc = '-- %s --' % APP_DESC desc = '-- %s --' % APP_DESC
version = 'version %s' % VERSION version = 'version %s' % VERSION
print(' ' * int((len(desc)-len(name))/2) + name) print(' ' * int((len(desc) - len(name)) / 2) + name)
print(desc) print(desc)
print(' ' * int((len(desc)-len(version))/2) + version) print(' ' * int((len(desc) - len(version)) / 2) + version)
print('') print('')
@ -273,19 +273,19 @@ def _menu_data_source(filename_ds):
_menu_data_source(filename_ds) _menu_data_source(filename_ds)
if choice == '3': if choice == '3':
print('Writing data sources layer...') print('Writing data sources layer...')
generate_data_sources_layer(file_ds) generate_data_sources_layer(file_ds, None)
_wait() _wait()
elif choice == '4': elif choice == '4':
print('Drawing the graph...') print('Drawing the graph...')
plot_data_sources_graph(file_ds) plot_data_sources_graph(file_ds, None)
_wait() _wait()
elif choice == '5': elif choice == '5':
print('Generating Excel file...') print('Generating Excel file...')
export_data_source_list_to_excel(file_ds, eql_search=eql_query_data_sources) export_data_source_list_to_excel(file_ds, None, eql_search=eql_query_data_sources)
_wait() _wait()
elif choice == '6': elif choice == '6':
print('Generating YAML file...') print('Generating YAML file...')
generate_technique_administration_file(file_ds, all_techniques=yaml_all_techniques) generate_technique_administration_file(file_ds, None, all_techniques=yaml_all_techniques)
_wait() _wait()
elif choice == '7': elif choice == '7':
filename_t = _select_file(MENU_NAME_DETECTION_COVERAGE_MAPPING, 'techniques (used to score the level of visibility)', filename_t = _select_file(MENU_NAME_DETECTION_COVERAGE_MAPPING, 'techniques (used to score the level of visibility)',
@ -352,22 +352,22 @@ def _menu_detection(filename_t):
_menu_detection(filename_t) _menu_detection(filename_t)
if choice == '4': if choice == '4':
print('Writing detection coverage layer...') print('Writing detection coverage layer...')
generate_detection_layer(file_tech, None, False) generate_detection_layer(file_tech, None, False, None)
_wait() _wait()
elif choice == '5': elif choice == '5':
filename_ds = _select_file(MENU_NAME_DETECTION_COVERAGE_MAPPING, 'data sources (used to add metadata on the ' filename_ds = _select_file(MENU_NAME_DETECTION_COVERAGE_MAPPING, 'data sources (used to add metadata on the '
'involved data sources to the heat map)', 'involved data sources to the heat map)',
FILE_TYPE_DATA_SOURCE_ADMINISTRATION, False) FILE_TYPE_DATA_SOURCE_ADMINISTRATION, False)
print('Writing detection coverage layer with visibility as overlay...') print('Writing detection coverage layer with visibility as overlay...')
generate_detection_layer(file_tech, filename_ds, True) generate_detection_layer(file_tech, filename_ds, True, None)
_wait() _wait()
elif choice == '6': elif choice == '6':
print('Drawing the graph...') print('Drawing the graph...')
plot_graph(file_tech, 'detection') plot_graph(file_tech, 'detection', None)
_wait() _wait()
elif choice == '7': elif choice == '7':
print('Generating Excel file...') print('Generating Excel file...')
export_techniques_list_to_excel(file_tech) export_techniques_list_to_excel(file_tech, None)
_wait() _wait()
elif choice == '8': elif choice == '8':
print('Checking the technique YAML file for errors...') print('Checking the technique YAML file for errors...')
@ -430,19 +430,19 @@ def _menu_visibility(filename_t, filename_ds):
_menu_visibility(filename_t, filename_ds) _menu_visibility(filename_t, filename_ds)
if choice == '4': if choice == '4':
print('Writing visibility coverage layer...') print('Writing visibility coverage layer...')
generate_visibility_layer(file_tech, filename_ds, False) generate_visibility_layer(file_tech, filename_ds, False, None)
_wait() _wait()
elif choice == '5': elif choice == '5':
print('Writing visibility coverage layer overlaid with detections...') print('Writing visibility coverage layer overlaid with detections...')
generate_visibility_layer(file_tech, filename_ds, True) generate_visibility_layer(file_tech, filename_ds, True, None)
_wait() _wait()
elif choice == '6': elif choice == '6':
print('Drawing the graph...') print('Drawing the graph...')
plot_graph(file_tech, 'visibility') plot_graph(file_tech, 'visibility', None)
_wait() _wait()
elif choice == '7': elif choice == '7':
print('Generating Excel file...') print('Generating Excel file...')
export_techniques_list_to_excel(file_tech) export_techniques_list_to_excel(file_tech, None)
_wait() _wait()
elif choice == '8': elif choice == '8':
print('Checking the technique YAML file for errors...') print('Checking the technique YAML file for errors...')
@ -540,7 +540,7 @@ def _menu_groups():
elif choice == '7': elif choice == '7':
if not generate_group_heat_map(groups, groups_overlay, overlay_type, default_stage, default_platform, if not generate_group_heat_map(groups, groups_overlay, overlay_type, default_stage, default_platform,
software_group, eql_query_visibility, eql_query_detection, False, software_group, eql_query_visibility, eql_query_detection, False,
include_all_score_objs=eql_all_scores): None, include_all_score_objs=eql_all_scores):
_wait() _wait()
_menu_groups() _menu_groups()
_wait() _wait()

View File

@ -5,33 +5,35 @@ from datetime import datetime
# Imports for pandas and plotly are because of performance reasons in the function that uses these libraries. # Imports for pandas and plotly are because of performance reasons in the function that uses these libraries.
def generate_detection_layer(filename_techniques, filename_data_sources, overlay): def generate_detection_layer(filename_techniques, filename_data_sources, overlay, output_filename):
""" """
Generates layer for detection coverage and optionally an overlaid version with visibility coverage. Generates layer for detection coverage and optionally an overlaid version with visibility coverage.
:param filename_techniques: the filename of the YAML file containing the techniques administration :param filename_techniques: the filename of the YAML file containing the techniques administration
:param filename_data_sources: the filename of the YAML file containing the data sources administration :param filename_data_sources: the filename of the YAML file containing the data sources administration
:param overlay: boolean value to specify if an overlay between detection and visibility should be generated :param overlay: boolean value to specify if an overlay between detection and visibility should be generated
:param output_filename: the output filename defined by the user
:return: :return:
""" """
if not overlay: if not overlay:
my_techniques, name, platform = load_techniques(filename_techniques) my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_detection = _map_and_colorize_techniques_for_detections(my_techniques) mapped_techniques_detection = _map_and_colorize_techniques_for_detections(my_techniques)
layer_detection = get_layer_template_detections('Detections ' + name, 'description', 'attack', platform) layer_detection = get_layer_template_detections('Detections ' + name, 'description', 'attack', platform)
_write_layer(layer_detection, mapped_techniques_detection, 'detection', name) _write_layer(layer_detection, mapped_techniques_detection, 'detection', name, output_filename)
else: else:
my_techniques, name, platform = load_techniques(filename_techniques) my_techniques, name, platform = load_techniques(filename_techniques)
my_data_sources = _load_data_sources(filename_data_sources) my_data_sources = _load_data_sources(filename_data_sources)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform) mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform)
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform) layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name) _write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name, output_filename)
def generate_visibility_layer(filename_techniques, filename_data_sources, overlay): def generate_visibility_layer(filename_techniques, filename_data_sources, overlay, output_filename):
""" """
Generates layer for visibility coverage and optionally an overlaid version with detection coverage. Generates layer for visibility coverage and optionally an overlaid version with detection coverage.
:param filename_techniques: the filename of the YAML file containing the techniques administration :param filename_techniques: the filename of the YAML file containing the techniques administration
:param filename_data_sources: the filename of the YAML file containing the data sources administration :param filename_data_sources: the filename of the YAML file containing the data sources administration
:param overlay: boolean value to specify if an overlay between detection and visibility should be generated :param overlay: boolean value to specify if an overlay between detection and visibility should be generated
:param output_filename: the output filename defined by the user
:return: :return:
""" """
my_data_sources = _load_data_sources(filename_data_sources) my_data_sources = _load_data_sources(filename_data_sources)
@ -40,19 +42,20 @@ def generate_visibility_layer(filename_techniques, filename_data_sources, overla
my_techniques, name, platform = load_techniques(filename_techniques) my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources, platform) mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources, platform)
layer_visibility = get_layer_template_visibility('Visibility ' + name, 'description', 'attack', platform) layer_visibility = get_layer_template_visibility('Visibility ' + name, 'description', 'attack', platform)
_write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', name) _write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', name, output_filename)
else: else:
my_techniques, name, platform = load_techniques(filename_techniques) my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform) mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform)
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform) layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name) _write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name, output_filename)
def plot_graph(filename, type_graph): def plot_graph(filename, type_graph, output_filename):
""" """
Generates a line graph which shows the improvements on detections through the time. Generates a line graph which shows the improvements on detections through the time.
:param filename: the filename of the YAML file containing the techniques administration :param filename: the filename of the YAML file containing the techniques administration
:param type_graph: indicates the type of the graph: detection or visibility :param type_graph: indicates the type of the graph: detection or visibility
:param output_filename: the output filename defined by the user
:return: :return:
""" """
# pylint: disable=unused-variable # pylint: disable=unused-variable
@ -70,7 +73,11 @@ def plot_graph(filename, type_graph):
df = pd.DataFrame(graph_values).groupby('date', as_index=False)[['count']].sum() df = pd.DataFrame(graph_values).groupby('date', as_index=False)[['count']].sum()
df['cumcount'] = df['count'].cumsum() df['cumcount'] = df['count'].cumsum()
output_filename = get_non_existing_filename('output/graph_%s' % type_graph, 'html') if not output_filename:
output_filename = 'graph_' + type_graph
elif output_filename.endswith('.html'):
output_filename = output_filename.replace('.html', '')
output_filename = get_non_existing_filename('output/' + output_filename, 'html')
import plotly import plotly
import plotly.graph_objs as go import plotly.graph_objs as go
@ -108,19 +115,26 @@ def _load_data_sources(file):
return my_data_sources return my_data_sources
def _write_layer(layer, mapped_techniques, filename_prefix, name): def _write_layer(layer, mapped_techniques, filename_prefix, name, output_filename):
""" """
Writes the json layer file to disk. Writes the json layer file to disk.
:param layer: the prepped layer dictionary :param layer: the prepped layer dictionary
:param mapped_techniques: the techniques section that will be included in the layer :param mapped_techniques: the techniques section that will be included in the layer
:param filename_prefix: the prefix for the output filename :param filename_prefix: the prefix for the output filename
:param name: the name that will be used in the filename together with the prefix :param name: the name that will be used in the filename together with the prefix
:param output_filename: the output filename defined by the user
:return: :return:
""" """
layer['techniques'] = mapped_techniques layer['techniques'] = mapped_techniques
json_string = simplejson.dumps(layer).replace('}, ', '},\n') json_string = simplejson.dumps(layer).replace('}, ', '},\n')
write_file(filename_prefix, name, json_string) if not output_filename:
output_filename = create_output_filename(filename_prefix, name)
else:
if output_filename.endswith('.json'):
output_filename = output_filename.replace('.json', '')
if filename_prefix == 'visibility_and_detection':
output_filename += '_overlay'
write_file(output_filename, json_string)
def _map_and_colorize_techniques_for_detections(my_techniques): def _map_and_colorize_techniques_for_detections(my_techniques):
@ -336,10 +350,11 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, pl
return mapped_techniques return mapped_techniques
def export_techniques_list_to_excel(filename): def export_techniques_list_to_excel(filename, output_filename):
""" """
Makes an overview of the MITRE ATT&CK techniques from the YAML administration file. Makes an overview of the MITRE ATT&CK techniques from the YAML administration file.
:param filename: the filename of the YAML file containing the techniques administration :param filename: the filename of the YAML file containing the techniques administration
:param output_filename: the output filename defined by the user
:return: :return:
""" """
# pylint: disable=unused-variable # pylint: disable=unused-variable
@ -347,7 +362,11 @@ def export_techniques_list_to_excel(filename):
my_techniques = dict(sorted(my_techniques.items(), key=lambda kv: kv[0], reverse=False)) my_techniques = dict(sorted(my_techniques.items(), key=lambda kv: kv[0], reverse=False))
mitre_techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) mitre_techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
excel_filename = get_non_existing_filename('output/techniques', 'xlsx') if not output_filename:
output_filename = 'techniques'
elif output_filename.endswith('.xlsx'):
output_filename = output_filename.replace('.xlsx', '')
excel_filename = get_non_existing_filename('output/' + output_filename, 'xlsx')
workbook = xlsxwriter.Workbook(excel_filename) workbook = xlsxwriter.Workbook(excel_filename)
worksheet_detections = workbook.add_worksheet('Detections') worksheet_detections = workbook.add_worksheet('Detections')
worksheet_visibility = workbook.add_worksheet('Visibility') worksheet_visibility = workbook.add_worksheet('Visibility')