diff --git a/generic.py b/generic.py index 07bd738..4f3325e 100644 --- a/generic.py +++ b/generic.py @@ -1,10 +1,12 @@ import os +import shutil import pickle +import sys +from ruamel.yaml import YAML +from difflib import SequenceMatcher from datetime import datetime as dt -import yaml from upgrade import upgrade_yaml_file from constants import * -from difflib import SequenceMatcher # Due to performance reasons the import of attackcti is within the function that makes use of this library. @@ -21,23 +23,7 @@ def try_get_key(dictionary, key): return None -def try_except(self, stix_objects, object_type, nested_value=None): - if object_type in stix_objects: - specific_stix_object = stix_objects[object_type] - if isinstance(specific_stix_object, list): - if nested_value is None: - lists = self.handle_list(stix_objects, object_type) - return lists - else: - nested_result = self.handle_nested(stix_objects, object_type, nested_value) - return nested_result - else: - return stix_objects[object_type] - else: - return None - - -def save_attack_data(data, path): +def _save_attack_data(data, path): """ Save ATT&CK data to disk for the purpose of caching. Data can be STIX objects our a custom schema. :param data: the MITRE ATT&CK data to save @@ -70,9 +56,9 @@ def load_attack_data(data_type): attack_data = None if data_type == DATA_TYPE_STIX_ALL_RELATIONSHIPS: - attack_data = mitre.get_all_relationships() + attack_data = mitre.get_relationships() if data_type == DATA_TYPE_STIX_ALL_TECH_ENTERPRISE: - attack_data = mitre.get_all_enterprise_techniques() + attack_data = mitre.get_enterprise_techniques() if data_type == DATA_TYPE_CUSTOM_TECH_BY_GROUP: # First we need to know which technique references (STIX Object type 'attack-pattern') we have for all # groups. This results in a dict: {group_id: Gxxxx, technique_ref/attack-pattern_ref: ...} @@ -112,11 +98,11 @@ def load_attack_data(data_type): attack_data = all_group_use elif data_type == DATA_TYPE_STIX_ALL_TECH: - attack_data = mitre.get_all_techniques() + attack_data = mitre.get_techniques() elif data_type == DATA_TYPE_STIX_ALL_GROUPS: - attack_data = mitre.get_all_groups() + attack_data = mitre.get_groups() elif data_type == DATA_TYPE_STIX_ALL_SOFTWARE: - attack_data = mitre.get_all_software() + attack_data = mitre.get_software() elif data_type == DATA_TYPE_CUSTOM_TECH_BY_SOFTWARE: # First we need to know which technique references (STIX Object type 'attack-pattern') we have for all software # This results in a dict: {software_id: Sxxxx, technique_ref/attack-pattern_ref: ...} @@ -180,11 +166,17 @@ def load_attack_data(data_type): }) attack_data = all_group_use - save_attack_data(attack_data, "cache/" + data_type) + _save_attack_data(attack_data, "cache/" + data_type) return attack_data +def init_yaml(): + _yaml = YAML() + _yaml.Representer.ignore_aliases = lambda *args: True # disable anchors/aliases + return _yaml + + def _get_base_template(name, description, stage, platform, sorting): """ Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator. @@ -197,7 +189,7 @@ def _get_base_template(name, description, stage, platform, sorting): :param sorting: sorting :return: layer template dictionary """ - layer = {} + layer = dict() layer['name'] = name layer['version'] = '2.1' layer['domain'] = 'mitre-enterprise' @@ -348,6 +340,22 @@ def get_layer_template_layered(name, description, stage, platform): return layer +def backup_file(filename): + """ + Create a backup of the provided file + :param filename: existing YAML filename + :return: + """ + suffix = 1 + backup_filename = filename.replace('.yaml', '_backup_' + str(suffix) + '.yaml') + while os.path.exists(backup_filename): + backup_filename = backup_filename.replace('_backup_' + str(suffix) + '.yaml', '_backup_' + str(suffix+1) + '.yaml') + suffix += 1 + + shutil.copy2(filename, backup_filename) + print('Written backup file: ' + backup_filename + '\n') + + def get_attack_id(stix_obj): """ Get the Technique, Group or Software ID from the STIX object @@ -386,6 +394,166 @@ def get_technique(techniques, technique_id): return None +def ask_yes_no(question): + """ + Ask the user to a question that needs to be answered with yes or no. + :param question: The question to be asked + :return: boolean value indicating a yes (True) or no (False0 + """ + yes_no = '' + while not re.match('^(y|yes|n|no)$', yes_no, re.IGNORECASE): + yes_no = input(question + '\n >> y(yes) / n(no): ') + print('') + + if re.match('^(y|yes)$', yes_no, re.IGNORECASE): + return True + else: + return False + + +def ask_multiple_choice(question, list_answers): + """ + Ask a multiple choice question. + :param question: the question to ask + :param list_answers: a list of answer + :return: the answer + """ + answer = '' + answers = '' + x = 1 + for a in list_answers: + a = a.replace('\n', '\n ') + answers += ' ' + str(x) + ') ' + a + '\n' + x += 1 + + # noinspection Annotator + while not re.match('(^[1-' + str(len(list_answers)) + ']{1}$)', answer): + print(question) + print(answers) + answer = input(' >> ') + print('') + + return list_answers[int(answer)-1] + + +def fix_date(yaml_file, date, input_reamel=True, return_reamel=False): + """ + Remove the single quotes around the date key-value pair in the provided yaml_file + :param yaml_file: ruamel.yaml instance or location of YAML file + :param date: string date value (e.g. 2019-01-01) + :param input_reamel: input type can be a reamel instance or list + :param return_reamel: return list of YAML file lines or reamel instance + :return: YAML file lines in a list + """ + _yaml = init_yaml() + if input_reamel: + file = sys.path[0] + '/.tmp_tech_file' + + with open(file, 'w') as fd: + _yaml.dump(yaml_file, fd) + else: + file = yaml_file + + with open(file, 'r') as fd: + new_lines = fd.readlines() + x = 0 + for line in new_lines: + if REGEX_YAML_DATE.match(line): + new_lines[x] = line.replace('\'' + date + '\'', date) + x += 1 + + # remove the temporary file + if input_reamel: + os.remove(file) + + if return_reamel: + return _yaml.load(''.join(new_lines)) + else: + return new_lines + + +def _get_latest_score_obj(yaml_object): + """ + Get the the score object in the score_logbook by date + :param yaml_object: a detection or visibility YAML object + :return: the latest score object + """ + if not isinstance(yaml_object['score_logbook'], list): + yaml_object['score_logbook'] = [yaml_object['score_logbook']] + + if len(yaml_object['score_logbook']) > 0 and 'date' in yaml_object['score_logbook'][0]: + # for some weird reason 'sorted()' provides inconsistent results + newest_score_obj = None + newest_date = None + for score_obj in yaml_object['score_logbook']: + if not newest_score_obj or score_obj['date'] > newest_date: + newest_date = score_obj['date'] + newest_score_obj = score_obj + + return newest_score_obj + else: + return None + + +def get_latest_comment(yaml_object, empty=' '): + """ + Return the latest comment present in the score_logbook + :param yaml_object: a detection or visibility YAML object + :param empty: value for an empty comment + :return: comment + """ + score_obj = _get_latest_score_obj(yaml_object) + if score_obj: + if score_obj['comment'] == '' or not score_obj['comment']: + return empty + else: + return score_obj['comment'] + else: + return empty + + +def get_latest_date(yaml_object): + """ + Return the latest date present in the score_logbook + :param yaml_object: a detection or visibility YAML object + :return: date as a datetime object or None + """ + score_obj = _get_latest_score_obj(yaml_object) + if score_obj: + return score_obj['date'] + else: + return None + + +def get_latest_auto_generated(yaml_object): + """ + Return the latest auto_generated value present in the score_logbook + :param yaml_object: a detection or visibility YAML object + :return: True or False + """ + score_obj = _get_latest_score_obj(yaml_object) + if score_obj: + if 'auto_generated' in score_obj: + return score_obj['auto_generated'] + else: + return False + else: + return False + + +def get_latest_score(yaml_object): + """ + Return the latest score present in the score_logbook + :param yaml_object: a detection or visibility YAML object + :return: score as an integer or None + """ + score_obj = _get_latest_score_obj(yaml_object) + if score_obj: + return score_obj['score'] + else: + return None + + def normalize_name_to_filename(name): """ Normalize the input filename to a lowercase filename and replace spaces with dashes. @@ -439,38 +607,39 @@ def get_all_mitre_data_sources(): return sorted(data_sources) -def calculate_score(l, zero_value=0): +def calculate_score(list_detections, zero_value=0): """ - Calculates the average score in the given list which contains dictionaries with 'score' field. - :param l: list + Calculates the average score in the given list which may contain multiple detection dictionaries + :param list_detections: list :param zero_value: the value when no scores are there, default 0 :return: average score """ - s = 0 + avg_score = 0 number = 0 - for v in l: - if v['score'] >= 0: - s += v['score'] + for v in list_detections: + score = get_latest_score(v) + if score >= 0: + avg_score += score number += 1 - s = int(round(s / number, 0) if number > 0 else zero_value) - return s + avg_score = int(round(avg_score / number, 0) if number > 0 else zero_value) + return avg_score -def _add_entry_to_list_in_dictionary(dict, technique_id, key, entry): +def add_entry_to_list_in_dictionary(dictionary, technique_id, key, entry): """ Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the list. If the dict[technique_id] doesn't exist yet, it will be created. - :param dict: the dictionary + :param dictionary: the dictionary :param technique_id: the id of the technique in the main dict :param key: the key where the list in the dictionary resides :param entry: the entry to add to the list :return: """ - if technique_id not in dict.keys(): - dict[technique_id] = {} - if not key in dict[technique_id].keys(): - dict[technique_id][key] = [] - dict[technique_id][key].append(entry) + if technique_id not in dictionary.keys(): + dictionary[technique_id] = {} + if key not in dictionary[technique_id].keys(): + dictionary[technique_id][key] = [] + dictionary[technique_id][key].append(entry) def load_techniques(filename, detection_or_visibility='all', filter_applicable_to='all'): @@ -484,26 +653,27 @@ def load_techniques(filename, detection_or_visibility='all', filter_applicable_t """ my_techniques = {} + _yaml = init_yaml() with open(filename, 'r') as yaml_file: - yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader) + yaml_content = _yaml.load(yaml_file) for d in yaml_content['techniques']: # Add detection items: - if type(d['detection']) == dict: # There is just one detection entry + if isinstance(d['detection'], dict): # There is just one detection entry if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']: - _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection']) - elif type(d['detection']) == list: # There are multiple detection entries + add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection']) + elif isinstance(d['detection'], list): # There are multiple detection entries for de in d['detection']: if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']: - _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de) + add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de) # Add visibility items - if type(d['visibility']) == dict: # There is just one visibility entry + if isinstance(d['visibility'], dict): # There is just one visibility entry if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']: - _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility']) - elif type(d['visibility']) == list: # There are multiple visibility entries + add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility']) + elif isinstance(d['visibility'], list): # There are multiple visibility entries for de in d['visibility']: if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']: - _add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de) + add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de) name = yaml_content['name'] platform = yaml_content['platform'] @@ -516,6 +686,111 @@ def _print_error_msg(msg, print_error): return True +def _check_health_score_object(yaml_object, object_type, tech_id, health_is_called): + """ + Check the health of a score_logbook inside a visibility or detection YAML object + :param yaml_object: YAML file lines + :param object_type: 'detection' or 'visibility' + :param tech_id: ATT&CK technique ID + :param health_is_called: boolean that specifies if detailed errors in the file will be printed and then quit() + :return: True if the YAML file is unhealthy, otherwise False + """ + has_error = False + min_score = None + max_score = None + + if object_type == 'detection': + min_score = -1 + max_score = 5 + elif object_type == 'visibility': + min_score = 0 + max_score = 4 + + if not isinstance(yaml_object['score_logbook'], list): + yaml_object['score_logbook'] = [yaml_object['score_logbook']] + + try: + for score_obj in yaml_object['score_logbook']: + for key in ['date', 'score', 'comment']: + if key not in score_obj: + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' is MISSING a key-value pair in a ' + object_type + ' score object in the \'score_logbook\': ' + key, health_is_called) + + if score_obj['score'] is None: + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object in the \'score_logbook\': score', health_is_called) + + elif not isinstance(score_obj['score'], int): + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID score format in a ' + object_type + ' score object in the \'score_logbook\': score should be an integer', health_is_called) + + if 'auto_generated' in score_obj: + if not isinstance(score_obj['auto_generated'], bool): + has_error = _print_error_msg( + '[!] Technique ID: ' + tech_id + ' has an INVALID auto_generated value in a ' + object_type + ' score object in the \'score_logbook\': auto_generated (if present) should be set to \'true\' or \'false\'', health_is_called) + + if isinstance(score_obj['score'], int): + if score_obj['date'] is None and score_obj['score'] > -1: + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object in the \'score_logbook\': date', health_is_called) + + # noinspection PyChainedComparisons + if not (score_obj['score'] >= min_score and score_obj['score'] <= max_score): + has_error = _print_error_msg( + '[!] Technique ID: ' + tech_id + ' has an INVALID ' + object_type + ' score in a score object in the \'score_logbook\': ' + str(score_obj['score']) + ' (should be between ' + str(min_score) + ' and ' + str(max_score) + ')', health_is_called) + + if score_obj['score'] > min_score: + try: + # noinspection PyStatementEffect + score_obj['date'].year + # noinspection PyStatementEffect + score_obj['date'].month + # noinspection PyStatementEffect + score_obj['date'].day + except AttributeError: + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID data format in a ' + object_type + ' score object in the \'score_logbook\': date (should be YYYY-MM-DD)', health_is_called) + except KeyError: + pass + + return has_error + + +def _check_health_yaml_object(yaml_object, object_type, tech_id, health_is_called): + """ + Check the health of a visibility or detection YAML object + :param yaml_object: YAML file lines + :param object_type: 'detection' or 'visibility' + :param tech_id: ATT&CK technique ID + :param health_is_called: boolean that specifies if detailed errors in the file will be printed and then quit() + :return: True if the YAML file is unhealthy, otherwise False + """ + has_error = False + + keys = ['applicable_to'] + + if object_type == 'detection': + keys.append('location') + + try: + for key in keys: + if not isinstance(yaml_object[key], list): + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has for the key-value pair \'' + key + '\' in ' + object_type + ' a string value assigned (should be a list)', health_is_called) + else: + try: + if yaml_object[key][0] is None: + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in ' + object_type + ': ' + key, health_is_called) + except TypeError: + has_error = _print_error_msg( + '[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in ' + object_type + ': ' + key, health_is_called) + except KeyError: + pass + + return has_error + + +def _update_health_sate(current, update): + if current or update: + return True + else: + return update + + def check_yaml_file_health(filename, file_type, health_is_called): """ Check on error in the provided YAML file. @@ -528,8 +803,9 @@ def check_yaml_file_health(filename, file_type, health_is_called): has_error = False if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION: # check for duplicate tech IDs + _yaml = init_yaml() with open(filename, 'r') as yaml_file: - yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader) + yaml_content = _yaml.load(yaml_file) tech_ids = list(map(lambda x: x['technique_id'], yaml_content['techniques'])) tech_dup = [] @@ -539,17 +815,20 @@ def check_yaml_file_health(filename, file_type, health_is_called): else: has_error = _print_error_msg('[!] Duplicate technique ID: ' + tech, health_is_called) + # check if the technique has a valid format + if not REGEX_YAML_TECHNIQUE_ID_FORMAT.match(tech): + has_error = _print_error_msg('[!] Invalid technique ID: ' + tech, health_is_called) + # checks on: - # - empty key-value pairs: 'date_implemented', 'date_registered', 'location', 'applicable_to', 'score' - # - invalid date format for: 'date_implemented', 'date_registered' + # - empty key-value pairs: 'applicable_to', 'comment', 'location', 'score_logbook' , 'date', 'score' + # - invalid date format for: 'date' # - detection or visibility score out-of-range - # - missing key-value pairs: 'applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment' + # - missing key-value pairs: 'applicable_to', 'comment', 'location', 'score_logbook', 'date', 'score' # - check on 'applicable_to' values which are very similar all_applicable_to = set() techniques = load_techniques(filename) for tech, v in techniques[0].items(): - for key in ['detection', 'visibility']: if key not in v: has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING ' + key, health_is_called) @@ -558,57 +837,24 @@ def check_yaml_file_health(filename, file_type, health_is_called): all_applicable_to.update([a for v in v[key] for a in v['applicable_to']]) for detection in v['detection']: - for key in ['applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment']: + for key in ['applicable_to', 'location', 'comment', 'score_logbook']: if key not in detection: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in detection: ' + key, health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING a key-value pair in detection: ' + key, health_is_called) - try: - if detection['score'] is None: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: score', health_is_called) - - elif not (detection['score'] >= -1 and detection['score'] <= 5): - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an INVALID detection score: ' - + str(detection['score']) + ' (should be between -1 and 5)', health_is_called) - - elif detection['score'] > -1: - for key in ['date_implemented', 'date_registered']: - if not detection[key]: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key, health_is_called) - break - try: - detection[key].year - detection[key].month - detection[key].day - except AttributeError: - has_error = _print_error_msg('[!] Technique ID: ' + tech + - ' has an INVALID data format for the key-value pair in detection: ' + - key + ' (should be YYYY-MM-DD)', health_is_called) - for key in ['location', 'applicable_to']: - if not isinstance(detection[key], list): - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has for the key-value pair \'' - + key + '\' a string value assigned (should be a list)', health_is_called) - else: - try: - if detection[key][0] is None: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key, health_is_called) - except TypeError: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key, health_is_called) - except KeyError: - pass + health = _check_health_yaml_object(detection, 'detection', tech, health_is_called) + has_error = _update_health_sate(has_error, health) + health = _check_health_score_object(detection, 'detection', tech, health_is_called) + has_error = _update_health_sate(has_error, health) for visibility in v['visibility']: - for key in ['applicable_to', 'score', 'comment']: + for key in ['applicable_to', 'comment', 'score_logbook']: if key not in visibility: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in visibility: ' + key, health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING a key-value pair in visibility: ' + key, health_is_called) - try: - if visibility['score'] is None: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in visibility: score', health_is_called) - elif not (visibility['score'] >= 0 and visibility['score'] <= 4): - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an INVALID visibility score: ' - + str(detection['score']) + ' (should be between 0 and 4)', health_is_called) - except KeyError: - pass + health = _check_health_yaml_object(visibility, 'visibility', tech, health_is_called) + has_error = _update_health_sate(has_error, health) + health = _check_health_score_object(visibility, 'visibility', tech, health_is_called) + has_error = _update_health_sate(has_error, health) # get values within the key-value pair 'applicable_to' which are a very close match similar = set() @@ -632,7 +878,7 @@ def check_yaml_file_health(filename, file_type, health_is_called): print('') # print a newline -def check_file_type(filename, file_type=None): +def _check_file_type(filename, file_type=None): """ Check if the provided YAML file has the key 'file_type' and possible if that key matches a specific value. :param filename: path to a YAML file @@ -642,9 +888,11 @@ def check_file_type(filename, file_type=None): if not os.path.exists(filename): print('[!] File: \'' + filename + '\' does not exist') return None + + _yaml = init_yaml() with open(filename, 'r') as yaml_file: try: - yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader) + yaml_content = _yaml.load(yaml_file) except Exception as e: print('[!] File: \'' + filename + '\' is not a valid YAML file.') print(' ' + str(e)) # print more detailed error information to help the user in fixing the error. @@ -679,7 +927,7 @@ def check_file(filename, file_type=None, health_is_called=False): :return: the file_type if present, else None is returned """ - yaml_content = check_file_type(filename, file_type) + yaml_content = _check_file_type(filename, file_type) # if the file is a valid YAML, continue. Else, return None if yaml_content: