- Made compatible with version 1.2 of the technique admin YAML file.
- Added new functionality for the auto-update of visibility scores. - Added multiple new generic functions. - Multiple small improvements to the technique admin YAML file health check. - Replaced PyYAML with ruamel.yaml. - Multiple functions made "private". - Made compatible with v0.2.7 of attackcti.master
parent
6be77c3260
commit
d0f2a4946b
456
generic.py
456
generic.py
|
@ -1,10 +1,12 @@
|
||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
import pickle
|
import pickle
|
||||||
|
import sys
|
||||||
|
from ruamel.yaml import YAML
|
||||||
|
from difflib import SequenceMatcher
|
||||||
from datetime import datetime as dt
|
from datetime import datetime as dt
|
||||||
import yaml
|
|
||||||
from upgrade import upgrade_yaml_file
|
from upgrade import upgrade_yaml_file
|
||||||
from constants import *
|
from constants import *
|
||||||
from difflib import SequenceMatcher
|
|
||||||
|
|
||||||
# Due to performance reasons the import of attackcti is within the function that makes use of this library.
|
# Due to performance reasons the import of attackcti is within the function that makes use of this library.
|
||||||
|
|
||||||
|
@ -21,23 +23,7 @@ def try_get_key(dictionary, key):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def try_except(self, stix_objects, object_type, nested_value=None):
|
def _save_attack_data(data, path):
|
||||||
if object_type in stix_objects:
|
|
||||||
specific_stix_object = stix_objects[object_type]
|
|
||||||
if isinstance(specific_stix_object, list):
|
|
||||||
if nested_value is None:
|
|
||||||
lists = self.handle_list(stix_objects, object_type)
|
|
||||||
return lists
|
|
||||||
else:
|
|
||||||
nested_result = self.handle_nested(stix_objects, object_type, nested_value)
|
|
||||||
return nested_result
|
|
||||||
else:
|
|
||||||
return stix_objects[object_type]
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def save_attack_data(data, path):
|
|
||||||
"""
|
"""
|
||||||
Save ATT&CK data to disk for the purpose of caching. Data can be STIX objects our a custom schema.
|
Save ATT&CK data to disk for the purpose of caching. Data can be STIX objects our a custom schema.
|
||||||
:param data: the MITRE ATT&CK data to save
|
:param data: the MITRE ATT&CK data to save
|
||||||
|
@ -70,9 +56,9 @@ def load_attack_data(data_type):
|
||||||
|
|
||||||
attack_data = None
|
attack_data = None
|
||||||
if data_type == DATA_TYPE_STIX_ALL_RELATIONSHIPS:
|
if data_type == DATA_TYPE_STIX_ALL_RELATIONSHIPS:
|
||||||
attack_data = mitre.get_all_relationships()
|
attack_data = mitre.get_relationships()
|
||||||
if data_type == DATA_TYPE_STIX_ALL_TECH_ENTERPRISE:
|
if data_type == DATA_TYPE_STIX_ALL_TECH_ENTERPRISE:
|
||||||
attack_data = mitre.get_all_enterprise_techniques()
|
attack_data = mitre.get_enterprise_techniques()
|
||||||
if data_type == DATA_TYPE_CUSTOM_TECH_BY_GROUP:
|
if data_type == DATA_TYPE_CUSTOM_TECH_BY_GROUP:
|
||||||
# First we need to know which technique references (STIX Object type 'attack-pattern') we have for all
|
# First we need to know which technique references (STIX Object type 'attack-pattern') we have for all
|
||||||
# groups. This results in a dict: {group_id: Gxxxx, technique_ref/attack-pattern_ref: ...}
|
# groups. This results in a dict: {group_id: Gxxxx, technique_ref/attack-pattern_ref: ...}
|
||||||
|
@ -112,11 +98,11 @@ def load_attack_data(data_type):
|
||||||
attack_data = all_group_use
|
attack_data = all_group_use
|
||||||
|
|
||||||
elif data_type == DATA_TYPE_STIX_ALL_TECH:
|
elif data_type == DATA_TYPE_STIX_ALL_TECH:
|
||||||
attack_data = mitre.get_all_techniques()
|
attack_data = mitre.get_techniques()
|
||||||
elif data_type == DATA_TYPE_STIX_ALL_GROUPS:
|
elif data_type == DATA_TYPE_STIX_ALL_GROUPS:
|
||||||
attack_data = mitre.get_all_groups()
|
attack_data = mitre.get_groups()
|
||||||
elif data_type == DATA_TYPE_STIX_ALL_SOFTWARE:
|
elif data_type == DATA_TYPE_STIX_ALL_SOFTWARE:
|
||||||
attack_data = mitre.get_all_software()
|
attack_data = mitre.get_software()
|
||||||
elif data_type == DATA_TYPE_CUSTOM_TECH_BY_SOFTWARE:
|
elif data_type == DATA_TYPE_CUSTOM_TECH_BY_SOFTWARE:
|
||||||
# First we need to know which technique references (STIX Object type 'attack-pattern') we have for all software
|
# First we need to know which technique references (STIX Object type 'attack-pattern') we have for all software
|
||||||
# This results in a dict: {software_id: Sxxxx, technique_ref/attack-pattern_ref: ...}
|
# This results in a dict: {software_id: Sxxxx, technique_ref/attack-pattern_ref: ...}
|
||||||
|
@ -180,11 +166,17 @@ def load_attack_data(data_type):
|
||||||
})
|
})
|
||||||
attack_data = all_group_use
|
attack_data = all_group_use
|
||||||
|
|
||||||
save_attack_data(attack_data, "cache/" + data_type)
|
_save_attack_data(attack_data, "cache/" + data_type)
|
||||||
|
|
||||||
return attack_data
|
return attack_data
|
||||||
|
|
||||||
|
|
||||||
|
def init_yaml():
|
||||||
|
_yaml = YAML()
|
||||||
|
_yaml.Representer.ignore_aliases = lambda *args: True # disable anchors/aliases
|
||||||
|
return _yaml
|
||||||
|
|
||||||
|
|
||||||
def _get_base_template(name, description, stage, platform, sorting):
|
def _get_base_template(name, description, stage, platform, sorting):
|
||||||
"""
|
"""
|
||||||
Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator.
|
Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator.
|
||||||
|
@ -197,7 +189,7 @@ def _get_base_template(name, description, stage, platform, sorting):
|
||||||
:param sorting: sorting
|
:param sorting: sorting
|
||||||
:return: layer template dictionary
|
:return: layer template dictionary
|
||||||
"""
|
"""
|
||||||
layer = {}
|
layer = dict()
|
||||||
layer['name'] = name
|
layer['name'] = name
|
||||||
layer['version'] = '2.1'
|
layer['version'] = '2.1'
|
||||||
layer['domain'] = 'mitre-enterprise'
|
layer['domain'] = 'mitre-enterprise'
|
||||||
|
@ -348,6 +340,22 @@ def get_layer_template_layered(name, description, stage, platform):
|
||||||
return layer
|
return layer
|
||||||
|
|
||||||
|
|
||||||
|
def backup_file(filename):
|
||||||
|
"""
|
||||||
|
Create a backup of the provided file
|
||||||
|
:param filename: existing YAML filename
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
suffix = 1
|
||||||
|
backup_filename = filename.replace('.yaml', '_backup_' + str(suffix) + '.yaml')
|
||||||
|
while os.path.exists(backup_filename):
|
||||||
|
backup_filename = backup_filename.replace('_backup_' + str(suffix) + '.yaml', '_backup_' + str(suffix+1) + '.yaml')
|
||||||
|
suffix += 1
|
||||||
|
|
||||||
|
shutil.copy2(filename, backup_filename)
|
||||||
|
print('Written backup file: ' + backup_filename + '\n')
|
||||||
|
|
||||||
|
|
||||||
def get_attack_id(stix_obj):
|
def get_attack_id(stix_obj):
|
||||||
"""
|
"""
|
||||||
Get the Technique, Group or Software ID from the STIX object
|
Get the Technique, Group or Software ID from the STIX object
|
||||||
|
@ -386,6 +394,166 @@ def get_technique(techniques, technique_id):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def ask_yes_no(question):
|
||||||
|
"""
|
||||||
|
Ask the user to a question that needs to be answered with yes or no.
|
||||||
|
:param question: The question to be asked
|
||||||
|
:return: boolean value indicating a yes (True) or no (False0
|
||||||
|
"""
|
||||||
|
yes_no = ''
|
||||||
|
while not re.match('^(y|yes|n|no)$', yes_no, re.IGNORECASE):
|
||||||
|
yes_no = input(question + '\n >> y(yes) / n(no): ')
|
||||||
|
print('')
|
||||||
|
|
||||||
|
if re.match('^(y|yes)$', yes_no, re.IGNORECASE):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def ask_multiple_choice(question, list_answers):
|
||||||
|
"""
|
||||||
|
Ask a multiple choice question.
|
||||||
|
:param question: the question to ask
|
||||||
|
:param list_answers: a list of answer
|
||||||
|
:return: the answer
|
||||||
|
"""
|
||||||
|
answer = ''
|
||||||
|
answers = ''
|
||||||
|
x = 1
|
||||||
|
for a in list_answers:
|
||||||
|
a = a.replace('\n', '\n ')
|
||||||
|
answers += ' ' + str(x) + ') ' + a + '\n'
|
||||||
|
x += 1
|
||||||
|
|
||||||
|
# noinspection Annotator
|
||||||
|
while not re.match('(^[1-' + str(len(list_answers)) + ']{1}$)', answer):
|
||||||
|
print(question)
|
||||||
|
print(answers)
|
||||||
|
answer = input(' >> ')
|
||||||
|
print('')
|
||||||
|
|
||||||
|
return list_answers[int(answer)-1]
|
||||||
|
|
||||||
|
|
||||||
|
def fix_date(yaml_file, date, input_reamel=True, return_reamel=False):
|
||||||
|
"""
|
||||||
|
Remove the single quotes around the date key-value pair in the provided yaml_file
|
||||||
|
:param yaml_file: ruamel.yaml instance or location of YAML file
|
||||||
|
:param date: string date value (e.g. 2019-01-01)
|
||||||
|
:param input_reamel: input type can be a reamel instance or list
|
||||||
|
:param return_reamel: return list of YAML file lines or reamel instance
|
||||||
|
:return: YAML file lines in a list
|
||||||
|
"""
|
||||||
|
_yaml = init_yaml()
|
||||||
|
if input_reamel:
|
||||||
|
file = sys.path[0] + '/.tmp_tech_file'
|
||||||
|
|
||||||
|
with open(file, 'w') as fd:
|
||||||
|
_yaml.dump(yaml_file, fd)
|
||||||
|
else:
|
||||||
|
file = yaml_file
|
||||||
|
|
||||||
|
with open(file, 'r') as fd:
|
||||||
|
new_lines = fd.readlines()
|
||||||
|
x = 0
|
||||||
|
for line in new_lines:
|
||||||
|
if REGEX_YAML_DATE.match(line):
|
||||||
|
new_lines[x] = line.replace('\'' + date + '\'', date)
|
||||||
|
x += 1
|
||||||
|
|
||||||
|
# remove the temporary file
|
||||||
|
if input_reamel:
|
||||||
|
os.remove(file)
|
||||||
|
|
||||||
|
if return_reamel:
|
||||||
|
return _yaml.load(''.join(new_lines))
|
||||||
|
else:
|
||||||
|
return new_lines
|
||||||
|
|
||||||
|
|
||||||
|
def _get_latest_score_obj(yaml_object):
|
||||||
|
"""
|
||||||
|
Get the the score object in the score_logbook by date
|
||||||
|
:param yaml_object: a detection or visibility YAML object
|
||||||
|
:return: the latest score object
|
||||||
|
"""
|
||||||
|
if not isinstance(yaml_object['score_logbook'], list):
|
||||||
|
yaml_object['score_logbook'] = [yaml_object['score_logbook']]
|
||||||
|
|
||||||
|
if len(yaml_object['score_logbook']) > 0 and 'date' in yaml_object['score_logbook'][0]:
|
||||||
|
# for some weird reason 'sorted()' provides inconsistent results
|
||||||
|
newest_score_obj = None
|
||||||
|
newest_date = None
|
||||||
|
for score_obj in yaml_object['score_logbook']:
|
||||||
|
if not newest_score_obj or score_obj['date'] > newest_date:
|
||||||
|
newest_date = score_obj['date']
|
||||||
|
newest_score_obj = score_obj
|
||||||
|
|
||||||
|
return newest_score_obj
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_latest_comment(yaml_object, empty=' '):
|
||||||
|
"""
|
||||||
|
Return the latest comment present in the score_logbook
|
||||||
|
:param yaml_object: a detection or visibility YAML object
|
||||||
|
:param empty: value for an empty comment
|
||||||
|
:return: comment
|
||||||
|
"""
|
||||||
|
score_obj = _get_latest_score_obj(yaml_object)
|
||||||
|
if score_obj:
|
||||||
|
if score_obj['comment'] == '' or not score_obj['comment']:
|
||||||
|
return empty
|
||||||
|
else:
|
||||||
|
return score_obj['comment']
|
||||||
|
else:
|
||||||
|
return empty
|
||||||
|
|
||||||
|
|
||||||
|
def get_latest_date(yaml_object):
|
||||||
|
"""
|
||||||
|
Return the latest date present in the score_logbook
|
||||||
|
:param yaml_object: a detection or visibility YAML object
|
||||||
|
:return: date as a datetime object or None
|
||||||
|
"""
|
||||||
|
score_obj = _get_latest_score_obj(yaml_object)
|
||||||
|
if score_obj:
|
||||||
|
return score_obj['date']
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_latest_auto_generated(yaml_object):
|
||||||
|
"""
|
||||||
|
Return the latest auto_generated value present in the score_logbook
|
||||||
|
:param yaml_object: a detection or visibility YAML object
|
||||||
|
:return: True or False
|
||||||
|
"""
|
||||||
|
score_obj = _get_latest_score_obj(yaml_object)
|
||||||
|
if score_obj:
|
||||||
|
if 'auto_generated' in score_obj:
|
||||||
|
return score_obj['auto_generated']
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_latest_score(yaml_object):
|
||||||
|
"""
|
||||||
|
Return the latest score present in the score_logbook
|
||||||
|
:param yaml_object: a detection or visibility YAML object
|
||||||
|
:return: score as an integer or None
|
||||||
|
"""
|
||||||
|
score_obj = _get_latest_score_obj(yaml_object)
|
||||||
|
if score_obj:
|
||||||
|
return score_obj['score']
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def normalize_name_to_filename(name):
|
def normalize_name_to_filename(name):
|
||||||
"""
|
"""
|
||||||
Normalize the input filename to a lowercase filename and replace spaces with dashes.
|
Normalize the input filename to a lowercase filename and replace spaces with dashes.
|
||||||
|
@ -439,38 +607,39 @@ def get_all_mitre_data_sources():
|
||||||
return sorted(data_sources)
|
return sorted(data_sources)
|
||||||
|
|
||||||
|
|
||||||
def calculate_score(l, zero_value=0):
|
def calculate_score(list_detections, zero_value=0):
|
||||||
"""
|
"""
|
||||||
Calculates the average score in the given list which contains dictionaries with 'score' field.
|
Calculates the average score in the given list which may contain multiple detection dictionaries
|
||||||
:param l: list
|
:param list_detections: list
|
||||||
:param zero_value: the value when no scores are there, default 0
|
:param zero_value: the value when no scores are there, default 0
|
||||||
:return: average score
|
:return: average score
|
||||||
"""
|
"""
|
||||||
s = 0
|
avg_score = 0
|
||||||
number = 0
|
number = 0
|
||||||
for v in l:
|
for v in list_detections:
|
||||||
if v['score'] >= 0:
|
score = get_latest_score(v)
|
||||||
s += v['score']
|
if score >= 0:
|
||||||
|
avg_score += score
|
||||||
number += 1
|
number += 1
|
||||||
s = int(round(s / number, 0) if number > 0 else zero_value)
|
avg_score = int(round(avg_score / number, 0) if number > 0 else zero_value)
|
||||||
return s
|
return avg_score
|
||||||
|
|
||||||
|
|
||||||
def _add_entry_to_list_in_dictionary(dict, technique_id, key, entry):
|
def add_entry_to_list_in_dictionary(dictionary, technique_id, key, entry):
|
||||||
"""
|
"""
|
||||||
Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the
|
Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the
|
||||||
list. If the dict[technique_id] doesn't exist yet, it will be created.
|
list. If the dict[technique_id] doesn't exist yet, it will be created.
|
||||||
:param dict: the dictionary
|
:param dictionary: the dictionary
|
||||||
:param technique_id: the id of the technique in the main dict
|
:param technique_id: the id of the technique in the main dict
|
||||||
:param key: the key where the list in the dictionary resides
|
:param key: the key where the list in the dictionary resides
|
||||||
:param entry: the entry to add to the list
|
:param entry: the entry to add to the list
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
if technique_id not in dict.keys():
|
if technique_id not in dictionary.keys():
|
||||||
dict[technique_id] = {}
|
dictionary[technique_id] = {}
|
||||||
if not key in dict[technique_id].keys():
|
if key not in dictionary[technique_id].keys():
|
||||||
dict[technique_id][key] = []
|
dictionary[technique_id][key] = []
|
||||||
dict[technique_id][key].append(entry)
|
dictionary[technique_id][key].append(entry)
|
||||||
|
|
||||||
|
|
||||||
def load_techniques(filename, detection_or_visibility='all', filter_applicable_to='all'):
|
def load_techniques(filename, detection_or_visibility='all', filter_applicable_to='all'):
|
||||||
|
@ -484,26 +653,27 @@ def load_techniques(filename, detection_or_visibility='all', filter_applicable_t
|
||||||
"""
|
"""
|
||||||
|
|
||||||
my_techniques = {}
|
my_techniques = {}
|
||||||
|
_yaml = init_yaml()
|
||||||
with open(filename, 'r') as yaml_file:
|
with open(filename, 'r') as yaml_file:
|
||||||
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
yaml_content = _yaml.load(yaml_file)
|
||||||
for d in yaml_content['techniques']:
|
for d in yaml_content['techniques']:
|
||||||
# Add detection items:
|
# Add detection items:
|
||||||
if type(d['detection']) == dict: # There is just one detection entry
|
if isinstance(d['detection'], dict): # There is just one detection entry
|
||||||
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
|
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
|
||||||
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection'])
|
add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection'])
|
||||||
elif type(d['detection']) == list: # There are multiple detection entries
|
elif isinstance(d['detection'], list): # There are multiple detection entries
|
||||||
for de in d['detection']:
|
for de in d['detection']:
|
||||||
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
|
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
|
||||||
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de)
|
add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de)
|
||||||
|
|
||||||
# Add visibility items
|
# Add visibility items
|
||||||
if type(d['visibility']) == dict: # There is just one visibility entry
|
if isinstance(d['visibility'], dict): # There is just one visibility entry
|
||||||
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
|
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
|
||||||
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility'])
|
add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility'])
|
||||||
elif type(d['visibility']) == list: # There are multiple visibility entries
|
elif isinstance(d['visibility'], list): # There are multiple visibility entries
|
||||||
for de in d['visibility']:
|
for de in d['visibility']:
|
||||||
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
|
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
|
||||||
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de)
|
add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de)
|
||||||
|
|
||||||
name = yaml_content['name']
|
name = yaml_content['name']
|
||||||
platform = yaml_content['platform']
|
platform = yaml_content['platform']
|
||||||
|
@ -516,6 +686,111 @@ def _print_error_msg(msg, print_error):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _check_health_score_object(yaml_object, object_type, tech_id, health_is_called):
|
||||||
|
"""
|
||||||
|
Check the health of a score_logbook inside a visibility or detection YAML object
|
||||||
|
:param yaml_object: YAML file lines
|
||||||
|
:param object_type: 'detection' or 'visibility'
|
||||||
|
:param tech_id: ATT&CK technique ID
|
||||||
|
:param health_is_called: boolean that specifies if detailed errors in the file will be printed and then quit()
|
||||||
|
:return: True if the YAML file is unhealthy, otherwise False
|
||||||
|
"""
|
||||||
|
has_error = False
|
||||||
|
min_score = None
|
||||||
|
max_score = None
|
||||||
|
|
||||||
|
if object_type == 'detection':
|
||||||
|
min_score = -1
|
||||||
|
max_score = 5
|
||||||
|
elif object_type == 'visibility':
|
||||||
|
min_score = 0
|
||||||
|
max_score = 4
|
||||||
|
|
||||||
|
if not isinstance(yaml_object['score_logbook'], list):
|
||||||
|
yaml_object['score_logbook'] = [yaml_object['score_logbook']]
|
||||||
|
|
||||||
|
try:
|
||||||
|
for score_obj in yaml_object['score_logbook']:
|
||||||
|
for key in ['date', 'score', 'comment']:
|
||||||
|
if key not in score_obj:
|
||||||
|
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' is MISSING a key-value pair in a ' + object_type + ' score object in the \'score_logbook\': ' + key, health_is_called)
|
||||||
|
|
||||||
|
if score_obj['score'] is None:
|
||||||
|
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object in the \'score_logbook\': score', health_is_called)
|
||||||
|
|
||||||
|
elif not isinstance(score_obj['score'], int):
|
||||||
|
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID score format in a ' + object_type + ' score object in the \'score_logbook\': score should be an integer', health_is_called)
|
||||||
|
|
||||||
|
if 'auto_generated' in score_obj:
|
||||||
|
if not isinstance(score_obj['auto_generated'], bool):
|
||||||
|
has_error = _print_error_msg(
|
||||||
|
'[!] Technique ID: ' + tech_id + ' has an INVALID auto_generated value in a ' + object_type + ' score object in the \'score_logbook\': auto_generated (if present) should be set to \'true\' or \'false\'', health_is_called)
|
||||||
|
|
||||||
|
if isinstance(score_obj['score'], int):
|
||||||
|
if score_obj['date'] is None and score_obj['score'] > -1:
|
||||||
|
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object in the \'score_logbook\': date', health_is_called)
|
||||||
|
|
||||||
|
# noinspection PyChainedComparisons
|
||||||
|
if not (score_obj['score'] >= min_score and score_obj['score'] <= max_score):
|
||||||
|
has_error = _print_error_msg(
|
||||||
|
'[!] Technique ID: ' + tech_id + ' has an INVALID ' + object_type + ' score in a score object in the \'score_logbook\': ' + str(score_obj['score']) + ' (should be between ' + str(min_score) + ' and ' + str(max_score) + ')', health_is_called)
|
||||||
|
|
||||||
|
if score_obj['score'] > min_score:
|
||||||
|
try:
|
||||||
|
# noinspection PyStatementEffect
|
||||||
|
score_obj['date'].year
|
||||||
|
# noinspection PyStatementEffect
|
||||||
|
score_obj['date'].month
|
||||||
|
# noinspection PyStatementEffect
|
||||||
|
score_obj['date'].day
|
||||||
|
except AttributeError:
|
||||||
|
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID data format in a ' + object_type + ' score object in the \'score_logbook\': date (should be YYYY-MM-DD)', health_is_called)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return has_error
|
||||||
|
|
||||||
|
|
||||||
|
def _check_health_yaml_object(yaml_object, object_type, tech_id, health_is_called):
|
||||||
|
"""
|
||||||
|
Check the health of a visibility or detection YAML object
|
||||||
|
:param yaml_object: YAML file lines
|
||||||
|
:param object_type: 'detection' or 'visibility'
|
||||||
|
:param tech_id: ATT&CK technique ID
|
||||||
|
:param health_is_called: boolean that specifies if detailed errors in the file will be printed and then quit()
|
||||||
|
:return: True if the YAML file is unhealthy, otherwise False
|
||||||
|
"""
|
||||||
|
has_error = False
|
||||||
|
|
||||||
|
keys = ['applicable_to']
|
||||||
|
|
||||||
|
if object_type == 'detection':
|
||||||
|
keys.append('location')
|
||||||
|
|
||||||
|
try:
|
||||||
|
for key in keys:
|
||||||
|
if not isinstance(yaml_object[key], list):
|
||||||
|
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has for the key-value pair \'' + key + '\' in ' + object_type + ' a string value assigned (should be a list)', health_is_called)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
if yaml_object[key][0] is None:
|
||||||
|
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in ' + object_type + ': ' + key, health_is_called)
|
||||||
|
except TypeError:
|
||||||
|
has_error = _print_error_msg(
|
||||||
|
'[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in ' + object_type + ': ' + key, health_is_called)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return has_error
|
||||||
|
|
||||||
|
|
||||||
|
def _update_health_sate(current, update):
|
||||||
|
if current or update:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return update
|
||||||
|
|
||||||
|
|
||||||
def check_yaml_file_health(filename, file_type, health_is_called):
|
def check_yaml_file_health(filename, file_type, health_is_called):
|
||||||
"""
|
"""
|
||||||
Check on error in the provided YAML file.
|
Check on error in the provided YAML file.
|
||||||
|
@ -528,8 +803,9 @@ def check_yaml_file_health(filename, file_type, health_is_called):
|
||||||
has_error = False
|
has_error = False
|
||||||
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||||
# check for duplicate tech IDs
|
# check for duplicate tech IDs
|
||||||
|
_yaml = init_yaml()
|
||||||
with open(filename, 'r') as yaml_file:
|
with open(filename, 'r') as yaml_file:
|
||||||
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
yaml_content = _yaml.load(yaml_file)
|
||||||
|
|
||||||
tech_ids = list(map(lambda x: x['technique_id'], yaml_content['techniques']))
|
tech_ids = list(map(lambda x: x['technique_id'], yaml_content['techniques']))
|
||||||
tech_dup = []
|
tech_dup = []
|
||||||
|
@ -539,17 +815,20 @@ def check_yaml_file_health(filename, file_type, health_is_called):
|
||||||
else:
|
else:
|
||||||
has_error = _print_error_msg('[!] Duplicate technique ID: ' + tech, health_is_called)
|
has_error = _print_error_msg('[!] Duplicate technique ID: ' + tech, health_is_called)
|
||||||
|
|
||||||
|
# check if the technique has a valid format
|
||||||
|
if not REGEX_YAML_TECHNIQUE_ID_FORMAT.match(tech):
|
||||||
|
has_error = _print_error_msg('[!] Invalid technique ID: ' + tech, health_is_called)
|
||||||
|
|
||||||
# checks on:
|
# checks on:
|
||||||
# - empty key-value pairs: 'date_implemented', 'date_registered', 'location', 'applicable_to', 'score'
|
# - empty key-value pairs: 'applicable_to', 'comment', 'location', 'score_logbook' , 'date', 'score'
|
||||||
# - invalid date format for: 'date_implemented', 'date_registered'
|
# - invalid date format for: 'date'
|
||||||
# - detection or visibility score out-of-range
|
# - detection or visibility score out-of-range
|
||||||
# - missing key-value pairs: 'applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment'
|
# - missing key-value pairs: 'applicable_to', 'comment', 'location', 'score_logbook', 'date', 'score'
|
||||||
# - check on 'applicable_to' values which are very similar
|
# - check on 'applicable_to' values which are very similar
|
||||||
|
|
||||||
all_applicable_to = set()
|
all_applicable_to = set()
|
||||||
techniques = load_techniques(filename)
|
techniques = load_techniques(filename)
|
||||||
for tech, v in techniques[0].items():
|
for tech, v in techniques[0].items():
|
||||||
|
|
||||||
for key in ['detection', 'visibility']:
|
for key in ['detection', 'visibility']:
|
||||||
if key not in v:
|
if key not in v:
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING ' + key, health_is_called)
|
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING ' + key, health_is_called)
|
||||||
|
@ -558,57 +837,24 @@ def check_yaml_file_health(filename, file_type, health_is_called):
|
||||||
all_applicable_to.update([a for v in v[key] for a in v['applicable_to']])
|
all_applicable_to.update([a for v in v[key] for a in v['applicable_to']])
|
||||||
|
|
||||||
for detection in v['detection']:
|
for detection in v['detection']:
|
||||||
for key in ['applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment']:
|
for key in ['applicable_to', 'location', 'comment', 'score_logbook']:
|
||||||
if key not in detection:
|
if key not in detection:
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in detection: ' + key, health_is_called)
|
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING a key-value pair in detection: ' + key, health_is_called)
|
||||||
|
|
||||||
try:
|
health = _check_health_yaml_object(detection, 'detection', tech, health_is_called)
|
||||||
if detection['score'] is None:
|
has_error = _update_health_sate(has_error, health)
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: score', health_is_called)
|
health = _check_health_score_object(detection, 'detection', tech, health_is_called)
|
||||||
|
has_error = _update_health_sate(has_error, health)
|
||||||
elif not (detection['score'] >= -1 and detection['score'] <= 5):
|
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an INVALID detection score: '
|
|
||||||
+ str(detection['score']) + ' (should be between -1 and 5)', health_is_called)
|
|
||||||
|
|
||||||
elif detection['score'] > -1:
|
|
||||||
for key in ['date_implemented', 'date_registered']:
|
|
||||||
if not detection[key]:
|
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key, health_is_called)
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
detection[key].year
|
|
||||||
detection[key].month
|
|
||||||
detection[key].day
|
|
||||||
except AttributeError:
|
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech +
|
|
||||||
' has an INVALID data format for the key-value pair in detection: ' +
|
|
||||||
key + ' (should be YYYY-MM-DD)', health_is_called)
|
|
||||||
for key in ['location', 'applicable_to']:
|
|
||||||
if not isinstance(detection[key], list):
|
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has for the key-value pair \''
|
|
||||||
+ key + '\' a string value assigned (should be a list)', health_is_called)
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
if detection[key][0] is None:
|
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key, health_is_called)
|
|
||||||
except TypeError:
|
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key, health_is_called)
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
for visibility in v['visibility']:
|
for visibility in v['visibility']:
|
||||||
for key in ['applicable_to', 'score', 'comment']:
|
for key in ['applicable_to', 'comment', 'score_logbook']:
|
||||||
if key not in visibility:
|
if key not in visibility:
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in visibility: ' + key, health_is_called)
|
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING a key-value pair in visibility: ' + key, health_is_called)
|
||||||
|
|
||||||
try:
|
health = _check_health_yaml_object(visibility, 'visibility', tech, health_is_called)
|
||||||
if visibility['score'] is None:
|
has_error = _update_health_sate(has_error, health)
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in visibility: score', health_is_called)
|
health = _check_health_score_object(visibility, 'visibility', tech, health_is_called)
|
||||||
elif not (visibility['score'] >= 0 and visibility['score'] <= 4):
|
has_error = _update_health_sate(has_error, health)
|
||||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an INVALID visibility score: '
|
|
||||||
+ str(detection['score']) + ' (should be between 0 and 4)', health_is_called)
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# get values within the key-value pair 'applicable_to' which are a very close match
|
# get values within the key-value pair 'applicable_to' which are a very close match
|
||||||
similar = set()
|
similar = set()
|
||||||
|
@ -632,7 +878,7 @@ def check_yaml_file_health(filename, file_type, health_is_called):
|
||||||
print('') # print a newline
|
print('') # print a newline
|
||||||
|
|
||||||
|
|
||||||
def check_file_type(filename, file_type=None):
|
def _check_file_type(filename, file_type=None):
|
||||||
"""
|
"""
|
||||||
Check if the provided YAML file has the key 'file_type' and possible if that key matches a specific value.
|
Check if the provided YAML file has the key 'file_type' and possible if that key matches a specific value.
|
||||||
:param filename: path to a YAML file
|
:param filename: path to a YAML file
|
||||||
|
@ -642,9 +888,11 @@ def check_file_type(filename, file_type=None):
|
||||||
if not os.path.exists(filename):
|
if not os.path.exists(filename):
|
||||||
print('[!] File: \'' + filename + '\' does not exist')
|
print('[!] File: \'' + filename + '\' does not exist')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
_yaml = init_yaml()
|
||||||
with open(filename, 'r') as yaml_file:
|
with open(filename, 'r') as yaml_file:
|
||||||
try:
|
try:
|
||||||
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
yaml_content = _yaml.load(yaml_file)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print('[!] File: \'' + filename + '\' is not a valid YAML file.')
|
print('[!] File: \'' + filename + '\' is not a valid YAML file.')
|
||||||
print(' ' + str(e)) # print more detailed error information to help the user in fixing the error.
|
print(' ' + str(e)) # print more detailed error information to help the user in fixing the error.
|
||||||
|
@ -679,7 +927,7 @@ def check_file(filename, file_type=None, health_is_called=False):
|
||||||
:return: the file_type if present, else None is returned
|
:return: the file_type if present, else None is returned
|
||||||
"""
|
"""
|
||||||
|
|
||||||
yaml_content = check_file_type(filename, file_type)
|
yaml_content = _check_file_type(filename, file_type)
|
||||||
|
|
||||||
# if the file is a valid YAML, continue. Else, return None
|
# if the file is a valid YAML, continue. Else, return None
|
||||||
if yaml_content:
|
if yaml_content:
|
||||||
|
|
Loading…
Reference in New Issue