Merge branch 'development' of https://github.com/rabobank-cdc/DeTTACT into development

master
Ruben Bouman 2019-05-15 10:05:29 +02:00
commit e745b3bfe4
5 changed files with 157 additions and 86 deletions

View File

@ -1,13 +1,14 @@
<img src="https://github.com/rabobank-cdc/DeTTACT/wiki/images/logo.png" alt="DeTT&CT" width=30% height=30%>
#### Detect Tactics, Techniques & Combat Threats
Latest version: [1.1](https://github.com/rabobank-cdc/DeTTACT/wiki/Changelog#version-11)
To get started with DeTT&CT, check out the
[Wiki](https://github.com/rabobank-cdc/DeTTACT/wiki/Getting-started).
DeTT&CT will help blue teams in scoring and comparing data source quality, visibility coverage, detection coverage and threat actor behaviours. The DeTT&CT framework consists of a Python tool, YAML administration files and [scoring tables](https://github.com/rabobank-cdc/DeTTACT/raw/master/scoring_table.xlsx) for the different aspects.
DeTT&CT aims to assist blue teams using ATT&CK to score and compare data log source quality, visibility coverage, detection coverage and threat actor behaviours. All of which can help, in different ways, to get more resilient against attacks targeting your organisation. The DeTT&CT framework consists of a Python tool, YAML administration files and [scoring tables](https://github.com/rabobank-cdc/DeTTACT/raw/master/scoring_table.xlsx) for the different aspects.
DeTT&CT will help you to:
DeTT&CT provides the following functionality:
- Administrate and score the quality of your data sources.
- Get insight on the visibility you have on for example endpoints.
@ -36,33 +37,13 @@ of which can be visualised by loading JSON layer files into the [ATT&CK Navigato
See below an example of mapping your data sources to ATT&CK which gives you a rough overview of your visibility coverage:
<img src="https://github.com/rabobank-cdc/DeTTACT/wiki/images/example_data_sources.png" alt="DeTT&CT"><br>
<img src="https://github.com/rabobank-cdc/DeTTACT/wiki/images/example_data_sources.png" alt="DeTT&CT - Data quality">
## Installation and requirements
See our GitHub Wiki: [Installation and requirements](https://github.com/rabobank-cdc/DeTTACT/wiki/Installation-and-requirements).
## Future developments
- Add more graphs:
- [ ] Detections: improvement based on newly added detections and improvements on the level/score of existing detections. Possibly with a changelog.
- [ ] Visibility: improvement in the quality of an existing data source.
- Groups:
- [ ] Have a group YAML file type that contains a count on how popular a certain technique is. This can be very useful to map things such as Red Canary's [Threat Detection Report 2019](https://redcanary.com/resources/guides/threat-detection-report/).
- Excel output for:
- [ ] Techniques administration YAML file: visibility coverage.
- [ ] Techniques administration YAML file: detection coverage.
- Data quality Excel sheet:
- [ ] Add colours to the data quality scores in the Excel sheet.
- YAML files:
- [ ] Create an option within the tool to migrate an old administration YAML file version to a new version (such as adding specific key-value pairs).
- MITRE ATT&CK updates
- [ ] Have a smart way of knowing what to update in your data source and technique administration files once MITRE publishes updates.
- [ ] Data sources: check for missing data sources in data sources administration files.
- Minimal visibility
- [ ] Integrate information into the framework on what a minimal set of visibility for a technique should be, before you can say to have useful visibility (e.g. technique X requires at least to have visibility on process monitoring, process command line monitoring and DLL monitoring).
## License: GPL-3.0
[DeTT&CT's GNU General Public License v3.0](https://github.com/rabobank-cdc/DeTTACT/blob/master/LICENSE)

View File

@ -58,6 +58,7 @@ def init_menu():
action='store_true')
parser_visibility.add_argument('-o', '--overlay', help='generate a visibility layer overlayed with detections for '
'the ATT&CK navigator', action='store_true')
parser_visibility.add_argument('--health', help='check the technique YAML file for errors', action='store_true')
# create the detection parser
parser_detection = subparsers.add_parser('detection', aliases=['d'],
@ -81,6 +82,7 @@ def init_menu():
'the ATT&CK navigator', action='store_true')
parser_detection.add_argument('-g', '--graph', help='generate a graph with detections added through time',
action='store_true')
parser_detection.add_argument('--health', help='check the technique YAML file for errors', action='store_true')
# create the group parser
parser_group = subparsers.add_parser('group', aliases=['g'],
@ -171,6 +173,8 @@ def menu(menu_parser):
export_techniques_list_to_excel(args.file_tech)
if args.excel and args.applicable != 'all':
print("[!] Filtering on 'applicable_to' is not supported for Excel output")
if args.health:
check_yaml_file_health(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION)
elif args.subparser in ['group', 'g']:
generate_group_heat_map(args.groups, args.overlay, args.overlay_type, args.stage, args.platform, args.software_group, args.applicable)
@ -194,6 +198,8 @@ def menu(menu_parser):
export_techniques_list_to_excel(args.file_tech)
if args.excel and args.applicable != 'all':
print("[!] Filtering on 'applicable_to' is not supported for Excel output")
if args.health:
check_yaml_file_health(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION)
elif args.subparser in ['generic', 'ge']:
if args.statistics:

View File

@ -337,3 +337,142 @@ def calculate_score(l, zero_value=0):
number += 1
s = int(round(s / number, 0) if number > 0 else zero_value)
return s
def _add_entry_to_list_in_dictionary(dict, technique_id, key, entry):
"""
Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the
list. If the dict[technique_id] doesn't exist yet, it will be created.
:param dict: the dictionary
:param technique_id: the id of the technique in the main dict
:param key: the key where the list in the dictionary resides
:param entry: the entry to add to the list
:return:
"""
if technique_id not in dict.keys():
dict[technique_id] = {}
if not key in dict[technique_id].keys():
dict[technique_id][key] = []
dict[technique_id][key].append(entry)
def load_techniques(filename, detection_or_visibility='all', filter_applicable_to='all'):
"""
Loads the techniques (including detection and visibility properties) from the given yaml file.
:param filename: the filename of the yaml file containing the techniques administration
:param detection_or_visibility: used to indicate to filter applicable_to field for detection or visibility. When
using 'all' no filtering will be applied.
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return: dictionary with techniques (incl. properties), name and platform
"""
my_techniques = {}
with open(filename, 'r') as yaml_file:
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
for d in yaml_content['techniques']:
# Add detection items:
if type(d['detection']) == dict: # There is just one detection entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection'])
elif type(d['detection']) == list: # There are multiple detection entries
for de in d['detection']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de)
# Add visibility items
if type(d['visibility']) == dict: # There is just one visibility entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility'])
elif type(d['visibility']) == list: # There are multiple visibility entries
for de in d['visibility']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de)
name = yaml_content['name']
platform = yaml_content['platform']
return my_techniques, name, platform
def check_yaml_file_health(filename, file_type):
"""
Check on error in the provided YAML file.
:param filename: YAML file location
:param file_type: currenlty only 'FILE_TYPE_TECHNIQUE_ADMINISTRATION' is being supported
:return:
"""
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
# check for duplicate tech IDs
with open(filename, 'r') as yaml_file:
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
tech_ids = list(map(lambda x: x['technique_id'], yaml_content['techniques']))
tech_dup = []
for tech in tech_ids:
if tech not in tech_dup:
tech_dup.append(tech)
else:
print('[!] Duplicate technique ID: ' + tech)
# checks on:
# - empty key-value pairs: 'date_implemented', 'date_registered', 'location', 'applicable_to', 'score'
# - invalid date format for: 'date_implemented', 'date_registered'
# - detection or visibility score out-of-range
# - missing key-value pairs: 'applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment'
techniques = load_techniques(filename)
for tech, v in techniques[0].items():
for key in ['detection', 'visibility']:
if key not in v:
print('[!] Technique ID: ' + tech + ' is MISSING ' + key)
for detection in v['detection']:
for key in ['applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment']:
if key not in detection:
print('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in detection: ' + key)
try:
if detection['score'] is None:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: score')
elif not (detection['score'] >= -1 and detection['score'] <= 5):
print('[!] Technique ID: ' + tech + ' has an INVALID detection score: ' + str(detection['score']) +
' (should be between -1 and 5)')
elif detection['score'] > -1:
for key in ['date_implemented', 'date_registered']:
if not detection[key]:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key)
break
try:
detection[key].year
detection[key].month
detection[key].day
except AttributeError:
print('[!] Technique ID: ' + tech + ' has an INVALID data format for the key-value pair '
'in detection: ' + key + ' (should be YYYY-MM-DD)')
for key in ['location', 'applicable_to']:
try:
if detection[key][0] is None:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key)
except TypeError:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key)
except KeyError:
pass
for visibility in v['visibility']:
for key in ['applicable_to', 'score', 'comment']:
if key not in visibility:
print('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in visibility: ' + key)
try:
if visibility['score'] is None:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in visibility: score')
elif not (visibility['score'] >= 0 and visibility['score'] <= 4):
print('[!] Technique ID: ' + tech + ' has an INVALID visibility score: ' + str(detection['score']) +
' (should be between 0 and 4)')
except KeyError:
pass

View File

@ -1,6 +1,5 @@
import simplejson
from generic import *
from technique_mapping import _load_techniques
CG_GROUPS = {}
@ -211,7 +210,7 @@ def get_detection_techniques(filename, filter_applicable_to):
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
groups_dict = {}
detection_techniques, name, platform = _load_techniques(filename, 'detection', filter_applicable_to)
detection_techniques, name, platform = load_techniques(filename, 'detection', filter_applicable_to)
group_id = 'DETECTION'
groups_dict[group_id] = {}
@ -235,7 +234,7 @@ def get_visibility_techniques(filename, filter_applicable_to):
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
groups_dict = {}
visibility_techniques, name, platform = _load_techniques(filename, 'visibility', filter_applicable_to)
visibility_techniques, name, platform = load_techniques(filename, 'visibility', filter_applicable_to)
group_id = 'VISIBILITY'
groups_dict[group_id] = {}

View File

@ -14,12 +14,12 @@ def generate_detection_layer(filename_techniques, filename_data_sources, overlay
:return:
"""
if not overlay:
my_techniques, name, platform = _load_techniques(filename_techniques, 'detection', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques, 'detection', filter_applicable_to)
mapped_techniques_detection = _map_and_colorize_techniques_for_detections(my_techniques)
layer_detection = get_layer_template_detections('Detections ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_detection, mapped_techniques_detection, 'detection', filter_applicable_to, name)
else:
my_techniques, name, platform = _load_techniques(filename_techniques, 'all', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques, 'all', filter_applicable_to)
my_data_sources = _load_data_sources(filename_data_sources)
mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources, filter_applicable_to)
layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
@ -38,12 +38,12 @@ def generate_visibility_layer(filename_techniques, filename_data_sources, overla
my_data_sources = _load_data_sources(filename_data_sources)
if not overlay:
my_techniques, name, platform = _load_techniques(filename_techniques, 'visibility', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques, 'visibility', filter_applicable_to)
mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources)
layer_visibility = get_layer_template_visibility('Visibility ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', filter_applicable_to, name)
else:
my_techniques, name, platform = _load_techniques(filename_techniques, 'all', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques, 'all', filter_applicable_to)
mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources, filter_applicable_to)
layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', filter_applicable_to, name)
@ -56,7 +56,7 @@ def plot_detection_graph(filename, filter_applicable_to):
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return:
"""
my_techniques, name, platform = _load_techniques(filename, 'detection', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename, 'detection', filter_applicable_to)
graph_values = []
for t in my_techniques.values():
@ -80,60 +80,6 @@ def plot_detection_graph(filename, filter_applicable_to):
print("File written: " + output_filename)
def _load_techniques(filename, detection_or_visibility, filter_applicable_to='all'):
"""
Loads the techniques (including detection and visibility properties) from the given yaml file.
:param filename: the filename of the yaml file containing the techniques administration
:param detection_or_visibility: used to indicate to filter applicable_to field for detection or visibility. When
using 'all' no filtering will be applied.
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return: dictionary with techniques (incl. properties), name and platform
"""
my_techniques = {}
with open(filename, 'r') as yaml_file:
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
for d in yaml_content['techniques']:
# Add detection items:
if type(d['detection']) == dict: # There is just one detection entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection'])
elif type(d['detection']) == list: # There are multiple detection entries
for de in d['detection']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de)
# Add visibility items
if type(d['visibility']) == dict: # There is just one visibility entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility'])
elif type(d['visibility']) == list: # There are multiple visibility entries
for de in d['visibility']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de)
name = yaml_content['name']
platform = yaml_content['platform']
return my_techniques, name, platform
def _add_entry_to_list_in_dictionary(dict, technique_id, key, entry):
"""
Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the
list. If the dict[technique_id] doesn't exist yet, it will be created.
:param dict: the dictionary
:param technique_id: the id of the technique in the main dict
:param key: the key where the list in the dictionary resides
:param entry: the entry to add to the list
:return:
"""
if technique_id not in dict.keys():
dict[technique_id] = {}
if not key in dict[technique_id].keys():
dict[technique_id][key] = []
dict[technique_id][key].append(entry)
def _load_data_sources(filename):
"""
Loads the data sources (including all properties) from the given yaml file.
@ -384,7 +330,7 @@ def export_techniques_list_to_excel(filename):
:param filename: the filename of the yaml file containing the techniques administration
:return:
"""
my_techniques, name, platform = _load_techniques(filename, 'all')
my_techniques, name, platform = load_techniques(filename, 'all')
my_techniques = dict(sorted(my_techniques.items(), key=lambda kv: kv[0], reverse=False))
mitre_techniques = load_attack_data(DATATYPE_ALL_TECH)