DeTTECT/upgrade.py

577 lines
28 KiB
Python

from constants import *
import simplejson
from io import StringIO
import os
def _load_techniques(yaml_file_lines):
"""
Loads the techniques (including detection and visibility properties) from the given YAML file.
:param yaml_file_lines: list with the YAML file lines containing the techniques administration
:return: dictionary with techniques (incl. properties)
"""
from generic import add_entry_to_list_in_dictionary, init_yaml, set_yaml_dv_comments
my_techniques = {}
_yaml = init_yaml()
yaml_content = _yaml.load(''.join(yaml_file_lines))
for d in yaml_content['techniques']:
# Add detection items
if isinstance(d['detection'], dict): # There is just one detection entry
d['detection'] = set_yaml_dv_comments(d['detection'])
add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection'])
elif isinstance(d['detection'], list): # There are multiple detection entries
for de in d['detection']:
de = set_yaml_dv_comments(de)
add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de)
# Add visibility items
if isinstance(d['visibility'], dict): # There is just one visibility entry
d['visibility'] = set_yaml_dv_comments(d['visibility'])
add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility'])
elif isinstance(d['visibility'], list): # There are multiple visibility entries
for de in d['visibility']:
de = set_yaml_dv_comments(de)
add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de)
return my_techniques
def _create_upgrade_text(file_type, file_version):
"""
Create text on the upgrades to be performed on the YAML file.
:param file_type: YAML file type
:param file_version: version of the YAML file
:return: upgrade text to be displayed in the console
"""
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
text = 'You are using an old version of the YAML file.\n' \
'The following upgrades will be performed on the techniques administration file:\n'
for version in FILE_TYPE_TECHNIQUE_ADMINISTRATION_UPGRADE_TEXT:
if file_version < version:
text += '- Version: ' + str(version) + '\n'
text += FILE_TYPE_TECHNIQUE_ADMINISTRATION_UPGRADE_TEXT[version] + '\n'
return text
def _get_indent_chars(file_lines):
"""
Identify and return the characters that are used to indent the YAML file
:param file_lines: List of lines in the YAML file
:return: indent characters
"""
indent_chars = ' '
for l in file_lines:
if REGEX_YAML_TECHNIQUE_ID.match(l):
indent_chars = REGEX_YAML_INDENT_CHARS.search(l).groups()[0]
indent_chars = len(indent_chars) * ' '
break
return indent_chars
# noinspection PyDictCreation
def upgrade_yaml_file(filename, file_type, file_version, attack_tech_data):
"""
Main function to upgrade the YAML file to a new version
:param filename: YAML administration file
:param file_type: YAML file type
:param file_version: version of the YAML file
:param attack_tech_data: ATT&CK data on techniques
:return:
"""
from generic import ask_yes_no, backup_file
is_upgraded = False
tech_upgrade_func = {}
tech_upgrade_func[1.1] = _upgrade_technique_yaml_10_to_11
tech_upgrade_func[1.2] = _upgrade_technique_yaml_11_to_12
with open(filename, 'r') as file:
file_new_lines = file.readlines()
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
if file_version < FILE_TYPE_TECHNIQUE_ADMINISTRATION_VERSION:
upgrade_text = _create_upgrade_text(file_type, file_version)
print(upgrade_text)
upgrade_question = 'Do you want to upgrade the below file. A backup will be created of the current file.\n' + \
'[!] Not upgrading the file will brake functionality within DeTT&CT.\n' + \
' - ' + filename
if ask_yes_no(upgrade_question):
is_upgraded = True
# create backup of the non-upgraded file
backup_file(filename)
for tech_f in tech_upgrade_func.keys():
if file_version < tech_f:
file_new_lines = tech_upgrade_func[tech_f](file_new_lines, attack_tech_data)
else:
print('Upgrade cancelled\n')
print('-' * 80)
return
if is_upgraded:
# write the upgraded file to disk
with open(filename, 'w') as f:
f.writelines(file_new_lines)
print('Written upgraded file: ' + filename)
print('\nUpgrade complete')
print('-' * 80)
def _upgrade_technique_yaml_10_to_11(file_lines, attack_tech_data):
"""
Upgrade the YAML technique administration file from 1.0 to 1.1.
:param file_lines: list containing the lines within the tech. admin. file
:param attack_tech_data: ATT&CK data on techniques
:return: array with new lines to be written to disk
"""
from generic import get_technique
# identify the indent characters used
indent_chars = _get_indent_chars(file_lines)
file_new_lines = []
x = 0
for l in file_lines:
if REGEX_YAML_VERSION_10.match(l):
file_new_lines.append(l.replace('1.0', '1.1'))
elif REGEX_YAML_TECHNIQUE_ID.match(l):
file_new_lines.append(l)
tech_id = REGEX_YAML_TECHNIQUE_ID_GROUP.search(l).group(1)
tech_name = get_technique(attack_tech_data, tech_id)['name']
file_new_lines.append(indent_chars + 'technique_name: ' + tech_name + '\n')
elif REGEX_YAML_DETECTION.match(l):
file_new_lines.append(l)
file_new_lines.append((indent_chars * 2) + "applicable_to: ['all']\n")
elif REGEX_YAML_VISIBILITY.match(l):
file_new_lines.append(l)
file_new_lines.append((indent_chars * 2) + "applicable_to: ['all']\n")
else:
file_new_lines.append(l)
x += 1
return file_new_lines
def _print_error_msg(msg):
print(msg)
return True
def _check_yaml_file_health_v11(file_lines):
"""
Check on error in the provided YAML file version 1.1
:param file_lines: YAML file lines
:return: True for a healthy file, and False when encountering health issues.
"""
from generic import init_yaml
has_error = False
# check for duplicate tech IDs
_yaml = init_yaml()
yaml_content = _yaml.load(''.join(file_lines))
tech_ids = list(map(lambda x: x['technique_id'], yaml_content['techniques']))
tech_dup = []
for tech in tech_ids:
if tech not in tech_dup:
tech_dup.append(tech)
else:
has_error = _print_error_msg('[!] Duplicate technique ID: ' + tech)
# checks on:
# - empty key-value pairs: 'date_implemented', 'location', 'applicable_to', 'score'
# - invalid date format for: 'date_implemented'
# - detection or visibility score out-of-range
# - missing key-value pairs: 'applicable_to', 'date_implemented', 'score', 'location', 'comment'
# - check on 'applicable_to' values which are very similar
dict_yaml_techniques = _load_techniques(file_lines)
all_applicable_to = set()
for tech, v in dict_yaml_techniques.items():
for key in ['detection', 'visibility']:
if key not in v:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING ' + key)
elif 'applicable_to' in v:
# create at set containing all values for 'applicable_to'
all_applicable_to.update([a for v in v[key] for a in v['applicable_to']])
for detection in v['detection']:
for key in ['applicable_to', 'date_implemented', 'score', 'location', 'comment']:
if key not in detection:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in detection: ' + key)
try:
# noinspection PyChainedComparisons
if detection['score'] is None:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an EMPTY key-value pair in detection: score')
elif not (detection['score'] >= -1 and detection['score'] <= 5):
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an INVALID detection score: '
+ str(detection['score']) + ' (should be between -1 and 5)')
elif detection['score'] > -1:
if not detection['date_implemented']:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an EMPTY key-value pair in detection: ' + 'date_implemented')
break
try:
# noinspection PyStatementEffect
detection['date_implemented'].year
# noinspection PyStatementEffect
detection['date_implemented'].month
# noinspection PyStatementEffect
detection['date_implemented'].day
except AttributeError:
has_error = _print_error_msg('[!] Technique ID: ' + tech +
' has an INVALID data format for the key-value pair in detection: ' +
'date_implemented (should be YYYY-MM-DD)')
for key in ['location', 'applicable_to']:
if not isinstance(detection[key], list):
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has for the key-value pair \''
+ key + '\' a string value assigned (should be a list)')
else:
try:
if detection[key][0] is None:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an EMPTY key-value pair in detection: ' + key)
except TypeError:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an EMPTY key-value pair in detection: ' + key)
except KeyError:
pass
for visibility in v['visibility']:
for key in ['applicable_to', 'score', 'comment']:
if key not in visibility:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in visibility: ' + key)
try:
if visibility['score'] is None:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an EMPTY key-value pair in visibility: score')
elif not (0 <= visibility['score'] <= 4):
# noinspection PyUnboundLocalVariable
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an INVALID visibility score: '
+ str(detection['score']) + ' (should be between 0 and 4)')
except KeyError:
pass
if has_error:
print('')
return False
else:
return True
def _upgrade_technique_yaml_11_to_12(file_lines, attack_tech_data):
"""
Upgrade the YAML technique administration file from 1.1 to 1.2.
:param file_lines: array containing the lines within the tech. admin. file
:param attack_tech_data: Not used, but necessary to be compatible with other upgrade methods.
:return: array with new lines to be written to disk
"""
from generic import ask_yes_no, fix_date_and_remove_null, init_yaml
# we will first do a health check on the tech. admin file version 1.1. Having health issues in the file could
# result in an upgraded file with errors.
print('Checking the health of the file before we to the upgrade from version 1.1 to 1.2')
healthy_file = _check_yaml_file_health_v11(file_lines)
if not healthy_file:
print('[!] Health issues found. It is advisable first to fix the health issues before continuing the upgrade.')
if not ask_yes_no('Are you sure that you want to continue the upgrade?'):
print('Upgrade cancelled')
quit()
else:
print(' - No health issues found. We continue the upgrade to version 1.2\n')
keep_date_registered = ask_yes_no("Do you want to keep the key-value pair 'date_registered' in your technique "
"administration file even though DeTT&CT no longer makes use of it?")
date_for_visibility = ''
print("Which date do you want to fill in for the visibility scores already present in the new key-value pair 'date'?")
while not REGEX_YAML_VALID_DATE.match(date_for_visibility):
date_for_visibility = input(' >> YYYY-MM-DD: ')
if not REGEX_YAML_VALID_DATE.match(date_for_visibility):
print(' Invalid date format')
print('')
auto_generated = ask_yes_no('Are ALL of the current visibility scores within the technique administration file directly derived from the nr. of data sources?\n'
' * Generated using the option \'-y, --yaml\' from the \'datasoure\' mode in dettect.py\n'
' * Which means NONE of them have been scored manually?')
_yaml = init_yaml()
yaml_file = _yaml.load(''.join(file_lines))
yaml_file['version'] = 1.2
# upgrade to the new v1.2 tech. admin file
for tech in yaml_file['techniques']:
if isinstance(tech['detection'], list):
detections = tech['detection']
else:
detections = [tech['detection']]
for d in detections:
score = d['score']
date = d['date_implemented']
try:
if not keep_date_registered:
del d['date_registered']
del d['date_implemented']
del d['score']
except KeyError:
pass
d['score_logbook'] = [{'date': date, 'score': score, 'comment': ''}]
if isinstance(tech['visibility'], list):
visibility = tech['visibility']
else:
visibility = [tech['visibility']]
for v in visibility:
score = v['score']
try:
del v['score']
except KeyError:
pass
v['score_logbook'] = [{'date': date_for_visibility, 'score': score, 'comment': ''}]
if auto_generated:
v['score_logbook'][0]['auto_generated'] = True
# remove the single quotes around the date
new_lines = fix_date_and_remove_null(yaml_file, date_for_visibility, input_type='ruamel')
return new_lines
def check_yaml_updated_to_sub_techniques(filename):
"""
Checks if the YAML technique administration file is already updated to ATT&CK with sub-techniques by comparing the techniques to the the crosswalk file.
:param filename: YAML administration file
:return: return False if an update is required, otherwise True
"""
from generic import init_yaml, backup_file, fix_date_and_remove_null, load_attack_data, get_technique, get_technique_from_yaml, remove_technique_from_yaml
# Open the crosswalk file from MITRE:
conversion_table = None
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mitre-data/subtechniques-crosswalk.json'), 'r') as f:
conversion_table = simplejson.load(f)
# Open the techniques YAML file:
_yaml = init_yaml()
with open(filename, 'r') as yaml_file:
yaml_content = _yaml.load(yaml_file)
# Keep track which techniques can be auto updated and which need manual updating
auto_updatable_techniques = []
manual_update_techniques = []
for item in conversion_table:
for element in item:
if element.startswith('T'):
for migrate_item in item[element]:
# Check if technique is in YAML file:
yaml_technique = get_technique_from_yaml(yaml_content, element)
if yaml_technique is None:
break
else:
# Possible types of changes:
# - Remains Technique
# - Became a Sub-Technique
# - Multiple Techniques Became New Sub-Technique
# - One or More Techniques Became New Technique
# - Merged into Existing Technique
# - Deprecated
# - Became Multiple Sub-Techniques
if item['change-type'] == 'Became a Sub-Technique':
auto_updatable_techniques.append(element)
elif item['change-type'] == 'Multiple Techniques Became New Sub-Technique':
manual_update_techniques.append(element)
elif item['change-type'] == 'One or More Techniques Became New Technique':
manual_update_techniques.append(element)
elif item['change-type'] == 'Merged into Existing Technique':
manual_update_techniques.append(element)
elif item['change-type'] == 'Deprecated':
auto_updatable_techniques.append(element)
elif item['change-type'] == 'Became Multiple Sub-Techniques':
manual_update_techniques.append(element)
if len(auto_updatable_techniques) > 0:
print('[!] File: \'' + filename + '\' needs to be updated to ATT&CK with sub-techniques. Use the option \'--update-to-sub-techniques\' to perform the update.')
return False
elif len(auto_updatable_techniques) == 0 and len(manual_update_techniques) > 0:
print('[!] File: \'' + filename + '\' needs some manual work to upgrade to ATT&CK with sub-techniques. See the list below on what needs to be changed.\n')
upgrade_to_sub_techniques(filename, notify_only=True)
return False
elif len(auto_updatable_techniques) == 0 and len(manual_update_techniques) == 0:
return True
else:
return False
def upgrade_to_sub_techniques(filename, notify_only=False):
"""
Upgrade the YAML technique administration file to ATT&CK with sub-techniques
:param filename: YAML administration file
:param notify_only: set to True by 'check_yaml_updated_to_sub_techniques' when no automatic upgrade of techniques can be performed because these require manual action
:return:
"""
from generic import init_yaml, backup_file, load_attack_data, get_technique, get_technique_from_yaml, remove_technique_from_yaml, ask_yes_no, local_stix_path, get_latest_score, get_latest_auto_generated
if not notify_only and not ask_yes_no('DeTT&CT is going to update \'' + filename + '\' to ATT&CK with sub-techniques. A backup of this file will be generated. Do you want to continue:'):
quit()
# Open the crosswalk file from MITRE:
conversion_table = None
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mitre-data/subtechniques-crosswalk.json'), 'r') as f:
conversion_table = simplejson.load(f)
# Open the techniques YAML file:
_yaml = init_yaml()
with open(filename, 'r') as yaml_file:
yaml_content = _yaml.load(yaml_file)
# Get the MITRE ATT&CK techniques (e.g. to get the new name for renamed techniques):
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH_ENTERPRISE)
# Check if STIX object collection (TAXII server or local STIX objects) contain sub-techniques, by checking the existence of the first sub-technique (T1001.001)
stix_sub_tech_check = get_technique(techniques, 'T1001.001')
if stix_sub_tech_check is None:
if local_stix_path:
print('[!] The local STIX repository \'' + local_stix_path +
'\' doesn\'t contain ATT&CK sub-techniques. This is necessary to perform the update.')
else:
print('[!] The TAXII server doesn\'t contain ATT&CK sub-techniques. This is necessary to perform the update.')
quit()
# Keep an ignore list for techniques that are already been taken care of:
ignore_list = []
# Collect messages and show them at the end grouped by comparable messages:
become_subtech_msgs = []
deprecated_msgs = []
renamed_msgs = []
subtech_added_msgs = []
warning_msgs = []
for item in conversion_table:
for element in item:
if element.startswith('T'):
for migrate_item in item[element]:
# Check if technique is in YAML file:
yaml_technique = get_technique_from_yaml(yaml_content, element)
# Only apply changes to techniques that are in the YAML file:
if yaml_technique is not None and element not in ignore_list:
change_name = False
# Possible types of changes:
# - Remains Technique
# - Became a Sub-Technique
# - Multiple Techniques Became New Sub-Technique
# - One or More Techniques Became New Technique
# - Merged into Existing Technique
# - Deprecated
# - Became Multiple Sub-Techniques
if item['change-type'] == 'Remains Technique':
# No upgrade necessary because techniques "Remains Technique". Only name changes can occur, these will be handled beneath.
change_name = True
# Only check if "new sub-techniques added" is within the explanation:
if 'new sub-techniques added' in migrate_item['explanation'].lower():
has_detection = False
is_auto_generated = False
if isinstance(yaml_technique['detection'], dict): # There is just one detection entry
has_detection = get_latest_score(yaml_technique['detection']) >= 0
is_auto_generated = get_latest_auto_generated(yaml_technique['visibility'])
elif isinstance(yaml_technique['detection'], list): # There are multiple detection entries
has_detection = len([d for d in yaml_technique['detection'] if get_latest_score(d) >= 0]) > 0
is_auto_generated = any([get_latest_auto_generated(v) for v in yaml_technique['visibility']])
if has_detection or not is_auto_generated:
subtech_added_msgs.append(migrate_item['id'])
elif item['change-type'] == 'Became a Sub-Technique':
# Conversion from technique to sub-technique:
yaml_technique['technique_id'] = migrate_item['id']
become_subtech_msgs.append('[i] Technique ' + element + ' has become sub-technique: ' +
migrate_item['id'] + '. Change applied in the YAML file.')
change_name = True
elif item['change-type'] == 'Multiple Techniques Became New Sub-Technique':
# No conversion possible: Multiple techniques became new sub-technique:
warning_msgs.append(
'[!] Technique ' + element + ' has been consolidated with multiple other techniques into one sub-technique: ' + migrate_item['id'] + '. You need to migrate this technique manually.')
elif item['change-type'] == 'One or More Techniques Became New Technique':
# No conversion possible: One or more techniques became new technique:
warning_msgs.append(
'[!] Technique ' + element + ' has been consolidated (with multiple other techniques) into one technique: ' + migrate_item['id'] + '. You need to migrate this technique manually.')
elif item['change-type'] == 'Merged into Existing Technique':
# No conversion possible: Technique merged into existing technique:
warning_msgs.append('[!] Technique ' + element + ' is merged with ' + migrate_item['id'] +
'. You need to migrate this technique manually.')
elif item['change-type'] == 'Deprecated':
# Remove deprecated items:
remove_technique_from_yaml(yaml_content, element)
deprecated_msgs.append('[i] Technique ' + element + ' is deprecated. Technique bas been removed from the YAML file.')
elif item['change-type'] == 'Became Multiple Sub-Techniques':
# No conversion: One technique became multiple sub techniques:
sub_ids = []
for i in item[element]:
sub_ids.append(i['id'])
warning_msgs.append('[!] Technique ' + element + ' is deprecated and split into multiple sub-techniques: ' +
', '.join(sub_ids) + '. You need to migrate this technique manually.')
ignore_list.append(element)
# Get the latest description from ATT&CK:
if change_name and migrate_item['id'] != 'N/A':
new_name = get_technique(techniques, migrate_item['id'])['name']
if yaml_technique['technique_name'] != new_name:
renamed_msgs.append('[i] Technique ' + element + ' is renamed from \'' +
yaml_technique['technique_name'] + '\' to \'' + new_name + '\'.')
yaml_technique['technique_name'] = new_name
# Print the results:
if len(become_subtech_msgs + deprecated_msgs + renamed_msgs) > 0:
print('Informational messages (no action needed):')
for item in become_subtech_msgs:
print(item)
for item in deprecated_msgs:
print(item)
for item in renamed_msgs:
print(item)
print('')
if len(warning_msgs) > 0:
print('Messages that need your attention:')
for item in warning_msgs:
print(item)
print('')
if len(become_subtech_msgs + deprecated_msgs + renamed_msgs + warning_msgs) == 0:
print('[i] No techniques found that need to be updated to ATT&CK sub-techniques.\n')
else:
if len(subtech_added_msgs) > 0:
print('The following techniques remained the same technique. However, to multiple techniques, sub-techniques were added. Please review the following list as there might be a sub-technique that fits better. Techniques: ' + ', '.join(subtech_added_msgs))
print('')
if not notify_only:
# Create backup of the YAML file:
backup_file(filename)
with open(filename, 'w') as fd:
# ruamel does not support output to a variable. Therefore we make use of StringIO.
string_io = StringIO()
_yaml.dump(yaml_content, string_io)
string_io.seek(0)
new_lines = string_io.readlines()
fd.writelines(new_lines)
print('File written: ' + filename + '\n')
print('Some last notes before you continue:')
print('- Please read MITRE\'s blog for more information on how to migrate techniques that need to be migrated manually: https://medium.com/mitre-attack/attack-with-sub-techniques-is-now-just-attack-8fc20997d8de')
print('- It is recommended to run the \'--update function\' in the datasource (ds) mode, to update the visibility scores for all new techniques, new sub-techniques and changed techniques.')
print('')
# Quit DeTT&CT when manual work needs to be done:
if len(warning_msgs) > 0:
quit()