added a new option '--health' to check a technique administration YAML file on errors.

master
Marcus Bakker 2019-05-14 12:58:06 +02:00
parent a183280ca9
commit 5fdcb2376d
4 changed files with 153 additions and 63 deletions

View File

@ -58,6 +58,7 @@ def init_menu():
action='store_true')
parser_visibility.add_argument('-o', '--overlay', help='generate a visibility layer overlayed with detections for '
'the ATT&CK navigator', action='store_true')
parser_visibility.add_argument('--health', help='check the technique YAML file for errors', action='store_true')
# create the detection parser
parser_detection = subparsers.add_parser('detection', aliases=['d'],
@ -81,6 +82,7 @@ def init_menu():
'the ATT&CK navigator', action='store_true')
parser_detection.add_argument('-g', '--graph', help='generate a graph with detections added through time',
action='store_true')
parser_detection.add_argument('--health', help='check the technique YAML file for errors', action='store_true')
# create the group parser
parser_group = subparsers.add_parser('group', aliases=['g'],
@ -171,6 +173,8 @@ def menu(menu_parser):
export_techniques_list_to_excel(args.file_tech)
if args.excel and args.applicable != 'all':
print("[!] Filtering on 'applicable_to' is not supported for Excel output")
if args.health:
check_yaml_file_health(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION)
elif args.subparser in ['group', 'g']:
generate_group_heat_map(args.groups, args.overlay, args.overlay_type, args.stage, args.platform, args.software_group, args.applicable)
@ -194,6 +198,8 @@ def menu(menu_parser):
export_techniques_list_to_excel(args.file_tech)
if args.excel and args.applicable != 'all':
print("[!] Filtering on 'applicable_to' is not supported for Excel output")
if args.health:
check_yaml_file_health(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION)
elif args.subparser in ['generic', 'ge']:
if args.statistics:

View File

@ -337,3 +337,142 @@ def calculate_score(l, zero_value=0):
number += 1
s = int(round(s / number, 0) if number > 0 else zero_value)
return s
def _add_entry_to_list_in_dictionary(dict, technique_id, key, entry):
"""
Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the
list. If the dict[technique_id] doesn't exist yet, it will be created.
:param dict: the dictionary
:param technique_id: the id of the technique in the main dict
:param key: the key where the list in the dictionary resides
:param entry: the entry to add to the list
:return:
"""
if technique_id not in dict.keys():
dict[technique_id] = {}
if not key in dict[technique_id].keys():
dict[technique_id][key] = []
dict[technique_id][key].append(entry)
def load_techniques(filename, detection_or_visibility='all', filter_applicable_to='all'):
"""
Loads the techniques (including detection and visibility properties) from the given yaml file.
:param filename: the filename of the yaml file containing the techniques administration
:param detection_or_visibility: used to indicate to filter applicable_to field for detection or visibility. When
using 'all' no filtering will be applied.
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return: dictionary with techniques (incl. properties), name and platform
"""
my_techniques = {}
with open(filename, 'r') as yaml_file:
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
for d in yaml_content['techniques']:
# Add detection items:
if type(d['detection']) == dict: # There is just one detection entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection'])
elif type(d['detection']) == list: # There are multiple detection entries
for de in d['detection']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de)
# Add visibility items
if type(d['visibility']) == dict: # There is just one visibility entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility'])
elif type(d['visibility']) == list: # There are multiple visibility entries
for de in d['visibility']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de)
name = yaml_content['name']
platform = yaml_content['platform']
return my_techniques, name, platform
def check_yaml_file_health(filename, file_type):
"""
Check on error in the provided YAML file.
:param filename: YAML file location
:param file_type: currenlty only 'FILE_TYPE_TECHNIQUE_ADMINISTRATION' is being supported
:return:
"""
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
# check for duplicate tech IDs
with open(filename, 'r') as yaml_file:
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
tech_ids = list(map(lambda x: x['technique_id'], yaml_content['techniques']))
tech_dup = []
for tech in tech_ids:
if tech not in tech_dup:
tech_dup.append(tech)
else:
print('[!] Duplicate technique ID: ' + tech)
# checks on:
# - empty key-value pairs: 'date_implemented', 'date_registered', 'location', 'applicable_to', 'score'
# - invalid date format for: 'date_implemented', 'date_registered'
# - detection or visibility score out-of-range
# - missing key-value pairs: 'applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment'
techniques = load_techniques(filename)
for tech, v in techniques[0].items():
for key in ['detection', 'visibility']:
if key not in v:
print('[!] Technique ID: ' + tech + ' is MISSING ' + key)
for detection in v['detection']:
for key in ['applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment']:
if key not in detection:
print('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in detection: ' + key)
try:
if detection['score'] is None:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: score')
elif not (detection['score'] >= -1 and detection['score'] <= 5):
print('[!] Technique ID: ' + tech + ' has an INVALID detection score: ' + str(detection['score']) +
' (should be between -1 and 5)')
elif detection['score'] > -1:
for key in ['date_implemented', 'date_registered']:
if not detection[key]:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key)
break
try:
detection[key].year
detection[key].month
detection[key].day
except AttributeError:
print('[!] Technique ID: ' + tech + ' has an INVALID data format for the key-value pair '
'in detection: ' + key + ' (should be YYYY-MM-DD)')
for key in ['location', 'applicable_to']:
try:
if detection[key][0] is None:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key)
except TypeError:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key)
except KeyError:
pass
for visibility in v['visibility']:
for key in ['applicable_to', 'score', 'comment']:
if key not in detection:
print('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in visibility: ' + key)
try:
if visibility['score'] is None:
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in visibility: score')
elif not (visibility['score'] >= 0 and visibility['score'] <= 4):
print('[!] Technique ID: ' + tech + ' has an INVALID visibility score: ' + str(detection['score']) +
' (should be between 0 and 4)')
except KeyError:
pass

