Added functionality to migrate technique administration YAML files with version 1.0 to version 1.1
parent
8999c431a8
commit
55010f8dbb
66
generic.py
66
generic.py
|
@ -2,69 +2,11 @@ import os
|
||||||
import pickle
|
import pickle
|
||||||
from datetime import datetime as dt
|
from datetime import datetime as dt
|
||||||
import yaml
|
import yaml
|
||||||
|
from upgrade import upgrade_yaml_file
|
||||||
|
from constants import *
|
||||||
|
|
||||||
# Due to performance reasons the import of attackcti is within the function that makes use of this library.
|
# Due to performance reasons the import of attackcti is within the function that makes use of this library.
|
||||||
|
|
||||||
APP_NAME = 'DeTT&CT'
|
|
||||||
APP_DESC = 'Detect Tactics, Techniques & Combat Threats'
|
|
||||||
VERSION = '1.0'
|
|
||||||
|
|
||||||
EXPIRE_TIME = 60*60*24
|
|
||||||
|
|
||||||
DATATYPE_TECH_BY_GROUP = 'mitre_techniques_used_by_group'
|
|
||||||
DATATYPE_ALL_TECH = 'mitre_all_techniques'
|
|
||||||
DATATYPE_ALL_GROUPS = 'mitre_all_groups'
|
|
||||||
DATATYPE_ALL_SOFTWARE = 'mitre_all_software'
|
|
||||||
DATATYPE_TECH_BY_SOFTWARE = 'mitre_techniques_used_by_software'
|
|
||||||
DATATYPE_SOFTWARE_BY_GROUP = 'mitre_software_used_by_group'
|
|
||||||
|
|
||||||
# Group colors
|
|
||||||
COLOR_GROUP_OVERLAY_MATCH = '#f9a825' # orange
|
|
||||||
COLOR_GROUP_OVERLAY_NO_MATCH = '#ffee58' # yellow
|
|
||||||
COLOR_SOFTWARE = '#0d47a1 ' # dark blue
|
|
||||||
COLOR_GROUP_AND_SOFTWARE = '#64b5f6 ' # light blue
|
|
||||||
COLOR_GRADIENT_MIN = '#ffcece' # light red
|
|
||||||
COLOR_GRADIENT_MAX = '#ff0000' # red
|
|
||||||
COLOR_TACTIC_ROW_BACKGRND = '#dddddd' # light grey
|
|
||||||
COLOR_GROUP_OVERLAY_ONLY_DETECTION = '#8BC34A' # green
|
|
||||||
COLOR_GROUP_OVERLAY_ONLY_VISIBILITY = '#1976D2' # blue
|
|
||||||
|
|
||||||
# data source colors (purple range)
|
|
||||||
COLOR_DS_25p = '#E1BEE7'
|
|
||||||
COLOR_DS_50p = '#CE93D8'
|
|
||||||
COLOR_DS_75p = '#AB47BC'
|
|
||||||
COLOR_DS_99p = '#7B1FA2'
|
|
||||||
COLOR_DS_100p = '#4A148C'
|
|
||||||
|
|
||||||
# data source colors HAPPY (green range)
|
|
||||||
COLOR_DS_25p_HAPPY = '#DCEDC8'
|
|
||||||
COLOR_DS_50p_HAPPY = '#AED581'
|
|
||||||
COLOR_DS_75p_HAPPY = '#8BC34A'
|
|
||||||
COLOR_DS_99p_HAPPY = '#689F38'
|
|
||||||
COLOR_DS_100p_HAPPY = '#33691E'
|
|
||||||
|
|
||||||
# Detection colors (green range)
|
|
||||||
COLOR_D_0 = '#64B5F6' # Blue: Forensics/Context
|
|
||||||
COLOR_D_1 = '#DCEDC8'
|
|
||||||
COLOR_D_2 = '#AED581'
|
|
||||||
COLOR_D_3 = '#8BC34A'
|
|
||||||
COLOR_D_4 = '#689F38'
|
|
||||||
COLOR_D_5 = '#33691E'
|
|
||||||
|
|
||||||
# Visibility colors (blue range)
|
|
||||||
COLOR_V_1 = '#BBDEFB'
|
|
||||||
COLOR_V_2 = '#64B5F6'
|
|
||||||
COLOR_V_3 = '#1976D2'
|
|
||||||
COLOR_V_4 = '#0D47A1'
|
|
||||||
|
|
||||||
# Detection and visibility overlay color:
|
|
||||||
COLOR_OVERLAY_VISIBILITY = COLOR_V_3
|
|
||||||
COLOR_OVERLAY_DETECTION = COLOR_D_3
|
|
||||||
COLOR_OVERLAY_BOTH = COLOR_GROUP_OVERLAY_MATCH
|
|
||||||
|
|
||||||
FILE_TYPE_DATA_SOURCE_ADMINISTRATION = 'data-source-administration'
|
|
||||||
FILE_TYPE_TECHNIQUE_ADMINISTRATION = 'technique-administration'
|
|
||||||
FILE_TYPE_GROUP_ADMINISTRATION = 'group-administration'
|
|
||||||
|
|
||||||
|
|
||||||
def save_attack_data(data, path):
|
def save_attack_data(data, path):
|
||||||
"""
|
"""
|
||||||
|
@ -363,7 +305,9 @@ def check_file_type(filename, file_type=None):
|
||||||
print('[!] File: \'' + filename + '\' is not a file type of: \'' + file_type + '\'')
|
print('[!] File: \'' + filename + '\' is not a file type of: \'' + file_type + '\'')
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
|
upgrade_yaml_file(filename, file_type, yaml_content['version'], load_attack_data(DATATYPE_ALL_TECH))
|
||||||
return yaml_content['file_type']
|
return yaml_content['file_type']
|
||||||
else:
|
else:
|
||||||
|
upgrade_yaml_file(filename, file_type, yaml_content['version'], load_attack_data(DATATYPE_ALL_TECH))
|
||||||
return yaml_content['file_type']
|
return yaml_content['file_type']
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ import glob
|
||||||
from data_source_mapping import *
|
from data_source_mapping import *
|
||||||
from technique_mapping import *
|
from technique_mapping import *
|
||||||
from group_mapping import *
|
from group_mapping import *
|
||||||
|
from constants import *
|
||||||
|
|
||||||
|
|
||||||
groups = 'all'
|
groups = 'all'
|
||||||
|
@ -13,11 +14,6 @@ groups_overlay = ''
|
||||||
overlay_type = ''
|
overlay_type = ''
|
||||||
filter_applicable_to = 'all'
|
filter_applicable_to = 'all'
|
||||||
|
|
||||||
MENU_NAME_DATA_SOURCE_MAPPING = 'Data source mapping'
|
|
||||||
MENU_NAME_VISIBILITY_MAPPING = 'Visibility coverage mapping'
|
|
||||||
MENU_NAME_DETECTION_COVERAGE_MAPPING = 'Detection coverage mapping'
|
|
||||||
MENU_NAME_THREAT_ACTOR_GROUP_MAPPING = 'Threat actor group mapping'
|
|
||||||
|
|
||||||
|
|
||||||
def clear():
|
def clear():
|
||||||
"""
|
"""
|
||||||
|
@ -51,7 +47,7 @@ def wait():
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
print('')
|
print('')
|
||||||
print('Press a key to return to the last menu')
|
print('Press a key to continue')
|
||||||
input('')
|
input('')
|
||||||
|
|
||||||
|
|
||||||
|
@ -139,6 +135,8 @@ def select_file(title, what, expected_file_type, b_clear=True, path='sample-data
|
||||||
filename = files[int(choice) - 1]
|
filename = files[int(choice) - 1]
|
||||||
file_type = check_file_type(filename, file_type=expected_file_type)
|
file_type = check_file_type(filename, file_type=expected_file_type)
|
||||||
if file_type:
|
if file_type:
|
||||||
|
print('Selected file: ' + filename)
|
||||||
|
wait()
|
||||||
return filename
|
return filename
|
||||||
else:
|
else:
|
||||||
print("[!] Invalid choice")
|
print("[!] Invalid choice")
|
||||||
|
|
|
@ -0,0 +1,150 @@
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
from constants import *
|
||||||
|
|
||||||
|
|
||||||
|
def create_upgrade_text(file_type, file_version):
|
||||||
|
"""
|
||||||
|
Create text on the upgrades to be performed on the YAML file.
|
||||||
|
:param file_type: YAML file type
|
||||||
|
:param file_version: version of the YAML file
|
||||||
|
:return: upgrade text to be displayed in the console
|
||||||
|
"""
|
||||||
|
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||||
|
text = 'You are using an old version of the YAML file.\n' \
|
||||||
|
'The following upgrades will be performed on the techniques administration file:\n'
|
||||||
|
for version in FILE_TYPE_TECHNIQUE_ADMINISTRATION_UPGRADE_TEXT:
|
||||||
|
if file_version < version:
|
||||||
|
text += FILE_TYPE_TECHNIQUE_ADMINISTRATION_UPGRADE_TEXT[version] + '\n'
|
||||||
|
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
def ask_to_upgrade(filename):
|
||||||
|
"""
|
||||||
|
Ask the user to upgrade the YAML file or not.
|
||||||
|
:param filename: YAML administration file
|
||||||
|
:return: boolean value indicating if the upgrade can be performed
|
||||||
|
"""
|
||||||
|
yes_no = ''
|
||||||
|
while not re.match('^(y|yes|n|no)$', yes_no, re.IGNORECASE):
|
||||||
|
yes_no = input('Do you want to upgrade the below file. A backup will be created of the current file.\n'
|
||||||
|
'[!] Not upgrading the file will brake some functionality within DeTT&CT.\n'
|
||||||
|
' - ' + filename + '\n >> y(yes)/n(no): ')
|
||||||
|
|
||||||
|
if re.match('^(y|yes)$', yes_no, re.IGNORECASE):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade_yaml_file(filename, file_type, file_version, attack_tech_data):
|
||||||
|
"""
|
||||||
|
Main function to upgrade the YAML file to a new version
|
||||||
|
:param filename: YAML administration file
|
||||||
|
:param file_type: YAML file type
|
||||||
|
:param file_version: version of the YAML file
|
||||||
|
:param attack_tech_data: ATT&CK data on techniques
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
|
||||||
|
is_upgraded = False
|
||||||
|
tech_upgrade_func = {}
|
||||||
|
tech_upgrade_func[1.1] = upgrade_technique_yaml_10_to_11
|
||||||
|
|
||||||
|
with open(filename, 'r') as file:
|
||||||
|
file_new_lines = file.readlines()
|
||||||
|
|
||||||
|
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||||
|
if file_version != FILE_TYPE_TECHNIQUE_ADMINISTRATION_VERSION:
|
||||||
|
upgrade_text = create_upgrade_text(file_type, file_version)
|
||||||
|
print(upgrade_text)
|
||||||
|
if ask_to_upgrade(filename):
|
||||||
|
is_upgraded = True
|
||||||
|
# create backup of the non-upgraded file
|
||||||
|
backup_filename = get_backup_filename(filename)
|
||||||
|
shutil.copy2(filename, backup_filename)
|
||||||
|
print('Written backup file: ' + backup_filename)
|
||||||
|
|
||||||
|
for tech_f in tech_upgrade_func.keys():
|
||||||
|
if file_version < tech_f:
|
||||||
|
file_new_lines = tech_upgrade_func[tech_f](file_new_lines, attack_tech_data)
|
||||||
|
else:
|
||||||
|
print('Upgrade cancelled\n')
|
||||||
|
print('-' * 80)
|
||||||
|
return
|
||||||
|
|
||||||
|
if is_upgraded:
|
||||||
|
# write the upgraded file to disk
|
||||||
|
with open(filename, 'w') as f:
|
||||||
|
f.writelines(file_new_lines)
|
||||||
|
print('Written upgraded file: ' + filename)
|
||||||
|
|
||||||
|
print('\nUpgrade complete')
|
||||||
|
print('-'*80)
|
||||||
|
|
||||||
|
|
||||||
|
def get_technique(techniques, technique_id):
|
||||||
|
"""
|
||||||
|
Generic function to lookup a specific technique_id in a list of dictionaries with techniques.
|
||||||
|
:param techniques: list with all techniques
|
||||||
|
:param technique_id: technique_id to look for
|
||||||
|
:return: the technique you're searching for. None if not found.
|
||||||
|
"""
|
||||||
|
for t in techniques:
|
||||||
|
if technique_id == t['technique_id']:
|
||||||
|
return t
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_backup_filename(filename):
|
||||||
|
"""
|
||||||
|
Create a filename to be used for backup of the YAML file
|
||||||
|
:param filename: existing YAML filename
|
||||||
|
:return: a name for the backup file
|
||||||
|
"""
|
||||||
|
suffix = 1
|
||||||
|
backup_filename = filename.replace('.yaml', '_backup_' + str(suffix) + '.yaml')
|
||||||
|
while os.path.exists(backup_filename):
|
||||||
|
backup_filename = backup_filename.replace('_backup_' + str(suffix) + '.yaml', '_backup_' + str(suffix+1) + '.yaml')
|
||||||
|
suffix += 1
|
||||||
|
|
||||||
|
return backup_filename
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade_technique_yaml_10_to_11(file_lines, attack_tech_data):
|
||||||
|
"""
|
||||||
|
Upgrade the YAML technique administration file from 1.0 to 1.1.
|
||||||
|
:param file_lines: array containing the lines within the tech. admin. file
|
||||||
|
:param attack_tech_data: ATT&CK data on techniques
|
||||||
|
:return: array with new lines to be written to disk
|
||||||
|
"""
|
||||||
|
regex_version = re.compile(r'^\s*version:\s+1\.0\s*$', re.IGNORECASE)
|
||||||
|
regex_tech = re.compile(r'^-\s+technique_id:\s+T[0-9]{4}\s*$', re.IGNORECASE)
|
||||||
|
regex_tech_id = re.compile(r'^-\s+technique_id:\s+(T[0-9]{4})\s*$', re.IGNORECASE)
|
||||||
|
regex_detection = re.compile(r'^\s+detection:\s*$', re.IGNORECASE)
|
||||||
|
regex_visibility = re.compile(r'^\s+visibility:\s*$', re.IGNORECASE)
|
||||||
|
|
||||||
|
file_new_lines = []
|
||||||
|
x = 0
|
||||||
|
for l in file_lines:
|
||||||
|
if regex_version.match(l):
|
||||||
|
file_new_lines.append(l.replace('1.0', '1.1'))
|
||||||
|
elif regex_tech.match(l):
|
||||||
|
file_new_lines.append(l)
|
||||||
|
|
||||||
|
tech_id = regex_tech_id.search(l).group(1)
|
||||||
|
tech_name = get_technique(attack_tech_data, tech_id)['technique']
|
||||||
|
file_new_lines.append(' technique_name: ' + tech_name+'\n')
|
||||||
|
elif regex_detection.match(l):
|
||||||
|
file_new_lines.append(l)
|
||||||
|
file_new_lines.append(" applicable_to: ['all']\n")
|
||||||
|
elif regex_visibility.match(l):
|
||||||
|
file_new_lines.append(l)
|
||||||
|
file_new_lines.append(" applicable_to: ['all']\n")
|
||||||
|
else:
|
||||||
|
file_new_lines.append(l)
|
||||||
|
x += 1
|
||||||
|
|
||||||
|
return file_new_lines
|
Loading…
Reference in New Issue