diff --git a/constants.py b/constants.py index 596f290..a4c43f8 100644 --- a/constants.py +++ b/constants.py @@ -144,6 +144,19 @@ YAML_OBJ_TECHNIQUE = {'technique_id': '', 'detection': YAML_OBJ_DETECTION, 'visibility': YAML_OBJ_VISIBILITY} +YAML_OBJ_DATA_SOURCE = {'data_source_name': '', + 'date_registered': None, + 'date_connected': None, + 'products': [''], + 'available_for_data_analytics': False, + 'comment': '', + 'data_quality': { + 'device_completeness': 0, + 'data_field_completeness': 0, + 'timeliness': 0, + 'consistency': 0, + 'retention': 0}} + # Interactive menu MENU_NAME_DATA_SOURCE_MAPPING = 'Data source mapping' MENU_NAME_VISIBILITY_MAPPING = 'Visibility coverage mapping' diff --git a/data_source_mapping.py b/data_source_mapping.py index 0bc039a..9416957 100644 --- a/data_source_mapping.py +++ b/data_source_mapping.py @@ -112,39 +112,48 @@ def export_data_source_list_to_excel(filename): # Putting the data sources data: y = 3 - for d in get_all_mitre_data_sources(): + + # check if an ATT&CK data source is missing from the data source YAML administration file + my_ds_list = my_data_sources.keys() + for ds in get_all_mitre_data_sources(): + if ds not in my_ds_list: + ds_obj = deepcopy(YAML_OBJ_DATA_SOURCE) + ds_obj['data_source_name'] = ds + ds_obj['comment'] = 'ATT&CK data source is missing from the YAML file' + my_data_sources[ds] = ds_obj + + for d in sorted(my_data_sources.keys()): + ds = my_data_sources[d] worksheet.write(y, 0, d, valign_top) - if d in my_data_sources.keys(): - ds = my_data_sources[d] - date_registered = ds['date_registered'].strftime('%Y-%m-%d') if isinstance(ds['date_registered'], datetime) else ds['date_registered'] - date_connected = ds['date_connected'].strftime('%Y-%m-%d') if isinstance(ds['date_connected'], datetime) else ds['date_connected'] + date_registered = ds['date_registered'].strftime('%Y-%m-%d') if isinstance(ds['date_registered'], datetime) else ds['date_registered'] + date_connected = ds['date_connected'].strftime('%Y-%m-%d') if isinstance(ds['date_connected'], datetime) else ds['date_connected'] - worksheet.write(y, 1, str(date_registered).replace('None', ''), valign_top) - worksheet.write(y, 2, str(date_connected).replace('None', ''), valign_top) - worksheet.write(y, 3, ', '.join(ds['products']).replace('None', ''), valign_top) - worksheet.write(y, 4, ds['comment'][:-1] if ds['comment'].endswith('\n') else ds['comment'], wrap_text) - worksheet.write(y, 5, str(ds['available_for_data_analytics']), valign_top) - worksheet.write(y, 6, ds['data_quality']['device_completeness'], format_center_valign_top) - worksheet.write(y, 7, ds['data_quality']['data_field_completeness'], format_center_valign_top) - worksheet.write(y, 8, ds['data_quality']['timeliness'], format_center_valign_top) - worksheet.write(y, 9, ds['data_quality']['consistency'], format_center_valign_top) - worksheet.write(y, 10, ds['data_quality']['retention'], format_center_valign_top) + worksheet.write(y, 1, str(date_registered).replace('None', ''), valign_top) + worksheet.write(y, 2, str(date_connected).replace('None', ''), valign_top) + worksheet.write(y, 3, ', '.join(ds['products']).replace('None', ''), valign_top) + worksheet.write(y, 4, ds['comment'][:-1] if ds['comment'].endswith('\n') else ds['comment'], wrap_text) + worksheet.write(y, 5, str(ds['available_for_data_analytics']), valign_top) + worksheet.write(y, 6, ds['data_quality']['device_completeness'], format_center_valign_top) + worksheet.write(y, 7, ds['data_quality']['data_field_completeness'], format_center_valign_top) + worksheet.write(y, 8, ds['data_quality']['timeliness'], format_center_valign_top) + worksheet.write(y, 9, ds['data_quality']['consistency'], format_center_valign_top) + worksheet.write(y, 10, ds['data_quality']['retention'], format_center_valign_top) - score = 0 - score_count = 0 - for k, v in ds['data_quality'].items(): - # the below DQ dimensions are given more weight in the calculation of the DQ score. - if k in ['device_completeness', 'data_field_completeness', 'retention']: - score += (v * 2) - score_count += 2 - else: - score += v - score_count += 1 - if score > 0: - score = score/score_count + score = 0 + score_count = 0 + for k, v in ds['data_quality'].items(): + # the below DQ dimensions are given more weight in the calculation of the DQ score. + if k in ['device_completeness', 'data_field_completeness', 'retention']: + score += (v * 2) + score_count += 2 + else: + score += v + score_count += 1 + if score > 0: + score = score/score_count - worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score) + worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score) y += 1 worksheet.autofilter(2, 0, 2, 11) diff --git a/generic.py b/generic.py index 9e87939..340571c 100644 --- a/generic.py +++ b/generic.py @@ -579,7 +579,7 @@ def map_techniques_to_data_sources(techniques, my_data_sources): def get_all_mitre_data_sources(): """ - Gets all the data sources from the techniques and make a unique sorted list of it. + Gets all the data sources from the techniques and make a set. :return: a sorted list with all data sources """ techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH) @@ -589,7 +589,7 @@ def get_all_mitre_data_sources(): if 'x_mitre_data_sources' in t.keys(): for ds in t['x_mitre_data_sources']: data_sources.add(ds) - return sorted(data_sources) + return data_sources def calculate_score(list_detections, zero_value=0):