From 5fdcb2376d4f42bee5964632d5706b3f214b98f2 Mon Sep 17 00:00:00 2001 From: Marcus Bakker Date: Tue, 14 May 2019 12:58:06 +0200 Subject: [PATCH] added a new option '--health' to check a technique administration YAML file on errors. --- dettact.py | 6 ++ generic.py | 139 +++++++++++++++++++++++++++++++++++++++++++ group_mapping.py | 5 +- technique_mapping.py | 66 ++------------------ 4 files changed, 153 insertions(+), 63 deletions(-) diff --git a/dettact.py b/dettact.py index 0407335..d9bc7eb 100644 --- a/dettact.py +++ b/dettact.py @@ -58,6 +58,7 @@ def init_menu(): action='store_true') parser_visibility.add_argument('-o', '--overlay', help='generate a visibility layer overlayed with detections for ' 'the ATT&CK navigator', action='store_true') + parser_visibility.add_argument('--health', help='check the technique YAML file for errors', action='store_true') # create the detection parser parser_detection = subparsers.add_parser('detection', aliases=['d'], @@ -81,6 +82,7 @@ def init_menu(): 'the ATT&CK navigator', action='store_true') parser_detection.add_argument('-g', '--graph', help='generate a graph with detections added through time', action='store_true') + parser_detection.add_argument('--health', help='check the technique YAML file for errors', action='store_true') # create the group parser parser_group = subparsers.add_parser('group', aliases=['g'], @@ -171,6 +173,8 @@ def menu(menu_parser): export_techniques_list_to_excel(args.file_tech) if args.excel and args.applicable != 'all': print("[!] Filtering on 'applicable_to' is not supported for Excel output") + if args.health: + check_yaml_file_health(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION) elif args.subparser in ['group', 'g']: generate_group_heat_map(args.groups, args.overlay, args.overlay_type, args.stage, args.platform, args.software_group, args.applicable) @@ -194,6 +198,8 @@ def menu(menu_parser): export_techniques_list_to_excel(args.file_tech) if args.excel and args.applicable != 'all': print("[!] Filtering on 'applicable_to' is not supported for Excel output") + if args.health: + check_yaml_file_health(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION) elif args.subparser in ['generic', 'ge']: if args.statistics: diff --git a/generic.py b/generic.py index 219310d..50155c6 100644 --- a/generic.py +++ b/generic.py @@ -337,3 +337,142 @@ def calculate_score(l, zero_value=0): number += 1 s = int(round(s / number, 0) if number > 0 else zero_value) return s + + +def _add_entry_to_list_in_dictionary(dict, technique_id, key, entry): + """ + Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the + list. If the dict[technique_id] doesn't exist yet, it will be created. + :param dict: the dictionary + :param technique_id: the id of the technique in the main dict + :param key: the key where the list in the dictionary resides + :param entry: the entry to add to the list + :return: + """ + if technique_id not in dict.keys(): + dict[technique_id] = {} + if not key in dict[technique_id].keys(): + dict[technique_id][key] = [] + dict[technique_id][key].append(entry) + + +def load_techniques(filename, detection_or_visibility='all', filter_applicable_to='all'): + """ + Loads the techniques (including detection and visibility properties) from the given yaml file. + :param filename: the filename of the yaml file containing the techniques administration + :param detection_or_visibility: used to indicate to filter applicable_to field for detection or visibility. When + using 'all' no filtering will be applied. + :param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file + :return: dictionary with techniques (incl. properties), name and platform + """ + + my_techniques = {} + with open(filename, 'r') as yaml_file: + yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader) + for d in yaml_content['techniques']: + # Add detection items: + if type(d['detection']) == dict: # There is just one detection entry + if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']: + _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection']) + elif type(d['detection']) == list: # There are multiple detection entries + for de in d['detection']: + if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']: + _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de) + + # Add visibility items + if type(d['visibility']) == dict: # There is just one visibility entry + if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']: + _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility']) + elif type(d['visibility']) == list: # There are multiple visibility entries + for de in d['visibility']: + if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']: + _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de) + + name = yaml_content['name'] + platform = yaml_content['platform'] + return my_techniques, name, platform + + +def check_yaml_file_health(filename, file_type): + """ + Check on error in the provided YAML file. + :param filename: YAML file location + :param file_type: currenlty only 'FILE_TYPE_TECHNIQUE_ADMINISTRATION' is being supported + :return: + """ + if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION: + # check for duplicate tech IDs + with open(filename, 'r') as yaml_file: + yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader) + + tech_ids = list(map(lambda x: x['technique_id'], yaml_content['techniques'])) + tech_dup = [] + for tech in tech_ids: + if tech not in tech_dup: + tech_dup.append(tech) + else: + print('[!] Duplicate technique ID: ' + tech) + + # checks on: + # - empty key-value pairs: 'date_implemented', 'date_registered', 'location', 'applicable_to', 'score' + # - invalid date format for: 'date_implemented', 'date_registered' + # - detection or visibility score out-of-range + # - missing key-value pairs: 'applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment' + + techniques = load_techniques(filename) + for tech, v in techniques[0].items(): + for key in ['detection', 'visibility']: + if key not in v: + print('[!] Technique ID: ' + tech + ' is MISSING ' + key) + + for detection in v['detection']: + for key in ['applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment']: + if key not in detection: + print('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in detection: ' + key) + + try: + if detection['score'] is None: + print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: score') + + elif not (detection['score'] >= -1 and detection['score'] <= 5): + print('[!] Technique ID: ' + tech + ' has an INVALID detection score: ' + str(detection['score']) + + ' (should be between -1 and 5)') + + elif detection['score'] > -1: + for key in ['date_implemented', 'date_registered']: + if not detection[key]: + print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key) + break + try: + detection[key].year + detection[key].month + detection[key].day + except AttributeError: + print('[!] Technique ID: ' + tech + ' has an INVALID data format for the key-value pair ' + 'in detection: ' + key + ' (should be YYYY-MM-DD)') + + for key in ['location', 'applicable_to']: + try: + if detection[key][0] is None: + print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key) + except TypeError: + print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key) + except KeyError: + pass + + for visibility in v['visibility']: + for key in ['applicable_to', 'score', 'comment']: + if key not in detection: + print('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in visibility: ' + key) + + try: + if visibility['score'] is None: + print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in visibility: score') + elif not (visibility['score'] >= 0 and visibility['score'] <= 4): + print('[!] Technique ID: ' + tech + ' has an INVALID visibility score: ' + str(detection['score']) + + ' (should be between 0 and 4)') + except KeyError: + pass + + + diff --git a/group_mapping.py b/group_mapping.py index a3e306d..253a8e6 100644 --- a/group_mapping.py +++ b/group_mapping.py @@ -1,6 +1,5 @@ import simplejson from generic import * -from technique_mapping import _load_techniques CG_GROUPS = {} @@ -211,7 +210,7 @@ def get_detection_techniques(filename, filter_applicable_to): # { group_id: {group_name: NAME, techniques: set{id, ...} } } groups_dict = {} - detection_techniques, name, platform = _load_techniques(filename, 'detection', filter_applicable_to) + detection_techniques, name, platform = load_techniques(filename, 'detection', filter_applicable_to) group_id = 'DETECTION' groups_dict[group_id] = {} @@ -235,7 +234,7 @@ def get_visibility_techniques(filename, filter_applicable_to): # { group_id: {group_name: NAME, techniques: set{id, ...} } } groups_dict = {} - visibility_techniques, name, platform = _load_techniques(filename, 'visibility', filter_applicable_to) + visibility_techniques, name, platform = load_techniques(filename, 'visibility', filter_applicable_to) group_id = 'VISIBILITY' groups_dict[group_id] = {} diff --git a/technique_mapping.py b/technique_mapping.py index 782bfcc..174030b 100644 --- a/technique_mapping.py +++ b/technique_mapping.py @@ -14,12 +14,12 @@ def generate_detection_layer(filename_techniques, filename_data_sources, overlay :return: """ if not overlay: - my_techniques, name, platform = _load_techniques(filename_techniques, 'detection', filter_applicable_to) + my_techniques, name, platform = load_techniques(filename_techniques, 'detection', filter_applicable_to) mapped_techniques_detection = _map_and_colorize_techniques_for_detections(my_techniques) layer_detection = get_layer_template_detections('Detections ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform) _write_layer(layer_detection, mapped_techniques_detection, 'detection', filter_applicable_to, name) else: - my_techniques, name, platform = _load_techniques(filename_techniques, 'all', filter_applicable_to) + my_techniques, name, platform = load_techniques(filename_techniques, 'all', filter_applicable_to) my_data_sources = _load_data_sources(filename_data_sources) mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources, filter_applicable_to) layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform) @@ -38,12 +38,12 @@ def generate_visibility_layer(filename_techniques, filename_data_sources, overla my_data_sources = _load_data_sources(filename_data_sources) if not overlay: - my_techniques, name, platform = _load_techniques(filename_techniques, 'visibility', filter_applicable_to) + my_techniques, name, platform = load_techniques(filename_techniques, 'visibility', filter_applicable_to) mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources) layer_visibility = get_layer_template_visibility('Visibility ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform) _write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', filter_applicable_to, name) else: - my_techniques, name, platform = _load_techniques(filename_techniques, 'all', filter_applicable_to) + my_techniques, name, platform = load_techniques(filename_techniques, 'all', filter_applicable_to) mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources, filter_applicable_to) layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform) _write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', filter_applicable_to, name) @@ -56,7 +56,7 @@ def plot_detection_graph(filename, filter_applicable_to): :param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file :return: """ - my_techniques, name, platform = _load_techniques(filename, 'detection', filter_applicable_to) + my_techniques, name, platform = load_techniques(filename, 'detection', filter_applicable_to) graph_values = [] for t in my_techniques.values(): @@ -80,60 +80,6 @@ def plot_detection_graph(filename, filter_applicable_to): print("File written: " + output_filename) -def _load_techniques(filename, detection_or_visibility, filter_applicable_to='all'): - """ - Loads the techniques (including detection and visibility properties) from the given yaml file. - :param filename: the filename of the yaml file containing the techniques administration - :param detection_or_visibility: used to indicate to filter applicable_to field for detection or visibility. When - using 'all' no filtering will be applied. - :param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file - :return: dictionary with techniques (incl. properties), name and platform - """ - - my_techniques = {} - with open(filename, 'r') as yaml_file: - yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader) - for d in yaml_content['techniques']: - # Add detection items: - if type(d['detection']) == dict: # There is just one detection entry - if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']: - _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection']) - elif type(d['detection']) == list: # There are multiple detection entries - for de in d['detection']: - if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']: - _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de) - - # Add visibility items - if type(d['visibility']) == dict: # There is just one visibility entry - if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']: - _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility']) - elif type(d['visibility']) == list: # There are multiple visibility entries - for de in d['visibility']: - if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']: - _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de) - - name = yaml_content['name'] - platform = yaml_content['platform'] - return my_techniques, name, platform - - -def _add_entry_to_list_in_dictionary(dict, technique_id, key, entry): - """ - Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the - list. If the dict[technique_id] doesn't exist yet, it will be created. - :param dict: the dictionary - :param technique_id: the id of the technique in the main dict - :param key: the key where the list in the dictionary resides - :param entry: the entry to add to the list - :return: - """ - if technique_id not in dict.keys(): - dict[technique_id] = {} - if not key in dict[technique_id].keys(): - dict[technique_id][key] = [] - dict[technique_id][key].append(entry) - - def _load_data_sources(filename): """ Loads the data sources (including all properties) from the given yaml file. @@ -384,7 +330,7 @@ def export_techniques_list_to_excel(filename): :param filename: the filename of the yaml file containing the techniques administration :return: """ - my_techniques, name, platform = _load_techniques(filename, 'all') + my_techniques, name, platform = load_techniques(filename, 'all') my_techniques = dict(sorted(my_techniques.items(), key=lambda kv: kv[0], reverse=False)) mitre_techniques = load_attack_data(DATATYPE_ALL_TECH)