commit
4277bc134c
27
README.md
27
README.md
|
@ -1,13 +1,14 @@
|
|||
<img src="https://github.com/rabobank-cdc/DeTTACT/wiki/images/logo.png" alt="DeTT&CT" width=30% height=30%>
|
||||
|
||||
#### Detect Tactics, Techniques & Combat Threats
|
||||
Latest version: [1.1](Changelog#version-11)
|
||||
|
||||
To get started with DeTT&CT, check out the
|
||||
[Wiki](https://github.com/rabobank-cdc/DeTTACT/wiki/Getting-started).
|
||||
|
||||
DeTT&CT will help blue teams in scoring and comparing data source quality, visibility coverage, detection coverage and threat actor behaviours. The DeTT&CT framework consists of a Python tool, YAML administration files and [scoring tables](https://github.com/rabobank-cdc/DeTTACT/raw/master/scoring_table.xlsx) for the different aspects.
|
||||
DeTT&CT aims to assist blue teams using ATT&CK to score and compare data log source quality, visibility coverage, detection coverage and threat actor behaviours. All of which can help, in different ways, to get more resilient against attacks targeting your organisation. The DeTT&CT framework consists of a Python tool, YAML administration files and [scoring tables](https://github.com/rabobank-cdc/DeTTACT/raw/master/scoring_table.xlsx) for the different aspects.
|
||||
|
||||
DeTT&CT will help you to:
|
||||
DeTT&CT provides the following functionality:
|
||||
|
||||
- Administrate and score the quality of your data sources.
|
||||
- Get insight on the visibility you have on for example endpoints.
|
||||
|
@ -36,33 +37,13 @@ of which can be visualised by loading JSON layer files into the [ATT&CK Navigato
|
|||
|
||||
See below an example of mapping your data sources to ATT&CK which gives you a rough overview of your visibility coverage:
|
||||
|
||||
<img src="https://github.com/rabobank-cdc/DeTTACT/wiki/images/example_data_sources.png" alt="DeTT&CT"><br>
|
||||
<img src="images/example_data_sources.png" alt="DeTT&CT - Data quality">
|
||||
|
||||
|
||||
## Installation and requirements
|
||||
|
||||
See our GitHub Wiki: [Installation and requirements](https://github.com/rabobank-cdc/DeTTACT/wiki/Installation-and-requirements).
|
||||
|
||||
## Future developments
|
||||
|
||||
- Add more graphs:
|
||||
- [ ] Detections: improvement based on newly added detections and improvements on the level/score of existing detections. Possibly with a changelog.
|
||||
- [ ] Visibility: improvement in the quality of an existing data source.
|
||||
- Groups:
|
||||
- [ ] Have a group YAML file type that contains a count on how popular a certain technique is. This can be very useful to map things such as Red Canary's [Threat Detection Report 2019](https://redcanary.com/resources/guides/threat-detection-report/).
|
||||
- Excel output for:
|
||||
- [ ] Techniques administration YAML file: visibility coverage.
|
||||
- [ ] Techniques administration YAML file: detection coverage.
|
||||
- Data quality Excel sheet:
|
||||
- [ ] Add colours to the data quality scores in the Excel sheet.
|
||||
- YAML files:
|
||||
- [ ] Create an option within the tool to migrate an old administration YAML file version to a new version (such as adding specific key-value pairs).
|
||||
- MITRE ATT&CK updates
|
||||
- [ ] Have a smart way of knowing what to update in your data source and technique administration files once MITRE publishes updates.
|
||||
- [ ] Data sources: check for missing data sources in data sources administration files.
|
||||
- Minimal visibility
|
||||
- [ ] Integrate information into the framework on what a minimal set of visibility for a technique should be, before you can say to have useful visibility (e.g. technique X requires at least to have visibility on process monitoring, process command line monitoring and DLL monitoring).
|
||||
|
||||
## License: GPL-3.0
|
||||
[DeTT&CT's GNU General Public License v3.0](https://github.com/rabobank-cdc/DeTTACT/blob/master/LICENSE)
|
||||
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
APP_NAME = 'DeTT&CT'
|
||||
APP_DESC = 'Detect Tactics, Techniques & Combat Threats'
|
||||
VERSION = '1.1'
|
||||
|
||||
EXPIRE_TIME = 60*60*24
|
||||
|
||||
DATATYPE_TECH_BY_GROUP = 'mitre_techniques_used_by_group'
|
||||
DATATYPE_ALL_TECH = 'mitre_all_techniques'
|
||||
DATATYPE_ALL_GROUPS = 'mitre_all_groups'
|
||||
DATATYPE_ALL_SOFTWARE = 'mitre_all_software'
|
||||
DATATYPE_TECH_BY_SOFTWARE = 'mitre_techniques_used_by_software'
|
||||
DATATYPE_SOFTWARE_BY_GROUP = 'mitre_software_used_by_group'
|
||||
|
||||
# Group colors
|
||||
COLOR_GROUP_OVERLAY_MATCH = '#f9a825' # orange
|
||||
COLOR_GROUP_OVERLAY_NO_MATCH = '#ffee58' # yellow
|
||||
COLOR_SOFTWARE = '#0d47a1 ' # dark blue
|
||||
COLOR_GROUP_AND_SOFTWARE = '#64b5f6 ' # light blue
|
||||
COLOR_GRADIENT_MIN = '#ffcece' # light red
|
||||
COLOR_GRADIENT_MAX = '#ff0000' # red
|
||||
COLOR_TACTIC_ROW_BACKGRND = '#dddddd' # light grey
|
||||
COLOR_GROUP_OVERLAY_ONLY_DETECTION = '#8BC34A' # green
|
||||
COLOR_GROUP_OVERLAY_ONLY_VISIBILITY = '#1976D2' # blue
|
||||
|
||||
# data source colors (purple range)
|
||||
COLOR_DS_25p = '#E1BEE7'
|
||||
COLOR_DS_50p = '#CE93D8'
|
||||
COLOR_DS_75p = '#AB47BC'
|
||||
COLOR_DS_99p = '#7B1FA2'
|
||||
COLOR_DS_100p = '#4A148C'
|
||||
|
||||
# data source colors HAPPY (green range)
|
||||
COLOR_DS_25p_HAPPY = '#DCEDC8'
|
||||
COLOR_DS_50p_HAPPY = '#AED581'
|
||||
COLOR_DS_75p_HAPPY = '#8BC34A'
|
||||
COLOR_DS_99p_HAPPY = '#689F38'
|
||||
COLOR_DS_100p_HAPPY = '#33691E'
|
||||
|
||||
# Detection colors (green range)
|
||||
COLOR_D_0 = '#64B5F6' # Blue: Forensics/Context
|
||||
COLOR_D_1 = '#DCEDC8'
|
||||
COLOR_D_2 = '#AED581'
|
||||
COLOR_D_3 = '#8BC34A'
|
||||
COLOR_D_4 = '#689F38'
|
||||
COLOR_D_5 = '#33691E'
|
||||
|
||||
# Visibility colors (blue range)
|
||||
COLOR_V_1 = '#BBDEFB'
|
||||
COLOR_V_2 = '#64B5F6'
|
||||
COLOR_V_3 = '#1976D2'
|
||||
COLOR_V_4 = '#0D47A1'
|
||||
|
||||
# Detection and visibility overlay color:
|
||||
COLOR_OVERLAY_VISIBILITY = COLOR_V_3
|
||||
COLOR_OVERLAY_DETECTION = COLOR_D_3
|
||||
COLOR_OVERLAY_BOTH = COLOR_GROUP_OVERLAY_MATCH
|
||||
|
||||
# Overlay types as used within the group functionality
|
||||
OVERLAY_TYPE_GROUP = 'group'
|
||||
OVERLAY_TYPE_VISIBILITY = 'visibility'
|
||||
OVERLAY_TYPE_DETECTION = 'detection'
|
||||
|
||||
FILE_TYPE_DATA_SOURCE_ADMINISTRATION = 'data-source-administration'
|
||||
FILE_TYPE_TECHNIQUE_ADMINISTRATION = 'technique-administration'
|
||||
FILE_TYPE_GROUP_ADMINISTRATION = 'group-administration'
|
||||
|
||||
# YAML administration file versions
|
||||
FILE_TYPE_DATA_SOURCE_ADMINISTRATION_VERSION = 1.0
|
||||
FILE_TYPE_TECHNIQUE_ADMINISTRATION_VERSION = 1.1
|
||||
FILE_TYPE_GROUP_ADMINISTRATION_VERSION = 1.0
|
||||
|
||||
# YAML file upgrade text
|
||||
FILE_TYPE_TECHNIQUE_ADMINISTRATION_UPGRADE_TEXT = {1.1: " - Adding new key 'technique_name' containing the ATT&CK technique name.\n"
|
||||
" - Adding new key 'applicable_to' for both detection and visibility. Default value is ['all']."}
|
||||
# Interactive menu
|
||||
MENU_NAME_DATA_SOURCE_MAPPING = 'Data source mapping'
|
||||
MENU_NAME_VISIBILITY_MAPPING = 'Visibility coverage mapping'
|
||||
MENU_NAME_DETECTION_COVERAGE_MAPPING = 'Detection coverage mapping'
|
||||
MENU_NAME_THREAT_ACTOR_GROUP_MAPPING = 'Threat actor group mapping'
|
|
@ -72,7 +72,15 @@ def export_data_source_list_to_excel(filename):
|
|||
# Formatting:
|
||||
format_bold_left = workbook.add_format({'align': 'left', 'bold': True})
|
||||
format_title = workbook.add_format({'align': 'left', 'bold': True, 'font_size': '14'})
|
||||
format_center = workbook.add_format({'align': 'center'})
|
||||
format_center_valign_top = workbook.add_format({'align': 'center', 'valign': 'top'})
|
||||
wrap_text = workbook.add_format({'text_wrap': True, 'valign': 'top'})
|
||||
valign_top = workbook.add_format({'valign': 'top'})
|
||||
no_score = workbook.add_format({'valign': 'top', 'align': 'center'})
|
||||
dq_score_1 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_DS_25p})
|
||||
dq_score_2 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_DS_50p})
|
||||
dq_score_3 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_DS_75p, 'font_color': '#ffffff'})
|
||||
dq_score_4 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_DS_99p, 'font_color': '#ffffff'})
|
||||
dq_score_5 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_DS_100p, 'font_color': '#ffffff'})
|
||||
|
||||
# Title
|
||||
worksheet.write(0, 0, 'Data sources for ' + name, format_title)
|
||||
|
@ -102,19 +110,19 @@ def export_data_source_list_to_excel(filename):
|
|||
# Putting the data sources data:
|
||||
y = 3
|
||||
for d in get_all_mitre_data_sources():
|
||||
worksheet.write(y, 0, d)
|
||||
worksheet.write(y, 0, d, valign_top)
|
||||
if d in my_data_sources.keys():
|
||||
ds = my_data_sources[d]
|
||||
worksheet.write(y, 1, str(ds['date_registered']).replace('None', ''))
|
||||
worksheet.write(y, 2, str(ds['date_connected']).replace('None', ''))
|
||||
worksheet.write(y, 3, ', '.join(ds['products']).replace('None', ''))
|
||||
worksheet.write(y, 4, str(ds['comment']) if ds['comment'] else '')
|
||||
worksheet.write(y, 5, str(ds['available_for_data_analytics']))
|
||||
worksheet.write(y, 6, ds['data_quality']['device_completeness'], format_center)
|
||||
worksheet.write(y, 7, ds['data_quality']['data_field_completeness'], format_center)
|
||||
worksheet.write(y, 8, ds['data_quality']['timeliness'], format_center)
|
||||
worksheet.write(y, 9, ds['data_quality']['consistency'], format_center)
|
||||
worksheet.write(y, 10, ds['data_quality']['retention'], format_center)
|
||||
worksheet.write(y, 1, str(ds['date_registered']).replace('None', ''), valign_top)
|
||||
worksheet.write(y, 2, str(ds['date_connected']).replace('None', ''), valign_top)
|
||||
worksheet.write(y, 3, ', '.join(ds['products']).replace('None', ''), valign_top)
|
||||
worksheet.write(y, 4, ds['comment'][:-1] if ds['comment'].endswith('\n') else ds['comment'], wrap_text)
|
||||
worksheet.write(y, 5, str(ds['available_for_data_analytics']), valign_top)
|
||||
worksheet.write(y, 6, ds['data_quality']['device_completeness'], format_center_valign_top)
|
||||
worksheet.write(y, 7, ds['data_quality']['data_field_completeness'], format_center_valign_top)
|
||||
worksheet.write(y, 8, ds['data_quality']['timeliness'], format_center_valign_top)
|
||||
worksheet.write(y, 9, ds['data_quality']['consistency'], format_center_valign_top)
|
||||
worksheet.write(y, 10, ds['data_quality']['retention'], format_center_valign_top)
|
||||
|
||||
score = 0
|
||||
score_count = 0
|
||||
|
@ -125,7 +133,7 @@ def export_data_source_list_to_excel(filename):
|
|||
if score > 0:
|
||||
score = score/score_count
|
||||
|
||||
worksheet.write(y, 11, score, format_center)
|
||||
worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score)
|
||||
y += 1
|
||||
|
||||
worksheet.autofilter(2, 0, 2, 11)
|
||||
|
@ -214,12 +222,14 @@ def generate_technique_administration_file(filename):
|
|||
techniques = load_attack_data(DATATYPE_ALL_TECH)
|
||||
|
||||
# This is part of the techniques administration YAML file and is used as a template
|
||||
dict_tech = {'technique_id': '', 'detection': {'date_registered': None, 'date_implemented': None, 'score': -1,
|
||||
'location': [''], 'comment': ''},
|
||||
'visibility': {'score': 0, 'comment': ''}}
|
||||
dict_tech = {'technique_id': '', 'technique_name': '', 'detection': {'applicable_to': ['all'],
|
||||
'date_registered': None,
|
||||
'date_implemented': None,
|
||||
'score': -1, 'location': [''], 'comment': ''},
|
||||
'visibility': {'applicable_to': ['all'], 'score': 0, 'comment': ''}}
|
||||
|
||||
yaml_file = {}
|
||||
yaml_file['version'] = 1.0
|
||||
yaml_file['version'] = FILE_TYPE_TECHNIQUE_ADMINISTRATION_VERSION
|
||||
yaml_file['file_type'] = FILE_TYPE_TECHNIQUE_ADMINISTRATION
|
||||
yaml_file['name'] = name
|
||||
yaml_file['platform'] = platform
|
||||
|
@ -248,6 +258,7 @@ def generate_technique_administration_file(filename):
|
|||
if score > 0 and t['technique_id'] not in techniques_upper:
|
||||
tech = copy.deepcopy(dict_tech)
|
||||
tech['technique_id'] = t['technique_id']
|
||||
tech['technique_name'] = get_technique(techniques, t['technique_id'])['technique']
|
||||
tech['visibility']['score'] = score
|
||||
yaml_file['techniques'].append(tech)
|
||||
|
||||
|
|
59
dettact.py
59
dettact.py
|
@ -48,12 +48,16 @@ def init_menu():
|
|||
parser_visibility.add_argument('-ft', '--file-tech', help='path to the technique administration YAML file (used to '
|
||||
'score the level of visibility)', required=True)
|
||||
parser_visibility.add_argument('-fd', '--file-ds', help='path to the data source administration YAML file (used to '
|
||||
'add metadata on the involved data sources)',
|
||||
required=True)
|
||||
'add metadata on the involved data sources)')
|
||||
parser_visibility.add_argument('-a', '--applicable', help='filter techniques based on the \'applicable_to\' field '
|
||||
'in the technique administration YAML file. '
|
||||
'Not supported for Excel output', default='all')
|
||||
parser_visibility.add_argument('-l', '--layer', help='generate a visibility layer for the ATT&CK navigator',
|
||||
action='store_true')
|
||||
parser_visibility.add_argument('-e', '--excel', help='generate an Excel sheet with all administrated techniques',
|
||||
action='store_true')
|
||||
parser_visibility.add_argument('-o', '--overlay', help='generate a visibility layer overlayed with detections for '
|
||||
'the ATT&CK navigator.', action='store_true')
|
||||
'the ATT&CK navigator', action='store_true')
|
||||
|
||||
# create the detection parser
|
||||
parser_detection = subparsers.add_parser('detection', aliases=['d'],
|
||||
|
@ -66,10 +70,15 @@ def init_menu():
|
|||
parser_detection.add_argument('-fd', '--file-ds', help='path to the data source administration YAML file (used in '
|
||||
'the overlay with visibility to add metadata on the '
|
||||
'involved data sources)')
|
||||
parser_detection.add_argument('-l', '--layer', help='generate detection layer for the ATT&CK navigator',
|
||||
parser_detection.add_argument('-a', '--applicable', help='filter techniques based on the \'applicable_to\' field '
|
||||
'in the technique administration YAML file. '
|
||||
'Not supported for Excel output', default='all')
|
||||
parser_detection.add_argument('-l', '--layer', help='generate detection layer for the ATT&CK navigator',
|
||||
action='store_true')
|
||||
parser_detection.add_argument('-e', '--excel', help='generate an Excel sheet with all administrated techniques',
|
||||
action='store_true')
|
||||
parser_detection.add_argument('-o', '--overlay', help='generate a detection layer overlayed with visibility for '
|
||||
'the ATT&CK navigator.', action='store_true')
|
||||
'the ATT&CK navigator', action='store_true')
|
||||
parser_detection.add_argument('-g', '--graph', help='generate a graph with detections added through time',
|
||||
action='store_true')
|
||||
|
||||
|
@ -91,6 +100,9 @@ def init_menu():
|
|||
'VISIBILITY provide a YAML with the technique administration.')
|
||||
parser_group.add_argument('-t', '--overlay-type', help='specify the type of overlay (default = group)',
|
||||
choices=['group', 'visibility', 'detection'], default='group')
|
||||
parser_group.add_argument('-a', '--applicable', help='filter techniques in the detection or visibility overlay '
|
||||
'based on the \'applicable_to\' field in the technique '
|
||||
'administration YAML file. ', default='all')
|
||||
parser_group.add_argument('--software-group', help='add techniques to the heat map by checking which software is '
|
||||
'used by group(s), and hence which techniques the software '
|
||||
'supports (does not influence the scores). If overlay group(s) '
|
||||
|
@ -142,15 +154,26 @@ def menu(menu_parser):
|
|||
generate_technique_administration_file(args.file)
|
||||
|
||||
elif args.subparser in ['visibility', 'v']:
|
||||
if check_file_type(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION) and \
|
||||
check_file_type(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION):
|
||||
if args.layer:
|
||||
generate_visibility_layer(args.file_tech, args.file_ds, False)
|
||||
if args.overlay:
|
||||
generate_visibility_layer(args.file_tech, args.file_ds, True)
|
||||
if args.layer or args.overlay:
|
||||
if not args.file_ds:
|
||||
print('[!] Generating a visibility layer or doing an overlay requires adding the data source'
|
||||
'administration YAML file (\'--file-ds\')')
|
||||
quit()
|
||||
|
||||
if check_file_type(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION) and \
|
||||
check_file_type(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION):
|
||||
if args.layer:
|
||||
generate_visibility_layer(args.file_tech, args.file_ds, False, args.applicable)
|
||||
if args.overlay:
|
||||
generate_visibility_layer(args.file_tech, args.file_ds, True, args.applicable)
|
||||
|
||||
if args.excel and check_file_type(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION) and args.applicable == 'all':
|
||||
export_techniques_list_to_excel(args.file_tech)
|
||||
if args.excel and args.applicable != 'all':
|
||||
print("[!] Filtering on 'applicable_to' is not supported for Excel output")
|
||||
|
||||
elif args.subparser in ['group', 'g']:
|
||||
generate_group_heat_map(args.groups, args.overlay, args.overlay_type, args.stage, args.platform, args.software_group)
|
||||
generate_group_heat_map(args.groups, args.overlay, args.overlay_type, args.stage, args.platform, args.software_group, args.applicable)
|
||||
|
||||
elif args.subparser in ['detection', 'd']:
|
||||
if args.overlay:
|
||||
|
@ -162,11 +185,15 @@ def menu(menu_parser):
|
|||
|
||||
if check_file_type(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION):
|
||||
if args.layer:
|
||||
generate_detection_layer(args.file_tech, args.file_ds, False)
|
||||
generate_detection_layer(args.file_tech, args.file_ds, False, args.applicable)
|
||||
if args.overlay and check_file_type(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION):
|
||||
generate_detection_layer(args.file_tech, args.file_ds, True)
|
||||
generate_detection_layer(args.file_tech, args.file_ds, True, args.applicable)
|
||||
if args.graph:
|
||||
plot_detection_graph(args.file_tech)
|
||||
plot_detection_graph(args.file_tech, args.applicable)
|
||||
if args.excel and args.applicable == 'all':
|
||||
export_techniques_list_to_excel(args.file_tech)
|
||||
if args.excel and args.applicable != 'all':
|
||||
print("[!] Filtering on 'applicable_to' is not supported for Excel output")
|
||||
|
||||
elif args.subparser in ['generic', 'ge']:
|
||||
if args.statistics:
|
||||
|
@ -174,6 +201,8 @@ def menu(menu_parser):
|
|||
elif args.updates:
|
||||
get_updates(args.updates, args.sort)
|
||||
|
||||
else:
|
||||
menu_parser.print_help()
|
||||
|
||||
def prepare_folders():
|
||||
"""
|
||||
|
|
122
generic.py
122
generic.py
|
@ -2,69 +2,11 @@ import os
|
|||
import pickle
|
||||
from datetime import datetime as dt
|
||||
import yaml
|
||||
from upgrade import upgrade_yaml_file
|
||||
from constants import *
|
||||
|
||||
# Due to performance reasons the import of attackcti is within the function that makes use of this library.
|
||||
|
||||
APP_NAME = 'DeTT&CT'
|
||||
APP_DESC = 'Detect Tactics, Techniques & Combat Threats'
|
||||
VERSION = '1.0'
|
||||
|
||||
EXPIRE_TIME = 60*60*24
|
||||
|
||||
DATATYPE_TECH_BY_GROUP = 'mitre_techniques_used_by_group'
|
||||
DATATYPE_ALL_TECH = 'mitre_all_techniques'
|
||||
DATATYPE_ALL_GROUPS = 'mitre_all_groups'
|
||||
DATATYPE_ALL_SOFTWARE = 'mitre_all_software'
|
||||
DATATYPE_TECH_BY_SOFTWARE = 'mitre_techniques_used_by_software'
|
||||
DATATYPE_SOFTWARE_BY_GROUP = 'mitre_software_used_by_group'
|
||||
|
||||
# Group colors
|
||||
COLOR_GROUP_OVERLAY_MATCH = '#f9a825' # orange
|
||||
COLOR_GROUP_OVERLAY_NO_MATCH = '#ffee58' # yellow
|
||||
COLOR_SOFTWARE = '#0d47a1 ' # dark blue
|
||||
COLOR_GROUP_AND_SOFTWARE = '#64b5f6 ' # light blue
|
||||
COLOR_GRADIENT_MIN = '#ffcece' # light red
|
||||
COLOR_GRADIENT_MAX = '#ff0000' # red
|
||||
COLOR_TACTIC_ROW_BACKGRND = '#dddddd' # light grey
|
||||
COLOR_GROUP_OVERLAY_ONLY_DETECTION = '#8BC34A' # green
|
||||
COLOR_GROUP_OVERLAY_ONLY_VISIBILITY = '#1976D2' # blue
|
||||
|
||||
# data source colors (purple range)
|
||||
COLOR_DS_25p = '#E1BEE7'
|
||||
COLOR_DS_50p = '#CE93D8'
|
||||
COLOR_DS_75p = '#AB47BC'
|
||||
COLOR_DS_99p = '#7B1FA2'
|
||||
COLOR_DS_100p = '#4A148C'
|
||||
|
||||
# data source colors HAPPY (green range)
|
||||
COLOR_DS_25p_HAPPY = '#DCEDC8'
|
||||
COLOR_DS_50p_HAPPY = '#AED581'
|
||||
COLOR_DS_75p_HAPPY = '#8BC34A'
|
||||
COLOR_DS_99p_HAPPY = '#689F38'
|
||||
COLOR_DS_100p_HAPPY = '#33691E'
|
||||
|
||||
# Detection colors (green range)
|
||||
COLOR_D_0 = '#64B5F6' # Blue: Forensics/Context
|
||||
COLOR_D_1 = '#DCEDC8'
|
||||
COLOR_D_2 = '#AED581'
|
||||
COLOR_D_3 = '#8BC34A'
|
||||
COLOR_D_4 = '#689F38'
|
||||
COLOR_D_5 = '#33691E'
|
||||
|
||||
# Visibility colors (blue range)
|
||||
COLOR_V_1 = '#BBDEFB'
|
||||
COLOR_V_2 = '#64B5F6'
|
||||
COLOR_V_3 = '#1976D2'
|
||||
COLOR_V_4 = '#0D47A1'
|
||||
|
||||
# Detection and visibility overlay color:
|
||||
COLOR_OVERLAY_VISIBILITY = COLOR_V_3
|
||||
COLOR_OVERLAY_DETECTION = COLOR_D_3
|
||||
COLOR_OVERLAY_BOTH = COLOR_GROUP_OVERLAY_MATCH
|
||||
|
||||
FILE_TYPE_DATA_SOURCE_ADMINISTRATION = 'data-source-administration'
|
||||
FILE_TYPE_TECHNIQUE_ADMINISTRATION = 'technique-administration'
|
||||
FILE_TYPE_GROUP_ADMINISTRATION = 'group-administration'
|
||||
|
||||
|
||||
def save_attack_data(data, path):
|
||||
"""
|
||||
|
@ -154,29 +96,37 @@ def _get_base_template(name, description, stage, platform, sorting):
|
|||
return layer
|
||||
|
||||
|
||||
def get_layer_template_groups(name, max_score, description, stage, platform):
|
||||
def get_layer_template_groups(name, max_score, description, stage, platform, overlay_type):
|
||||
"""
|
||||
Prepares a base template for the json layer file that can be loaded into the MITRE ATT&CK Navigator.
|
||||
More information on the version 2.1 layer format:
|
||||
https://github.com/mitre/attack-navigator/blob/master/layers/LAYERFORMATv2_1.md
|
||||
:param name: name
|
||||
:param max_score: max_score
|
||||
:param max_score: max_score = max_tech_count_group
|
||||
:param description: description
|
||||
:param stage: stage (act | prepare)
|
||||
:param platform: platform
|
||||
:param overlay_type: group, visibility or detection
|
||||
:return: layer template dictionary
|
||||
"""
|
||||
layer = _get_base_template(name, description, stage, platform, 3)
|
||||
layer['gradient'] = {'colors': [COLOR_GRADIENT_MIN, COLOR_GRADIENT_MAX], 'minValue': 0, 'maxValue': max_score}
|
||||
layer['legendItems'] = \
|
||||
[
|
||||
{'label': 'Tech. ref. for ' + str(1) + ' group', 'color': COLOR_GRADIENT_MIN},
|
||||
{'label': 'Tech. ref. for ' + str(max_score) + ' groups', 'color': COLOR_GRADIENT_MAX},
|
||||
{'label': 'Groups overlay: tech. in group + overlay', 'color': COLOR_GROUP_OVERLAY_MATCH},
|
||||
{'label': 'Groups overlay: tech. in overlay', 'color': COLOR_GROUP_OVERLAY_NO_MATCH},
|
||||
{'label': 'Src. of tech. is only software', 'color': COLOR_SOFTWARE},
|
||||
{'label': 'Src. of tech. is group(s)/overlay + software', 'color': COLOR_GROUP_AND_SOFTWARE}
|
||||
]
|
||||
layer['legendItems'] = []
|
||||
layer['legendItems'].append({'label': 'Tech. ref. for ' + str(1) + ' group', 'color': COLOR_GRADIENT_MIN})
|
||||
layer['legendItems'].append({'label': 'Tech. ref. for ' + str(max_score) + ' groups', 'color': COLOR_GRADIENT_MAX})
|
||||
|
||||
if overlay_type == OVERLAY_TYPE_GROUP:
|
||||
layer['legendItems'].append({'label': 'Groups overlay: tech. in group + overlay', 'color': COLOR_GROUP_OVERLAY_MATCH})
|
||||
layer['legendItems'].append({'label': 'Groups overlay: tech. in overlay', 'color': COLOR_GROUP_OVERLAY_NO_MATCH})
|
||||
layer['legendItems'].append({'label': 'Src. of tech. is only software', 'color': COLOR_SOFTWARE})
|
||||
layer['legendItems'].append({'label': 'Src. of tech. is group(s)/overlay + software', 'color': COLOR_GROUP_AND_SOFTWARE})
|
||||
elif overlay_type == OVERLAY_TYPE_DETECTION:
|
||||
layer['legendItems'].append({'label': 'Tech. in group + detection', 'color': COLOR_GROUP_OVERLAY_MATCH})
|
||||
layer['legendItems'].append({'label': 'Tech. in detection', 'color': COLOR_GROUP_OVERLAY_ONLY_DETECTION})
|
||||
elif overlay_type == OVERLAY_TYPE_VISIBILITY:
|
||||
layer['legendItems'].append({'label': 'Tech. in group + visibility', 'color': COLOR_GROUP_OVERLAY_MATCH})
|
||||
layer['legendItems'].append({'label': 'Tech. in visibility', 'color': COLOR_GROUP_OVERLAY_ONLY_VISIBILITY})
|
||||
|
||||
return layer
|
||||
|
||||
|
||||
|
@ -192,7 +142,6 @@ def get_layer_template_detections(name, description, stage, platform):
|
|||
:return: layer template dictionary
|
||||
"""
|
||||
layer = _get_base_template(name, description, stage, platform, 0)
|
||||
layer['gradient'] = {'colors': ['#ff6666', '#ffe766', '#8ec843'], 'minValue': 0, 'maxValue': 100}
|
||||
layer['legendItems'] = \
|
||||
[
|
||||
{'label': 'Detection score 0: Forensics/Context', 'color': COLOR_D_0},
|
||||
|
@ -217,7 +166,6 @@ def get_layer_template_data_sources(name, description, stage, platform):
|
|||
:return: layer template dictionary
|
||||
"""
|
||||
layer = _get_base_template(name, description, stage, platform, 0)
|
||||
layer['gradient'] = {'colors': ['#ff6666', '#ffe766', '#8ec843'], 'minValue': 0, 'maxValue': 100}
|
||||
layer['legendItems'] = \
|
||||
[
|
||||
{'label': '1-25% of data sources available', 'color': COLOR_DS_25p},
|
||||
|
@ -241,7 +189,6 @@ def get_layer_template_visibility(name, description, stage, platform):
|
|||
:return: layer template dictionary
|
||||
"""
|
||||
layer = _get_base_template(name, description, stage, platform, 0)
|
||||
layer['gradient'] = {'colors': ['#ff6666', '#ffe766', '#8ec843'], 'minValue': 0, 'maxValue': 100}
|
||||
layer['legendItems'] = \
|
||||
[
|
||||
{'label': 'Visibility score 1: Minimal', 'color': COLOR_V_1},
|
||||
|
@ -264,7 +211,6 @@ def get_layer_template_layered(name, description, stage, platform):
|
|||
:return: layer template dictionary
|
||||
"""
|
||||
layer = _get_base_template(name, description, stage, platform, 0)
|
||||
layer['gradient'] = {'colors': ['#ff6666', '#ffe766', '#8ec843'], 'minValue': 0, 'maxValue': 100}
|
||||
layer['legendItems'] = \
|
||||
[
|
||||
{'label': 'Visibility', 'color': COLOR_OVERLAY_VISIBILITY},
|
||||
|
@ -350,10 +296,13 @@ def check_file_type(filename, file_type=None):
|
|||
with open(filename, 'r') as yaml_file:
|
||||
try:
|
||||
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
||||
except:
|
||||
except Exception as e:
|
||||
print('[!] File: \'' + filename + '\' is not a valid YAML file.')
|
||||
print(' ' + str(e)) # print more detailed error information to help the user in fixing the error.
|
||||
return None
|
||||
|
||||
# This check is performed because a text file will also be considered to be valid YAML. But, we are using
|
||||
# key-value pairs within the YAML files.
|
||||
if not hasattr(yaml_content, 'keys'):
|
||||
print('[!] File: \'' + filename + '\' is not a valid YAML file.')
|
||||
return None
|
||||
|
@ -366,6 +315,25 @@ def check_file_type(filename, file_type=None):
|
|||
print('[!] File: \'' + filename + '\' is not a file type of: \'' + file_type + '\'')
|
||||
return None
|
||||
else:
|
||||
upgrade_yaml_file(filename, file_type, yaml_content['version'], load_attack_data(DATATYPE_ALL_TECH))
|
||||
return yaml_content['file_type']
|
||||
else:
|
||||
upgrade_yaml_file(filename, file_type, yaml_content['version'], load_attack_data(DATATYPE_ALL_TECH))
|
||||
return yaml_content['file_type']
|
||||
|
||||
|
||||
def calculate_score(l, zero_value=0):
|
||||
"""
|
||||
Calculates the average score in the given list which contains dictionaries with 'score' field.
|
||||
:param l: list
|
||||
:param zero_value: the value when no scores are there, default 0
|
||||
:return: average score
|
||||
"""
|
||||
s = 0
|
||||
number = 0
|
||||
for v in l:
|
||||
if v['score'] >= 0:
|
||||
s += v['score']
|
||||
number += 1
|
||||
s = int(round(s / number, 0) if number > 0 else zero_value)
|
||||
return s
|
||||
|
|
114
group_mapping.py
114
group_mapping.py
|
@ -1,6 +1,6 @@
|
|||
import simplejson
|
||||
from generic import *
|
||||
from technique_mapping import _load_detections
|
||||
from technique_mapping import _load_techniques
|
||||
|
||||
CG_GROUPS = {}
|
||||
|
||||
|
@ -201,56 +201,62 @@ def get_group_techniques(groups, stage, platform, file_type):
|
|||
return groups_dict
|
||||
|
||||
|
||||
def get_detection_techniques(filename):
|
||||
def get_detection_techniques(filename, filter_applicable_to):
|
||||
"""
|
||||
Get all techniques (in a dict) from the detection administration
|
||||
:param filename: path to the YAML technique administration file
|
||||
:return: dictionary
|
||||
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
|
||||
:return: groups dictionary, loaded techniques from administration YAML file
|
||||
"""
|
||||
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
|
||||
groups_dict = {}
|
||||
|
||||
detection_techniques, name, platform = _load_detections(filename)
|
||||
detection_techniques, name, platform = _load_techniques(filename, 'detection', filter_applicable_to)
|
||||
|
||||
group_id = 'DETECTION'
|
||||
groups_dict[group_id] = {}
|
||||
groups_dict[group_id]['group_name'] = 'Detection'
|
||||
groups_dict[group_id]['techniques'] = set()
|
||||
for t, v in detection_techniques.items():
|
||||
if 'detection' in v.keys() and v['detection']['score'] > 0:
|
||||
s = calculate_score(v['detection'])
|
||||
if s > 0:
|
||||
groups_dict[group_id]['techniques'].add(t)
|
||||
|
||||
return groups_dict
|
||||
return groups_dict, detection_techniques
|
||||
|
||||
|
||||
def get_visibility_techniques(filename):
|
||||
def get_visibility_techniques(filename, filter_applicable_to):
|
||||
"""
|
||||
Get all techniques (in a dict) from the detections administration
|
||||
:param filename: path to the YAML technique administration file
|
||||
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
|
||||
:return: dictionary
|
||||
"""
|
||||
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
|
||||
groups_dict = {}
|
||||
|
||||
detection_techniques, name, platform = _load_detections(filename)
|
||||
visibility_techniques, name, platform = _load_techniques(filename, 'visibility', filter_applicable_to)
|
||||
|
||||
group_id = 'VISIBILITY'
|
||||
groups_dict[group_id] = {}
|
||||
groups_dict[group_id]['group_name'] = 'Visibility'
|
||||
groups_dict[group_id]['techniques'] = set()
|
||||
for t, v in detection_techniques.items():
|
||||
if 'visibility' in v.keys() and v['visibility']['score'] > 0:
|
||||
for t, v in visibility_techniques.items():
|
||||
s = calculate_score(v['visibility'])
|
||||
if s > 0:
|
||||
groups_dict[group_id]['techniques'].add(t)
|
||||
|
||||
return groups_dict
|
||||
return groups_dict, visibility_techniques
|
||||
|
||||
|
||||
def get_technique_count(groups, groups_overlay, groups_software):
|
||||
def get_technique_count(groups, groups_overlay, groups_software, overlay_type, all_techniques):
|
||||
"""
|
||||
Create a dict with all involved techniques and their relevant count/score
|
||||
:param groups: a dict with data on groups
|
||||
:param groups_overlay: a dict with data on the groups to overlay
|
||||
:param groups_software: a dict with with data on which techniques are used within related software
|
||||
:param overlay_type: group, visibility or detection
|
||||
:param all_techniques: dict containing all technique data for visibility or detection
|
||||
:return: dictionary
|
||||
"""
|
||||
# { technique_id: {count: ..., groups: set{} }
|
||||
|
@ -262,26 +268,49 @@ def get_technique_count(groups, groups_overlay, groups_software):
|
|||
techniques_dict[tech] = dict()
|
||||
techniques_dict[tech]['groups'] = set()
|
||||
techniques_dict[tech]['count'] = 1
|
||||
|
||||
# We only want to increase the score when comparing groups and not for visibility or detection.
|
||||
# This allows to have proper sorting of the heat map, which in turn improves the ability to visually
|
||||
# compare this heat map with the detection/visibility ATT&CK Navigator layers.
|
||||
else:
|
||||
techniques_dict[tech]['count'] += 1
|
||||
techniques_dict[tech]['groups'].add(group)
|
||||
|
||||
max_tech_count_group = max(techniques_dict.values(), key=lambda v: v['count'])['count']
|
||||
|
||||
# create dict {tech_id: score+max_tech_count} to be used for when doing an overlay of the type visibility or detection
|
||||
if overlay_type != OVERLAY_TYPE_GROUP:
|
||||
dict_tech_score = {}
|
||||
list_tech = groups_overlay[overlay_type.upper()]['techniques']
|
||||
for tech in list_tech:
|
||||
dict_tech_score[tech] = calculate_score(all_techniques[tech][overlay_type]) + max_tech_count_group
|
||||
|
||||
for group, v in groups_overlay.items():
|
||||
for tech in v['techniques']:
|
||||
if tech not in techniques_dict:
|
||||
techniques_dict[tech] = dict()
|
||||
techniques_dict[tech]['groups'] = set()
|
||||
techniques_dict[tech]['count'] = 1
|
||||
if overlay_type == OVERLAY_TYPE_GROUP:
|
||||
techniques_dict[tech]['count'] = 1
|
||||
else:
|
||||
techniques_dict[tech]['count'] = dict_tech_score[tech]
|
||||
elif group in groups:
|
||||
if tech not in groups[group]['techniques']:
|
||||
techniques_dict[tech]['count'] += 1
|
||||
if overlay_type == OVERLAY_TYPE_GROUP:
|
||||
techniques_dict[tech]['count'] += 1
|
||||
else:
|
||||
techniques_dict[tech]['count'] = dict_tech_score[tech]
|
||||
# Only to this when it was not already counted by being part of 'groups'.
|
||||
# Meaning the group in 'groups_overlay' was also part of 'groups' (match on Group ID) and the
|
||||
# technique was already counted for that group / it is not a new technique for that group coming
|
||||
# from a YAML file
|
||||
else:
|
||||
techniques_dict[tech]['count'] += 1
|
||||
# increase count when the group in the YAML file is a custom group
|
||||
if overlay_type == OVERLAY_TYPE_GROUP:
|
||||
# increase count when the group in the YAML file is a custom group
|
||||
techniques_dict[tech]['count'] += 1
|
||||
else:
|
||||
techniques_dict[tech]['count'] = dict_tech_score[tech]
|
||||
|
||||
techniques_dict[tech]['groups'].add(group)
|
||||
|
||||
for group, v in groups_software.items():
|
||||
|
@ -295,25 +324,28 @@ def get_technique_count(groups, groups_overlay, groups_software):
|
|||
techniques_dict[tech]['groups'] = set()
|
||||
techniques_dict[tech]['groups'].add(group)
|
||||
|
||||
return techniques_dict
|
||||
return techniques_dict, max_tech_count_group
|
||||
|
||||
|
||||
def get_technique_layer(techniques, groups, overlay, groups_software, overlay_file_type, overlay_type):
|
||||
def get_technique_layer(techniques_count, groups, overlay, groups_software, overlay_file_type, overlay_type,
|
||||
all_techniques, max_tech_count_group):
|
||||
"""
|
||||
Create the technique layer that will be part of the ATT&CK navigator json file
|
||||
:param techniques: involved techniques with count (to be used within the scores)
|
||||
:param techniques_count: involved techniques with count (to be used within the scores)
|
||||
:param groups: a dict with data on groups
|
||||
:param overlay: a dict with data on the groups to overlay
|
||||
:param groups_software: a dict with with data on which techniques are used within related software
|
||||
:param overlay_file_type: the file type of the YAML file as present in the key 'file_type'
|
||||
:param overlay_type: group, visibility or detection
|
||||
:param all_techniques: dictionary with all techniques loaded from techniques administration YAML file
|
||||
:param max_tech_count_group: the maximum number of times a technique is used among threat actor groups
|
||||
:return: dictionary
|
||||
"""
|
||||
techniques_layer = []
|
||||
|
||||
# { technique_id: {count: ..., groups: set{} }
|
||||
# add the technique count/scoring
|
||||
for tech, v in techniques.items():
|
||||
for tech, v in techniques_count.items():
|
||||
t = dict()
|
||||
t['techniqueID'] = tech
|
||||
t['score'] = v['count']
|
||||
|
@ -335,19 +367,25 @@ def get_technique_layer(techniques, groups, overlay, groups_software, overlay_fi
|
|||
# change the color and add metadata to make the groups overlay visible
|
||||
for group, values in overlay.items():
|
||||
if tech in values['techniques']:
|
||||
# Determine color:
|
||||
if len(v['groups'].intersection(set(groups.keys()))) > 0:
|
||||
# if the technique is both present in the group (-g/--groups) and the groups overlay (-o/--overlay)
|
||||
t['color'] = COLOR_GROUP_OVERLAY_MATCH
|
||||
else:
|
||||
# the technique is only present in the overlay and not in the provided groups (-g/--groups)
|
||||
if overlay_file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||
if overlay_type == 'visibility':
|
||||
if overlay_type == OVERLAY_TYPE_VISIBILITY:
|
||||
t['color'] = COLOR_GROUP_OVERLAY_ONLY_VISIBILITY
|
||||
elif overlay_type == 'detection':
|
||||
elif overlay_type == OVERLAY_TYPE_DETECTION:
|
||||
t['color'] = COLOR_GROUP_OVERLAY_ONLY_DETECTION
|
||||
else:
|
||||
t['color'] = COLOR_GROUP_OVERLAY_NO_MATCH
|
||||
|
||||
# Add applicable_to to metadata in case of overlay for detection/visibility:
|
||||
if overlay_file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||
metadata_dict['Applicable to'] = set([a for v in all_techniques[tech][overlay_type] for a in v['applicable_to']])
|
||||
metadata_dict[overlay_type.capitalize() + ' score'] = [str(techniques_count[tech]['count'] - max_tech_count_group)]
|
||||
|
||||
if 'Overlay' not in metadata_dict:
|
||||
metadata_dict['Overlay'] = set()
|
||||
metadata_dict['Overlay'].add(values['group_name'])
|
||||
|
@ -401,7 +439,7 @@ def get_group_list(groups, file_type):
|
|||
return groups
|
||||
|
||||
|
||||
def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, software_groups):
|
||||
def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, software_groups, filter_applicable_to):
|
||||
"""
|
||||
Calls all functions that are necessary for the generation of the heat map and write a json layer to disk.
|
||||
:param groups: threat actor groups
|
||||
|
@ -411,6 +449,7 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
|
|||
:param stage: attack or pre-attack
|
||||
:param platform: all, Linux, macOS, Windows
|
||||
:param software_groups: specify if techniques from related software should be included.
|
||||
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
|
||||
:return: returns nothing when something's wrong
|
||||
"""
|
||||
overlay_dict = {}
|
||||
|
@ -429,7 +468,9 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
|
|||
overlay_file_type = None
|
||||
if overlay:
|
||||
if os.path.isfile(overlay):
|
||||
expected_file_type = FILE_TYPE_GROUP_ADMINISTRATION if overlay_type == 'group' else FILE_TYPE_TECHNIQUE_ADMINISTRATION if overlay_type in ['visibility', 'detection'] else None
|
||||
expected_file_type = FILE_TYPE_GROUP_ADMINISTRATION if overlay_type == OVERLAY_TYPE_GROUP \
|
||||
else FILE_TYPE_TECHNIQUE_ADMINISTRATION \
|
||||
if overlay_type in [OVERLAY_TYPE_VISIBILITY, OVERLAY_TYPE_DETECTION] else None
|
||||
overlay_file_type = check_file_type(overlay, expected_file_type)
|
||||
if not overlay_file_type:
|
||||
return
|
||||
|
@ -439,11 +480,12 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
|
|||
else:
|
||||
overlay = []
|
||||
|
||||
all_techniques = None
|
||||
if overlay_file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||
if overlay_type == 'visibility':
|
||||
overlay_dict = get_visibility_techniques(overlay)
|
||||
elif overlay_type == 'detection':
|
||||
overlay_dict = get_detection_techniques(overlay)
|
||||
if overlay_type == OVERLAY_TYPE_VISIBILITY:
|
||||
overlay_dict, all_techniques = get_visibility_techniques(overlay, filter_applicable_to)
|
||||
elif overlay_type == OVERLAY_TYPE_DETECTION:
|
||||
overlay_dict, all_techniques = get_detection_techniques(overlay, filter_applicable_to)
|
||||
elif len(overlay) > 0:
|
||||
overlay_dict = get_group_techniques(overlay, stage, platform, overlay_file_type)
|
||||
if not overlay_dict:
|
||||
|
@ -456,17 +498,17 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
|
|||
print('[!] Empty layer.') # the provided groups dit not result in any techniques
|
||||
return
|
||||
|
||||
# check if we are doing an software group overlay
|
||||
# check if we are doing a software group overlay
|
||||
if software_groups and overlay: # TODO add support for campaign info in layer metadata
|
||||
# if a group overlay is provided, get the software techniques for the overlay
|
||||
groups_software_dict = get_software_techniques(overlay, stage, platform)
|
||||
if overlay_type not in [OVERLAY_TYPE_VISIBILITY, OVERLAY_TYPE_DETECTION]:
|
||||
# if a group overlay is provided, get the software techniques for the overlay
|
||||
groups_software_dict = get_software_techniques(overlay, stage, platform)
|
||||
elif software_groups:
|
||||
groups_software_dict = get_software_techniques(groups, stage, platform)
|
||||
|
||||
technique_count = get_technique_count(groups_dict, overlay_dict, groups_software_dict)
|
||||
technique_count, max_tech_count_group = get_technique_count(groups_dict, overlay_dict, groups_software_dict, overlay_type, all_techniques)
|
||||
technique_layer = get_technique_layer(technique_count, groups_dict, overlay_dict, groups_software_dict,
|
||||
overlay_file_type, overlay_type)
|
||||
max_technique_count = max(technique_count.values(), key=lambda v: v['count'])['count']
|
||||
overlay_file_type, overlay_type, all_techniques, max_tech_count_group)
|
||||
|
||||
# make a list group names for the involved groups.
|
||||
if groups == ['all']:
|
||||
|
@ -478,13 +520,13 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
|
|||
desc = 'stage: ' + stage + ' | platform: ' + platform + ' | group(s): ' + ', '.join(groups_list) + \
|
||||
' | overlay group(s): ' + ', '.join(overlay_list)
|
||||
|
||||
layer = get_layer_template_groups(stage[0].upper() + stage[1:] + ' ' + platform, max_technique_count, desc, stage, platform)
|
||||
layer = get_layer_template_groups(stage[0].upper() + stage[1:] + ' ' + platform, max_tech_count_group, desc, stage, platform, overlay_type)
|
||||
layer['techniques'] = technique_layer
|
||||
|
||||
json_string = simplejson.dumps(layer).replace('}, ', '},\n')
|
||||
|
||||
if overlay:
|
||||
filename = "output/" + stage + '_' + platform.lower() + '_' + '_'.join(groups_list) + '-overlay_' + '_'.join(overlay_list) + '.json'
|
||||
filename = "output/" + stage + '_' + platform.lower() + '_' + '_'.join(groups_list) + '-overlay_' + '_'.join(overlay_list) + '_' + filter_applicable_to.replace(' ', '_') + '.json'
|
||||
else:
|
||||
filename = "output/" + stage + '_' + platform.lower() + '_' + '_'.join(groups_list) + '.json'
|
||||
with open(filename, 'w') as f: # write layer file to disk
|
||||
|
|
|
@ -3,6 +3,7 @@ import glob
|
|||
from data_source_mapping import *
|
||||
from technique_mapping import *
|
||||
from group_mapping import *
|
||||
from constants import *
|
||||
|
||||
|
||||
groups = 'all'
|
||||
|
@ -10,12 +11,9 @@ software_group = False
|
|||
platform = 'Windows'
|
||||
stage = 'attack'
|
||||
groups_overlay = ''
|
||||
overlay_type = ''
|
||||
|
||||
MENU_NAME_DATA_SOURCE_MAPPING = 'Data source mapping'
|
||||
MENU_NAME_VISIBILITY_MAPPING = 'Visibility coverage mapping'
|
||||
MENU_NAME_DETECTION_COVERAGE_MAPPING = 'Detection coverage mapping'
|
||||
MENU_NAME_THREAT_ACTOR_GROUP_MAPPING = 'Threat actor group mapping'
|
||||
overlay_type = 'group'
|
||||
filter_applicable_to = 'all'
|
||||
yaml_path = 'sample-data/'
|
||||
|
||||
|
||||
def clear():
|
||||
|
@ -50,7 +48,7 @@ def wait():
|
|||
:return:
|
||||
"""
|
||||
print('')
|
||||
print('Press a key to return to the last menu')
|
||||
print('Press a key to continue')
|
||||
input('')
|
||||
|
||||
|
||||
|
@ -88,7 +86,7 @@ def interactive_menu():
|
|||
interactive_menu()
|
||||
|
||||
|
||||
def select_file(title, what, expected_file_type, b_clear=True, path='sample-data/'):
|
||||
def select_file(title, what, expected_file_type, b_clear=True):
|
||||
"""
|
||||
Prints and handles the file selection in the terminal. It shows just .yaml files.
|
||||
:param title: title to print on top of this menu
|
||||
|
@ -98,16 +96,17 @@ def select_file(title, what, expected_file_type, b_clear=True, path='sample-data
|
|||
:param path: the path to look in
|
||||
:return: filename of the selected file
|
||||
"""
|
||||
global yaml_path
|
||||
if b_clear:
|
||||
clear()
|
||||
print('Menu: %s' % title)
|
||||
print('')
|
||||
print('Select the YAML file with %s:' % what)
|
||||
print('')
|
||||
print('Path: %s' % path)
|
||||
print('Path: %s' % yaml_path)
|
||||
n = 1
|
||||
files = []
|
||||
for f in glob.glob(path + '*.yaml'):
|
||||
for f in glob.glob(yaml_path + '*.yaml'):
|
||||
files.append(f)
|
||||
print('%d. %s' % (n, f))
|
||||
n += 1
|
||||
|
@ -124,11 +123,12 @@ def select_file(title, what, expected_file_type, b_clear=True, path='sample-data
|
|||
choice = ask_input()
|
||||
choice = choice if choice.endswith('/') else choice + '/'
|
||||
if os.path.exists(choice):
|
||||
return select_file(title, what, expected_file_type, b_clear, choice)
|
||||
yaml_path = choice
|
||||
return select_file(title, what, expected_file_type, b_clear)
|
||||
else:
|
||||
print("[!] Path doesn't exist")
|
||||
wait()
|
||||
return select_file(title, what, expected_file_type, b_clear, path)
|
||||
return select_file(title, what, expected_file_type, b_clear)
|
||||
elif choice == str(back_nr):
|
||||
interactive_menu()
|
||||
elif choice == 'q':
|
||||
|
@ -138,12 +138,14 @@ def select_file(title, what, expected_file_type, b_clear=True, path='sample-data
|
|||
filename = files[int(choice) - 1]
|
||||
file_type = check_file_type(filename, file_type=expected_file_type)
|
||||
if file_type:
|
||||
print('Selected file: ' + filename)
|
||||
wait()
|
||||
return filename
|
||||
else:
|
||||
print("[!] Invalid choice")
|
||||
|
||||
wait()
|
||||
return select_file(title, what, expected_file_type, b_clear, path)
|
||||
return select_file(title, what, expected_file_type, b_clear)
|
||||
|
||||
|
||||
def menu_updates():
|
||||
|
@ -250,29 +252,44 @@ def menu_detection(filename_t):
|
|||
:param filename_t:
|
||||
:return:
|
||||
"""
|
||||
global filter_applicable_to
|
||||
clear()
|
||||
print('Menu: %s' % MENU_NAME_DETECTION_COVERAGE_MAPPING)
|
||||
print('')
|
||||
print('Selected techniques YAML file: %s' % filename_t)
|
||||
print('')
|
||||
print('Options:')
|
||||
print('1. Filter techniques based on the \'applicable_to\' field in the technique administration YAML file (not '
|
||||
'for Excel output): %s' % filter_applicable_to)
|
||||
print('')
|
||||
print('Select what you want to do:')
|
||||
print('1. Generate a layer for detection coverage for the ATT&CK Navigator.')
|
||||
print('2. Generate a layer for detection coverage overlayed with visibility for the ATT&CK Navigator.')
|
||||
print('3. Generate a graph with detections added through time.')
|
||||
print('2. Generate a layer for detection coverage for the ATT&CK Navigator.')
|
||||
print('3. Generate a layer for detection coverage overlayed with visibility for the ATT&CK Navigator.')
|
||||
print('4. Generate a graph with detections added through time.')
|
||||
print('5. Generate an Excel sheet with all administrated techniques.')
|
||||
print('9. Back to main menu.')
|
||||
choice = ask_input()
|
||||
if choice == '1':
|
||||
print('Writing detection coverage layer...')
|
||||
generate_detection_layer(filename_t, None, False)
|
||||
wait()
|
||||
print('Specify your filter for the applicable_to field:')
|
||||
filter_applicable_to = ask_input().lower()
|
||||
elif choice == '2':
|
||||
filename_ds = select_file(MENU_NAME_DETECTION_COVERAGE_MAPPING, 'data sources (used to add metadata on the involved data sources to the heat map)', FILE_TYPE_DATA_SOURCE_ADMINISTRATION, False)
|
||||
print('Writing detection coverage layer with visibility as overlay...')
|
||||
generate_detection_layer(filename_t, filename_ds, True)
|
||||
print('Writing detection coverage layer...')
|
||||
generate_detection_layer(filename_t, None, False, filter_applicable_to)
|
||||
wait()
|
||||
elif choice == '3':
|
||||
filename_ds = select_file(MENU_NAME_DETECTION_COVERAGE_MAPPING, 'data sources (used to add metadata on the '
|
||||
'involved data sources to the heat map)',
|
||||
FILE_TYPE_DATA_SOURCE_ADMINISTRATION, False)
|
||||
print('Writing detection coverage layer with visibility as overlay...')
|
||||
generate_detection_layer(filename_t, filename_ds, True, filter_applicable_to)
|
||||
wait()
|
||||
elif choice == '4':
|
||||
print('Drawing the graph...')
|
||||
plot_detection_graph(filename_t)
|
||||
plot_detection_graph(filename_t, filter_applicable_to)
|
||||
wait()
|
||||
elif choice == '5':
|
||||
print('Generating Excel file...')
|
||||
export_techniques_list_to_excel(filename_t)
|
||||
wait()
|
||||
elif choice == '9':
|
||||
interactive_menu()
|
||||
|
@ -288,24 +305,37 @@ def menu_visibility(filename_t, filename_ds):
|
|||
:param filename_ds:
|
||||
:return:
|
||||
"""
|
||||
global filter_applicable_to
|
||||
clear()
|
||||
print('Menu: %s' % MENU_NAME_VISIBILITY_MAPPING)
|
||||
print('')
|
||||
print('Selected techniques YAML file: %s' % filename_t)
|
||||
print('Selected data source YAML file: %s' % filename_ds)
|
||||
print('')
|
||||
print('Options:')
|
||||
print('1. Filter techniques based on the \'applicable_to\' field in the technique administration YAML file (not for '
|
||||
'Excel output): %s' % filter_applicable_to)
|
||||
print('')
|
||||
print('Select what you want to do:')
|
||||
print('1. Generate a layer for visibility for the ATT&CK Navigator.')
|
||||
print('2. Generate a layers for visibility overlayed with detection coverage for the ATT&CK Navigator.')
|
||||
print('2. Generate a layer for visibility for the ATT&CK Navigator.')
|
||||
print('3. Generate a layer for visibility overlayed with detection coverage for the ATT&CK Navigator.')
|
||||
print('4. Generate an Excel sheet with all administrated techniques.')
|
||||
print('9. Back to main menu.')
|
||||
choice = ask_input()
|
||||
if choice == '1':
|
||||
print('Writing visibility coverage layer...')
|
||||
generate_visibility_layer(filename_t, filename_ds, False)
|
||||
wait()
|
||||
print('Specify your filter for the applicable_to field:')
|
||||
filter_applicable_to = ask_input().lower()
|
||||
elif choice == '2':
|
||||
print('Writing visibility coverage layers overlayed with detections...')
|
||||
generate_visibility_layer(filename_t, filename_ds, True)
|
||||
print('Writing visibility coverage layer...')
|
||||
generate_visibility_layer(filename_t, filename_ds, False, filter_applicable_to)
|
||||
wait()
|
||||
elif choice == '3':
|
||||
print('Writing visibility coverage layer overlayed with detections...')
|
||||
generate_visibility_layer(filename_t, filename_ds, True, filter_applicable_to)
|
||||
wait()
|
||||
elif choice == '4':
|
||||
print('Generating Excel file...')
|
||||
export_techniques_list_to_excel(filename_t)
|
||||
wait()
|
||||
elif choice == '9':
|
||||
interactive_menu()
|
||||
|
@ -319,7 +349,7 @@ def menu_groups():
|
|||
Prints and handles the Threat actor group mapping functionality.
|
||||
:return:
|
||||
"""
|
||||
global groups, software_group, platform, stage, groups_overlay, overlay_type
|
||||
global groups, software_group, platform, stage, groups_overlay, overlay_type, filter_applicable_to
|
||||
clear()
|
||||
print('Menu: %s' % MENU_NAME_THREAT_ACTOR_GROUP_MAPPING)
|
||||
print('')
|
||||
|
@ -331,8 +361,10 @@ def menu_groups():
|
|||
print('5. Overlay: ')
|
||||
print(' - %s: %s' % ('File' if os.path.exists(groups_overlay) else 'Groups', groups_overlay))
|
||||
print(' - Type: %s' % overlay_type)
|
||||
print('6. Filter techniques in the detection or visibility overlay based on the \'applicable_to\' field in the '
|
||||
'technique administration YAML file: %s' % filter_applicable_to)
|
||||
print('')
|
||||
print('6. Generate a heat map layer.')
|
||||
print('7. Generate a heat map layer.')
|
||||
print('9. Back to main menu.')
|
||||
choice = ask_input()
|
||||
if choice == '1':
|
||||
|
@ -349,7 +381,8 @@ def menu_groups():
|
|||
elif choice == '4':
|
||||
print('Specify the groups to include separated using commas. Group can be their ID, name or alias '
|
||||
'(default is all groups). Other option is to provide a YAML file with a custom group(s)')
|
||||
groups = ask_input()
|
||||
g = ask_input()
|
||||
groups = g if g is not '' else 'all'
|
||||
elif choice == '5':
|
||||
print('')
|
||||
print('1. Overlay with groups.')
|
||||
|
@ -361,19 +394,22 @@ def menu_groups():
|
|||
print('Specify the group(s) to overlay (in a different color) on the one specified in the Groups option. '
|
||||
'A group can be their ID, name or alias separated using commas. Other option is to provide a YAML '
|
||||
'file with a custom group(s).')
|
||||
overlay_type = 'group'
|
||||
overlay_type = OVERLAY_TYPE_GROUP
|
||||
groups_overlay = ask_input()
|
||||
elif choice == '2':
|
||||
overlay_type = 'detection'
|
||||
overlay_type = OVERLAY_TYPE_DETECTION
|
||||
groups_overlay = select_file(MENU_NAME_THREAT_ACTOR_GROUP_MAPPING, 'techniques', FILE_TYPE_TECHNIQUE_ADMINISTRATION, False)
|
||||
elif choice == '3':
|
||||
overlay_type = 'visibility'
|
||||
overlay_type = OVERLAY_TYPE_VISIBILITY
|
||||
groups_overlay = select_file(MENU_NAME_THREAT_ACTOR_GROUP_MAPPING, 'techniques', FILE_TYPE_TECHNIQUE_ADMINISTRATION, False)
|
||||
elif choice == '4':
|
||||
overlay_type = ''
|
||||
groups_overlay = ''
|
||||
elif choice == '6':
|
||||
generate_group_heat_map(groups, groups_overlay, overlay_type, stage, platform, software_group)
|
||||
print('Specify your filter for the applicable_to field:')
|
||||
filter_applicable_to = ask_input().lower()
|
||||
elif choice == '7':
|
||||
generate_group_heat_map(groups, groups_overlay, overlay_type, stage, platform, software_group, filter_applicable_to)
|
||||
wait()
|
||||
elif choice == '9':
|
||||
interactive_menu()
|
||||
|
|
|
@ -201,7 +201,7 @@ data_sources:
|
|||
retention: 0
|
||||
- data_source_name: SSL/TLS inspection
|
||||
date_registered: 2019-01-10
|
||||
date_connected: 2000-01-01
|
||||
date_connected: 2000-01-01
|
||||
products: [Proxy Product]
|
||||
available_for_data_analytics: True
|
||||
comment: ''
|
||||
|
@ -213,7 +213,7 @@ data_sources:
|
|||
retention: 5
|
||||
- data_source_name: Anti-virus
|
||||
date_registered: 2019-01-10
|
||||
date_connected: 2000-01-01
|
||||
date_connected: 2000-01-01
|
||||
products: [AV Product]
|
||||
available_for_data_analytics: True
|
||||
comment: ''
|
||||
|
@ -225,7 +225,7 @@ data_sources:
|
|||
retention: 5
|
||||
- data_source_name: Network intrusion detection system
|
||||
date_registered: 2019-01-10
|
||||
date_connected: 2016-01-01
|
||||
date_connected: 2016-01-01
|
||||
products: [NIDS]
|
||||
available_for_data_analytics: True
|
||||
comment: ''
|
||||
|
@ -261,7 +261,7 @@ data_sources:
|
|||
retention: 0
|
||||
- data_source_name: Email gateway
|
||||
date_registered: 2019-01-10
|
||||
date_connected: 2000-01-01
|
||||
date_connected: 2000-01-01
|
||||
products: [Email Gateway Product]
|
||||
available_for_data_analytics: True
|
||||
comment: ''
|
||||
|
@ -285,7 +285,7 @@ data_sources:
|
|||
retention: 0
|
||||
- data_source_name: Web proxy
|
||||
date_registered: 2019-01-10
|
||||
date_connected: 2000-01-01
|
||||
date_connected: 2000-01-01
|
||||
products: [Proxy Product]
|
||||
available_for_data_analytics: True
|
||||
comment: ''
|
||||
|
@ -525,7 +525,7 @@ data_sources:
|
|||
retention: 0
|
||||
- data_source_name: Disk forensics
|
||||
date_registered: 2019-01-10
|
||||
date_connected: 2019-01-01
|
||||
date_connected: 2019-01-01
|
||||
products: [Manual, Commercial tool]
|
||||
available_for_data_analytics: True
|
||||
comment: ''
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,95 +1,139 @@
|
|||
import simplejson
|
||||
from generic import *
|
||||
import xlsxwriter
|
||||
# Imports for pandas and plotly are because of performance reasons in the function that uses these libraries.
|
||||
|
||||
|
||||
def generate_detection_layer(filename_techniques, filename_data_sources, overlay):
|
||||
def generate_detection_layer(filename_techniques, filename_data_sources, overlay, filter_applicable_to):
|
||||
"""
|
||||
Generates layer for detection coverage and optionally an overlayed version with visibility coverage.
|
||||
:param filename_techniques: the filename of the yaml file containing the techniques administration
|
||||
:param filename_data_sources: the filename of the yaml file containing the data sources administration
|
||||
:param overlay: boolean value to specify if an overlay between detection and visibility should be generated
|
||||
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
|
||||
:return:
|
||||
"""
|
||||
my_techniques, name, platform = _load_detections(filename_techniques)
|
||||
|
||||
if not overlay:
|
||||
my_techniques, name, platform = _load_techniques(filename_techniques, 'detection', filter_applicable_to)
|
||||
mapped_techniques_detection = _map_and_colorize_techniques_for_detections(my_techniques)
|
||||
layer_detection = get_layer_template_detections('Detections ' + name, 'description', 'attack', platform)
|
||||
_write_layer(layer_detection, mapped_techniques_detection, 'detection', name)
|
||||
layer_detection = get_layer_template_detections('Detections ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
|
||||
_write_layer(layer_detection, mapped_techniques_detection, 'detection', filter_applicable_to, name)
|
||||
else:
|
||||
my_techniques, name, platform = _load_techniques(filename_techniques, 'all', filter_applicable_to)
|
||||
my_data_sources = _load_data_sources(filename_data_sources)
|
||||
mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources)
|
||||
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
|
||||
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name)
|
||||
mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources, filter_applicable_to)
|
||||
layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
|
||||
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', filter_applicable_to, name)
|
||||
|
||||
|
||||
def generate_visibility_layer(filename_techniques, filename_data_sources, overlay):
|
||||
def generate_visibility_layer(filename_techniques, filename_data_sources, overlay, filter_applicable_to):
|
||||
"""
|
||||
Generates layer for visibility coverage and optionally an overlayed version with detection coverage.
|
||||
:param filename_techniques: the filename of the yaml file containing the techniques administration
|
||||
:param filename_data_sources: the filename of the yaml file containing the data sources administration
|
||||
:param overlay: boolean value to specify if an overlay between detection and visibility should be generated
|
||||
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
|
||||
:return:
|
||||
"""
|
||||
my_techniques, name, platform = _load_detections(filename_techniques)
|
||||
my_data_sources = _load_data_sources(filename_data_sources)
|
||||
|
||||
if not overlay:
|
||||
my_techniques, name, platform = _load_techniques(filename_techniques, 'visibility', filter_applicable_to)
|
||||
mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources)
|
||||
layer_visibility = get_layer_template_visibility('Visibility ' + name, 'description', 'attack', platform)
|
||||
_write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', name)
|
||||
layer_visibility = get_layer_template_visibility('Visibility ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
|
||||
_write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', filter_applicable_to, name)
|
||||
else:
|
||||
mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources)
|
||||
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
|
||||
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name)
|
||||
my_techniques, name, platform = _load_techniques(filename_techniques, 'all', filter_applicable_to)
|
||||
mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources, filter_applicable_to)
|
||||
layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
|
||||
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', filter_applicable_to, name)
|
||||
|
||||
|
||||
def plot_detection_graph(filename):
|
||||
def plot_detection_graph(filename, filter_applicable_to):
|
||||
"""
|
||||
Generates a line graph which shows the improvements on detections through the time.
|
||||
:param filename: the filename of the yaml file containing the techniques administration
|
||||
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
|
||||
:return:
|
||||
"""
|
||||
my_techniques, name, platform = _load_detections(filename)
|
||||
my_techniques, name, platform = _load_techniques(filename, 'detection', filter_applicable_to)
|
||||
|
||||
graph_values = []
|
||||
for t in my_techniques.values():
|
||||
if 'detection' in t.keys() and t['detection']['date_implemented']:
|
||||
yyyymm = t['detection']['date_implemented'].strftime('%Y-%m')
|
||||
graph_values.append({'date': yyyymm, 'count': 1})
|
||||
for detection in t['detection']:
|
||||
if detection['date_implemented']:
|
||||
yyyymm = detection['date_implemented'].strftime('%Y-%m')
|
||||
graph_values.append({'date': yyyymm, 'count': 1})
|
||||
|
||||
import pandas as pd
|
||||
df = pd.DataFrame(graph_values).groupby('date', as_index=False)[['count']].sum()
|
||||
df['cumcount'] = df.ix[::1, 'count'].cumsum()[::1]
|
||||
|
||||
output_filename = 'output/graph_detection.html'
|
||||
output_filename = 'output/graph_detection_%s.html' % filter_applicable_to
|
||||
import plotly
|
||||
import plotly.graph_objs as go
|
||||
plotly.offline.plot(
|
||||
{'data': [go.Scatter(x=df['date'], y=df['cumcount'])],
|
||||
'layout': go.Layout(title="# of detections for " + name)},
|
||||
'layout': go.Layout(title="# of detections for %s %s" % (name, filter_applicable_to))},
|
||||
filename=output_filename, auto_open=False
|
||||
)
|
||||
print("File written: " + output_filename)
|
||||
|
||||
|
||||
def _load_detections(filename):
|
||||
def _load_techniques(filename, detection_or_visibility, filter_applicable_to='all'):
|
||||
"""
|
||||
Loads the techniques (including detection and visibility properties) from the given yaml file.
|
||||
:param filename: the filename of the yaml file containing the techniques administration
|
||||
:param detection_or_visibility: used to indicate to filter applicable_to field for detection or visibility. When
|
||||
using 'all' no filtering will be applied.
|
||||
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
|
||||
:return: dictionary with techniques (incl. properties), name and platform
|
||||
"""
|
||||
|
||||
my_techniques = {}
|
||||
with open(filename, 'r') as yaml_file:
|
||||
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
||||
for d in yaml_content['techniques']:
|
||||
my_techniques[d['technique_id']] = d
|
||||
# Add detection items:
|
||||
if type(d['detection']) == dict: # There is just one detection entry
|
||||
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
|
||||
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection'])
|
||||
elif type(d['detection']) == list: # There are multiple detection entries
|
||||
for de in d['detection']:
|
||||
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
|
||||
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de)
|
||||
|
||||
# Add visibility items
|
||||
if type(d['visibility']) == dict: # There is just one visibility entry
|
||||
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
|
||||
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility'])
|
||||
elif type(d['visibility']) == list: # There are multiple visibility entries
|
||||
for de in d['visibility']:
|
||||
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
|
||||
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de)
|
||||
|
||||
name = yaml_content['name']
|
||||
platform = yaml_content['platform']
|
||||
return my_techniques, name, platform
|
||||
|
||||
|
||||
def _add_entry_to_list_in_dictionary(dict, technique_id, key, entry):
|
||||
"""
|
||||
Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the
|
||||
list. If the dict[technique_id] doesn't exist yet, it will be created.
|
||||
:param dict: the dictionary
|
||||
:param technique_id: the id of the technique in the main dict
|
||||
:param key: the key where the list in the dictionary resides
|
||||
:param entry: the entry to add to the list
|
||||
:return:
|
||||
"""
|
||||
if technique_id not in dict.keys():
|
||||
dict[technique_id] = {}
|
||||
if not key in dict[technique_id].keys():
|
||||
dict[technique_id][key] = []
|
||||
dict[technique_id][key].append(entry)
|
||||
|
||||
|
||||
def _load_data_sources(filename):
|
||||
"""
|
||||
Loads the data sources (including all properties) from the given yaml file.
|
||||
|
@ -106,19 +150,21 @@ def _load_data_sources(filename):
|
|||
return my_data_sources
|
||||
|
||||
|
||||
def _write_layer(layer, mapped_techniques, filename_prefix, name):
|
||||
def _write_layer(layer, mapped_techniques, filename_prefix, filename_suffix, name):
|
||||
"""
|
||||
Writes the json layer file to disk.
|
||||
:param layer: the prepped layer dictionary
|
||||
:param mapped_techniques: the techniques section that will be included in the layer
|
||||
:param filename_prefix: the prefix for the output filename
|
||||
:param filename_suffix: the suffix for the output filename
|
||||
:param name: the name that will be used in the filename together with the prefix
|
||||
:return:
|
||||
"""
|
||||
|
||||
layer['techniques'] = mapped_techniques
|
||||
json_string = simplejson.dumps(layer).replace('}, ', '},\n')
|
||||
output_filename = 'output/%s_%s.json' % (filename_prefix, normalize_name_to_filename(name))
|
||||
filename_suffix = '_' + filename_suffix if filename_suffix != '' else ''
|
||||
output_filename = normalize_name_to_filename('output/%s_%s%s.json' % (filename_prefix, name, filename_suffix))
|
||||
with open(output_filename, 'w') as f:
|
||||
f.write(json_string)
|
||||
print("File written: " + output_filename)
|
||||
|
@ -136,26 +182,41 @@ def _map_and_colorize_techniques_for_detections(my_techniques):
|
|||
# techniques to be used in the layer output file.
|
||||
mapped_techniques = []
|
||||
try:
|
||||
for d, c in my_techniques.items():
|
||||
s = -1 if 'detection' not in c.keys() else c['detection']['score']
|
||||
color = COLOR_D_0 if s == 0 else COLOR_D_1 if s == 1 else COLOR_D_2 if s == 2 else COLOR_D_3 \
|
||||
if s == 3 else COLOR_D_4 if s == 4 else COLOR_D_5 if s == 5 else ''
|
||||
technique = get_technique(techniques, d)
|
||||
for tactic in technique['tactic']:
|
||||
location = ', '.join(c['detection']['location']) if 'detection' in c.keys() else '-'
|
||||
location = location if location != '' else '-'
|
||||
x = {}
|
||||
x['techniqueID'] = d
|
||||
x['color'] = color
|
||||
x['comment'] = ''
|
||||
x['enabled'] = True
|
||||
x['tactic'] = tactic.lower().replace(' ', '-')
|
||||
x['metadata'] = [{'name': '-Detection score', 'value': str(s)},
|
||||
{'name': '-Detection location', 'value': location}]
|
||||
for technique_id, technique_data in my_techniques.items():
|
||||
s = calculate_score(technique_data['detection'], zero_value=-1)
|
||||
|
||||
mapped_techniques.append(x)
|
||||
except Exception:
|
||||
print('[!] Possible error in YAML file at: ' + d)
|
||||
if s != -1:
|
||||
color = COLOR_D_0 if s == 0 else COLOR_D_1 if s == 1 else COLOR_D_2 if s == 2 else COLOR_D_3 \
|
||||
if s == 3 else COLOR_D_4 if s == 4 else COLOR_D_5 if s == 5 else ''
|
||||
technique = get_technique(techniques, technique_id)
|
||||
|
||||
for tactic in technique['tactic']:
|
||||
x = {}
|
||||
x['techniqueID'] = technique_id
|
||||
x['color'] = color
|
||||
x['comment'] = ''
|
||||
x['enabled'] = True
|
||||
x['tactic'] = tactic.lower().replace(' ', '-')
|
||||
x['metadata'] = []
|
||||
x['score'] = s
|
||||
cnt = 1
|
||||
tcnt = len([d for d in technique_data['detection'] if d['score'] >= 0])
|
||||
for detection in technique_data['detection']:
|
||||
if detection['score'] >= 0:
|
||||
location = ', '.join(detection['location'])
|
||||
location = location if location != '' else '-'
|
||||
applicable_to = ', '.join(detection['applicable_to'])
|
||||
comment = str(detection['comment']) if str(detection['comment']) != '' else '-'
|
||||
x['metadata'].append({'name': '-Applicable to', 'value': applicable_to})
|
||||
x['metadata'].append({'name': '-Detection score', 'value': str(detection['score'])})
|
||||
x['metadata'].append({'name': '-Detection location', 'value': location})
|
||||
x['metadata'].append({'name': '-Comment', 'value': comment})
|
||||
if cnt != tcnt:
|
||||
x['metadata'].append({'name': '---', 'value': '---'})
|
||||
cnt += 1
|
||||
mapped_techniques.append(x)
|
||||
except Exception as e:
|
||||
print('[!] Possible error in YAML file at: %s. Error: %s' % (technique_id, str(e)))
|
||||
quit()
|
||||
|
||||
return mapped_techniques
|
||||
|
@ -175,26 +236,37 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
|
|||
# Color the techniques based on how the coverage defined in the detections definition and generate a list with
|
||||
# techniques to be used in the layer output file.
|
||||
mapped_techniques = []
|
||||
for d, c in my_techniques.items():
|
||||
s = 0 if 'visibility' not in c.keys() else c['visibility']['score']
|
||||
if 'visibility' in c.keys():
|
||||
comment = str(c['visibility']['comment']) if str(c['visibility']['comment']) != '' else '-'
|
||||
else:
|
||||
comment = '-'
|
||||
my_ds = ', '.join(technique_ds_mapping[d]['my_data_sources']) if d in technique_ds_mapping.keys() and technique_ds_mapping[d]['my_data_sources'] else '-'
|
||||
for technique_id, technique_data in my_techniques.items():
|
||||
s = calculate_score(technique_data['visibility'])
|
||||
|
||||
my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else '-'
|
||||
technique = get_technique(techniques, technique_id)
|
||||
color = COLOR_V_1 if s == 1 else COLOR_V_2 if s == 2 else COLOR_V_3 if s == 3 else COLOR_V_4 if s == 4 else ''
|
||||
technique = get_technique(techniques, d)
|
||||
|
||||
for tactic in technique['tactic']:
|
||||
x = {}
|
||||
x['techniqueID'] = d
|
||||
x['techniqueID'] = technique_id
|
||||
x['color'] = color
|
||||
x['comment'] = ''
|
||||
x['enabled'] = True
|
||||
x['tactic'] = tactic.lower().replace(' ', '-')
|
||||
x['metadata'] = [{'name': '-Visibility score', 'value': str(s)},
|
||||
{'name': '-Comment', 'value': comment},
|
||||
{'name': '-Available data sources', 'value': my_ds},
|
||||
{'name': '-ATT&CK data sources', 'value': ', '.join(technique['data_sources'])}]
|
||||
x['metadata'] = []
|
||||
x['metadata'].append({'name': '-Available data sources', 'value': my_ds})
|
||||
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['data_sources'])})
|
||||
x['metadata'].append({'name': '---', 'value': '---'})
|
||||
x['score'] = s
|
||||
|
||||
cnt = 1
|
||||
tcnt = len(technique_data['visibility'])
|
||||
for visibility in technique_data['visibility']:
|
||||
comment = str(visibility['comment']) if str(visibility['comment']) != '' else '-'
|
||||
applicable_to = ', '.join(visibility['applicable_to'])
|
||||
x['metadata'].append({'name': '-Applicable to', 'value': applicable_to})
|
||||
x['metadata'].append({'name': '-Visibility score', 'value': str(visibility['score'])})
|
||||
x['metadata'].append({'name': '-Comment', 'value': comment})
|
||||
if cnt != tcnt:
|
||||
x['metadata'].append({'name': '---', 'value': '---'})
|
||||
cnt += 1
|
||||
|
||||
mapped_techniques.append(x)
|
||||
|
||||
|
@ -215,11 +287,12 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
|
|||
return mapped_techniques
|
||||
|
||||
|
||||
def _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources):
|
||||
def _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources, filter_applicable_to):
|
||||
"""
|
||||
Determine the color of the techniques based on both detection and visibility.
|
||||
:param my_techniques: the configured techniques
|
||||
:param my_data_sources: the configured data sources
|
||||
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
|
||||
:return: a dictionary with techniques that can be used in the layer's output file
|
||||
"""
|
||||
techniques = load_attack_data(DATATYPE_ALL_TECH)
|
||||
|
@ -229,13 +302,22 @@ def _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources):
|
|||
# Color the techniques based on how the coverage defined in the detections definition and generate a list with
|
||||
# techniques to be used in the layer output file.
|
||||
mapped_techniques = []
|
||||
for d, c in my_techniques.items():
|
||||
detection_score = 0 if 'detection' not in c.keys() else c['detection']['score']
|
||||
visibility_score = 0 if 'visibility' not in c.keys() else c['visibility']['score']
|
||||
for technique_id, technique_data in my_techniques.items():
|
||||
detection_score = calculate_score(technique_data['detection'], zero_value=-1)
|
||||
visibility_score = calculate_score(technique_data['visibility'])
|
||||
|
||||
detection = True if detection_score > 0 else False
|
||||
visibility = True if visibility_score > 0 else False
|
||||
|
||||
# Additional filtering based on applicable_to field. Overrules the score.
|
||||
a2_d = set([a for d in technique_data['detection'] for a in d['applicable_to']])
|
||||
a2_v = set([a for v in technique_data['detection'] for a in v['applicable_to']])
|
||||
|
||||
if filter_applicable_to != 'all' and filter_applicable_to not in a2_d and 'all' not in a2_d:
|
||||
detection = False
|
||||
if filter_applicable_to != 'all' and filter_applicable_to not in a2_v and 'all' not in a2_v:
|
||||
visibility = False
|
||||
|
||||
if detection and visibility:
|
||||
color = COLOR_OVERLAY_BOTH
|
||||
elif detection and not visibility:
|
||||
|
@ -243,31 +325,173 @@ def _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources):
|
|||
elif not detection and visibility:
|
||||
color = COLOR_OVERLAY_VISIBILITY
|
||||
|
||||
location = ', '.join(c['detection']['location']) if 'detection' in c.keys() else '-'
|
||||
location = location if location != '' else '-'
|
||||
my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else '-'
|
||||
|
||||
if 'visibility' in c.keys():
|
||||
comment = str(c['visibility']['comment']) if str(c['visibility']['comment']) != '' else '-'
|
||||
else:
|
||||
comment = '-'
|
||||
|
||||
my_ds = ', '.join(technique_ds_mapping[d]['my_data_sources']) if d in technique_ds_mapping.keys() and technique_ds_mapping[d]['my_data_sources'] else '-'
|
||||
|
||||
technique = get_technique(techniques, d)
|
||||
technique = get_technique(techniques, technique_id)
|
||||
for tactic in technique['tactic']:
|
||||
x = {}
|
||||
x['techniqueID'] = d
|
||||
x['techniqueID'] = technique_id
|
||||
x['color'] = color
|
||||
x['comment'] = ''
|
||||
x['enabled'] = True
|
||||
x['tactic'] = tactic.lower().replace(' ', '-')
|
||||
x['metadata'] = [{'name': '-Visibility score', 'value': str(visibility_score)},
|
||||
{'name': '-Comment', 'value': comment},
|
||||
{'name': '-Available data sources', 'value': my_ds},
|
||||
{'name': '-ATT&CK data sources', 'value': ', '.join(technique['data_sources'])},
|
||||
{'name': '-Detection score', 'value': str(detection_score)},
|
||||
{'name': '-Detection location', 'value': location}]
|
||||
x['metadata'] = []
|
||||
x['metadata'].append({'name': '-Available data sources', 'value': my_ds})
|
||||
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['data_sources'])})
|
||||
x['metadata'].append({'name': '---', 'value': '---'})
|
||||
|
||||
# Metadata for detection:
|
||||
cnt = 1
|
||||
tcnt = len([d for d in technique_data['detection'] if d['score'] >= 0 and (filter_applicable_to == 'all' or filter_applicable_to in d['applicable_to'] or 'all' in d['applicable_to'])])
|
||||
for detection in technique_data['detection']:
|
||||
if detection['score'] >= 0 and (filter_applicable_to == 'all' or filter_applicable_to in detection['applicable_to'] or 'all' in detection['applicable_to']):
|
||||
location = ', '.join(detection['location'])
|
||||
location = location if location != '' else '-'
|
||||
applicable_to = ', '.join(detection['applicable_to'])
|
||||
comment = str(detection['comment']) if str(detection['comment']) != '' else '-'
|
||||
x['metadata'].append({'name': '-Applicable to', 'value': applicable_to})
|
||||
x['metadata'].append({'name': '-Detection score', 'value': str(detection['score'])})
|
||||
x['metadata'].append({'name': '-Detection location', 'value': location})
|
||||
x['metadata'].append({'name': '-Comment', 'value': comment})
|
||||
if cnt != tcnt:
|
||||
x['metadata'].append({'name': '---', 'value': '---'})
|
||||
cnt += 1
|
||||
|
||||
# Metadata for visibility:
|
||||
if tcnt > 0:
|
||||
x['metadata'].append({'name': '---', 'value': '---'})
|
||||
cnt = 1
|
||||
tcnt = len([v for v in technique_data['visibility'] if filter_applicable_to == 'all' or filter_applicable_to in v['applicable_to'] or 'all' in v['applicable_to']])
|
||||
for visibility in technique_data['visibility']:
|
||||
if filter_applicable_to == 'all' or filter_applicable_to in visibility['applicable_to'] or 'all' in visibility['applicable_to']:
|
||||
comment = str(visibility['comment']) if str(visibility['comment']) != '' else '-'
|
||||
applicable_to = ', '.join(visibility['applicable_to'])
|
||||
x['metadata'].append({'name': '-Applicable to', 'value': applicable_to})
|
||||
x['metadata'].append({'name': '-Visibility score', 'value': str(visibility['score'])})
|
||||
x['metadata'].append({'name': '-Comment', 'value': comment})
|
||||
if cnt != tcnt:
|
||||
x['metadata'].append({'name': '---', 'value': '---'})
|
||||
cnt += 1
|
||||
|
||||
mapped_techniques.append(x)
|
||||
|
||||
return mapped_techniques
|
||||
|
||||
|
||||
def export_techniques_list_to_excel(filename):
|
||||
"""
|
||||
Makes an overview of the MITRE ATT&CK techniques from the YAML administration file.
|
||||
:param filename: the filename of the yaml file containing the techniques administration
|
||||
:return:
|
||||
"""
|
||||
my_techniques, name, platform = _load_techniques(filename, 'all')
|
||||
my_techniques = dict(sorted(my_techniques.items(), key=lambda kv: kv[0], reverse=False))
|
||||
mitre_techniques = load_attack_data(DATATYPE_ALL_TECH)
|
||||
|
||||
excel_filename = 'output/techniques.xlsx'
|
||||
workbook = xlsxwriter.Workbook(excel_filename)
|
||||
worksheet_detections = workbook.add_worksheet('Detections')
|
||||
worksheet_visibility = workbook.add_worksheet('Visibility')
|
||||
|
||||
# Formatting:
|
||||
format_bold_left = workbook.add_format({'align': 'left', 'bold': True})
|
||||
format_title = workbook.add_format({'align': 'left', 'bold': True, 'font_size': '14'})
|
||||
format_bold_center_bggrey = workbook.add_format({'align': 'center', 'bold': True, 'bg_color': '#dbdbdb'})
|
||||
format_bold_center_bgreen = workbook.add_format({'align': 'center', 'bold': True, 'bg_color': '#8bc34a'})
|
||||
format_bold_center_bgblue = workbook.add_format({'align': 'center', 'bold': True, 'bg_color': '#64b5f6'})
|
||||
wrap_text = workbook.add_format({'text_wrap': True, 'valign': 'top'})
|
||||
valign_top = workbook.add_format({'valign': 'top'})
|
||||
no_score = workbook.add_format({'valign': 'top', 'align': 'center'})
|
||||
detection_score_0 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_D_0})
|
||||
detection_score_1 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_D_1})
|
||||
detection_score_2 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_D_2})
|
||||
detection_score_3 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_D_3})
|
||||
detection_score_4 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_D_4, 'font_color': '#ffffff'})
|
||||
detection_score_5 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_D_5, 'font_color': '#ffffff'})
|
||||
visibility_score_1 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_V_1})
|
||||
visibility_score_2 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_V_2})
|
||||
visibility_score_3 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_V_3, 'font_color': '#ffffff'})
|
||||
visibility_score_4 = workbook.add_format({'valign': 'top', 'align': 'center', 'bg_color': COLOR_V_4, 'font_color': '#ffffff'})
|
||||
|
||||
# Title
|
||||
worksheet_detections.write(0, 0, 'Overview of detections for ' + name, format_title)
|
||||
worksheet_visibility.write(0, 0, 'Overview of visibility for ' + name, format_title)
|
||||
|
||||
# Header columns
|
||||
worksheet_detections.merge_range(2, 0, 2, 2, 'Technique', format_bold_center_bggrey)
|
||||
worksheet_visibility.merge_range(2, 0, 2, 2, 'Technique', format_bold_center_bggrey)
|
||||
worksheet_detections.merge_range(2, 3, 2, 8, 'Detection', format_bold_center_bgreen)
|
||||
worksheet_visibility.merge_range(2, 3, 2, 5, 'Visibility', format_bold_center_bgblue)
|
||||
|
||||
# Writing the detections:
|
||||
y = 3
|
||||
worksheet_detections.write(y, 0, 'ID', format_bold_left)
|
||||
worksheet_detections.write(y, 1, 'Description', format_bold_left)
|
||||
worksheet_detections.write(y, 2, 'Tactic', format_bold_left)
|
||||
worksheet_detections.write(y, 3, 'Applicable to', format_bold_left)
|
||||
worksheet_detections.write(y, 4, 'Date registered', format_bold_left)
|
||||
worksheet_detections.write(y, 5, 'Date implemented', format_bold_left)
|
||||
worksheet_detections.write(y, 6, 'Score', format_bold_left)
|
||||
worksheet_detections.write(y, 7, 'Location', format_bold_left)
|
||||
worksheet_detections.write(y, 8, 'Comment', format_bold_left)
|
||||
worksheet_detections.set_column(0, 0, 14)
|
||||
worksheet_detections.set_column(1, 1, 40)
|
||||
worksheet_detections.set_column(2, 2, 50)
|
||||
worksheet_detections.set_column(3, 3, 18)
|
||||
worksheet_detections.set_column(4, 4, 15)
|
||||
worksheet_detections.set_column(5, 5, 18)
|
||||
worksheet_detections.set_column(6, 6, 8)
|
||||
worksheet_detections.set_column(7, 7, 25)
|
||||
worksheet_detections.set_column(8, 8, 40)
|
||||
y = 4
|
||||
for technique_id, technique_data in my_techniques.items():
|
||||
# Add row for every detection that is defined:
|
||||
for detection in technique_data['detection']:
|
||||
worksheet_detections.write(y, 0, technique_id, valign_top)
|
||||
worksheet_detections.write(y, 1, get_technique(mitre_techniques, technique_id)['technique'], valign_top)
|
||||
worksheet_detections.write(y, 2, ', '.join(t.capitalize() for t in get_technique(mitre_techniques, technique_id)['tactic']), valign_top)
|
||||
worksheet_detections.write(y, 3, ', '.join(detection['applicable_to']), wrap_text)
|
||||
worksheet_detections.write(y, 4, str(detection['date_registered']).replace('None', ''), valign_top)
|
||||
worksheet_detections.write(y, 5, str(detection['date_implemented']).replace('None', ''), valign_top)
|
||||
ds = detection['score']
|
||||
worksheet_detections.write(y, 6, ds, detection_score_0 if ds == 0 else detection_score_1 if ds ==1 else detection_score_2 if ds == 2 else detection_score_3 if ds == 3 else detection_score_4 if ds == 4 else detection_score_5 if ds == 5 else no_score)
|
||||
worksheet_detections.write(y, 7, '\n'.join(detection['location']), wrap_text)
|
||||
worksheet_detections.write(y, 8, detection['comment'][:-1] if detection['comment'].endswith('\n') else detection['comment'], wrap_text)
|
||||
y += 1
|
||||
worksheet_detections.autofilter(3, 0, 3, 8)
|
||||
worksheet_detections.freeze_panes(4, 0)
|
||||
|
||||
# Writing the visibility items:
|
||||
y = 3
|
||||
worksheet_visibility.write(y, 0, 'ID', format_bold_left)
|
||||
worksheet_visibility.write(y, 1, 'Description', format_bold_left)
|
||||
worksheet_visibility.write(y, 2, 'Tactic', format_bold_left)
|
||||
worksheet_visibility.write(y, 3, 'Applicable to', format_bold_left)
|
||||
worksheet_visibility.write(y, 4, 'Score', format_bold_left)
|
||||
worksheet_visibility.write(y, 5, 'Comment', format_bold_left)
|
||||
worksheet_visibility.set_column(0, 0, 14)
|
||||
worksheet_visibility.set_column(1, 1, 40)
|
||||
worksheet_visibility.set_column(2, 2, 50)
|
||||
worksheet_visibility.set_column(3, 9, 18)
|
||||
worksheet_visibility.set_column(4, 10, 8)
|
||||
worksheet_visibility.set_column(5, 11, 40)
|
||||
y = 4
|
||||
for technique_id, technique_data in my_techniques.items():
|
||||
# Add row for every visibility that is defined:
|
||||
for visibility in technique_data['visibility']:
|
||||
worksheet_visibility.write(y, 0, technique_id, valign_top)
|
||||
worksheet_visibility.write(y, 1, get_technique(mitre_techniques, technique_id)['technique'], valign_top)
|
||||
worksheet_visibility.write(y, 2, ', '.join(t.capitalize() for t in get_technique(mitre_techniques, technique_id)['tactic']), valign_top)
|
||||
worksheet_visibility.write(y, 3, ', '.join(visibility['applicable_to']), wrap_text)
|
||||
vs = visibility['score']
|
||||
worksheet_visibility.write(y, 4, vs, visibility_score_1 if vs == 1 else visibility_score_2 if vs == 2 else visibility_score_3 if vs == 3 else visibility_score_4 if vs == 4 else no_score)
|
||||
worksheet_visibility.write(y, 5, visibility['comment'][:-1] if visibility['comment'].endswith('\n') else visibility['comment'], wrap_text)
|
||||
y += 1
|
||||
worksheet_visibility.autofilter(3, 0, 3, 5)
|
||||
worksheet_visibility.freeze_panes(4, 0)
|
||||
|
||||
try:
|
||||
workbook.close()
|
||||
print("File written: " + excel_filename)
|
||||
except Exception as e:
|
||||
print('[!] Error while writing Excel file: %s' % str(e))
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
import re
|
||||
import os
|
||||
import shutil
|
||||
from constants import *
|
||||
|
||||
|
||||
def _create_upgrade_text(file_type, file_version):
|
||||
"""
|
||||
Create text on the upgrades to be performed on the YAML file.
|
||||
:param file_type: YAML file type
|
||||
:param file_version: version of the YAML file
|
||||
:return: upgrade text to be displayed in the console
|
||||
"""
|
||||
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||
text = 'You are using an old version of the YAML file.\n' \
|
||||
'The following upgrades will be performed on the techniques administration file:\n'
|
||||
for version in FILE_TYPE_TECHNIQUE_ADMINISTRATION_UPGRADE_TEXT:
|
||||
if file_version < version:
|
||||
text += FILE_TYPE_TECHNIQUE_ADMINISTRATION_UPGRADE_TEXT[version] + '\n'
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def _ask_to_upgrade(filename):
|
||||
"""
|
||||
Ask the user to upgrade the YAML file or not.
|
||||
:param filename: YAML administration file
|
||||
:return: boolean value indicating if the upgrade can be performed
|
||||
"""
|
||||
yes_no = ''
|
||||
while not re.match('^(y|yes|n|no)$', yes_no, re.IGNORECASE):
|
||||
yes_no = input('Do you want to upgrade the below file. A backup will be created of the current file.\n'
|
||||
'[!] Not upgrading the file will brake some functionality within DeTT&CT.\n'
|
||||
' - ' + filename + '\n >> y(yes)/n(no): ')
|
||||
|
||||
if re.match('^(y|yes)$', yes_no, re.IGNORECASE):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def upgrade_yaml_file(filename, file_type, file_version, attack_tech_data):
|
||||
"""
|
||||
Main function to upgrade the YAML file to a new version
|
||||
:param filename: YAML administration file
|
||||
:param file_type: YAML file type
|
||||
:param file_version: version of the YAML file
|
||||
:param attack_tech_data: ATT&CK data on techniques
|
||||
:return:
|
||||
"""
|
||||
|
||||
is_upgraded = False
|
||||
tech_upgrade_func = {}
|
||||
tech_upgrade_func[1.1] = _upgrade_technique_yaml_10_to_11
|
||||
|
||||
with open(filename, 'r') as file:
|
||||
file_new_lines = file.readlines()
|
||||
|
||||
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||
if file_version != FILE_TYPE_TECHNIQUE_ADMINISTRATION_VERSION:
|
||||
upgrade_text = _create_upgrade_text(file_type, file_version)
|
||||
print(upgrade_text)
|
||||
if _ask_to_upgrade(filename):
|
||||
is_upgraded = True
|
||||
# create backup of the non-upgraded file
|
||||
backup_filename = _get_backup_filename(filename)
|
||||
shutil.copy2(filename, backup_filename)
|
||||
print('Written backup file: ' + backup_filename)
|
||||
|
||||
for tech_f in tech_upgrade_func.keys():
|
||||
if file_version < tech_f:
|
||||
file_new_lines = tech_upgrade_func[tech_f](file_new_lines, attack_tech_data)
|
||||
else:
|
||||
print('Upgrade cancelled\n')
|
||||
print('-' * 80)
|
||||
return
|
||||
|
||||
if is_upgraded:
|
||||
# write the upgraded file to disk
|
||||
with open(filename, 'w') as f:
|
||||
f.writelines(file_new_lines)
|
||||
print('Written upgraded file: ' + filename)
|
||||
|
||||
print('\nUpgrade complete')
|
||||
print('-'*80)
|
||||
|
||||
|
||||
def _get_technique(techniques, technique_id):
|
||||
"""
|
||||
Generic function to lookup a specific technique_id in a list of dictionaries with techniques.
|
||||
:param techniques: list with all techniques
|
||||
:param technique_id: technique_id to look for
|
||||
:return: the technique you're searching for. None if not found.
|
||||
"""
|
||||
for t in techniques:
|
||||
if technique_id == t['technique_id']:
|
||||
return t
|
||||
return None
|
||||
|
||||
|
||||
def _get_backup_filename(filename):
|
||||
"""
|
||||
Create a filename to be used for backup of the YAML file
|
||||
:param filename: existing YAML filename
|
||||
:return: a name for the backup file
|
||||
"""
|
||||
suffix = 1
|
||||
backup_filename = filename.replace('.yaml', '_backup_' + str(suffix) + '.yaml')
|
||||
while os.path.exists(backup_filename):
|
||||
backup_filename = backup_filename.replace('_backup_' + str(suffix) + '.yaml', '_backup_' + str(suffix+1) + '.yaml')
|
||||
suffix += 1
|
||||
|
||||
return backup_filename
|
||||
|
||||
|
||||
def _upgrade_technique_yaml_10_to_11(file_lines, attack_tech_data):
|
||||
"""
|
||||
Upgrade the YAML technique administration file from 1.0 to 1.1.
|
||||
:param file_lines: array containing the lines within the tech. admin. file
|
||||
:param attack_tech_data: ATT&CK data on techniques
|
||||
:return: array with new lines to be written to disk
|
||||
"""
|
||||
regex_version = re.compile(r'^\s*version:\s+1\.0\s*$', re.IGNORECASE)
|
||||
regex_tech = re.compile(r'^-\s+technique_id:\s+T[0-9]{4}\s*$', re.IGNORECASE)
|
||||
regex_tech_id = re.compile(r'^-\s+technique_id:\s+(T[0-9]{4})\s*$', re.IGNORECASE)
|
||||
regex_detection = re.compile(r'^\s+detection:\s*$', re.IGNORECASE)
|
||||
regex_visibility = re.compile(r'^\s+visibility:\s*$', re.IGNORECASE)
|
||||
|
||||
file_new_lines = []
|
||||
x = 0
|
||||
for l in file_lines:
|
||||
if regex_version.match(l):
|
||||
file_new_lines.append(l.replace('1.0', '1.1'))
|
||||
elif regex_tech.match(l):
|
||||
file_new_lines.append(l)
|
||||
|
||||
tech_id = regex_tech_id.search(l).group(1)
|
||||
tech_name = _get_technique(attack_tech_data, tech_id)['technique']
|
||||
file_new_lines.append(' technique_name: ' + tech_name+'\n')
|
||||
elif regex_detection.match(l):
|
||||
file_new_lines.append(l)
|
||||
file_new_lines.append(" applicable_to: ['all']\n")
|
||||
elif regex_visibility.match(l):
|
||||
file_new_lines.append(l)
|
||||
file_new_lines.append(" applicable_to: ['all']\n")
|
||||
else:
|
||||
file_new_lines.append(l)
|
||||
x += 1
|
||||
|
||||
return file_new_lines
|
Loading…
Reference in New Issue