diff --git a/README.md b/README.md index 16f7fef..65f9bd3 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ DeTT&CT #### Detect Tactics, Techniques & Combat Threats -Latest version: [1.2.6](https://github.com/rabobank-cdc/DeTTECT/wiki/Changelog#version-126) +Latest version: [1.2.7](https://github.com/rabobank-cdc/DeTTECT/wiki/Changelog#version-127) To get started with DeTT&CT, check out this [page](https://github.com/rabobank-cdc/DeTTECT/wiki/Getting-started), our [talk](https://www.youtube.com/watch?v=_kWpekkhomU) at hack.lu 2019 and our blog on: - [mbsecure.nl/blog/2019/5/dettact-mapping-your-blue-team-to-mitre-attack](https://www.mbsecure.nl/blog/2019/5/dettact-mapping-your-blue-team-to-mitre-attack) or diff --git a/constants.py b/constants.py index ccb62fd..fb8958a 100644 --- a/constants.py +++ b/constants.py @@ -2,7 +2,7 @@ import re APP_NAME = 'DeTT&CT' APP_DESC = 'Detect Tactics, Techniques & Combat Threats' -VERSION = '1.2.6' +VERSION = '1.2.7' EXPIRE_TIME = 60 * 60 * 24 @@ -131,6 +131,7 @@ YAML_OBJ_VISIBILITY = {'applicable_to': ['all'], 'auto_generated': True} ] } + YAML_OBJ_DETECTION = {'applicable_to': ['all'], 'location': [''], 'comment': '', @@ -139,7 +140,7 @@ YAML_OBJ_DETECTION = {'applicable_to': ['all'], {'date': None, 'score': -1, 'comment': ''} - ]} + ]} YAML_OBJ_TECHNIQUE = {'technique_id': '', 'technique_name': '', @@ -175,3 +176,42 @@ HEALTH_ERROR_TXT = '[!] The below YAML file contains possible errors. It\'s reco PLATFORMS = {'windows': 'Windows', 'linux': 'Linux', 'macos': 'macOS', 'aws': 'AWS', 'gcp': 'GCP', 'azure': 'Azure', 'azure ad': 'Azure AD', 'office 365': 'Office 365', 'saas': 'SaaS'} + +# Data sources applicable per platform +DATA_SOURCES = {'Windows': ['Access tokens', 'Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Binary file metadata', 'BIOS', 'Browser extensions', + 'Component firmware', 'Data loss prevention', 'Detonation chamber', 'Digital certificate logs', 'Disk forensics', 'DLL monitoring', 'DNS records', 'EFI', 'Email gateway', + 'Environment variable', 'File monitoring', 'Host network interface', 'Kernel drivers', 'Loaded DLLs', 'Mail server', 'Malware reverse engineering', 'MBR', 'Named Pipes', + 'Netflow/Enclave netflow', 'Network device logs', 'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'PowerShell logs', + 'Process command-line parameters', 'Process monitoring', 'Process use of network', 'Sensor health and status', 'Services', 'SSL/TLS inspection', 'System calls', + 'Third-party application logs', 'User interface', 'VBR', 'Web application firewall logs', 'Web logs', 'Web proxy', 'Windows Error Reporting', 'Windows event logs', + 'Windows Registry', 'WMI Objects'], + 'Linux': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Binary file metadata', 'BIOS', 'Browser extensions', 'Component firmware', + 'Data loss prevention', 'Detonation chamber', 'Digital certificate logs', 'Disk forensics', 'DNS records', 'EFI', 'Email gateway', 'Environment variable', 'File monitoring', + 'Host network interface', 'Kernel drivers', 'Mail server', 'Malware reverse engineering', 'MBR', 'Named Pipes', 'Netflow/Enclave netflow', 'Network device logs', + 'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'PowerShell logs', 'Process command-line parameters', 'Process monitoring', + 'Process use of network', 'Sensor health and status', 'Services', 'SSL/TLS inspection', 'System calls', 'Third-party application logs', 'User interface', 'VBR', + 'Web application firewall logs', 'Web logs', 'Web proxy'], + 'macOS': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Binary file metadata', 'BIOS', 'Browser extensions', 'Component firmware', + 'Data loss prevention', 'Detonation chamber', 'Digital certificate logs', 'Disk forensics', 'DNS records', 'EFI', 'Email gateway', 'Environment variable', 'File monitoring', + 'Host network interface', 'Kernel drivers', 'Mail server', 'Malware reverse engineering', 'MBR', 'Named Pipes', 'Netflow/Enclave netflow', 'Network device logs', + 'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'PowerShell logs', 'Process command-line parameters', 'Process monitoring', + 'Process use of network', 'Sensor health and status', 'Services', 'SSL/TLS inspection', 'System calls', 'Third-party application logs', 'User interface', 'VBR', + 'Web application firewall logs', 'Web logs', 'Web proxy'], + 'AWS': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'AWS CloudTrail logs', 'AWS OS logs', 'Binary file metadata', 'Data loss prevention', + 'Detonation chamber', 'DNS records', 'Email gateway', 'File monitoring', 'Mail server', 'Malware reverse engineering', 'Netflow/Enclave netflow', 'Network device logs', + 'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'Sensor health and status', 'SSL/TLS inspection', 'Third-party application logs', + 'Web application firewall logs', 'Web logs', 'Web proxy'], + 'GCP': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Binary file metadata', 'Data loss prevention', 'Detonation chamber', + 'DNS records', 'Email gateway', 'File monitoring', 'Mail server', 'Malware reverse engineering', 'Netflow/Enclave netflow', 'Network device logs', + 'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'Sensor health and status', 'SSL/TLS inspection', 'Stackdriver logs', + 'Third-party application logs', 'Web application firewall logs', 'Web logs', 'Web proxy'], + 'Azure': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Azure activity logs', 'Azure OS logs', 'Binary file metadata', + 'Data loss prevention', 'DNS records', 'File monitoring', 'Malware reverse engineering', 'Netflow/Enclave netflow', 'Network device logs', 'Network intrusion detection system', + 'Network protocol analysis', 'Packet capture', 'Sensor health and status', 'SSL/TLS inspection', 'Third-party application logs', 'Web application firewall logs', + 'Web logs', 'Web proxy'], + 'Azure AD': ['API monitoring', 'Authentication logs', 'Azure activity logs', 'Malware reverse engineering', 'Sensor health and status'], + 'Office 365': ['Anti-virus', 'API monitoring', 'Authentication logs', 'Azure activity logs', 'Data loss prevention', 'Detonation chamber', 'Email gateway', 'Mail server', + 'Malware reverse engineering', 'Office 365 account logs', 'Office 365 audit logs', 'Office 365 trace logs', 'Sensor health and status'], + 'Saas': ['Anti-virus', 'API monitoring', 'Application logs', 'Authentication logs', 'Data loss prevention', 'Detonation chamber', 'Email gateway', 'Mail server', + 'Malware reverse engineering', 'OAuth audit logs', 'Sensor health and status', 'Third-party application logs', 'Web application firewall logs', 'Web logs'] + } diff --git a/data_source_mapping.py b/data_source_mapping.py index 7ff4be1..cbf0c6f 100644 --- a/data_source_mapping.py +++ b/data_source_mapping.py @@ -1,7 +1,7 @@ from copy import deepcopy from datetime import datetime -import simplejson import xlsxwriter +import simplejson from generic import * @@ -17,7 +17,7 @@ def generate_data_sources_layer(filename): my_data_sources, name, platform, exceptions = _load_data_sources(filename) # Do the mapping between my data sources and MITRE data sources: - my_techniques = _map_and_colorize_techniques(my_data_sources, exceptions) + my_techniques = _map_and_colorize_techniques(my_data_sources, platform, exceptions) layer = get_layer_template_data_sources("Data sources " + name, 'description', 'attack', platform) layer['techniques'] = my_techniques @@ -66,7 +66,7 @@ def export_data_source_list_to_excel(filename, eql_search=False): :return: """ # pylint: disable=unused-variable - my_data_sources, name, platform, exceptions = _load_data_sources(filename, filter_empty_scores=False) + my_data_sources, name, platforms, exceptions = _load_data_sources(filename, filter_empty_scores=False) excel_filename = get_non_existing_filename('output/data_sources', 'xlsx') workbook = xlsxwriter.Workbook(excel_filename) @@ -121,7 +121,9 @@ def export_data_source_list_to_excel(filename, eql_search=False): ds_miss_text = 'ATT&CK data source is missing from the YAML file' # pylint: disable=consider-iterating-dictionary my_ds_list = [ds.lower() for ds in my_data_sources.keys()] - for ds in get_all_mitre_data_sources(): + applicable_data_sources = get_applicable_data_sources_platform(platforms) + + for ds in applicable_data_sources: if ds.lower() not in my_ds_list: ds_obj = deepcopy(YAML_OBJ_DATA_SOURCE) ds_obj['data_source_name'] = ds @@ -157,9 +159,9 @@ def export_data_source_list_to_excel(filename, eql_search=False): score += v score_count += 1 if score > 0: - score = score/score_count + score = score / score_count - worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score) + worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score) # noqa y += 1 worksheet.autofilter(2, 0, 2, 11) @@ -207,22 +209,40 @@ def _load_data_sources(file, filter_empty_scores=True): return my_data_sources, name, platform, exceptions -def _map_and_colorize_techniques(my_ds, exceptions): +def _count_applicable_data_sources(technique, applicable_data_sources): + """ + get the count of applicable data sources for the provided technique. + This takes into account which data sources are applicable for a platform(s) + :param technique: ATT&CK CTI technique object + :param applicable_data_sources: a list of applicable ATT&CK data sources + :return: a count of the applicable data sources for this technique + """ + applicable_ds_count = 0 + for ds in technique['x_mitre_data_sources']: + if ds in applicable_data_sources: + applicable_ds_count += 1 + return applicable_ds_count + + +def _map_and_colorize_techniques(my_ds, platforms, exceptions): """ Determine the color of the techniques based on how many data sources are available per technique. :param my_ds: the configured data sources + :param platforms: the configured platform(s) + :param exceptions: the list of ATT&CK technique exception within the data source YAML file :return: a dictionary with techniques that can be used in the layer's output file """ techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) + applicable_data_sources = get_applicable_data_sources_platform(platforms) technique_colors = {} # Color the techniques based on how many data sources are available. for t in techniques: if 'x_mitre_data_sources' in t: - total_ds_count = len(t['x_mitre_data_sources']) + total_ds_count = _count_applicable_data_sources(t, applicable_data_sources) ds_count = 0 for ds in t['x_mitre_data_sources']: - if ds in my_ds.keys(): + if ds in my_ds.keys() and ds in applicable_data_sources: ds_count += 1 if total_ds_count > 0: result = (float(ds_count) / float(total_ds_count)) * 100 @@ -234,7 +254,7 @@ def _map_and_colorize_techniques(my_ds, exceptions): output_techniques = [] for t, v in my_techniques.items(): - if t not in exceptions: + if t not in exceptions and t in technique_colors: for tactic in v['tactics']: d = dict() d['techniqueID'] = t @@ -243,8 +263,10 @@ def _map_and_colorize_techniques(my_ds, exceptions): d['enabled'] = True d['tactic'] = tactic.lower().replace(' ', '-') d['metadata'] = [{'name': '-Available data sources', 'value': ', '.join(v['my_data_sources'])}, - {'name': '-ATT&CK data sources', 'value': ', '.join(v['data_sources'])}, + {'name': '-ATT&CK data sources', 'value': ', '.join(get_applicable_data_sources_technique(v['data_sources'], + applicable_data_sources))}, {'name': '-Products', 'value': ', '.join(v['products'])}] + d['metadata'] = make_layer_metadata_compliant(d['metadata']) output_techniques.append(d) @@ -335,7 +357,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin): if new_tech['technique_id'] in tech_ids_new: are_scores_updated = True yaml_file_tech_admin['techniques'].append(new_tech) - tech_new_print.append(' - ' + new_tech['technique_id']+'\n') + tech_new_print.append(' - ' + new_tech['technique_id'] + '\n') x += 1 print('The following new technique IDs are added to the technique administration file with a visibility ' @@ -353,7 +375,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin): for cur_tech, cur_values in cur_visibility_scores.items(): new_tech = _get_technique_yaml_obj(new_visibility_scores['techniques'], cur_tech) if new_tech: # new_tech will be None if technique_id is part of the 'exception' list within the - # data source administration file + # data source administration file new_score = new_tech['visibility']['score_logbook'][0]['score'] for cur_obj in cur_values['visibility']: @@ -377,7 +399,8 @@ def update_technique_administration_file(file_data_sources, file_tech_admin): # ask how the score should be updated answer = 0 if mix_scores: - answer = ask_multiple_choice(V_UPDATE_Q_MIXED, [V_UPDATE_ANSWER_3, V_UPDATE_ANSWER_4, V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL]) + answer = ask_multiple_choice(V_UPDATE_Q_MIXED, [V_UPDATE_ANSWER_3, V_UPDATE_ANSWER_4, + V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL]) elif manually_scored: answer = ask_multiple_choice(V_UPDATE_Q_ALL_MANUAL, [V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL]) elif auto_scored: @@ -442,7 +465,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin): elif update_action == V_UPDATE_ACTION_DIFF: print('-' * 80) tmp_txt = '[updates remaining: ' + str(updated_vis_score_cnt - score_updates_handled) + ']' - print(' ' * (80-len(tmp_txt)) + tmp_txt) + print(' ' * (80 - len(tmp_txt)) + tmp_txt) print('') print('Visibility object:') print(' - ATT&CK ID/name ' + tech_id + ' / ' + tech_name) @@ -458,7 +481,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin): print(' - Date: ' + new_score_obj['date']) print(' - Score: ' + str(new_score_obj['score'])) print(' - Visibility comment: ' + _indent_comment(new_score_obj['comment'], 23)) - print(' - Auto generated: true') + print(' - Auto generated: True') print('') if ask_yes_no('Update the score?'): are_scores_updated = True @@ -484,6 +507,8 @@ def update_technique_administration_file(file_data_sources, file_tech_admin): print('No visibility scores have been updated.') # pylint: disable=redefined-outer-name + + def generate_technique_administration_file(filename, write_file=True, all_techniques=False): """ Generate a technique administration file based on the data source administration YAML file @@ -496,6 +521,7 @@ def generate_technique_administration_file(filename, write_file=True, all_techni my_data_sources, name, platform, exceptions = _load_data_sources(filename) techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH_ENTERPRISE) + applicable_data_sources = get_applicable_data_sources_platform(platform) yaml_file = dict() yaml_file['version'] = FILE_TYPE_TECHNIQUE_ADMINISTRATION_VERSION @@ -511,10 +537,10 @@ def generate_technique_administration_file(filename, write_file=True, all_techni if platform == 'all' or len(set(platforms).intersection(set(platform))) > 0: # not every technique has data source listed if 'x_mitre_data_sources' in t: - total_ds_count = len(t['x_mitre_data_sources']) + total_ds_count = _count_applicable_data_sources(t, applicable_data_sources) ds_count = 0 for ds in t['x_mitre_data_sources']: - if ds in my_data_sources.keys(): + if ds in my_data_sources.keys() and ds in applicable_data_sources: ds_count += 1 if total_ds_count > 0: result = (float(ds_count) / float(total_ds_count)) * 100 @@ -549,7 +575,8 @@ def generate_technique_administration_file(filename, write_file=True, all_techni # remove the single quotes from the date yaml_file_lines = fix_date_and_remove_null(file_lines, today, input_type='list') - output_filename = get_non_existing_filename('output/techniques-administration-' + normalize_name_to_filename(name + '-' + platform_to_name(platform)), 'yaml') + output_filename = get_non_existing_filename('output/techniques-administration-' + + normalize_name_to_filename(name + '-' + platform_to_name(platform)), 'yaml') with open(output_filename, 'w') as f: f.writelines(yaml_file_lines) print("File written: " + output_filename) diff --git a/generic.py b/generic.py index a977551..2eb3bea 100644 --- a/generic.py +++ b/generic.py @@ -2,8 +2,8 @@ import os import shutil import pickle from io import StringIO -from ruamel.yaml import YAML from datetime import datetime as dt +from ruamel.yaml import YAML from upgrade import upgrade_yaml_file from constants import * from health import check_yaml_file_health @@ -173,6 +173,10 @@ def load_attack_data(data_type): def init_yaml(): + """ + Initialize ruamel.yaml with the correct settings + :return: am uamel.yaml object + """ _yaml = YAML() _yaml.Representer.ignore_aliases = lambda *args: True # disable anchors/aliases return _yaml @@ -268,7 +272,7 @@ def get_layer_template_detections(name, description, stage, platform): {'label': 'Detection score 3: Good', 'color': COLOR_D_3}, {'label': 'Detection score 4: Very good', 'color': COLOR_D_4}, {'label': 'Detection score 5: Excellent', 'color': COLOR_D_5} - ] + ] return layer @@ -291,7 +295,7 @@ def get_layer_template_data_sources(name, description, stage, platform): {'label': '51-75% of data sources available', 'color': COLOR_DS_75p}, {'label': '76-99% of data sources available', 'color': COLOR_DS_99p}, {'label': '100% of data sources available', 'color': COLOR_DS_100p} - ] + ] return layer @@ -313,7 +317,7 @@ def get_layer_template_visibility(name, description, stage, platform): {'label': 'Visibility score 2: Medium', 'color': COLOR_V_2}, {'label': 'Visibility score 3: Good', 'color': COLOR_V_3}, {'label': 'Visibility score 4: Excellent', 'color': COLOR_V_4} - ] + ] return layer @@ -334,7 +338,7 @@ def get_layer_template_layered(name, description, stage, platform): {'label': 'Visibility', 'color': COLOR_OVERLAY_VISIBILITY}, {'label': 'Detection', 'color': COLOR_OVERLAY_DETECTION}, {'label': 'Visibility and detection', 'color': COLOR_OVERLAY_BOTH} - ] + ] return layer @@ -601,6 +605,39 @@ def platform_to_name(platform, separator='-'): return '' +def get_applicable_data_sources_platform(platforms): + """ + Get the applicable ATT&CK data sources for the provided platform(s) + :param platforms: the ATT&CK platform(s) + :return: a list of applicable ATT&CK data sources + """ + applicable_data_sources = set() + if platforms == 'all' or 'all' in platforms: + # pylint: disable=unused-variable + for k, v in DATA_SOURCES.items(): + applicable_data_sources.update(v) + else: + for p in platforms: + applicable_data_sources.update(DATA_SOURCES[p]) + + return list(applicable_data_sources) + + +def get_applicable_data_sources_technique(technique_data_sources, platform_applicable_data_sources): + """ + Get the applicable ATT&CK data sources for the provided technique's data sources (for which the source is ATT&CK CTI) + :param technique_data_sources: the ATT&CK technique's data sources + :param platform_applicable_data_sources: a list of applicable ATT&CK data sources based on 'DATA_SOURCES' + :return: a list of applicable data sources + """ + applicable_data_sources = set() + for ds in technique_data_sources: + if ds in platform_applicable_data_sources: + applicable_data_sources.add(ds) + + return list(applicable_data_sources) + + def map_techniques_to_data_sources(techniques, my_data_sources): """ This function maps the MITRE ATT&CK techniques to your data sources. @@ -806,6 +843,20 @@ def check_file(filename, file_type=None, health_is_called=False): return yaml_content # value is None +def make_layer_metadata_compliant(metadata): + """ + Make sure the metadata values in the Navigator layer file are compliant with the expected data structure + from the latest version on: https://github.com/mitre-attack/attack-navigator/tree/master/layers + :param metadata: list of metadata dictionaries + :return: compliant list of metadata dictionaries + """ + for md_item in metadata: + if not md_item['value'] or md_item['value'] == '': + md_item['value'] = '-' + + return metadata + + def get_updates(update_type, sort='modified'): """ Print a list of updates for a techniques, groups or software. Sort by modified or creation date. diff --git a/health.py b/health.py index 4834bd9..312451c 100644 --- a/health.py +++ b/health.py @@ -94,7 +94,7 @@ def check_health_data_sources(filename, ds_content, health_is_called, no_print=F ATT&CK Platform is not part of the EQL search result :return: False if no errors have been found, otherwise True """ - from generic import get_all_mitre_data_sources + from generic import get_applicable_data_sources_platform has_error = False platform = ds_content.get('platform', None) @@ -113,8 +113,8 @@ def check_health_data_sources(filename, ds_content, health_is_called, no_print=F health_is_called) ds_list = [kv['data_source_name'].lower() for kv in ds_content['data_sources']] - ds_list_mitre = get_all_mitre_data_sources() - for ds in ds_list_mitre: + applicable_data_sources = get_applicable_data_sources_platform(platform) + for ds in applicable_data_sources: if ds.lower() not in ds_list: has_error = _print_error_msg('[!] Data source: \'' + ds + '\' is MISSING from the YAML file', health_is_called) @@ -122,30 +122,34 @@ def check_health_data_sources(filename, ds_content, health_is_called, no_print=F # check for missing keys for key in ['data_source_name', 'date_registered', 'date_connected', 'products', 'available_for_data_analytics', 'comment', 'data_quality']: if key not in ds: - has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' is MISSING a key-value pair: ' + key, health_is_called) + has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + + '\' is MISSING a key-value pair: ' + key, health_is_called) for key in ['date_registered', 'date_connected']: if key in ds and not ds[key] is None: try: - # noinspection PyStatementEffect + # pylint: disable=pointless-statement ds[key].year - # noinspection PyStatementEffect + # pylint: disable=pointless-statement ds[key].month - # noinspection PyStatementEffect + # pylint: disable=pointless-statement ds[key].day except AttributeError: - has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID data format for the dimension \'' + dimension + has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID data format for the key-value pair \'' + key + '\': ' + ds[key] + ' (should be YYYY-MM-DD without quotes)', health_is_called) + print(type(ds[key])) if 'available_for_data_analytics' in ds: if not isinstance(ds['available_for_data_analytics'], bool): - has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID \'available_for_data_analytics\' value: should be set to \'true\' or \'false\'', health_is_called) + has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + + '\' has an INVALID \'available_for_data_analytics\' value: should be set to \'true\' or \'false\'', health_is_called) if 'data_quality' in ds: if isinstance(ds['data_quality'], dict): for dimension in ['device_completeness', 'data_field_completeness', 'timeliness', 'consistency', 'retention']: if dimension not in ds['data_quality']: - has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' is MISSING a key-value pair in \'data_quality\': ' + dimension, health_is_called) + has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + + '\' is MISSING a key-value pair in \'data_quality\': ' + dimension, health_is_called) else: if isinstance(ds['data_quality'][dimension], int): if not 0 <= ds['data_quality'][dimension] <= 5: @@ -155,14 +159,16 @@ def check_health_data_sources(filename, ds_content, health_is_called, no_print=F has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID data quality score for the dimension \'' + dimension + '\': ' + str(ds['data_quality'][dimension]) + ' (should be an an integer)', health_is_called) else: - has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' the key-value pair \'data_quality\' is NOT a dictionary with data quality dimension scores', health_is_called) + has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + + '\' the key-value pair \'data_quality\' is NOT a dictionary with data quality dimension scores', health_is_called) if 'exceptions' in ds_content: for tech in ds_content['exceptions']: tech_id = str(tech['technique_id']) if not REGEX_YAML_TECHNIQUE_ID_FORMAT.match(tech_id) and tech_id != 'None': - has_error = _print_error_msg('[!] INVALID technique ID in the \'exceptions\' list of data source admin. file: ' + tech_id, health_is_called) + has_error = _print_error_msg( + '[!] INVALID technique ID in the \'exceptions\' list of data source admin. file: ' + tech_id, health_is_called) if has_error and not health_is_called and not no_print: print(HEALTH_ERROR_TXT + filename) @@ -199,13 +205,16 @@ def _check_health_score_object(yaml_object, object_type, tech_id, health_is_call for score_obj in yaml_object['score_logbook']: for key in ['date', 'score', 'comment']: if key not in score_obj: - has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' is MISSING a key-value pair in a ' + object_type + ' score object within the \'score_logbook\': ' + key, health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' is MISSING a key-value pair in a ' + + object_type + ' score object within the \'score_logbook\': ' + key, health_is_called) if score_obj['score'] is None: - has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object within the \'score_logbook\': score', health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + + object_type + ' score object within the \'score_logbook\': score', health_is_called) elif not isinstance(score_obj['score'], int): - has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID score format in a ' + object_type + ' score object within the \'score_logbook\': ' + score_obj['score'] + ' (should be an integer)', health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID score format in a ' + object_type + + ' score object within the \'score_logbook\': ' + score_obj['score'] + ' (should be an integer)', health_is_called) if 'auto_generated' in score_obj: if not isinstance(score_obj['auto_generated'], bool): @@ -214,23 +223,24 @@ def _check_health_score_object(yaml_object, object_type, tech_id, health_is_call if isinstance(score_obj['score'], int): if score_obj['date'] is None and ((score_obj['score'] > -1 and object_type == 'detection') or (score_obj['score'] > 0 and object_type == 'visibility')): - has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object within the \'score_logbook\': date', health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + + object_type + ' score object within the \'score_logbook\': date', health_is_called) - # noinspection PyChainedComparisons if not (score_obj['score'] >= min_score and score_obj['score'] <= max_score): has_error = _print_error_msg( '[!] Technique ID: ' + tech_id + ' has an INVALID ' + object_type + ' score in a score object within the \'score_logbook\': ' + str(score_obj['score']) + ' (should be between ' + str(min_score) + ' and ' + str(max_score) + ')', health_is_called) if not score_obj['date'] is None: try: - # noinspection PyStatementEffect + # pylint: disable=pointless-statement score_obj['date'].year - # noinspection PyStatementEffect + # pylint: disable=pointless-statement score_obj['date'].month - # noinspection PyStatementEffect + # pylint: disable=pointless-statement score_obj['date'].day except AttributeError: - has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID data format in a ' + object_type + ' score object within the \'score_logbook\': ' + score_obj['date'] + ' (should be YYYY-MM-DD without quotes)', health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID data format in a ' + object_type + + ' score object within the \'score_logbook\': ' + score_obj['date'] + ' (should be YYYY-MM-DD without quotes)', health_is_called) except KeyError: pass @@ -296,12 +306,14 @@ def _check_health_techniques(filename, technique_content, health_is_called): for okey in obj_keys: if okey not in obj: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING a key-value pair in \'' + obj_type + '\': ' + okey, health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech + + ' is MISSING a key-value pair in \'' + obj_type + '\': ' + okey, health_is_called) for okey in obj_keys_list: if okey in obj: if not isinstance(obj[okey], list): - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + obj_type + '\' is NOT a list', health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + + '\' in \'' + obj_type + '\' is NOT a list', health_is_called) for okey in obj_keys_not_none: if okey in obj: @@ -310,9 +322,11 @@ def _check_health_techniques(filename, technique_content, health_is_called): if item is None: none_count += 1 if none_count == 1: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + obj_type + '\' has an EMPTY value (an empty string is allowed: \'\')', health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + + obj_type + '\' has an EMPTY value (an empty string is allowed: \'\')', health_is_called) elif none_count > 1: - has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + obj_type + '\' has multiple EMPTY values (an empty string is allowed: \'\')', health_is_called) + has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + obj_type + + '\' has multiple EMPTY values (an empty string is allowed: \'\')', health_is_called) health = _check_health_score_object(obj, obj_type, tech, health_is_called) has_error = _update_health_state(has_error, health) @@ -330,7 +344,8 @@ def _check_health_techniques(filename, technique_content, health_is_called): similar.add(i2) if len(similar) > 0: - has_error = _print_error_msg('[!] There are values in the key-value pairs for \'applicable_to\' which are very similar. Correct where necessary:', health_is_called) + has_error = _print_error_msg( + '[!] There are values in the key-value pairs for \'applicable_to\' which are very similar. Correct where necessary:', health_is_called) for s in similar: _print_error_msg(' - ' + s, health_is_called) diff --git a/technique_mapping.py b/technique_mapping.py index a6b67e4..15d6765 100644 --- a/technique_mapping.py +++ b/technique_mapping.py @@ -1,6 +1,6 @@ import simplejson -from generic import * import xlsxwriter +from generic import * from datetime import datetime # Imports for pandas and plotly are because of performance reasons in the function that uses these libraries. @@ -21,7 +21,7 @@ def generate_detection_layer(filename_techniques, filename_data_sources, overlay else: my_techniques, name, platform = load_techniques(filename_techniques) my_data_sources = _load_data_sources(filename_data_sources) - mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources) + mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform) layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform) _write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name) @@ -38,28 +38,29 @@ def generate_visibility_layer(filename_techniques, filename_data_sources, overla if not overlay: my_techniques, name, platform = load_techniques(filename_techniques) - mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources) + mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources, platform) layer_visibility = get_layer_template_visibility('Visibility ' + name, 'description', 'attack', platform) _write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', name) else: my_techniques, name, platform = load_techniques(filename_techniques) - mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources) + mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform) layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform) _write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name) -def plot_graph(filename, type): +def plot_graph(filename, type_graph): """ Generates a line graph which shows the improvements on detections through the time. :param filename: the filename of the YAML file containing the techniques administration - :param type: indicates the type of the graph: detection or visibility + :param type_graph: indicates the type of the graph: detection or visibility :return: """ + # pylint: disable=unused-variable my_techniques, name, platform = load_techniques(filename) graph_values = [] for t in my_techniques.values(): - for item in t[type]: + for item in t[type_graph]: date = get_latest_date(item) if date: yyyymm = date.strftime('%Y-%m') @@ -69,13 +70,13 @@ def plot_graph(filename, type): df = pd.DataFrame(graph_values).groupby('date', as_index=False)[['count']].sum() df['cumcount'] = df['count'].cumsum() - output_filename = get_non_existing_filename('output/graph_%s' % type, 'html') + output_filename = get_non_existing_filename('output/graph_%s' % type_graph, 'html') import plotly import plotly.graph_objs as go plotly.offline.plot( {'data': [go.Scatter(x=df['date'], y=df['cumcount'])], - 'layout': go.Layout(title="# of %s items for %s" % (type, name))}, + 'layout': go.Layout(title="# of %s items for %s" % (type_graph, name))}, filename=output_filename, auto_open=False ) print("File written: " + output_filename) @@ -122,20 +123,6 @@ def _write_layer(layer, mapped_techniques, filename_prefix, name): write_file(filename_prefix, name, json_string) -def _layer_metadata_make_compliant(metadata): - """ - Make sure the metadata values in the Navigator layer file are compliant with the expected data structure - from the latest version on: https://github.com/mitre-attack/attack-navigator/tree/master/layers - :param metadata: list of metadata dictionaries - :return: compliant list of metadata dictionaries - """ - for md_item in metadata: - if not md_item['value'] or md_item['value'] == '': - md_item['value'] = '-' - - return metadata - - def _map_and_colorize_techniques_for_detections(my_techniques): """ Determine the color of the techniques based on the detection score in the given YAML file. @@ -153,7 +140,7 @@ def _map_and_colorize_techniques_for_detections(my_techniques): if s != -1: color = COLOR_D_0 if s == 0 else COLOR_D_1 if s == 1 else COLOR_D_2 if s == 2 else COLOR_D_3 \ - if s == 3 else COLOR_D_4 if s == 4 else COLOR_D_5 if s == 5 else '' + if s == 3 else COLOR_D_4 if s == 4 else COLOR_D_5 if s == 5 else '' technique = get_technique(techniques, technique_id) for tactic in get_tactics(technique): @@ -180,7 +167,7 @@ def _map_and_colorize_techniques_for_detections(my_techniques): if cnt != tcnt: x['metadata'].append({'name': '---', 'value': '---'}) cnt += 1 - x['metadata'] = _layer_metadata_make_compliant(x['metadata']) + x['metadata'] = make_layer_metadata_compliant(x['metadata']) mapped_techniques.append(x) except Exception as e: print('[!] Possible error in YAML file at: %s. Error: %s' % (technique_id, str(e))) @@ -189,14 +176,16 @@ def _map_and_colorize_techniques_for_detections(my_techniques): return mapped_techniques -def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources): +def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources, platforms): """ Determine the color of the techniques based on the visibility score in the given YAML file. :param my_techniques: the configured techniques :param my_data_sources: the configured data sources + :param platforms: the configured platform(s) :return: a dictionary with techniques that can be used in the layer's output file """ techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) + applicable_data_sources = get_applicable_data_sources_platform(platforms) technique_ds_mapping = map_techniques_to_data_sources(techniques, my_data_sources) @@ -208,7 +197,7 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources): if s == 0: s = None - my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else '' + my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else '' # noqa technique = get_technique(techniques, technique_id) color = COLOR_V_1 if s == 1 else COLOR_V_2 if s == 2 else COLOR_V_3 if s == 3 else COLOR_V_4 if s == 4 else '' @@ -221,7 +210,8 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources): x['tactic'] = tactic.lower().replace(' ', '-') x['metadata'] = [] x['metadata'].append({'name': '-Available data sources', 'value': my_ds}) - x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['x_mitre_data_sources'])}) + x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(get_applicable_data_sources_technique(technique['x_mitre_data_sources'], + applicable_data_sources))}) x['metadata'].append({'name': '---', 'value': '---'}) x['score'] = s @@ -237,7 +227,7 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources): x['metadata'].append({'name': '---', 'value': '---'}) cnt += 1 - x['metadata'] = _layer_metadata_make_compliant(x['metadata']) + x['metadata'] = make_layer_metadata_compliant(x['metadata']) mapped_techniques.append(x) for t in techniques: @@ -251,23 +241,25 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources): x['comment'] = '' x['enabled'] = True x['tactic'] = tactic.lower().replace(' ', '-') - ds = ', '.join(t['x_mitre_data_sources']) if 'x_mitre_data_sources' in t else '' + ds = ', '.join(get_applicable_data_sources_technique(t['x_mitre_data_sources'], applicable_data_sources)) if 'x_mitre_data_sources' in t else '' # noqa x['metadata'] = [{'name': '-ATT&CK data sources', 'value': ds}] - x['metadata'] = _layer_metadata_make_compliant(x['metadata']) + x['metadata'] = make_layer_metadata_compliant(x['metadata']) mapped_techniques.append(x) return mapped_techniques -def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources): +def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platforms): """ Determine the color of the techniques based on both detection and visibility. :param my_techniques: the configured techniques :param my_data_sources: the configured data sources + :param platforms: the configured platform(s) :return: a dictionary with techniques that can be used in the layer's output file """ techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) + applicable_data_sources = get_applicable_data_sources_platform(platforms) technique_ds_mapping = map_techniques_to_data_sources(techniques, my_data_sources) @@ -290,7 +282,7 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources): else: color = COLOR_WHITE - my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else '' + my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else '' # noqa technique = get_technique(techniques, technique_id) for tactic in get_tactics(technique): @@ -302,7 +294,8 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources): x['tactic'] = tactic.lower().replace(' ', '-') x['metadata'] = [] x['metadata'].append({'name': '-Available data sources', 'value': my_ds}) - x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['x_mitre_data_sources'])}) + x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(get_applicable_data_sources_technique(technique['x_mitre_data_sources'], + applicable_data_sources))}) x['metadata'].append({'name': '---', 'value': '---'}) # Metadata for detection: @@ -337,7 +330,7 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources): x['metadata'].append({'name': '---', 'value': '---'}) cnt += 1 - x['metadata'] = _layer_metadata_make_compliant(x['metadata']) + x['metadata'] = make_layer_metadata_compliant(x['metadata']) mapped_techniques.append(x) return mapped_techniques @@ -349,6 +342,7 @@ def export_techniques_list_to_excel(filename): :param filename: the filename of the YAML file containing the techniques administration :return: """ + # pylint: disable=unused-variable my_techniques, name, platform = load_techniques(filename) my_techniques = dict(sorted(my_techniques.items(), key=lambda kv: kv[0], reverse=False)) mitre_techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) @@ -422,7 +416,7 @@ def export_techniques_list_to_excel(filename): tmp_date = tmp_date.strftime('%Y-%m-%d') worksheet_detections.write(y, 4, str(tmp_date).replace('None', ''), valign_top) ds = get_latest_score(detection) - worksheet_detections.write(y, 5, ds, detection_score_0 if ds == 0 else detection_score_1 if ds == 1 else detection_score_2 if ds == 2 else detection_score_3 if ds == 3 else detection_score_4 if ds == 4 else detection_score_5 if ds == 5 else no_score) + worksheet_detections.write(y, 5, ds, detection_score_0 if ds == 0 else detection_score_1 if ds == 1 else detection_score_2 if ds == 2 else detection_score_3 if ds == 3 else detection_score_4 if ds == 4 else detection_score_5 if ds == 5 else no_score) # noqa worksheet_detections.write(y, 6, '\n'.join(detection['location']), wrap_text) worksheet_detections.write(y, 7, detection['comment'][:-1] if detection['comment'].endswith('\n') else detection['comment'], wrap_text) d_comment = get_latest_comment(detection) @@ -463,7 +457,7 @@ def export_techniques_list_to_excel(filename): tmp_date = tmp_date.strftime('%Y-%m-%d') worksheet_visibility.write(y, 4, str(tmp_date).replace('None', ''), valign_top) vs = get_latest_score(visibility) - worksheet_visibility.write(y, 5, vs, visibility_score_1 if vs == 1 else visibility_score_2 if vs == 2 else visibility_score_3 if vs == 3 else visibility_score_4 if vs == 4 else no_score) + worksheet_visibility.write(y, 5, vs, visibility_score_1 if vs == 1 else visibility_score_2 if vs == 2 else visibility_score_3 if vs == 3 else visibility_score_4 if vs == 4 else no_score) # noqa v_comment = get_latest_comment(visibility) worksheet_visibility.write(y, 6, visibility['comment'][:-1] if visibility['comment'].endswith('\n') else visibility['comment'], wrap_text) worksheet_visibility.write(y, 7, v_comment[:-1] if v_comment.endswith('\n') else v_comment, wrap_text)