View File

@ -1,6 +1,5 @@
import simplejson
from generic import *
from technique_mapping import _load_techniques
CG_GROUPS = {}
@ -211,7 +210,7 @@ def get_detection_techniques(filename, filter_applicable_to):
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
groups_dict = {}
detection_techniques, name, platform = _load_techniques(filename, 'detection', filter_applicable_to)
detection_techniques, name, platform = load_techniques(filename, 'detection', filter_applicable_to)
group_id = 'DETECTION'
groups_dict[group_id] = {}
@ -235,7 +234,7 @@ def get_visibility_techniques(filename, filter_applicable_to):
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
groups_dict = {}
visibility_techniques, name, platform = _load_techniques(filename, 'visibility', filter_applicable_to)
visibility_techniques, name, platform = load_techniques(filename, 'visibility', filter_applicable_to)
group_id = 'VISIBILITY'
groups_dict[group_id] = {}

View File

@ -14,12 +14,12 @@ def generate_detection_layer(filename_techniques, filename_data_sources, overlay
:return:
"""
if not overlay:
my_techniques, name, platform = _load_techniques(filename_techniques, 'detection', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques, 'detection', filter_applicable_to)
mapped_techniques_detection = _map_and_colorize_techniques_for_detections(my_techniques)
layer_detection = get_layer_template_detections('Detections ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_detection, mapped_techniques_detection, 'detection', filter_applicable_to, name)
else:
my_techniques, name, platform = _load_techniques(filename_techniques, 'all', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques, 'all', filter_applicable_to)
my_data_sources = _load_data_sources(filename_data_sources)
mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources, filter_applicable_to)
layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
@ -38,12 +38,12 @@ def generate_visibility_layer(filename_techniques, filename_data_sources, overla
my_data_sources = _load_data_sources(filename_data_sources)
if not overlay:
my_techniques, name, platform = _load_techniques(filename_techniques, 'visibility', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques, 'visibility', filter_applicable_to)
mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources)
layer_visibility = get_layer_template_visibility('Visibility ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', filter_applicable_to, name)
else:
my_techniques, name, platform = _load_techniques(filename_techniques, 'all', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename_techniques, 'all', filter_applicable_to)
mapped_techniques_both = _map_and_colorize_techniques_for_overlayed(my_techniques, my_data_sources, filter_applicable_to)
layer_both = get_layer_template_layered('Visibility and Detection ' + name + ' ' + filter_applicable_to, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', filter_applicable_to, name)
@ -56,7 +56,7 @@ def plot_detection_graph(filename, filter_applicable_to):
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return:
"""
my_techniques, name, platform = _load_techniques(filename, 'detection', filter_applicable_to)
my_techniques, name, platform = load_techniques(filename, 'detection', filter_applicable_to)
graph_values = []
for t in my_techniques.values():
@ -80,60 +80,6 @@ def plot_detection_graph(filename, filter_applicable_to):
print("File written: " + output_filename)
def _load_techniques(filename, detection_or_visibility, filter_applicable_to='all'):
"""
Loads the techniques (including detection and visibility properties) from the given yaml file.
:param filename: the filename of the yaml file containing the techniques administration
:param detection_or_visibility: used to indicate to filter applicable_to field for detection or visibility. When
using 'all' no filtering will be applied.
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return: dictionary with techniques (incl. properties), name and platform
"""
my_techniques = {}
with open(filename, 'r') as yaml_file:
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
for d in yaml_content['techniques']:
# Add detection items:
if type(d['detection']) == dict: # There is just one detection entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection'])
elif type(d['detection']) == list: # There are multiple detection entries
for de in d['detection']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de)
# Add visibility items
if type(d['visibility']) == dict: # There is just one visibility entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility'])
elif type(d['visibility']) == list: # There are multiple visibility entries
for de in d['visibility']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de)
name = yaml_content['name']
platform = yaml_content['platform']
return my_techniques, name, platform
def _add_entry_to_list_in_dictionary(dict, technique_id, key, entry):
"""
Ensures a list will be created if it doesn't exist in the given dict[technique_id][key] and adds the entry to the
list. If the dict[technique_id] doesn't exist yet, it will be created.
:param dict: the dictionary
:param technique_id: the id of the technique in the main dict
:param key: the key where the list in the dictionary resides
:param entry: the entry to add to the list
:return:
"""
if technique_id not in dict.keys():
dict[technique_id] = {}
if not key in dict[technique_id].keys():
dict[technique_id][key] = []
dict[technique_id][key].append(entry)
def _load_data_sources(filename):
"""
Loads the data sources (including all properties) from the given yaml file.
@ -384,7 +330,7 @@ def export_techniques_list_to_excel(filename):
:param filename: the filename of the yaml file containing the techniques administration
:return:
"""
my_techniques, name, platform = _load_techniques(filename, 'all')
my_techniques, name, platform = load_techniques(filename, 'all')
my_techniques = dict(sorted(my_techniques.items(), key=lambda kv: kv[0], reverse=False))
mitre_techniques = load_attack_data(DATATYPE_ALL_TECH)