- Replaced PyYAML with ruamel.yaml
- Added new functionality for the auto-update of visibility scores. - Made compatible with version 1.2 of the technique admin YAML file.master
parent
b0ba153c32
commit
94f4913670
|
@ -1,7 +1,8 @@
|
|||
import simplejson
|
||||
from generic import *
|
||||
import xlsxwriter
|
||||
import copy
|
||||
from generic import *
|
||||
|
||||
# Imports for pandas and plotly are because of performance reasons in the function that uses these libraries.
|
||||
|
||||
|
||||
|
@ -23,7 +24,7 @@ def generate_data_sources_layer(filename):
|
|||
output_filename = 'output/data_sources_' + normalize_name_to_filename(name) + '.json'
|
||||
with open(output_filename, 'w') as f:
|
||||
f.write(json_string)
|
||||
print("File written: " + output_filename)
|
||||
print('File written: ' + output_filename)
|
||||
|
||||
|
||||
def plot_data_sources_graph(filename):
|
||||
|
@ -53,7 +54,7 @@ def plot_data_sources_graph(filename):
|
|||
'layout': go.Layout(title="# of data sources for " + name)},
|
||||
filename=output_filename, auto_open=False
|
||||
)
|
||||
print("File written: " + output_filename)
|
||||
print("File written: " + output_filename)
|
||||
|
||||
|
||||
def export_data_source_list_to_excel(filename):
|
||||
|
@ -140,7 +141,7 @@ def export_data_source_list_to_excel(filename):
|
|||
worksheet.freeze_panes(3, 0)
|
||||
try:
|
||||
workbook.close()
|
||||
print("File written: " + excel_filename)
|
||||
print("File written: " + excel_filename)
|
||||
except Exception as e:
|
||||
print('[!] Error while writing Excel file: %s' % str(e))
|
||||
|
||||
|
@ -149,11 +150,12 @@ def _load_data_sources(filename, filter_empty_scores=True):
|
|||
"""
|
||||
Loads the data sources (including all properties) from the given yaml file.
|
||||
:param filename: the filename of the yaml file containing the data sources administration
|
||||
:return: dictionaty with data sources, name, platform and exceptions list.
|
||||
:return: dictionary with data sources, name, platform and exceptions list.
|
||||
"""
|
||||
my_data_sources = {}
|
||||
_yaml = init_yaml()
|
||||
with open(filename, 'r') as yaml_file:
|
||||
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
||||
yaml_content = _yaml.load(yaml_file)
|
||||
for d in yaml_content['data_sources']:
|
||||
dq = d['data_quality']
|
||||
if not filter_empty_scores:
|
||||
|
@ -195,9 +197,8 @@ def _map_and_colorize_techniques(my_ds, exceptions):
|
|||
for t, v in my_techniques.items():
|
||||
if t not in exceptions:
|
||||
for tactic in v['tactics']:
|
||||
d = {}
|
||||
d = dict()
|
||||
d['techniqueID'] = t
|
||||
# d['score'] = 50
|
||||
d['color'] = technique_colors[t]
|
||||
d['comment'] = ''
|
||||
d['enabled'] = True
|
||||
|
@ -211,10 +212,244 @@ def _map_and_colorize_techniques(my_ds, exceptions):
|
|||
return output_techniques
|
||||
|
||||
|
||||
def generate_technique_administration_file(filename):
|
||||
def _indent_comment(comment, indent):
|
||||
"""
|
||||
Indent a multiline general, visibility, detection comment by x spaces
|
||||
:param comment: The comment to indent
|
||||
:param indent: The number of spaces to use in the indent
|
||||
:return: indented comment or the original
|
||||
"""
|
||||
if '\n' in comment:
|
||||
new_comment = comment.replace('\n', '\n' + ' ' * indent)
|
||||
return new_comment
|
||||
else:
|
||||
return comment
|
||||
|
||||
|
||||
def _get_technique_yaml_obj(techniques, tech_id):
|
||||
"""
|
||||
Get at technique YAML obj from the provided list of techniques YAML objects which as the provided technique ID
|
||||
:param techniques: list of technique YAML objects
|
||||
:param tech_id: ATT&CK ID
|
||||
:return: technique YAML obj
|
||||
"""
|
||||
for tech in techniques:
|
||||
if tech['technique_id'] == tech_id:
|
||||
return tech
|
||||
|
||||
|
||||
def update_technique_administration_file(file_data_sources, file_tech_admin):
|
||||
"""
|
||||
Update the visibility scores in the provided technique administration file
|
||||
:param file_data_sources: file location of the data source admin. file
|
||||
:param file_tech_admin: file location of the tech. admin. file
|
||||
:return:
|
||||
"""
|
||||
# first we generate the new visibility scores contained within a temporary tech. admin YAML 'file'
|
||||
new_visibility_scores = generate_technique_administration_file(file_data_sources, write_file=False)
|
||||
|
||||
# we get the date to remove the single quotes at the end of the code
|
||||
today = new_visibility_scores['techniques'][0]['visibility']['score_logbook'][0]['date']
|
||||
|
||||
# next we load the current visibility scores from the tech. admin file
|
||||
cur_visibility_scores, _, platform_tech_admin = load_techniques(file_tech_admin, 'visibility')
|
||||
|
||||
# if the platform does not match between the data source and tech. admin file we return
|
||||
if new_visibility_scores['platform'] != platform_tech_admin:
|
||||
print('[!] The MITRE ATT&CK platform key-value pair in the data source administration and technique '
|
||||
'administration file do not match.\n Visibility update canceled.')
|
||||
return
|
||||
|
||||
# we did not return, so init
|
||||
_yaml = init_yaml()
|
||||
with open(file_tech_admin) as fd:
|
||||
yaml_file_tech_admin = _yaml.load(fd)
|
||||
|
||||
# check if we have tech IDs for which we now have visibility, but which were not yet part of the tech. admin file
|
||||
cur_tech_ids = cur_visibility_scores.keys()
|
||||
new_tech_ids = list(map(lambda k: k['technique_id'], new_visibility_scores['techniques']))
|
||||
|
||||
tech_ids_new = []
|
||||
for tid in new_tech_ids:
|
||||
if tid not in cur_tech_ids:
|
||||
tech_ids_new.append(tid)
|
||||
|
||||
# Add the new tech. to the ruamel instance: 'yaml_file_tech_admin'
|
||||
are_scores_updated = False
|
||||
tech_new_print = []
|
||||
if len(tech_ids_new) > 0:
|
||||
|
||||
# do we want fill in a comment for all updated visibility scores?
|
||||
comment = ''
|
||||
if ask_yes_no('\nDo you want to fill in the visibility comment for the updated scores?'):
|
||||
comment = input(' >> Visibility comment for in the new \'score\' object: ')
|
||||
print('')
|
||||
|
||||
# add new techniques and set the comment
|
||||
x = 0
|
||||
for new_tech in new_visibility_scores['techniques']:
|
||||
|
||||
# set the comment for all new visibility scores
|
||||
# we will also be needing this later in the code to update the scores of already present techniques
|
||||
new_visibility_scores['techniques'][x]['visibility']['score_logbook'][0]['comment'] = comment
|
||||
|
||||
if new_tech['technique_id'] in tech_ids_new:
|
||||
are_scores_updated = True
|
||||
yaml_file_tech_admin['techniques'].append(new_tech)
|
||||
tech_new_print.append(' - ' + new_tech['technique_id']+'\n')
|
||||
x += 1
|
||||
|
||||
print('The following new technique IDs are added to the technique administration file with a visibility '
|
||||
'score derived from the nr. of data sources:')
|
||||
print(''.join(tech_new_print))
|
||||
else:
|
||||
print(' - No new techniques, for which we now have visibility, have been added to the techniques administration file.')
|
||||
|
||||
# determine how visibility scores have been assigned in the current YAML file (auto, manually or mixed)
|
||||
# also determine if we have any scores that can be updated
|
||||
manually_scored = False
|
||||
auto_scored = False
|
||||
mix_scores = False
|
||||
updated_vis_score_cnt = 0
|
||||
for cur_tech, cur_values in cur_visibility_scores.items():
|
||||
new_tech = _get_technique_yaml_obj(new_visibility_scores['techniques'], cur_tech)
|
||||
new_score = new_tech['visibility']['score_logbook'][0]['score']
|
||||
|
||||
for cur_obj in cur_values['visibility']:
|
||||
old_score = get_latest_score(cur_obj)
|
||||
|
||||
if get_latest_auto_generated(cur_obj) and old_score != new_score:
|
||||
auto_scored = True
|
||||
updated_vis_score_cnt += 1
|
||||
elif old_score != new_score:
|
||||
manually_scored = True
|
||||
updated_vis_score_cnt += 1
|
||||
|
||||
if manually_scored and auto_scored:
|
||||
mix_scores = True
|
||||
manually_scored = False
|
||||
auto_scored = False
|
||||
break
|
||||
|
||||
# stop if none of the present visibility scores are eligible for an update
|
||||
if not mix_scores and not manually_scored and not auto_scored:
|
||||
print(' - None of the already present techniques has a visibility score that is eligible for an update.')
|
||||
else:
|
||||
print('\nA total of ' + str(updated_vis_score_cnt) + ' visibility scores are eligible for an update.\n')
|
||||
# ask how the score should be updated
|
||||
answer = 0
|
||||
if manually_scored:
|
||||
answer = ask_multiple_choice(V_UPDATE_Q_ALL_MANUAL, [V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
|
||||
elif auto_scored:
|
||||
answer = ask_multiple_choice(V_UPDATE_Q_ALL_AUTO, [V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
|
||||
elif mix_scores:
|
||||
answer = ask_multiple_choice(V_UPDATE_Q_MIXED, [V_UPDATE_ANSWER_3, V_UPDATE_ANSWER_4, V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
|
||||
if answer == V_UPDATE_ANSWER_CANCEL:
|
||||
return
|
||||
|
||||
# identify which visibility scores have changed and set the action to perform on the score
|
||||
# tech_update {tech_id: ..., {obj_idx: { action: 1|2|3, score_obj: {...} } } }
|
||||
tech_update = dict()
|
||||
for new_tech in new_visibility_scores['techniques']:
|
||||
tech_id = new_tech['technique_id']
|
||||
new_score_obj = new_tech['visibility']['score_logbook'][0]
|
||||
new_score = new_score_obj['score']
|
||||
|
||||
if tech_id in cur_visibility_scores:
|
||||
old_visibility_objects = cur_visibility_scores[tech_id]['visibility']
|
||||
obj_idx = 0
|
||||
for old_vis_obj in old_visibility_objects:
|
||||
old_score = get_latest_score(old_vis_obj)
|
||||
auto_gen = get_latest_auto_generated(old_vis_obj)
|
||||
|
||||
# continue if score can be updated
|
||||
if old_score != new_score:
|
||||
if tech_id not in tech_update:
|
||||
tech_update[tech_id] = dict()
|
||||
|
||||
if (answer == V_UPDATE_ANSWER_1) or (answer == V_UPDATE_ANSWER_3 and auto_gen):
|
||||
tech_update[tech_id][obj_idx] = {'action': V_UPDATE_ACTION_AUTO, 'score_obj': new_score_obj}
|
||||
elif answer == V_UPDATE_ANSWER_2:
|
||||
tech_update[tech_id][obj_idx] = {'action': V_UPDATE_ACTION_DIFF, 'score_obj': new_score_obj}
|
||||
elif answer == V_UPDATE_ANSWER_4:
|
||||
if auto_gen:
|
||||
tech_update[tech_id][obj_idx] = {'action': V_UPDATE_ACTION_AUTO, 'score_obj': new_score_obj}
|
||||
else:
|
||||
tech_update[tech_id][obj_idx] = {'action': V_UPDATE_ACTION_DIFF, 'score_obj': new_score_obj}
|
||||
obj_idx += 1
|
||||
|
||||
# perform the above set actions
|
||||
score_updates_handled = 0
|
||||
for old_tech in yaml_file_tech_admin['techniques']:
|
||||
tech_id = old_tech['technique_id']
|
||||
tech_name = old_tech['technique_name']
|
||||
obj_idx = 0
|
||||
if tech_id in tech_update:
|
||||
if isinstance(old_tech['visibility'], list):
|
||||
old_vis_obj = old_tech['visibility']
|
||||
else:
|
||||
old_vis_obj = [old_tech['visibility']]
|
||||
|
||||
while obj_idx <= len(tech_update[tech_id]):
|
||||
# continue if an action has been set for this visibility object
|
||||
if obj_idx in tech_update[tech_id]:
|
||||
update_action = tech_update[tech_id][obj_idx]['action']
|
||||
new_score_obj = tech_update[tech_id][obj_idx]['score_obj']
|
||||
|
||||
if update_action == V_UPDATE_ACTION_AUTO:
|
||||
are_scores_updated = True
|
||||
old_vis_obj[obj_idx]['score_logbook'].insert(0, new_score_obj)
|
||||
print(' - Updated a score in technique ID: ' + tech_id +
|
||||
' (applicable to: ' + ', '.join(old_vis_obj[obj_idx]['applicable_to']) + ')')
|
||||
elif update_action == V_UPDATE_ACTION_DIFF:
|
||||
print('-' * 80)
|
||||
tmp_txt = '[updates remaining: ' + str(updated_vis_score_cnt - score_updates_handled) + ']'
|
||||
print(' ' * (80-len(tmp_txt)) + tmp_txt)
|
||||
print('')
|
||||
print('Visibility object:')
|
||||
print(' - ATT&CK ID/name ' + tech_id + ' / ' + tech_name)
|
||||
print(' - Applicable to: ' + ', '.join(old_vis_obj[obj_idx]['applicable_to']))
|
||||
print(' - General comment: ' + _indent_comment(old_vis_obj[obj_idx]['comment'], 23))
|
||||
print('')
|
||||
print('OLD score object:')
|
||||
print(' - Date: ' + get_latest_date(old_vis_obj[obj_idx]).strftime('%Y-%m-%d'))
|
||||
print(' - Score: ' + str(get_latest_score(old_vis_obj[obj_idx])))
|
||||
print(' - Visibility comment: ' + _indent_comment(get_latest_comment(old_vis_obj[obj_idx]), 23))
|
||||
print('NEW score object:')
|
||||
print(' - Date: ' + new_score_obj['date'])
|
||||
print(' - Score: ' + str(new_score_obj['score']))
|
||||
print(' - Visibility comment: ' + _indent_comment(new_score_obj['comment'], 23))
|
||||
print(' - Auto generated: true')
|
||||
print('')
|
||||
if ask_yes_no('Update the score?'):
|
||||
are_scores_updated = True
|
||||
old_vis_obj[obj_idx]['score_logbook'].insert(0, new_score_obj)
|
||||
print(' - Updated a score in technique ID: ' + tech_id +
|
||||
' (applicable to: ' + ', '.join(old_vis_obj[obj_idx]['applicable_to']) + ')')
|
||||
|
||||
score_updates_handled += 1
|
||||
|
||||
obj_idx += 1
|
||||
|
||||
# create backup of the current tech. admin YAML file
|
||||
if are_scores_updated:
|
||||
print('')
|
||||
backup_file(file_tech_admin)
|
||||
|
||||
yaml_file_tech_admin = fix_date(yaml_file_tech_admin, today, input_reamel=True, return_reamel=True)
|
||||
|
||||
with open(file_tech_admin, 'w') as fd:
|
||||
_yaml.dump(yaml_file_tech_admin, fd)
|
||||
print('File written: ' + file_tech_admin)
|
||||
else:
|
||||
print('No visibility scores have been updated.')
|
||||
|
||||
|
||||
def generate_technique_administration_file(filename, write_file=True):
|
||||
"""
|
||||
Generate a technique administration file based on the data source administration yaml file
|
||||
:param filename: the filename of the yaml file containing the data sources administration
|
||||
:param write_file: by default the file is written to disk
|
||||
:return:
|
||||
"""
|
||||
my_data_sources, name, platform, exceptions = _load_data_sources(filename)
|
||||
|
@ -222,18 +457,37 @@ def generate_technique_administration_file(filename):
|
|||
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH_ENTERPRISE)
|
||||
|
||||
# This is part of the techniques administration YAML file and is used as a template
|
||||
dict_tech = {'technique_id': '', 'technique_name': '', 'detection': {'applicable_to': ['all'],
|
||||
'date_registered': None,
|
||||
'date_implemented': None,
|
||||
'score': -1, 'location': [''], 'comment': ''},
|
||||
'visibility': {'applicable_to': ['all'], 'score': 0, 'comment': ''}}
|
||||
dict_tech = {'technique_id': '',
|
||||
'technique_name': '',
|
||||
'detection':
|
||||
{'applicable_to': ['all'],
|
||||
'location': [''],
|
||||
'comment': '',
|
||||
'score_logbook':
|
||||
[
|
||||
{'date': None,
|
||||
'score': -1,
|
||||
'comment': ''}
|
||||
]},
|
||||
'visibility':
|
||||
{'applicable_to': ['all'],
|
||||
'comment': '',
|
||||
'score_logbook':
|
||||
[
|
||||
{'date': None,
|
||||
'score': 0,
|
||||
'comment': '',
|
||||
'auto_generated': True}
|
||||
]
|
||||
}}
|
||||
|
||||
yaml_file = {}
|
||||
yaml_file = dict()
|
||||
yaml_file['version'] = FILE_TYPE_TECHNIQUE_ADMINISTRATION_VERSION
|
||||
yaml_file['file_type'] = FILE_TYPE_TECHNIQUE_ADMINISTRATION
|
||||
yaml_file['name'] = name
|
||||
yaml_file['platform'] = platform
|
||||
yaml_file['techniques'] = []
|
||||
today = dt.now().strftime('%Y-%m-%d')
|
||||
|
||||
# Score visibility based on the number of available data sources and the exceptions
|
||||
for t in techniques:
|
||||
|
@ -260,17 +514,36 @@ def generate_technique_administration_file(filename):
|
|||
tech = copy.deepcopy(dict_tech)
|
||||
tech['technique_id'] = tech_id
|
||||
tech['technique_name'] = t['name']
|
||||
tech['visibility']['score'] = score
|
||||
# noinspection PyUnresolvedReferences
|
||||
tech['visibility']['score_logbook'][0]['score'] = score
|
||||
# noinspection PyUnresolvedReferences
|
||||
tech['visibility']['score_logbook'][0]['date'] = today
|
||||
yaml_file['techniques'].append(tech)
|
||||
|
||||
yaml_string = '%YAML 1.2\n---\n' + yaml.dump(yaml_file, sort_keys=False).replace('null', '')
|
||||
output_filename = 'output/techniques-administration-' + normalize_name_to_filename(name+'-'+platform) + '.yaml'
|
||||
suffix = 1
|
||||
while os.path.exists(output_filename):
|
||||
output_filename = 'output/techniques-administration-' + normalize_name_to_filename(name + '-' + platform) + \
|
||||
'_' + str(suffix) + '.yaml'
|
||||
suffix += 1
|
||||
if write_file:
|
||||
# remove the single quotes around the date key-value pair
|
||||
_yaml = init_yaml()
|
||||
tmp_file = sys.path[0] + '/.tmp_tech_file'
|
||||
|
||||
with open(output_filename, 'w') as f:
|
||||
f.write(yaml_string)
|
||||
print("File written: " + output_filename)
|
||||
# create the file lines by writing it to disk
|
||||
with open(tmp_file, 'w') as fd_tmp:
|
||||
_yaml.dump(yaml_file, fd_tmp)
|
||||
|
||||
# remove the single quotes from the date
|
||||
yaml_file_lines = fix_date(tmp_file, today, input_reamel=False)
|
||||
|
||||
os.remove(tmp_file)
|
||||
|
||||
# create a output filename that prevents overwriting any existing files
|
||||
output_filename = 'output/techniques-administration-' + normalize_name_to_filename(name+'-'+platform) + '.yaml'
|
||||
suffix = 1
|
||||
while os.path.exists(output_filename):
|
||||
output_filename = 'output/techniques-administration-' + normalize_name_to_filename(name + '-' + platform) + \
|
||||
'_' + str(suffix) + '.yaml'
|
||||
suffix += 1
|
||||
|
||||
with open(output_filename, 'w') as f:
|
||||
f.writelines(yaml_file_lines)
|
||||
print("File written: " + output_filename)
|
||||
else:
|
||||
return yaml_file
|
||||
|
|
Loading…
Reference in New Issue