- Replaced PyYAML with ruamel.yaml.

- Multiple functions made "private".
master
Marcus Bakker 2019-07-31 10:19:51 +02:00
parent d0f2a4946b
commit f4121bf4d0
1 changed files with 37 additions and 34 deletions

View File

@ -4,7 +4,7 @@ from generic import *
CG_GROUPS = {}
def is_in_group(json_groups, argument_groups):
def _is_in_group(json_groups, argument_groups):
"""
Check if the two dicts (json_groups and argument_groups) have any groups in common based on their names/aliases.
:param json_groups: group aliases from ATT&CK
@ -20,7 +20,7 @@ def is_in_group(json_groups, argument_groups):
return False
def is_group_found(groups_found, argument_groups):
def _is_group_found(groups_found, argument_groups):
"""
Check if a group that has been provided using '-g/--groups'/'-o/--overlay' is present within MITRE ATT&CK.
:param groups_found: groups that are found in the ATT&CK data
@ -51,7 +51,7 @@ def is_group_found(groups_found, argument_groups):
return True
def get_software_techniques(groups, stage, platform):
def _get_software_techniques(groups, stage, platform):
"""
Get all techniques (in a dict) from the provided list of groups in relation to the software these groups use,
and hence techniques they support.
@ -69,18 +69,20 @@ def get_software_techniques(groups, stage, platform):
software_dict = {}
for tech in tech_by_software:
if tech['software_id'] not in software_dict:
# noinspection PySetFunctionToLiteral
software_dict[tech['software_id']] = set([tech['technique_id']])
else:
software_dict[tech['software_id']].add(tech['technique_id'])
# groups is a YAML file
if os.path.isfile(str(groups)):
_yaml = init_yaml()
with open(groups, 'r') as yaml_file:
config = yaml.load(yaml_file, Loader=yaml.FullLoader)
config = _yaml.load(yaml_file)
for group in config['groups']:
if group['enabled']:
group_id = generate_group_id(group['group_name'], group['campaign'])
group_id = _generate_group_id(group['group_name'], group['campaign'])
groups_dict[group_id] = dict()
groups_dict[group_id]['group_name'] = group['group_name']
@ -104,7 +106,7 @@ def get_software_techniques(groups, stage, platform):
# and the group is a group we are interested in
if s['x_mitre_platforms']: # their is some software that do not have a platform, skip those
if s['matrix'] == 'mitre-'+stage and (platform in s['x_mitre_platforms'] or platform == 'all') and \
(groups[0] == 'all' or s['group_id'].lower() in groups or is_in_group(s['aliases'], groups)):
(groups[0] == 'all' or s['group_id'].lower() in groups or _is_in_group(s['aliases'], groups)):
if s['group_id'] not in groups_dict:
groups_dict[s['group_id']] = {'group_name': s['name']}
groups_dict[s['group_id']]['techniques'] = set()
@ -113,7 +115,7 @@ def get_software_techniques(groups, stage, platform):
return groups_dict
def generate_group_id(group_name, campaign):
def _generate_group_id(group_name, campaign):
# CG_GROUPS = { group_name+campaign: id } }
"""
Generate a custom group id.
@ -144,7 +146,7 @@ def generate_group_id(group_name, campaign):
return CG_GROUPS[group_name + campaign]
def get_group_techniques(groups, stage, platform, file_type):
def _get_group_techniques(groups, stage, platform, file_type):
"""
Get all techniques (in a dict) from the provided list of groups
:param groups: group ID, group name/alias or a YAML file with group(s) data
@ -159,20 +161,21 @@ def get_group_techniques(groups, stage, platform, file_type):
# groups is a YAML file
if file_type == FILE_TYPE_GROUP_ADMINISTRATION:
_yaml = init_yaml()
with open(groups, 'r') as yaml_file:
config = yaml.load(yaml_file, Loader=yaml.FullLoader)
config = _yaml.load(yaml_file)
for group in config['groups']:
if group['enabled']:
campaign = group['campaign'] if group['campaign'] else ''
group_id = generate_group_id(group['group_name'], campaign)
group_id = _generate_group_id(group['group_name'], campaign)
groups_dict[group_id] = dict()
groups_dict[group_id]['group_name'] = group['group_name']
if type(group['technique_id']) == list:
if isinstance(group['technique_id'], list):
groups_dict[group_id]['techniques'] = set(group['technique_id'])
groups_dict[group_id]['weight'] = dict((i, 1) for i in group['technique_id'])
elif type(group['technique_id']) == dict:
elif isinstance(group['technique_id'], dict):
groups_dict[group_id]['techniques'] = set(group['technique_id'].keys())
groups_dict[group_id]['weight'] = group['technique_id']
groups_dict[group_id]['campaign'] = group['campaign']
@ -189,7 +192,7 @@ def get_group_techniques(groups, stage, platform, file_type):
# group matches the: matrix/stage, platform and the group(s) we are interested in
if gr['matrix'] == 'mitre-'+stage and (platform in platforms or platform == 'all') and \
(groups[0] == 'all' or gr['group_id'].lower() in groups or is_in_group(gr['aliases'], groups)):
(groups[0] == 'all' or gr['group_id'].lower() in groups or _is_in_group(gr['aliases'], groups)):
if gr['group_id'] not in groups_dict:
groups_found.add(gr['group_id'])
groups_dict[gr['group_id']] = {'group_name': gr['name']}
@ -199,17 +202,17 @@ def get_group_techniques(groups, stage, platform, file_type):
groups_dict[gr['group_id']]['techniques'].add(gr['technique_id'])
groups_dict[gr['group_id']]['weight'][gr['technique_id']] = 1
# do not call 'is_group_found' when groups is a YAML file
# do not call '_is_group_found' when groups is a YAML file
# (this could contain groups that do not exists within ATT&CK)
if not os.path.isfile(str(groups)):
found = is_group_found(groups_found, groups)
found = _is_group_found(groups_found, groups)
if not found:
return -1
return groups_dict
def get_detection_techniques(filename, filter_applicable_to):
def _get_detection_techniques(filename, filter_applicable_to):
"""
Get all techniques (in a dict) from the detection administration
:param filename: path to the YAML technique administration file
@ -235,9 +238,9 @@ def get_detection_techniques(filename, filter_applicable_to):
return groups_dict, detection_techniques
def get_visibility_techniques(filename, filter_applicable_to):
def _get_visibility_techniques(filename, filter_applicable_to):
"""
Get all techniques (in a dict) from the detections administration
Get all techniques (in a dict) from the technique administration
:param filename: path to the YAML technique administration file
:param filter_applicable_to: filter techniques based on applicable_to field in techniques administration YAML file
:return: dictionary
@ -261,7 +264,7 @@ def get_visibility_techniques(filename, filter_applicable_to):
return groups_dict, visibility_techniques
def get_technique_count(groups, groups_overlay, groups_software, overlay_type, all_techniques):
def _get_technique_count(groups, groups_overlay, groups_software, overlay_type, all_techniques):
"""
Create a dict with all involved techniques and their relevant count/score
:param groups: a dict with data on groups
@ -288,7 +291,7 @@ def get_technique_count(groups, groups_overlay, groups_software, overlay_type, a
techniques_dict[tech]['count'] += v['weight'][tech]
techniques_dict[tech]['groups'].add(group)
max_count = max(techniques_dict.values(), key=lambda v: v['count'])['count']
max_count = max(techniques_dict.values(), key=lambda k: k['count'])['count']
# create dict {tech_id: score+max_tech_count} to be used for when doing an overlay of the type visibility or detection
if overlay_type != OVERLAY_TYPE_GROUP:
@ -339,8 +342,8 @@ def get_technique_count(groups, groups_overlay, groups_software, overlay_type, a
return techniques_dict, max_count
def get_technique_layer(techniques_count, groups, overlay, groups_software, overlay_file_type, overlay_type,
all_techniques):
def _get_technique_layer(techniques_count, groups, overlay, groups_software, overlay_file_type, overlay_type,
all_techniques):
"""
Create the technique layer that will be part of the ATT&CK navigator json file
:param techniques_count: involved techniques with count (to be used within the scores)
@ -430,7 +433,7 @@ def get_technique_layer(techniques_count, groups, overlay, groups_software, over
return techniques_layer
def get_group_list(groups, file_type):
def _get_group_list(groups, file_type):
"""
Make a list of group names for the involved groups.
:param groups: a dict with data on groups
@ -495,15 +498,15 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
all_techniques = None
if overlay_file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
if overlay_type == OVERLAY_TYPE_VISIBILITY:
overlay_dict, all_techniques = get_visibility_techniques(overlay, filter_applicable_to)
overlay_dict, all_techniques = _get_visibility_techniques(overlay, filter_applicable_to)
elif overlay_type == OVERLAY_TYPE_DETECTION:
overlay_dict, all_techniques = get_detection_techniques(overlay, filter_applicable_to)
overlay_dict, all_techniques = _get_detection_techniques(overlay, filter_applicable_to)
elif len(overlay) > 0:
overlay_dict = get_group_techniques(overlay, stage, platform, overlay_file_type)
overlay_dict = _get_group_techniques(overlay, stage, platform, overlay_file_type)
if not overlay_dict:
return
groups_dict = get_group_techniques(groups, stage, platform, groups_file_type)
groups_dict = _get_group_techniques(groups, stage, platform, groups_file_type)
if groups_dict == -1:
return
if len(groups_dict) == 0:
@ -514,20 +517,20 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
if software_groups and overlay: # TODO add support for campaign info in layer metadata
if overlay_type not in [OVERLAY_TYPE_VISIBILITY, OVERLAY_TYPE_DETECTION]:
# if a group overlay is provided, get the software techniques for the overlay
groups_software_dict = get_software_techniques(overlay, stage, platform)
groups_software_dict = _get_software_techniques(overlay, stage, platform)
elif software_groups:
groups_software_dict = get_software_techniques(groups, stage, platform)
groups_software_dict = _get_software_techniques(groups, stage, platform)
technique_count, max_count = get_technique_count(groups_dict, overlay_dict, groups_software_dict, overlay_type, all_techniques)
technique_layer = get_technique_layer(technique_count, groups_dict, overlay_dict, groups_software_dict,
overlay_file_type, overlay_type, all_techniques)
technique_count, max_count = _get_technique_count(groups_dict, overlay_dict, groups_software_dict, overlay_type, all_techniques)
technique_layer = _get_technique_layer(technique_count, groups_dict, overlay_dict, groups_software_dict,
overlay_file_type, overlay_type, all_techniques)
# make a list group names for the involved groups.
if groups == ['all']:
groups_list = ['all']
else:
groups_list = get_group_list(groups_dict, groups_file_type)
overlay_list = get_group_list(overlay_dict, overlay_file_type)
groups_list = _get_group_list(groups_dict, groups_file_type)
overlay_list = _get_group_list(overlay_dict, overlay_file_type)
desc = 'stage: ' + stage + ' | platform: ' + platform + ' | group(s): ' + ', '.join(groups_list) + \
' | overlay group(s): ' + ', '.join(overlay_list)