Merge branch 'master' of https://github.com/marcusbakker/DeTTECT-private
commit
45ca1b9e81
13
constants.py
13
constants.py
|
@ -144,6 +144,19 @@ YAML_OBJ_TECHNIQUE = {'technique_id': '',
|
|||
'detection': YAML_OBJ_DETECTION,
|
||||
'visibility': YAML_OBJ_VISIBILITY}
|
||||
|
||||
YAML_OBJ_DATA_SOURCE = {'data_source_name': '',
|
||||
'date_registered': None,
|
||||
'date_connected': None,
|
||||
'products': [''],
|
||||
'available_for_data_analytics': False,
|
||||
'comment': '',
|
||||
'data_quality': {
|
||||
'device_completeness': 0,
|
||||
'data_field_completeness': 0,
|
||||
'timeliness': 0,
|
||||
'consistency': 0,
|
||||
'retention': 0}}
|
||||
|
||||
# Interactive menu
|
||||
MENU_NAME_DATA_SOURCE_MAPPING = 'Data source mapping'
|
||||
MENU_NAME_VISIBILITY_MAPPING = 'Visibility coverage mapping'
|
||||
|
|
|
@ -109,39 +109,48 @@ def export_data_source_list_to_excel(filename):
|
|||
|
||||
# Putting the data sources data:
|
||||
y = 3
|
||||
for d in get_all_mitre_data_sources():
|
||||
|
||||
# check if an ATT&CK data source is missing from the data source YAML administration file
|
||||
my_ds_list = my_data_sources.keys()
|
||||
for ds in get_all_mitre_data_sources():
|
||||
if ds not in my_ds_list:
|
||||
ds_obj = deepcopy(YAML_OBJ_DATA_SOURCE)
|
||||
ds_obj['data_source_name'] = ds
|
||||
ds_obj['comment'] = 'ATT&CK data source is missing from the YAML file'
|
||||
my_data_sources[ds] = ds_obj
|
||||
|
||||
for d in sorted(my_data_sources.keys()):
|
||||
ds = my_data_sources[d]
|
||||
worksheet.write(y, 0, d, valign_top)
|
||||
if d in my_data_sources.keys():
|
||||
ds = my_data_sources[d]
|
||||
|
||||
date_registered = ds['date_registered'].strftime('%Y-%m-%d') if isinstance(ds['date_registered'], datetime) else ds['date_registered']
|
||||
date_connected = ds['date_connected'].strftime('%Y-%m-%d') if isinstance(ds['date_connected'], datetime) else ds['date_connected']
|
||||
date_registered = ds['date_registered'].strftime('%Y-%m-%d') if isinstance(ds['date_registered'], datetime) else ds['date_registered']
|
||||
date_connected = ds['date_connected'].strftime('%Y-%m-%d') if isinstance(ds['date_connected'], datetime) else ds['date_connected']
|
||||
|
||||
worksheet.write(y, 1, str(date_registered).replace('None', ''), valign_top)
|
||||
worksheet.write(y, 2, str(date_connected).replace('None', ''), valign_top)
|
||||
worksheet.write(y, 3, ', '.join(ds['products']).replace('None', ''), valign_top)
|
||||
worksheet.write(y, 4, ds['comment'][:-1] if ds['comment'].endswith('\n') else ds['comment'], wrap_text)
|
||||
worksheet.write(y, 5, str(ds['available_for_data_analytics']), valign_top)
|
||||
worksheet.write(y, 6, ds['data_quality']['device_completeness'], format_center_valign_top)
|
||||
worksheet.write(y, 7, ds['data_quality']['data_field_completeness'], format_center_valign_top)
|
||||
worksheet.write(y, 8, ds['data_quality']['timeliness'], format_center_valign_top)
|
||||
worksheet.write(y, 9, ds['data_quality']['consistency'], format_center_valign_top)
|
||||
worksheet.write(y, 10, ds['data_quality']['retention'], format_center_valign_top)
|
||||
worksheet.write(y, 1, str(date_registered).replace('None', ''), valign_top)
|
||||
worksheet.write(y, 2, str(date_connected).replace('None', ''), valign_top)
|
||||
worksheet.write(y, 3, ', '.join(ds['products']).replace('None', ''), valign_top)
|
||||
worksheet.write(y, 4, ds['comment'][:-1] if ds['comment'].endswith('\n') else ds['comment'], wrap_text)
|
||||
worksheet.write(y, 5, str(ds['available_for_data_analytics']), valign_top)
|
||||
worksheet.write(y, 6, ds['data_quality']['device_completeness'], format_center_valign_top)
|
||||
worksheet.write(y, 7, ds['data_quality']['data_field_completeness'], format_center_valign_top)
|
||||
worksheet.write(y, 8, ds['data_quality']['timeliness'], format_center_valign_top)
|
||||
worksheet.write(y, 9, ds['data_quality']['consistency'], format_center_valign_top)
|
||||
worksheet.write(y, 10, ds['data_quality']['retention'], format_center_valign_top)
|
||||
|
||||
score = 0
|
||||
score_count = 0
|
||||
for k, v in ds['data_quality'].items():
|
||||
# the below DQ dimensions are given more weight in the calculation of the DQ score.
|
||||
if k in ['device_completeness', 'data_field_completeness', 'retention']:
|
||||
score += (v * 2)
|
||||
score_count += 2
|
||||
else:
|
||||
score += v
|
||||
score_count += 1
|
||||
if score > 0:
|
||||
score = score/score_count
|
||||
score = 0
|
||||
score_count = 0
|
||||
for k, v in ds['data_quality'].items():
|
||||
# the below DQ dimensions are given more weight in the calculation of the DQ score.
|
||||
if k in ['device_completeness', 'data_field_completeness', 'retention']:
|
||||
score += (v * 2)
|
||||
score_count += 2
|
||||
else:
|
||||
score += v
|
||||
score_count += 1
|
||||
if score > 0:
|
||||
score = score/score_count
|
||||
|
||||
worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score)
|
||||
worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score)
|
||||
y += 1
|
||||
|
||||
worksheet.autofilter(2, 0, 2, 11)
|
||||
|
|
|
@ -614,7 +614,7 @@ def map_techniques_to_data_sources(techniques, my_data_sources):
|
|||
|
||||
def get_all_mitre_data_sources():
|
||||
"""
|
||||
Gets all the data sources from the techniques and make a unique sorted list of it.
|
||||
Gets all the data sources from the techniques and make a set.
|
||||
:return: a sorted list with all data sources
|
||||
"""
|
||||
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
|
||||
|
@ -624,7 +624,7 @@ def get_all_mitre_data_sources():
|
|||
if 'x_mitre_data_sources' in t.keys():
|
||||
for ds in t['x_mitre_data_sources']:
|
||||
data_sources.add(ds)
|
||||
return sorted(data_sources)
|
||||
return data_sources
|
||||
|
||||
|
||||
def calculate_score(list_detections, zero_value=0):
|
||||
|
|
Loading…
Reference in New Issue