import os import shutil import pickle from io import StringIO from datetime import datetime as dt from ruamel.yaml import YAML from upgrade import upgrade_yaml_file from constants import * from health import check_yaml_file_health # Due to performance reasons the import of attackcti is within the function that makes use of this library. def _save_attack_data(data, path): """ Save ATT&CK data to disk for the purpose of caching. Data can be STIX objects our a custom schema. :param data: the MITRE ATT&CK data to save :param path: file path to write to, including filename :return: """ if not os.path.exists('cache/'): os.mkdir('cache/') with open(path, 'wb') as f: pickle.dump([data, dt.now()], f) def load_attack_data(data_type): """ Load the cached ATT&CK data from disk, if not expired (data file on disk is older then EXPIRE_TIME seconds). :param data_type: the desired data type, see DATATYPE_XX constants. :return: MITRE ATT&CK data object (STIX or custom schema) """ if os.path.exists("cache/" + data_type): with open("cache/" + data_type, 'rb') as f: cached = pickle.load(f) write_time = cached[1] if not (dt.now() - write_time).total_seconds() >= EXPIRE_TIME: # the first item in the list contains the ATT&CK data return cached[0] from attackcti import attack_client mitre = attack_client() attack_data = None if data_type == DATA_TYPE_STIX_ALL_RELATIONSHIPS: attack_data = mitre.get_relationships() attack_data = mitre.remove_revoked(attack_data) elif data_type == DATA_TYPE_STIX_ALL_TECH_ENTERPRISE: attack_data = mitre.get_enterprise_techniques() attack_data = mitre.remove_revoked(attack_data) elif data_type == DATA_TYPE_CUSTOM_TECH_BY_GROUP: # First we need to know which technique references (STIX Object type 'attack-pattern') we have for all # groups. This results in a dict: {group_id: Gxxxx, technique_ref/attack-pattern_ref: ...} groups = load_attack_data(DATA_TYPE_STIX_ALL_GROUPS) relationships = load_attack_data(DATA_TYPE_STIX_ALL_RELATIONSHIPS) all_groups_relationships = [] for g in groups: for r in relationships: if g['id'] == r['source_ref'] and r['relationship_type'] == 'uses' and \ r['target_ref'].startswith('attack-pattern--'): # much more information on the group can be added. Only the minimal required data is now added. all_groups_relationships.append( { 'group_id': get_attack_id(g), 'name': g['name'], 'aliases': g.get('aliases', None), 'technique_ref': r['target_ref'] }) # Now we start resolving this part of the dict created above: 'technique_ref/attack-pattern_ref'. # and we add some more data to the final result. all_group_use = [] techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) for gr in all_groups_relationships: for t in techniques: if t['id'] == gr['technique_ref']: all_group_use.append( { 'group_id': gr['group_id'], 'name': gr['name'], 'aliases': gr['aliases'], 'technique_id': get_attack_id(t), 'x_mitre_platforms': t.get('x_mitre_platforms', None), 'matrix': t['external_references'][0]['source_name'] }) attack_data = all_group_use elif data_type == DATA_TYPE_STIX_ALL_TECH: attack_data = mitre.get_techniques() attack_data = mitre.remove_revoked(attack_data) elif data_type == DATA_TYPE_STIX_ALL_GROUPS: attack_data = mitre.get_groups() attack_data = mitre.remove_revoked(attack_data) elif data_type == DATA_TYPE_STIX_ALL_SOFTWARE: attack_data = mitre.get_software() attack_data = mitre.remove_revoked(attack_data) elif data_type == DATA_TYPE_CUSTOM_TECH_BY_SOFTWARE: # First we need to know which technique references (STIX Object type 'attack-pattern') we have for all software # This results in a dict: {software_id: Sxxxx, technique_ref/attack-pattern_ref: ...} software = load_attack_data(DATA_TYPE_STIX_ALL_SOFTWARE) relationships = load_attack_data(DATA_TYPE_STIX_ALL_RELATIONSHIPS) all_software_relationships = [] for s in software: for r in relationships: if s['id'] == r['source_ref'] and r['relationship_type'] == 'uses' and \ r['target_ref'].startswith('attack-pattern--'): # much more information (e.g. description, aliases, platform) on the software can be added to the # dict if necessary. Only the minimal required data is now added. all_software_relationships.append({'software_id': get_attack_id(s), 'technique_ref': r['target_ref']}) # Now we start resolving this part of the dict created above: 'technique_ref/attack-pattern_ref' techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) all_software_use = [] for sr in all_software_relationships: for t in techniques: if t['id'] == sr['technique_ref']: # much more information on the technique can be added to the dict. Only the minimal required data # is now added (i.e. resolving the technique ref to an actual ATT&CK ID) all_software_use.append({'software_id': sr['software_id'], 'technique_id': get_attack_id(t)}) attack_data = all_software_use elif data_type == DATA_TYPE_CUSTOM_SOFTWARE_BY_GROUP: # First we need to know which software references (STIX Object type 'malware' or 'tool') we have for all # groups. This results in a dict: {group_id: Gxxxx, software_ref/malware-tool_ref: ...} groups = load_attack_data(DATA_TYPE_STIX_ALL_GROUPS) relationships = load_attack_data(DATA_TYPE_STIX_ALL_RELATIONSHIPS) all_groups_relationships = [] for g in groups: for r in relationships: if g['id'] == r['source_ref'] and r['relationship_type'] == 'uses' and \ (r['target_ref'].startswith('tool--') or r['target_ref'].startswith('malware--')): # much more information on the group can be added. Only the minimal required data is now added. all_groups_relationships.append( { 'group_id': get_attack_id(g), 'name': g['name'], 'aliases': g.get('aliases', None), 'software_ref': r['target_ref'] }) # Now we start resolving this part of the dict created above: 'software_ref/malware-tool_ref'. # and we add some more data to the final result. all_group_use = [] software = load_attack_data(DATA_TYPE_STIX_ALL_SOFTWARE) for gr in all_groups_relationships: for s in software: if s['id'] == gr['software_ref']: all_group_use.append( { 'group_id': gr['group_id'], 'name': gr['name'], 'aliases': gr['aliases'], 'software_id': get_attack_id(s), 'x_mitre_platforms': s.get('x_mitre_platforms', None), 'matrix': s['external_references'][0]['source_name'] }) attack_data = all_group_use elif data_type == DATA_TYPE_STIX_ALL_ENTERPRISE_MITIGATIONS: attack_data = mitre.get_enterprise_mitigations() attack_data = mitre.remove_revoked(attack_data) elif data_type == DATA_TYPE_STIX_ALL_MOBILE_MITIGATIONS: attack_data = mitre.get_mobile_mitigations() attack_data = mitre.remove_revoked(attack_data) _save_attack_data(attack_data, "cache/" + data_type) return attack_data def init_yaml(): """ Initialize ruamel.yaml with the correct settings :return: am uamel.yaml object """ _yaml = YAML() _yaml.Representer.ignore_aliases = lambda *args: True # disable anchors/aliases return _yaml def _get_base_template(name, description, stage, platform, sorting): """ Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator. More information on the layer format can be found here: https://github.com/mitre/attack-navigator/blob/master/layers/ :param name: name :param description: description :param stage: stage (act | prepare) :param platform: platform :param sorting: sorting :return: layer template dictionary """ layer = dict() layer['name'] = name layer['version'] = '2.2' layer['domain'] = 'mitre-enterprise' layer['description'] = description if platform == 'all': platform = list(PLATFORMS.values()) if stage == 'attack': layer['filters'] = {'stages': ['act'], 'platforms': platform} else: layer['filters'] = {'stages': ['prepare'], 'platforms': platform} layer['sorting'] = sorting layer['viewMode'] = 0 layer['hideDisable'] = False layer['techniques'] = [] layer['showTacticRowBackground'] = False layer['tacticRowBackground'] = COLOR_TACTIC_ROW_BACKGRND layer['selectTechniquesAcrossTactics'] = True return layer def get_layer_template_groups(name, max_count, description, stage, platform, overlay_type): """ Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator. More information on the version 2.2 layer format: https://github.com/mitre/attack-navigator/blob/master/layers/LAYERFORMATv2_1.md :param name: name :param max_count: the sum of all count values :param description: description :param stage: stage (act | prepare) :param platform: platform :param overlay_type: group, visibility or detection :return: layer template dictionary """ layer = _get_base_template(name, description, stage, platform, 3) layer['gradient'] = {'colors': [COLOR_GRADIENT_MIN, COLOR_GRADIENT_MAX], 'minValue': 0, 'maxValue': max_count} layer['legendItems'] = [] layer['legendItems'].append({'label': 'Tech. not often used', 'color': COLOR_GRADIENT_MIN}) layer['legendItems'].append({'label': 'Tech. used frequently', 'color': COLOR_GRADIENT_MAX}) if overlay_type == OVERLAY_TYPE_GROUP: layer['legendItems'].append({'label': 'Groups overlay: tech. in group + overlay', 'color': COLOR_GROUP_OVERLAY_MATCH}) layer['legendItems'].append({'label': 'Groups overlay: tech. in overlay', 'color': COLOR_GROUP_OVERLAY_NO_MATCH}) layer['legendItems'].append({'label': 'Src. of tech. is only software', 'color': COLOR_SOFTWARE}) layer['legendItems'].append({'label': 'Src. of tech. is group(s)/overlay + software', 'color': COLOR_GROUP_AND_SOFTWARE}) elif overlay_type == OVERLAY_TYPE_DETECTION: layer['legendItems'].append({'label': 'Tech. in group + detection', 'color': COLOR_GROUP_OVERLAY_MATCH}) layer['legendItems'].append({'label': 'Tech. in detection', 'color': COLOR_GROUP_OVERLAY_ONLY_DETECTION}) elif overlay_type == OVERLAY_TYPE_VISIBILITY: layer['legendItems'].append({'label': 'Tech. in group + visibility', 'color': COLOR_GROUP_OVERLAY_MATCH}) layer['legendItems'].append({'label': 'Tech. in visibility', 'color': COLOR_GROUP_OVERLAY_ONLY_VISIBILITY}) return layer def get_layer_template_detections(name, description, stage, platform): """ Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator. More information on the version 2.2 layer format: https://github.com/mitre/attack-navigator/blob/master/layers/LAYERFORMATv2_1.md :param name: name :param description: description :param stage: stage (act | prepare) :param platform: platform :return: layer template dictionary """ layer = _get_base_template(name, description, stage, platform, 0) layer['legendItems'] = \ [ {'label': 'Detection score 0: Forensics/Context', 'color': COLOR_D_0}, {'label': 'Detection score 1: Basic', 'color': COLOR_D_1}, {'label': 'Detection score 2: Fair', 'color': COLOR_D_2}, {'label': 'Detection score 3: Good', 'color': COLOR_D_3}, {'label': 'Detection score 4: Very good', 'color': COLOR_D_4}, {'label': 'Detection score 5: Excellent', 'color': COLOR_D_5} ] return layer def get_layer_template_data_sources(name, description, stage, platform): """ Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator. More information on the version 2.2 layer format: https://github.com/mitre/attack-navigator/blob/master/layers/LAYERFORMATv2_1.md :param name: name :param description: description :param stage: stage (act | prepare) :param platform: platform :return: layer template dictionary """ layer = _get_base_template(name, description, stage, platform, 0) layer['legendItems'] = \ [ {'label': '1-25% of data sources available', 'color': COLOR_DS_25p}, {'label': '26-50% of data sources available', 'color': COLOR_DS_50p}, {'label': '51-75% of data sources available', 'color': COLOR_DS_75p}, {'label': '76-99% of data sources available', 'color': COLOR_DS_99p}, {'label': '100% of data sources available', 'color': COLOR_DS_100p} ] return layer def get_layer_template_visibility(name, description, stage, platform): """ Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator. More information on the version 2.2 layer format: https://github.com/mitre/attack-navigator/blob/master/layers/LAYERFORMATv2_1.md :param name: name :param description: description :param stage: stage (act | prepare) :param platform: platform :return: layer template dictionary """ layer = _get_base_template(name, description, stage, platform, 0) layer['legendItems'] = \ [ {'label': 'Visibility score 1: Minimal', 'color': COLOR_V_1}, {'label': 'Visibility score 2: Medium', 'color': COLOR_V_2}, {'label': 'Visibility score 3: Good', 'color': COLOR_V_3}, {'label': 'Visibility score 4: Excellent', 'color': COLOR_V_4} ] return layer def get_layer_template_layered(name, description, stage, platform): """ Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator. More information on the version 2.2 layer format: https://github.com/mitre/attack-navigator/blob/master/layers/LAYERFORMATv2_1.md :param name: name :param description: description :param stage: stage (act | prepare) :param platform: platform :return: layer template dictionary """ layer = _get_base_template(name, description, stage, platform, 0) layer['legendItems'] = \ [ {'label': 'Visibility', 'color': COLOR_OVERLAY_VISIBILITY}, {'label': 'Detection', 'color': COLOR_OVERLAY_DETECTION}, {'label': 'Visibility and detection', 'color': COLOR_OVERLAY_BOTH} ] return layer def write_file(filename_prefix, filename, content): """ Writes content to a file and ensures if the file already exists it won't be overwritten by appending a number as suffix. :param filename_prefix: prefix part of the filename :param filename: filename :param content: the content of the file that needs to be written to the file :return: """ output_filename = 'output/%s_%s' % (filename_prefix, normalize_name_to_filename(filename)) output_filename = get_non_existing_filename(output_filename, 'json') with open(output_filename, 'w') as f: f.write(content) print('File written: ' + output_filename) def get_non_existing_filename(filename, extension): """ Generates a filename that doesn't exist based on the given filename by appending a number as suffix. :param filename: :param extension: :return: """ if os.path.exists('%s.%s' % (filename, extension)): suffix = 1 while os.path.exists('%s_%s.%s' % (filename, suffix, extension)): suffix += 1 output_filename = '%s_%s.%s' % (filename, suffix, extension) else: output_filename = '%s.%s' % (filename, extension) return output_filename def backup_file(filename): """ Create a backup of the provided file :param filename: existing YAML filename :return: """ suffix = 1 backup_filename = filename.replace('.yaml', '_backup_' + str(suffix) + '.yaml') while os.path.exists(backup_filename): backup_filename = backup_filename.replace('_backup_' + str(suffix) + '.yaml', '_backup_' + str(suffix + 1) + '.yaml') suffix += 1 shutil.copy2(filename, backup_filename) print('Written backup file: ' + backup_filename + '\n') def get_attack_id(stix_obj): """ Get the Technique, Group or Software ID from the STIX object :param stix_obj: STIX object (Technique, Software or Group) :return: ATT&CK ID """ for ext_ref in stix_obj['external_references']: if ext_ref['source_name'] in ['mitre-attack', 'mitre-mobile-attack', 'mitre-pre-attack']: return ext_ref['external_id'] def get_tactics(technique): """ Get all tactics from a given technique :param technique: technique STIX object :return: list with tactics """ tactics = [] if 'kill_chain_phases' in technique: for phase in technique['kill_chain_phases']: tactics.append(phase['phase_name']) return tactics def get_technique(techniques, technique_id): """ Generic function to lookup a specific technique_id in a list of dictionaries with techniques. :param techniques: list with all techniques :param technique_id: technique_id to look for :return: the technique you're searching for. None if not found. """ for tech in techniques: if technique_id == get_attack_id(tech): return tech return None def ask_yes_no(question): """ Ask the user to a question that needs to be answered with yes or no. :param question: The question to be asked :return: boolean value indicating a yes (True) or no (False0 """ yes_no = '' while not re.match('^(y|yes|n|no)$', yes_no, re.IGNORECASE): yes_no = input(question + '\n >> y(yes) / n(no): ') print('') if re.match('^(y|yes)$', yes_no, re.IGNORECASE): return True else: return False def ask_multiple_choice(question, list_answers): """ Ask a multiple choice question. :param question: the question to ask :param list_answers: a list of answer :return: the answer """ answer = '' answers = '' x = 1 for a in list_answers: a = a.replace('\n', '\n ') answers += ' ' + str(x) + ') ' + a + '\n' x += 1 # noinspection Annotator while not re.match('(^[1-' + str(len(list_answers)) + ']{1}$)', answer): print(question) print(answers) answer = input(' >> ') print('') return list_answers[int(answer) - 1] def fix_date_and_remove_null(yaml_file, date, input_type='ruamel'): """ Remove the single quotes around the date key-value pair in the provided yaml_file and remove any 'null' values :param yaml_file: ruamel.yaml instance or location of YAML file :param date: string date value (e.g. 2019-01-01) :param input_type: input type can be a ruamel.yaml instance or list :return: YAML file lines in a list """ _yaml = init_yaml() if input_type == 'ruamel': # ruamel does not support output to a variable. Therefore we make use of StringIO. file = StringIO() _yaml.dump(yaml_file, file) file.seek(0) new_lines = file.readlines() elif input_type == 'list': new_lines = yaml_file elif input_type == 'file': new_lines = yaml_file.readlines() fixed_lines = [l.replace('\'' + date + '\'', date).replace('null', '') if REGEX_YAML_DATE.match(l) else l.replace('null', '') for l in new_lines] return fixed_lines def get_latest_score_obj(yaml_object): """ Get the the score object in the score_logbook by date :param yaml_object: a detection or visibility YAML object :return: the latest score object """ if not isinstance(yaml_object['score_logbook'], list): yaml_object['score_logbook'] = [yaml_object['score_logbook']] if len(yaml_object['score_logbook']) > 0 and 'date' in yaml_object['score_logbook'][0]: # for some weird reason 'sorted()' provides inconsistent results newest_score_obj = None newest_date = None for score_obj in yaml_object['score_logbook']: if not newest_score_obj or score_obj['date'] > newest_date: newest_date = score_obj['date'] newest_score_obj = score_obj return newest_score_obj else: return None def get_latest_comment(yaml_object): """ Return the latest comment present in the score_logbook :param yaml_object: a detection or visibility YAML object :return: comment """ score_obj = get_latest_score_obj(yaml_object) if score_obj: if score_obj['comment'] == '' or not score_obj['comment']: return '' else: return score_obj['comment'] else: return '' def get_latest_date(yaml_object): """ Return the latest date present in the score_logbook :param yaml_object: a detection or visibility YAML object :return: date as a datetime object or None """ score_obj = get_latest_score_obj(yaml_object) if score_obj: return score_obj['date'] else: return None def get_latest_auto_generated(yaml_object): """ Return the latest auto_generated value present in the score_logbook :param yaml_object: a detection or visibility YAML object :return: True or False """ score_obj = get_latest_score_obj(yaml_object) if score_obj: if 'auto_generated' in score_obj: return score_obj['auto_generated'] else: return False else: return False def get_latest_score(yaml_object): """ Return the latest score present in the score_logbook :param yaml_object: a detection or visibility YAML object :return: score as an integer or None """ score_obj = get_latest_score_obj(yaml_object) if score_obj: return score_obj['score'] else: return None def normalize_name_to_filename(name): """ Normalize the input filename to a lowercase filename and replace spaces with dashes. :param name: input filename :return: normalized filename """ return name.lower().replace(' ', '-') def platform_to_name(platform, separator='-'): """ Makes a filename friendly version of the platform parameter which can be a string or list. :param platform: the platform variable (a string or a list) :param separator: a string value that separates multiple platforms. Default is '-' :return: a filename friendly representation of the value of platform """ if platform == 'all': return 'all' elif isinstance(platform, list): return separator.join(platform) else: return '' def get_applicable_data_sources_platform(platforms): """ Get the applicable ATT&CK data sources for the provided platform(s) :param platforms: the ATT&CK platform(s) :return: a list of applicable ATT&CK data sources """ applicable_data_sources = set() if platforms == 'all' or 'all' in platforms: for v in DATA_SOURCES.values(): applicable_data_sources.update(v) else: for p in platforms: applicable_data_sources.update(DATA_SOURCES[p]) return list(applicable_data_sources) def get_applicable_data_sources_technique(technique_data_sources, platform_applicable_data_sources): """ Get the applicable ATT&CK data sources for the provided technique's data sources (for which the source is ATT&CK CTI) :param technique_data_sources: the ATT&CK technique's data sources :param platform_applicable_data_sources: a list of applicable ATT&CK data sources based on 'DATA_SOURCES' :return: a list of applicable data sources """ applicable_data_sources = set() for ds in technique_data_sources: if ds in platform_applicable_data_sources: applicable_data_sources.add(ds) return list(applicable_data_sources) def map_techniques_to_data_sources(techniques, my_data_sources): """ This function maps the MITRE ATT&CK techniques to your data sources. :param techniques: list with all MITRE ATT&CK techniques :param my_data_sources: your configured data sources :return: a dictionary containing techniques that can be used in the layer output file. """ my_techniques = {} for i_ds in my_data_sources.keys(): # Loop through all techniques, to find techniques using that data source: for t in techniques: # If your data source is in the list of data sources for this technique AND if the # technique isn't added yet (by an other data source): tech_id = get_attack_id(t) if 'x_mitre_data_sources' in t: if i_ds in t['x_mitre_data_sources'] and tech_id not in my_techniques.keys(): my_techniques[tech_id] = {} my_techniques[tech_id]['my_data_sources'] = [i_ds, ] my_techniques[tech_id]['data_sources'] = t['x_mitre_data_sources'] # create a list of tactics my_techniques[tech_id]['tactics'] = list(map(lambda k: k['phase_name'], t.get('kill_chain_phases', None))) my_techniques[tech_id]['products'] = set(my_data_sources[i_ds]['products']) elif t['x_mitre_data_sources'] and i_ds in t['x_mitre_data_sources'] and tech_id in my_techniques.keys(): my_techniques[tech_id]['my_data_sources'].append(i_ds) my_techniques[tech_id]['products'].update(my_data_sources[i_ds]['products']) return my_techniques def get_all_mitre_data_sources(): """ Gets all the data sources from the techniques and make a set. :return: a sorted list with all data sources """ techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) data_sources = set() for t in techniques: if 'x_mitre_data_sources' in t.keys(): for ds in t['x_mitre_data_sources']: data_sources.add(ds) return data_sources def calculate_score(list_detections, zero_value=0): """ Calculates the average score in the given list which may contain multiple detection dictionaries :param list_detections: list :param zero_value: the value when no scores are there, default 0 :return: average score """ avg_score = 0 number = 0 for v in list_detections: score = get_latest_score(v) if score and score >= 0: avg_score += score number += 1 avg_score = int(round(avg_score / number, 0) if number > 0 else zero_value) return avg_score def add_entry_to_list_in_dictionary(dictionary, technique_id, key, entry): """ Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the list. If the dict[technique_id] doesn't exist yet, it will be created. :param dictionary: the dictionary :param technique_id: the id of the technique in the main dict :param key: the key where the list in the dictionary resides :param entry: the entry to add to the list :return: """ if technique_id not in dictionary.keys(): dictionary[technique_id] = {} if key not in dictionary[technique_id].keys(): dictionary[technique_id][key] = [] dictionary[technique_id][key].append(entry) def set_yaml_dv_comments(yaml_object): """ Set all comments in the detection or visibility YAML object when the 'comment' key-value pair is missing or is None. This gives the user the flexibility to have YAML files with missing 'comment' key-value pairs. :param yaml_object: detection or visibility object :return: detection or visibility object for which empty comments are filled with an empty string """ yaml_object['comment'] = yaml_object.get('comment', '') if yaml_object['comment'] is None: yaml_object['comment'] = '' if 'score_logbook' in yaml_object: for score_obj in yaml_object['score_logbook']: score_obj['comment'] = score_obj.get('comment', '') if score_obj['comment'] is None: score_obj['comment'] = '' return yaml_object def load_techniques(file): """ Loads the techniques (including detection and visibility properties). :param file: the file location of the YAML file or a dict containing the techniques administration :return: dictionary with techniques (incl. properties), name and platform """ my_techniques = {} if isinstance(file, dict): # file is a dict and created due to the use of an EQL query by the user yaml_content = file else: # file is a file location on disk _yaml = init_yaml() with open(file, 'r') as yaml_file: yaml_content = _yaml.load(yaml_file) for d in yaml_content['techniques']: if 'detection' in d: # Add detection items: if isinstance(d['detection'], dict): # There is just one detection entry d['detection'] = set_yaml_dv_comments(d['detection']) add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection']) elif isinstance(d['detection'], list): # There are multiple detection entries for de in d['detection']: de = set_yaml_dv_comments(de) add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de) if 'visibility' in d: # Add visibility items if isinstance(d['visibility'], dict): # There is just one visibility entry d['visibility'] = set_yaml_dv_comments(d['visibility']) add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility']) elif isinstance(d['visibility'], list): # There are multiple visibility entries for de in d['visibility']: de = set_yaml_dv_comments(de) add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de) name = yaml_content['name'] platform = get_platform_from_yaml(yaml_content) return my_techniques, name, platform def _check_file_type(filename, file_type=None): """ Check if the provided YAML file has the key 'file_type' and possible if that key matches a specific value. :param filename: path to a YAML file :param file_type: value to check against the 'file_type' key in the YAML file :return: the file_type if present, else None is returned """ if not os.path.exists(filename): print('[!] File: \'' + filename + '\' does not exist') return None _yaml = init_yaml() with open(filename, 'r') as yaml_file: try: yaml_content = _yaml.load(yaml_file) except Exception as e: print('[!] File: \'' + filename + '\' is not a valid YAML file.') print(' ' + str(e)) # print more detailed error information to help the user in fixing the error. return None # This check is performed because a text file will also be considered to be valid YAML. But, we are using # key-value pairs within the YAML files. if not hasattr(yaml_content, 'keys'): print('[!] File: \'' + filename + '\' is not a valid YAML file.') return None if 'file_type' not in yaml_content.keys(): print('[!] File: \'' + filename + '\' does not contain a file_type key.') return None elif file_type: if file_type != yaml_content['file_type']: print('[!] File: \'' + filename + '\' is not a file type of: \'' + file_type + '\'') return None else: return yaml_content else: return yaml_content def check_file(filename, file_type=None, health_is_called=False): """ Calls three functions to perform the following checks: is the file a valid YAML file, needs the file to be upgrade, does the file contain errors. :param filename: path to a YAML file :param file_type: value to check against the 'file_type' key in the YAML file :param health_is_called: boolean that specifies if detailed errors in the file will be printed by the function 'check_yaml_file_health' :return: the file_type if present, else None is returned """ yaml_content = _check_file_type(filename, file_type) # if the file is a valid YAML, continue. Else, return None if yaml_content: upgrade_yaml_file(filename, file_type, yaml_content['version'], load_attack_data(DATA_TYPE_STIX_ALL_TECH)) check_yaml_file_health(filename, file_type, health_is_called) return yaml_content['file_type'] return yaml_content # value is None def make_layer_metadata_compliant(metadata): """ Make sure the metadata values in the Navigator layer file are compliant with the expected data structure from the latest version on: https://github.com/mitre-attack/attack-navigator/tree/master/layers :param metadata: list of metadata dictionaries :return: compliant list of metadata dictionaries """ for md_item in metadata: if not md_item['value'] or md_item['value'] == '': md_item['value'] = '-' return metadata def get_updates(update_type, sort='modified'): """ Print a list of updates for a techniques, groups or software. Sort by modified or creation date. :param update_type: the type of update: techniques, groups or software :param sort: sort the list by modified or creation date :return: """ if update_type[:-1] == 'technique': techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) sorted_techniques = sorted(techniques, key=lambda k: k[sort]) for t in sorted_techniques: print(get_attack_id(t) + ' ' + t['name']) print(' ' * 6 + 'created: ' + t['created'].strftime('%Y-%m-%d')) print(' ' * 6 + 'modified: ' + t['modified'].strftime('%Y-%m-%d')) print(' ' * 6 + 'matrix: ' + t['external_references'][0]['source_name'][6:]) tactics = get_tactics(t) if tactics: print(' ' * 6 + 'tactic: ' + ', '.join(tactics)) else: print(' ' * 6 + 'tactic: None') print('') elif update_type[:-1] == 'group': groups = load_attack_data(DATA_TYPE_STIX_ALL_GROUPS) sorted_groups = sorted(groups, key=lambda k: k[sort]) for g in sorted_groups: print(get_attack_id(g) + ' ' + g['name']) print(' ' * 6 + 'created: ' + g['created'].strftime('%Y-%m-%d')) print(' ' * 6 + 'modified: ' + g['modified'].strftime('%Y-%m-%d')) print('') elif update_type == 'software': software = load_attack_data(DATA_TYPE_STIX_ALL_SOFTWARE) sorted_software = sorted(software, key=lambda k: k[sort]) for s in sorted_software: print(get_attack_id(s) + ' ' + s['name']) print(' ' * 6 + 'created: ' + s['created'].strftime('%Y-%m-%d')) print(' ' * 6 + 'modified: ' + s['modified'].strftime('%Y-%m-%d')) print(' ' * 6 + 'matrix: ' + s['external_references'][0]['source_name'][6:]) print(' ' * 6 + 'type: ' + s['type']) if 'x_mitre_platforms' in s: print(' ' * 6 + 'platform: ' + ', '.join(s['x_mitre_platforms'])) else: print(' ' * 6 + 'platform: None') print('') def get_statistics_mitigations(matrix): """ Print out statistics related to mitigations and how many techniques they cover :return: """ if matrix == 'enterprise': mitigations = load_attack_data(DATA_TYPE_STIX_ALL_ENTERPRISE_MITIGATIONS) elif matrix == 'mobile': mitigations = load_attack_data(DATA_TYPE_STIX_ALL_MOBILE_MITIGATIONS) mitigations_dict = dict() for m in mitigations: if m['external_references'][0]['external_id'].startswith('M'): mitigations_dict[m['id']] = {'mID': m['external_references'][0]['external_id'], 'name': m['name']} relationships = load_attack_data(DATA_TYPE_STIX_ALL_RELATIONSHIPS) relationships_mitigates = [r for r in relationships if r['relationship_type'] == 'mitigates' if r['source_ref'].startswith('course-of-action') if r['target_ref'].startswith('attack-pattern') if r['source_ref'] in mitigations_dict] # {id: {name: ..., count: ..., name: ...} } count_dict = dict() for r in relationships_mitigates: src_ref = r['source_ref'] m = mitigations_dict[src_ref] if m['mID'] not in count_dict: count_dict[m['mID']] = dict() count_dict[m['mID']]['count'] = 1 count_dict[m['mID']]['name'] = m['name'] else: count_dict[m['mID']]['count'] += 1 count_dict_sorted = dict(sorted(count_dict.items(), key=lambda kv: kv[1]['count'], reverse=True)) str_format = '{:<6s} {:<14s} {:s}' print(str_format.format('Count', 'Mitigation ID', 'Name')) print('-' * 60) for k, v in count_dict_sorted.items(): print(str_format.format(str(v['count']), k, v['name'])) def get_statistics_data_sources(): """ Print out statistics related to data sources and how many techniques they cover. :return: """ techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) # {data_source: {techniques: [T0001, ...}, count: ...} data_sources_dict = {} for tech in techniques: tech_id = get_attack_id(tech) # Not every technique has a data source listed data_sources = tech.get('x_mitre_data_sources', None) if data_sources: for ds in data_sources: if ds not in data_sources_dict: data_sources_dict[ds] = {'techniques': [tech_id], 'count': 1} else: data_sources_dict[ds]['techniques'].append(tech_id) data_sources_dict[ds]['count'] += 1 # sort the dict on the value of 'count' data_sources_dict_sorted = dict(sorted(data_sources_dict.items(), key=lambda kv: kv[1]['count'], reverse=True)) str_format = '{:<6s} {:s}' print(str_format.format('Count', 'Data Source')) print('-' * 50) for k, v in data_sources_dict_sorted.items(): print(str_format.format(str(v['count']), k)) def get_platform_from_yaml(yaml_content): """ Read the platform field from the YAML file supporting both string and list values. :param yaml_content: the content of the YAML file containing the platform field :return: the platform value """ platform = yaml_content.get('platform', None) if platform is None: return [] if isinstance(platform, str): platform = [platform] platform = [p.lower() for p in platform if p is not None] if platform == ['all']: platform = 'all' else: valid_platform_list = [] for p in platform: if p in PLATFORMS.keys(): valid_platform_list.append(PLATFORMS[p]) platform = valid_platform_list return platform