- Non-MITRE ATT&CK data sources are now also exported to Excel.

- Any ATT&CK data sources that are missing within the YAML file are added to the Excel with a comment stating it is missing.
master
Marcus Bakker 2019-08-15 20:31:20 +02:00
parent 7ad8fe16c7
commit 84f9f0440a
3 changed files with 52 additions and 30 deletions

View File

@ -144,6 +144,19 @@ YAML_OBJ_TECHNIQUE = {'technique_id': '',
'detection': YAML_OBJ_DETECTION,
'visibility': YAML_OBJ_VISIBILITY}
YAML_OBJ_DATA_SOURCE = {'data_source_name': '',
'date_registered': None,
'date_connected': None,
'products': [''],
'available_for_data_analytics': False,
'comment': '',
'data_quality': {
'device_completeness': 0,
'data_field_completeness': 0,
'timeliness': 0,
'consistency': 0,
'retention': 0}}
# Interactive menu
MENU_NAME_DATA_SOURCE_MAPPING = 'Data source mapping'
MENU_NAME_VISIBILITY_MAPPING = 'Visibility coverage mapping'

View File

@ -112,39 +112,48 @@ def export_data_source_list_to_excel(filename):
# Putting the data sources data:
y = 3
for d in get_all_mitre_data_sources():
# check if an ATT&CK data source is missing from the data source YAML administration file
my_ds_list = my_data_sources.keys()
for ds in get_all_mitre_data_sources():
if ds not in my_ds_list:
ds_obj = deepcopy(YAML_OBJ_DATA_SOURCE)
ds_obj['data_source_name'] = ds
ds_obj['comment'] = 'ATT&CK data source is missing from the YAML file'
my_data_sources[ds] = ds_obj
for d in sorted(my_data_sources.keys()):
ds = my_data_sources[d]
worksheet.write(y, 0, d, valign_top)
if d in my_data_sources.keys():
ds = my_data_sources[d]
date_registered = ds['date_registered'].strftime('%Y-%m-%d') if isinstance(ds['date_registered'], datetime) else ds['date_registered']
date_connected = ds['date_connected'].strftime('%Y-%m-%d') if isinstance(ds['date_connected'], datetime) else ds['date_connected']
date_registered = ds['date_registered'].strftime('%Y-%m-%d') if isinstance(ds['date_registered'], datetime) else ds['date_registered']
date_connected = ds['date_connected'].strftime('%Y-%m-%d') if isinstance(ds['date_connected'], datetime) else ds['date_connected']
worksheet.write(y, 1, str(date_registered).replace('None', ''), valign_top)
worksheet.write(y, 2, str(date_connected).replace('None', ''), valign_top)
worksheet.write(y, 3, ', '.join(ds['products']).replace('None', ''), valign_top)
worksheet.write(y, 4, ds['comment'][:-1] if ds['comment'].endswith('\n') else ds['comment'], wrap_text)
worksheet.write(y, 5, str(ds['available_for_data_analytics']), valign_top)
worksheet.write(y, 6, ds['data_quality']['device_completeness'], format_center_valign_top)
worksheet.write(y, 7, ds['data_quality']['data_field_completeness'], format_center_valign_top)
worksheet.write(y, 8, ds['data_quality']['timeliness'], format_center_valign_top)
worksheet.write(y, 9, ds['data_quality']['consistency'], format_center_valign_top)
worksheet.write(y, 10, ds['data_quality']['retention'], format_center_valign_top)
worksheet.write(y, 1, str(date_registered).replace('None', ''), valign_top)
worksheet.write(y, 2, str(date_connected).replace('None', ''), valign_top)
worksheet.write(y, 3, ', '.join(ds['products']).replace('None', ''), valign_top)
worksheet.write(y, 4, ds['comment'][:-1] if ds['comment'].endswith('\n') else ds['comment'], wrap_text)
worksheet.write(y, 5, str(ds['available_for_data_analytics']), valign_top)
worksheet.write(y, 6, ds['data_quality']['device_completeness'], format_center_valign_top)
worksheet.write(y, 7, ds['data_quality']['data_field_completeness'], format_center_valign_top)
worksheet.write(y, 8, ds['data_quality']['timeliness'], format_center_valign_top)
worksheet.write(y, 9, ds['data_quality']['consistency'], format_center_valign_top)
worksheet.write(y, 10, ds['data_quality']['retention'], format_center_valign_top)
score = 0
score_count = 0
for k, v in ds['data_quality'].items():
# the below DQ dimensions are given more weight in the calculation of the DQ score.
if k in ['device_completeness', 'data_field_completeness', 'retention']:
score += (v * 2)
score_count += 2
else:
score += v
score_count += 1
if score > 0:
score = score/score_count
score = 0
score_count = 0
for k, v in ds['data_quality'].items():
# the below DQ dimensions are given more weight in the calculation of the DQ score.
if k in ['device_completeness', 'data_field_completeness', 'retention']:
score += (v * 2)
score_count += 2
else:
score += v
score_count += 1
if score > 0:
score = score/score_count
worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score)
worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score)
y += 1
worksheet.autofilter(2, 0, 2, 11)

View File

@ -579,7 +579,7 @@ def map_techniques_to_data_sources(techniques, my_data_sources):
def get_all_mitre_data_sources():
"""
Gets all the data sources from the techniques and make a unique sorted list of it.
Gets all the data sources from the techniques and make a set.
:return: a sorted list with all data sources
"""
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
@ -589,7 +589,7 @@ def get_all_mitre_data_sources():
if 'x_mitre_data_sources' in t.keys():
for ds in t['x_mitre_data_sources']:
data_sources.add(ds)
return sorted(data_sources)
return data_sources
def calculate_score(list_detections, zero_value=0):