2019-03-29 14:26:25 +00:00
|
|
|
import simplejson
|
|
|
|
from generic import *
|
2019-08-08 12:46:30 +00:00
|
|
|
from eql_yaml import techniques_search
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
CG_GROUPS = {}
|
|
|
|
|
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
def _is_in_group(json_groups, argument_groups):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
Check if the two dicts (json_groups and argument_groups) have any groups in common based on their names/aliases.
|
|
|
|
:param json_groups: group aliases from ATT&CK
|
|
|
|
:param argument_groups: group names provided via the command line by the user
|
|
|
|
:return: true or false
|
|
|
|
"""
|
|
|
|
json_groups = list(map(lambda x: x.lower(), json_groups))
|
|
|
|
|
|
|
|
for group in argument_groups:
|
|
|
|
if group in json_groups:
|
|
|
|
return True
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
2020-10-15 09:04:07 +00:00
|
|
|
def _are_groups_found(groups_found, argument_groups):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
2020-10-15 09:04:07 +00:00
|
|
|
Check if the groups that are provided using '-g/--groups'/'-o/--overlay' are present within MITRE ATT&CK.
|
|
|
|
:param groups_attack_subset: groups that are found in the ATT&CK data
|
2019-03-29 14:26:25 +00:00
|
|
|
:param argument_groups: groups provided via the command line by the user
|
2020-10-15 09:04:07 +00:00
|
|
|
:return: returns boolean that indicates if all of the groups are found
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
2020-10-15 09:04:07 +00:00
|
|
|
group_attack_data = load_attack_data(DATA_TYPE_STIX_ALL_GROUPS)
|
|
|
|
group_found = True
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
for group_arg in argument_groups:
|
|
|
|
if group_arg == 'all': # this one will be ignored as it does not make any sense for this function
|
|
|
|
return True
|
|
|
|
|
|
|
|
group_id = None
|
2020-10-15 09:04:07 +00:00
|
|
|
for group in group_attack_data: # is the group provided via the command line known in ATT&CK?
|
2019-07-13 12:41:59 +00:00
|
|
|
if 'aliases' in group:
|
|
|
|
group_aliases_lower = list(map(lambda x: x.lower(), group['aliases']))
|
|
|
|
if group_arg in group_aliases_lower or group_arg == get_attack_id(group).lower():
|
|
|
|
group_id = get_attack_id(group)
|
2019-03-29 14:26:25 +00:00
|
|
|
if not group_id: # the group that has been provided through the command line cannot be found in ATT&CK
|
2020-10-15 09:04:07 +00:00
|
|
|
print('[!] Unknown ATT&CK group: ' + group_arg)
|
|
|
|
group_found = False
|
2019-03-29 14:26:25 +00:00
|
|
|
elif group_id not in groups_found: # group not present in filtered data sate (i.e. platform and stage)
|
|
|
|
print('[!] Group not part of the data set: ' + group_arg)
|
2020-10-15 09:04:07 +00:00
|
|
|
group_found = False
|
|
|
|
|
|
|
|
return group_found
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
def _get_software_techniques(groups, stage, platform):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
Get all techniques (in a dict) from the provided list of groups in relation to the software these groups use,
|
|
|
|
and hence techniques they support.
|
|
|
|
:param groups: ATT&CK groups
|
|
|
|
:param stage: attack or pre-attack
|
2020-10-15 09:04:07 +00:00
|
|
|
:param platform: one or multiple values from PLATFORMS constant
|
2019-03-29 14:26:25 +00:00
|
|
|
:return: dictionary with info on groups
|
|
|
|
"""
|
|
|
|
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
|
|
|
|
groups_dict = {}
|
|
|
|
|
2019-07-15 12:55:39 +00:00
|
|
|
tech_by_software = load_attack_data(DATA_TYPE_CUSTOM_TECH_BY_SOFTWARE)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
# { software_id: [technique, ...] }
|
|
|
|
software_dict = {}
|
2019-07-13 12:41:59 +00:00
|
|
|
for tech in tech_by_software:
|
2019-03-29 14:26:25 +00:00
|
|
|
if tech['software_id'] not in software_dict:
|
2019-07-31 08:19:51 +00:00
|
|
|
# noinspection PySetFunctionToLiteral
|
2019-03-29 14:26:25 +00:00
|
|
|
software_dict[tech['software_id']] = set([tech['technique_id']])
|
|
|
|
else:
|
|
|
|
software_dict[tech['software_id']].add(tech['technique_id'])
|
|
|
|
|
|
|
|
# groups is a YAML file
|
|
|
|
if os.path.isfile(str(groups)):
|
2019-07-31 08:19:51 +00:00
|
|
|
_yaml = init_yaml()
|
2019-03-29 14:26:25 +00:00
|
|
|
with open(groups, 'r') as yaml_file:
|
2019-07-31 08:19:51 +00:00
|
|
|
config = _yaml.load(yaml_file)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
for group in config['groups']:
|
|
|
|
if group['enabled']:
|
2019-12-06 12:50:16 +00:00
|
|
|
campaign = group.get('campaign', None)
|
|
|
|
campaign = str(campaign) if campaign else ''
|
|
|
|
group_id = _generate_group_id(str(group['group_name']), campaign)
|
2019-03-29 14:26:25 +00:00
|
|
|
groups_dict[group_id] = dict()
|
|
|
|
|
2019-08-09 08:51:57 +00:00
|
|
|
groups_dict[group_id]['group_name'] = str(group['group_name'])
|
2019-03-29 14:26:25 +00:00
|
|
|
groups_dict[group_id]['techniques'] = set()
|
2019-12-06 12:50:16 +00:00
|
|
|
if campaign != '':
|
|
|
|
groups_dict[group_id]['campaign'] = str(campaign)
|
2020-01-30 15:28:46 +00:00
|
|
|
groups_dict[group_id]['software'] = group.get('software_id', None)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2020-01-30 15:28:46 +00:00
|
|
|
if 'software_id' in group and group['software_id']:
|
2019-03-29 14:26:25 +00:00
|
|
|
for soft_id in group['software_id']:
|
|
|
|
try:
|
|
|
|
groups_dict[group_id]['techniques'].update(software_dict[soft_id])
|
|
|
|
except KeyError:
|
|
|
|
print('[!] unknown ATT&CK software ID: ' + soft_id)
|
|
|
|
|
|
|
|
# groups are provided as arguments via the command line
|
|
|
|
else:
|
2019-07-15 12:55:39 +00:00
|
|
|
software_by_group = load_attack_data(DATA_TYPE_CUSTOM_SOFTWARE_BY_GROUP)
|
2019-07-13 12:41:59 +00:00
|
|
|
|
|
|
|
for s in software_by_group:
|
|
|
|
# software matches the ATT&CK Matrix and platform
|
|
|
|
# and the group is a group we are interested in
|
2019-12-06 12:50:16 +00:00
|
|
|
if s['x_mitre_platforms']: # there is software that do not have a platform, skip those
|
2020-10-15 09:04:07 +00:00
|
|
|
if s['matrix'] == 'mitre-' + stage and len(set(s['x_mitre_platforms']).intersection(set(platform))) > 0 and \
|
2019-07-31 08:19:51 +00:00
|
|
|
(groups[0] == 'all' or s['group_id'].lower() in groups or _is_in_group(s['aliases'], groups)):
|
2019-03-29 14:26:25 +00:00
|
|
|
if s['group_id'] not in groups_dict:
|
2019-07-13 12:41:59 +00:00
|
|
|
groups_dict[s['group_id']] = {'group_name': s['name']}
|
2019-03-29 14:26:25 +00:00
|
|
|
groups_dict[s['group_id']]['techniques'] = set()
|
|
|
|
groups_dict[s['group_id']]['techniques'].update(software_dict[s['software_id']])
|
|
|
|
|
|
|
|
return groups_dict
|
|
|
|
|
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
def _generate_group_id(group_name, campaign):
|
2019-03-29 14:26:25 +00:00
|
|
|
# CG_GROUPS = { group_name+campaign: id } }
|
|
|
|
"""
|
|
|
|
Generate a custom group id.
|
|
|
|
:param group_name: group name as used within the YAML file
|
|
|
|
:param campaign: campaign as used within the YAML file
|
|
|
|
:return: custom group identifier string (e.g. CG0001)
|
|
|
|
"""
|
|
|
|
global CG_GROUPS
|
|
|
|
|
|
|
|
if not CG_GROUPS:
|
|
|
|
new_id = 1
|
|
|
|
elif group_name + campaign not in CG_GROUPS:
|
|
|
|
new_id = len(CG_GROUPS) + 1
|
|
|
|
|
|
|
|
if group_name + campaign not in CG_GROUPS:
|
|
|
|
length = len(str(new_id))
|
|
|
|
if length > 9:
|
|
|
|
cg_id = 'CG00' + str(new_id)
|
|
|
|
elif length > 99:
|
|
|
|
cg_id = 'CG0' + str(new_id)
|
|
|
|
elif length > 999:
|
|
|
|
cg_id = 'CG' + str(new_id)
|
|
|
|
else:
|
|
|
|
cg_id = 'CG000' + str(new_id)
|
|
|
|
|
|
|
|
CG_GROUPS[group_name + campaign] = cg_id
|
|
|
|
|
|
|
|
return CG_GROUPS[group_name + campaign]
|
|
|
|
|
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
def _get_group_techniques(groups, stage, platform, file_type):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
Get all techniques (in a dict) from the provided list of groups
|
|
|
|
:param groups: group ID, group name/alias or a YAML file with group(s) data
|
|
|
|
:param stage: attack or pre-attack
|
2020-10-15 09:04:07 +00:00
|
|
|
:param platform: one or multiple values from PLATFORMS constant
|
2019-03-29 14:26:25 +00:00
|
|
|
:param file_type: the file type of the YAML file as present in the key 'file_type'
|
|
|
|
:return: returns dictionary with all techniques from the provided list of groups or -1 when group is not found
|
|
|
|
"""
|
|
|
|
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
|
|
|
|
groups_dict = {}
|
|
|
|
groups_found = set()
|
|
|
|
|
|
|
|
# groups is a YAML file
|
2020-10-15 09:23:29 +00:00
|
|
|
if file_type == FILE_TYPE_GROUP_ADMINISTRATION:
|
2019-07-31 08:19:51 +00:00
|
|
|
_yaml = init_yaml()
|
2019-03-29 14:26:25 +00:00
|
|
|
with open(groups, 'r') as yaml_file:
|
2019-07-31 08:19:51 +00:00
|
|
|
config = _yaml.load(yaml_file)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
for group in config['groups']:
|
|
|
|
if group['enabled']:
|
2019-12-06 12:50:16 +00:00
|
|
|
campaign = group.get('campaign', None)
|
|
|
|
campaign = str(campaign) if campaign else ''
|
2019-08-09 08:51:57 +00:00
|
|
|
group_id = _generate_group_id(str(group['group_name']), campaign)
|
2019-03-29 14:26:25 +00:00
|
|
|
groups_dict[group_id] = dict()
|
|
|
|
|
2019-08-09 08:51:57 +00:00
|
|
|
groups_dict[group_id]['group_name'] = str(group['group_name'])
|
2019-07-31 08:19:51 +00:00
|
|
|
if isinstance(group['technique_id'], list):
|
2019-05-15 12:43:25 +00:00
|
|
|
groups_dict[group_id]['techniques'] = set(group['technique_id'])
|
|
|
|
groups_dict[group_id]['weight'] = dict((i, 1) for i in group['technique_id'])
|
2019-07-31 08:19:51 +00:00
|
|
|
elif isinstance(group['technique_id'], dict):
|
2019-05-15 12:43:25 +00:00
|
|
|
groups_dict[group_id]['techniques'] = set(group['technique_id'].keys())
|
|
|
|
groups_dict[group_id]['weight'] = group['technique_id']
|
2019-12-06 12:50:16 +00:00
|
|
|
if campaign != '':
|
|
|
|
groups_dict[group_id]['campaign'] = str(campaign)
|
2020-01-27 18:35:57 +00:00
|
|
|
groups_dict[group_id]['software'] = group.get('software_id', None)
|
2019-03-29 14:26:25 +00:00
|
|
|
else:
|
|
|
|
# groups are provided as arguments via the command line
|
2019-07-15 12:55:39 +00:00
|
|
|
all_groups_tech = load_attack_data(DATA_TYPE_CUSTOM_TECH_BY_GROUP)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-07-13 12:41:59 +00:00
|
|
|
for gr in all_groups_tech:
|
|
|
|
platforms = gr['x_mitre_platforms']
|
|
|
|
if not platforms:
|
2019-03-29 14:26:25 +00:00
|
|
|
# we just set this to an random legit value, because for pre-attack 'platform' is not used
|
2020-10-15 09:04:07 +00:00
|
|
|
platforms = ['Windows']
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
# group matches the: matrix/stage, platform and the group(s) we are interested in
|
2020-10-15 09:04:07 +00:00
|
|
|
if gr['matrix'] == 'mitre-' + stage and len(set(platforms).intersection(set(platform))) > 0 and \
|
2019-07-31 08:19:51 +00:00
|
|
|
(groups[0] == 'all' or gr['group_id'].lower() in groups or _is_in_group(gr['aliases'], groups)):
|
2019-07-13 12:41:59 +00:00
|
|
|
if gr['group_id'] not in groups_dict:
|
|
|
|
groups_found.add(gr['group_id'])
|
|
|
|
groups_dict[gr['group_id']] = {'group_name': gr['name']}
|
|
|
|
groups_dict[gr['group_id']]['techniques'] = set()
|
|
|
|
groups_dict[gr['group_id']]['weight'] = dict()
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-07-13 12:41:59 +00:00
|
|
|
groups_dict[gr['group_id']]['techniques'].add(gr['technique_id'])
|
|
|
|
groups_dict[gr['group_id']]['weight'][gr['technique_id']] = 1
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2020-10-15 09:04:07 +00:00
|
|
|
# do not call '_are_groups_found' when groups is a YAML file
|
2019-03-29 14:26:25 +00:00
|
|
|
# (this could contain groups that do not exists within ATT&CK)
|
|
|
|
if not os.path.isfile(str(groups)):
|
2020-10-15 09:04:07 +00:00
|
|
|
found = _are_groups_found(groups_found, groups)
|
2019-03-29 14:26:25 +00:00
|
|
|
if not found:
|
|
|
|
return -1
|
|
|
|
|
|
|
|
return groups_dict
|
|
|
|
|
|
|
|
|
2019-08-08 12:46:30 +00:00
|
|
|
def _get_detection_techniques(filename):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
Get all techniques (in a dict) from the detection administration
|
|
|
|
:param filename: path to the YAML technique administration file
|
2019-04-24 14:15:04 +00:00
|
|
|
:return: groups dictionary, loaded techniques from administration YAML file
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
|
|
|
|
groups_dict = {}
|
|
|
|
|
2019-08-08 12:46:30 +00:00
|
|
|
detection_techniques, name, platform = load_techniques(filename)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
group_id = 'DETECTION'
|
|
|
|
groups_dict[group_id] = {}
|
|
|
|
groups_dict[group_id]['group_name'] = 'Detection'
|
|
|
|
groups_dict[group_id]['techniques'] = set()
|
2019-05-15 12:43:25 +00:00
|
|
|
groups_dict[group_id]['weight'] = dict()
|
2019-03-29 14:26:25 +00:00
|
|
|
for t, v in detection_techniques.items():
|
2020-06-18 15:01:16 +00:00
|
|
|
s = calculate_score(v['detection'], zero_value=-1)
|
|
|
|
if s >= 0:
|
2019-03-29 14:26:25 +00:00
|
|
|
groups_dict[group_id]['techniques'].add(t)
|
2019-05-15 12:43:25 +00:00
|
|
|
groups_dict[group_id]['weight'][t] = 1
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-04-24 14:15:04 +00:00
|
|
|
return groups_dict, detection_techniques
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
|
2019-08-08 12:46:30 +00:00
|
|
|
def _get_visibility_techniques(filename):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
2019-07-31 08:19:51 +00:00
|
|
|
Get all techniques (in a dict) from the technique administration
|
2019-03-29 14:26:25 +00:00
|
|
|
:param filename: path to the YAML technique administration file
|
|
|
|
:return: dictionary
|
|
|
|
"""
|
|
|
|
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
|
|
|
|
groups_dict = {}
|
|
|
|
|
2019-08-08 12:46:30 +00:00
|
|
|
visibility_techniques, name, platform = load_techniques(filename)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
group_id = 'VISIBILITY'
|
|
|
|
groups_dict[group_id] = {}
|
|
|
|
groups_dict[group_id]['group_name'] = 'Visibility'
|
|
|
|
groups_dict[group_id]['techniques'] = set()
|
2019-05-15 12:43:25 +00:00
|
|
|
groups_dict[group_id]['weight'] = dict()
|
2019-04-24 14:15:04 +00:00
|
|
|
for t, v in visibility_techniques.items():
|
2019-05-02 11:21:01 +00:00
|
|
|
s = calculate_score(v['visibility'])
|
|
|
|
if s > 0:
|
2019-03-29 14:26:25 +00:00
|
|
|
groups_dict[group_id]['techniques'].add(t)
|
2019-05-15 12:43:25 +00:00
|
|
|
groups_dict[group_id]['weight'][t] = 1
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-04-24 14:15:04 +00:00
|
|
|
return groups_dict, visibility_techniques
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
def _get_technique_count(groups, groups_overlay, groups_software, overlay_type, all_techniques):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
Create a dict with all involved techniques and their relevant count/score
|
|
|
|
:param groups: a dict with data on groups
|
|
|
|
:param groups_overlay: a dict with data on the groups to overlay
|
|
|
|
:param groups_software: a dict with with data on which techniques are used within related software
|
2019-05-02 17:54:32 +00:00
|
|
|
:param overlay_type: group, visibility or detection
|
|
|
|
:param all_techniques: dict containing all technique data for visibility or detection
|
2019-05-15 12:43:25 +00:00
|
|
|
:return: dictionary, max_count
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
# { technique_id: {count: ..., groups: set{} }
|
|
|
|
techniques_dict = {}
|
|
|
|
|
|
|
|
for group, v in groups.items():
|
|
|
|
for tech in v['techniques']:
|
|
|
|
if tech not in techniques_dict:
|
|
|
|
techniques_dict[tech] = dict()
|
|
|
|
techniques_dict[tech]['groups'] = set()
|
2019-05-15 12:43:25 +00:00
|
|
|
techniques_dict[tech]['count'] = v['weight'][tech]
|
2019-05-02 17:54:32 +00:00
|
|
|
|
|
|
|
# We only want to increase the score when comparing groups and not for visibility or detection.
|
|
|
|
# This allows to have proper sorting of the heat map, which in turn improves the ability to visually
|
|
|
|
# compare this heat map with the detection/visibility ATT&CK Navigator layers.
|
2019-05-03 08:25:11 +00:00
|
|
|
else:
|
2019-05-15 12:43:25 +00:00
|
|
|
techniques_dict[tech]['count'] += v['weight'][tech]
|
2019-03-29 14:26:25 +00:00
|
|
|
techniques_dict[tech]['groups'].add(group)
|
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
max_count = max(techniques_dict.values(), key=lambda k: k['count'])['count']
|
2019-05-03 08:25:11 +00:00
|
|
|
|
|
|
|
# create dict {tech_id: score+max_tech_count} to be used for when doing an overlay of the type visibility or detection
|
2019-05-02 18:15:43 +00:00
|
|
|
if overlay_type != OVERLAY_TYPE_GROUP:
|
2019-05-02 17:54:32 +00:00
|
|
|
dict_tech_score = {}
|
|
|
|
list_tech = groups_overlay[overlay_type.upper()]['techniques']
|
|
|
|
for tech in list_tech:
|
2020-06-18 15:01:16 +00:00
|
|
|
if overlay_type == OVERLAY_TYPE_VISIBILITY:
|
|
|
|
dict_tech_score[tech] = calculate_score(all_techniques[tech]['visibility']) + max_count
|
|
|
|
elif overlay_type == OVERLAY_TYPE_DETECTION:
|
|
|
|
dict_tech_score[tech] = calculate_score(all_techniques[tech]['detection'], zero_value=-1) + max_count
|
2019-05-02 17:54:32 +00:00
|
|
|
|
2019-03-29 14:26:25 +00:00
|
|
|
for group, v in groups_overlay.items():
|
|
|
|
for tech in v['techniques']:
|
|
|
|
if tech not in techniques_dict:
|
|
|
|
techniques_dict[tech] = dict()
|
|
|
|
techniques_dict[tech]['groups'] = set()
|
2019-05-02 18:15:43 +00:00
|
|
|
if overlay_type == OVERLAY_TYPE_GROUP:
|
2019-05-15 12:43:25 +00:00
|
|
|
techniques_dict[tech]['count'] = v['weight'][tech]
|
2019-05-02 17:54:32 +00:00
|
|
|
else:
|
|
|
|
techniques_dict[tech]['count'] = dict_tech_score[tech]
|
2019-03-29 14:26:25 +00:00
|
|
|
elif group in groups:
|
|
|
|
if tech not in groups[group]['techniques']:
|
2019-05-02 18:15:43 +00:00
|
|
|
if overlay_type == OVERLAY_TYPE_GROUP:
|
2019-05-15 12:43:25 +00:00
|
|
|
techniques_dict[tech]['count'] += v['weight'][tech]
|
2019-05-02 17:54:32 +00:00
|
|
|
else:
|
|
|
|
techniques_dict[tech]['count'] = dict_tech_score[tech]
|
2019-05-15 12:43:25 +00:00
|
|
|
# Only do this when it was not already counted by being part of 'groups'.
|
2019-03-29 14:26:25 +00:00
|
|
|
# Meaning the group in 'groups_overlay' was also part of 'groups' (match on Group ID) and the
|
|
|
|
# technique was already counted for that group / it is not a new technique for that group coming
|
|
|
|
# from a YAML file
|
|
|
|
else:
|
2019-05-02 18:15:43 +00:00
|
|
|
if overlay_type == OVERLAY_TYPE_GROUP:
|
2019-05-02 17:54:32 +00:00
|
|
|
# increase count when the group in the YAML file is a custom group
|
2019-05-15 12:43:25 +00:00
|
|
|
techniques_dict[tech]['count'] += v['weight'][tech]
|
2019-05-02 17:54:32 +00:00
|
|
|
else:
|
|
|
|
techniques_dict[tech]['count'] = dict_tech_score[tech]
|
|
|
|
|
2019-03-29 14:26:25 +00:00
|
|
|
techniques_dict[tech]['groups'].add(group)
|
|
|
|
|
|
|
|
for group, v in groups_software.items():
|
|
|
|
for tech in v['techniques']:
|
|
|
|
if tech not in techniques_dict:
|
|
|
|
techniques_dict[tech] = dict()
|
|
|
|
techniques_dict[tech]['count'] = 0
|
|
|
|
# we will not adjust the scoring for groups_software. We will just set the the score to 0.
|
|
|
|
# This will later be used for the colouring of the heat map.
|
|
|
|
if 'groups' not in techniques_dict[tech]:
|
|
|
|
techniques_dict[tech]['groups'] = set()
|
|
|
|
techniques_dict[tech]['groups'].add(group)
|
|
|
|
|
2019-05-15 12:43:25 +00:00
|
|
|
return techniques_dict, max_count
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
def _get_technique_layer(techniques_count, groups, overlay, groups_software, overlay_file_type, overlay_type,
|
|
|
|
all_techniques):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
Create the technique layer that will be part of the ATT&CK navigator json file
|
2019-05-02 17:54:32 +00:00
|
|
|
:param techniques_count: involved techniques with count (to be used within the scores)
|
2019-03-29 14:26:25 +00:00
|
|
|
:param groups: a dict with data on groups
|
|
|
|
:param overlay: a dict with data on the groups to overlay
|
|
|
|
:param groups_software: a dict with with data on which techniques are used within related software
|
|
|
|
:param overlay_file_type: the file type of the YAML file as present in the key 'file_type'
|
|
|
|
:param overlay_type: group, visibility or detection
|
2019-04-24 14:15:04 +00:00
|
|
|
:param all_techniques: dictionary with all techniques loaded from techniques administration YAML file
|
2019-03-29 14:26:25 +00:00
|
|
|
:return: dictionary
|
|
|
|
"""
|
|
|
|
techniques_layer = []
|
|
|
|
|
|
|
|
# { technique_id: {count: ..., groups: set{} }
|
|
|
|
# add the technique count/scoring
|
2019-05-02 17:54:32 +00:00
|
|
|
for tech, v in techniques_count.items():
|
2019-03-29 14:26:25 +00:00
|
|
|
t = dict()
|
|
|
|
t['techniqueID'] = tech
|
|
|
|
t['score'] = v['count']
|
|
|
|
t['metadata'] = []
|
|
|
|
metadata_dict = dict()
|
|
|
|
|
|
|
|
for group, values in groups.items():
|
|
|
|
if tech in values['techniques']: # we do not color this one because that's done using the scoring
|
2020-07-13 12:20:36 +00:00
|
|
|
if 'Group' not in metadata_dict:
|
2020-05-29 10:16:54 +00:00
|
|
|
metadata_dict['Group'] = set()
|
|
|
|
metadata_dict['Group'].add(values['group_name'])
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-07-13 12:41:59 +00:00
|
|
|
# this will only be effective when loading a YAML files that have a value for the key 'campaign'
|
|
|
|
if 'campaign' in values:
|
|
|
|
if 'Campaign' not in metadata_dict:
|
2019-03-29 14:26:25 +00:00
|
|
|
metadata_dict['Campaign'] = set()
|
|
|
|
metadata_dict['Campaign'].add(values['campaign'])
|
|
|
|
|
|
|
|
# change the color and add metadata to make the groups overlay visible
|
|
|
|
for group, values in overlay.items():
|
|
|
|
if tech in values['techniques']:
|
2019-04-24 14:15:04 +00:00
|
|
|
# Determine color:
|
2019-03-29 14:26:25 +00:00
|
|
|
if len(v['groups'].intersection(set(groups.keys()))) > 0:
|
|
|
|
# if the technique is both present in the group (-g/--groups) and the groups overlay (-o/--overlay)
|
2020-05-29 10:16:54 +00:00
|
|
|
metadata_dict['Group'].add(values['group_name'])
|
2020-06-18 15:01:16 +00:00
|
|
|
|
|
|
|
# determine the color of the overlay:
|
|
|
|
# - using groups, it's normal orange
|
|
|
|
# - using detections, it's 6 variations or orange (score 0 to 5)
|
|
|
|
# - using visibility, it's 4 variations of orange (score 1 to 4)
|
|
|
|
if overlay_file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
|
|
|
if overlay_type == OVERLAY_TYPE_VISIBILITY:
|
|
|
|
s = calculate_score(all_techniques[tech]['visibility'])
|
|
|
|
t['color'] = COLOR_O_1 if s == 1 else COLOR_O_2 if s == 2 else COLOR_O_3 if s == 3 else COLOR_O_4 if s == 4 else ''
|
|
|
|
elif overlay_type == OVERLAY_TYPE_DETECTION:
|
|
|
|
s = calculate_score(all_techniques[tech]['detection'], zero_value=-1)
|
|
|
|
t['color'] = COLOR_O_0 if s == 0 else COLOR_O_1 if s == 1 else COLOR_O_2 if s == 2 else COLOR_O_3 if s == 3 else COLOR_O_4 if s == 4 else COLOR_O_5 if s == 5 else ''
|
|
|
|
else:
|
|
|
|
t['color'] = COLOR_GROUP_OVERLAY_MATCH
|
2019-03-29 14:26:25 +00:00
|
|
|
else:
|
|
|
|
# the technique is only present in the overlay and not in the provided groups (-g/--groups)
|
|
|
|
if overlay_file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
2019-05-02 18:15:43 +00:00
|
|
|
if overlay_type == OVERLAY_TYPE_VISIBILITY:
|
2020-06-18 15:01:16 +00:00
|
|
|
s = calculate_score(all_techniques[tech]['visibility'])
|
|
|
|
t['color'] = COLOR_V_1 if s == 1 else COLOR_V_2 if s == 2 else COLOR_V_3 if s == 3 else COLOR_V_4 if s == 4 else ''
|
2019-05-02 18:15:43 +00:00
|
|
|
elif overlay_type == OVERLAY_TYPE_DETECTION:
|
2020-06-18 15:01:16 +00:00
|
|
|
s = calculate_score(all_techniques[tech]['detection'], zero_value=-1)
|
|
|
|
t['color'] = COLOR_D_0 if s == 0 else COLOR_D_1 if s == 1 else COLOR_D_2 if s == 2 else COLOR_D_3 if s == 3 else COLOR_D_4 if s == 4 else COLOR_D_5 if s == 5 else ''
|
2019-03-29 14:26:25 +00:00
|
|
|
else:
|
|
|
|
t['color'] = COLOR_GROUP_OVERLAY_NO_MATCH
|
2020-07-13 12:31:23 +00:00
|
|
|
if 'Group' not in metadata_dict:
|
2020-05-29 10:16:54 +00:00
|
|
|
metadata_dict['Group'] = set()
|
|
|
|
metadata_dict['Group'].add(values['group_name'])
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-04-24 14:15:04 +00:00
|
|
|
# Add applicable_to to metadata in case of overlay for detection/visibility:
|
|
|
|
if overlay_file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
2020-07-02 14:30:36 +00:00
|
|
|
t['metadata'].append({'name': 'Overlay', 'value': overlay_type})
|
2020-05-29 10:16:54 +00:00
|
|
|
for obj_type in ['detection', 'visibility']:
|
2020-06-12 08:54:41 +00:00
|
|
|
t['metadata'] = add_metadata_technique_object(all_techniques[tech], obj_type, t['metadata'])
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
# change the color and add metadata to make the groups software overlay visible
|
2019-12-06 12:50:16 +00:00
|
|
|
for group, values in groups_software.items():
|
2019-03-29 14:26:25 +00:00
|
|
|
if tech in values['techniques']:
|
|
|
|
if t['score'] > 0:
|
|
|
|
t['color'] = COLOR_GROUP_AND_SOFTWARE
|
|
|
|
else:
|
|
|
|
t['color'] = COLOR_SOFTWARE
|
|
|
|
|
|
|
|
if 'Software groups' not in metadata_dict:
|
|
|
|
metadata_dict['Software groups'] = set()
|
|
|
|
metadata_dict['Software groups'].add(values['group_name'])
|
2019-12-06 12:50:16 +00:00
|
|
|
if 'campaign' in values:
|
|
|
|
if 'Software campaign' not in metadata_dict:
|
|
|
|
metadata_dict['Software campaign'] = set()
|
|
|
|
metadata_dict['Software campaign'].add(values['campaign'])
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
# create the metadata based on the dict 'metadata_dict'
|
2020-05-29 10:16:54 +00:00
|
|
|
i = 0
|
2019-03-29 14:26:25 +00:00
|
|
|
for metadata, values in metadata_dict.items():
|
2020-07-02 14:30:36 +00:00
|
|
|
tmp_dict = {'name': metadata, 'value': ', '.join(values)}
|
2020-05-29 10:16:54 +00:00
|
|
|
t['metadata'].insert(i, tmp_dict)
|
|
|
|
i += 1
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2020-05-29 10:16:54 +00:00
|
|
|
t['metadata'] = make_layer_metadata_compliant(t['metadata'])
|
2019-03-29 14:26:25 +00:00
|
|
|
techniques_layer.append(t)
|
|
|
|
|
2020-07-10 09:33:56 +00:00
|
|
|
determine_and_set_show_sub_techniques(techniques_layer)
|
|
|
|
|
2019-03-29 14:26:25 +00:00
|
|
|
return techniques_layer
|
|
|
|
|
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
def _get_group_list(groups, file_type):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
Make a list of group names for the involved groups.
|
|
|
|
:param groups: a dict with data on groups
|
|
|
|
:param file_type: the file type of the YAML file as present in the key 'file_type'
|
|
|
|
:return: list
|
|
|
|
"""
|
|
|
|
if file_type == FILE_TYPE_GROUP_ADMINISTRATION:
|
|
|
|
groups_list = []
|
|
|
|
for group, values in groups.items():
|
2019-12-06 12:50:16 +00:00
|
|
|
if 'campaign' in values and values['campaign'] != '':
|
2019-03-29 14:26:25 +00:00
|
|
|
groups_list.append(values['group_name'] + ' (' + values['campaign'] + ')')
|
|
|
|
else:
|
|
|
|
groups_list.append(values['group_name'])
|
|
|
|
|
|
|
|
return groups_list
|
|
|
|
else:
|
|
|
|
return groups
|
|
|
|
|
|
|
|
|
2020-06-08 14:56:56 +00:00
|
|
|
def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, software_groups, search_visibility,
|
|
|
|
search_detection, health_is_called, output_filename, layer_name, include_all_score_objs=False):
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
Calls all functions that are necessary for the generation of the heat map and write a json layer to disk.
|
|
|
|
:param groups: threat actor groups
|
|
|
|
:param overlay: group(s), visibility or detections to overlay (group ID, group name/alias, YAML file with
|
|
|
|
group(s), detections or visibility)
|
|
|
|
:param overlay_type: group, visibility or detection
|
|
|
|
:param stage: attack or pre-attack
|
2020-10-15 09:04:07 +00:00
|
|
|
:param platform: one or multiple the values from PLATFORMS constant or None (default = Windows)
|
2019-08-08 12:46:30 +00:00
|
|
|
:param software_groups: specify if techniques from related software should be included
|
|
|
|
:param search_visibility: visibility EQL search query
|
|
|
|
:param search_detection: detection EQL search query
|
|
|
|
:param health_is_called: boolean that specifies if detailed errors in the file will be printed
|
2020-05-25 09:44:13 +00:00
|
|
|
:param output_filename: output filename defined by the user
|
2020-06-08 14:56:56 +00:00
|
|
|
:param layer_name: the name of the Navigator layer
|
2019-08-08 12:46:30 +00:00
|
|
|
:param include_all_score_objs: include all score objects within the score_logbook for the EQL query
|
2020-06-19 07:08:18 +00:00
|
|
|
:return: returns None when something went wrong
|
2019-03-29 14:26:25 +00:00
|
|
|
"""
|
|
|
|
overlay_dict = {}
|
|
|
|
groups_software_dict = {}
|
|
|
|
|
|
|
|
groups_file_type = None
|
2020-10-15 09:04:07 +00:00
|
|
|
if groups == None:
|
|
|
|
groups = ['all']
|
|
|
|
elif os.path.isfile(groups[0]):
|
|
|
|
groups = groups[0]
|
2019-08-08 12:46:30 +00:00
|
|
|
groups_file_type = check_file(groups, file_type=FILE_TYPE_GROUP_ADMINISTRATION,
|
|
|
|
health_is_called=health_is_called)
|
2019-03-29 14:26:25 +00:00
|
|
|
if not groups_file_type:
|
2020-06-19 07:08:18 +00:00
|
|
|
return None # the groups_file_type is not of the type FILE_TYPE_GROUP_ADMINISTRATION
|
2020-10-15 09:04:07 +00:00
|
|
|
elif isinstance(groups, str):
|
|
|
|
# reached when the groups are provided via the interactive menu
|
2019-03-29 14:26:25 +00:00
|
|
|
groups = groups.split(',')
|
|
|
|
groups = list(map(lambda x: x.strip().lower(), groups))
|
2020-10-15 09:04:07 +00:00
|
|
|
else: # reached when the groups are provided via CLI arguments
|
|
|
|
groups = list(map(lambda x: x.strip().lower(), groups))
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-12-06 12:50:16 +00:00
|
|
|
# set the correct value for platform
|
2020-10-15 09:04:07 +00:00
|
|
|
platform_yaml = None
|
2019-12-06 12:50:16 +00:00
|
|
|
if groups_file_type == FILE_TYPE_GROUP_ADMINISTRATION:
|
|
|
|
_yaml = init_yaml()
|
|
|
|
with open(groups, 'r') as yaml_file:
|
|
|
|
group_file = _yaml.load(yaml_file)
|
|
|
|
|
|
|
|
platform_yaml = get_platform_from_yaml(group_file)
|
2020-10-15 09:04:07 +00:00
|
|
|
|
|
|
|
if platform == None and platform_yaml != None:
|
|
|
|
platform = platform_yaml
|
|
|
|
elif platform == None:
|
|
|
|
platform = ['Windows']
|
|
|
|
elif 'all' in platform:
|
|
|
|
platform = list(PLATFORMS.values())
|
2019-12-06 12:50:16 +00:00
|
|
|
|
2019-03-29 14:26:25 +00:00
|
|
|
overlay_file_type = None
|
|
|
|
if overlay:
|
2020-10-15 09:04:07 +00:00
|
|
|
if os.path.isfile(overlay[0]):
|
|
|
|
overlay = overlay[0]
|
2019-05-02 18:15:43 +00:00
|
|
|
expected_file_type = FILE_TYPE_GROUP_ADMINISTRATION if overlay_type == OVERLAY_TYPE_GROUP \
|
|
|
|
else FILE_TYPE_TECHNIQUE_ADMINISTRATION \
|
|
|
|
if overlay_type in [OVERLAY_TYPE_VISIBILITY, OVERLAY_TYPE_DETECTION] else None
|
2019-08-08 12:46:30 +00:00
|
|
|
overlay_file_type = check_file(overlay, expected_file_type, health_is_called=health_is_called)
|
2019-03-29 14:26:25 +00:00
|
|
|
if not overlay_file_type:
|
2020-06-19 07:08:18 +00:00
|
|
|
return None # the overlay_file_type is not of the expected type
|
2020-10-15 09:04:07 +00:00
|
|
|
elif isinstance(overlay, str):
|
|
|
|
# reached when the overlay is provided via the interactive menu
|
2019-03-29 14:26:25 +00:00
|
|
|
overlay = overlay.split(',')
|
|
|
|
overlay = list(map(lambda x: x.strip().lower(), overlay))
|
2020-10-15 09:04:07 +00:00
|
|
|
else: # reached when the groups are provided via CLI arguments
|
|
|
|
overlay = list(map(lambda x: x.strip().lower(), overlay))
|
2019-03-29 14:26:25 +00:00
|
|
|
else:
|
|
|
|
overlay = []
|
|
|
|
|
2019-08-08 12:46:30 +00:00
|
|
|
# load the techniques (visibility or detection) from the YAML file
|
2019-04-24 14:15:04 +00:00
|
|
|
all_techniques = None
|
2019-03-29 14:26:25 +00:00
|
|
|
if overlay_file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
2019-08-08 12:46:30 +00:00
|
|
|
# filter out visibility and/or detection objects using EQL
|
|
|
|
if search_detection or search_visibility:
|
|
|
|
overlay = techniques_search(overlay, search_visibility, search_detection,
|
|
|
|
include_all_score_objs=include_all_score_objs)
|
2019-08-13 12:28:43 +00:00
|
|
|
if not overlay:
|
|
|
|
return None # something went wrong in executing the search or 0 results where returned
|
2019-08-08 12:46:30 +00:00
|
|
|
|
2019-05-02 18:15:43 +00:00
|
|
|
if overlay_type == OVERLAY_TYPE_VISIBILITY:
|
2019-08-08 12:46:30 +00:00
|
|
|
overlay_dict, all_techniques = _get_visibility_techniques(overlay)
|
2019-05-02 18:15:43 +00:00
|
|
|
elif overlay_type == OVERLAY_TYPE_DETECTION:
|
2019-08-08 12:46:30 +00:00
|
|
|
overlay_dict, all_techniques = _get_detection_techniques(overlay)
|
2020-06-19 07:08:18 +00:00
|
|
|
|
2019-12-06 12:50:16 +00:00
|
|
|
# we are not overlaying visibility or detection, overlay group will therefore contain information on another group
|
2019-03-29 14:26:25 +00:00
|
|
|
elif len(overlay) > 0:
|
2019-07-31 08:19:51 +00:00
|
|
|
overlay_dict = _get_group_techniques(overlay, stage, platform, overlay_file_type)
|
2019-11-14 14:12:26 +00:00
|
|
|
if overlay_dict == -1:
|
2020-06-19 07:08:18 +00:00
|
|
|
return None # returns None when the provided Group(s) to be overlaid, contains Groups not part of ATT&CK
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
groups_dict = _get_group_techniques(groups, stage, platform, groups_file_type)
|
2019-03-29 14:26:25 +00:00
|
|
|
if groups_dict == -1:
|
2020-06-19 07:08:18 +00:00
|
|
|
return None # returns None when the provided Group contains Groups not part of ATT&CK
|
2019-03-29 14:26:25 +00:00
|
|
|
if len(groups_dict) == 0:
|
|
|
|
print('[!] Empty layer.') # the provided groups dit not result in any techniques
|
2020-06-19 07:08:18 +00:00
|
|
|
return None
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-04-23 11:21:27 +00:00
|
|
|
# check if we are doing a software group overlay
|
2019-12-06 12:50:16 +00:00
|
|
|
if software_groups and overlay:
|
2019-05-02 18:15:43 +00:00
|
|
|
if overlay_type not in [OVERLAY_TYPE_VISIBILITY, OVERLAY_TYPE_DETECTION]:
|
2019-04-23 11:21:27 +00:00
|
|
|
# if a group overlay is provided, get the software techniques for the overlay
|
2019-07-31 08:19:51 +00:00
|
|
|
groups_software_dict = _get_software_techniques(overlay, stage, platform)
|
2019-03-29 14:26:25 +00:00
|
|
|
elif software_groups:
|
2019-07-31 08:19:51 +00:00
|
|
|
groups_software_dict = _get_software_techniques(groups, stage, platform)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-07-31 08:19:51 +00:00
|
|
|
technique_count, max_count = _get_technique_count(groups_dict, overlay_dict, groups_software_dict, overlay_type, all_techniques)
|
|
|
|
technique_layer = _get_technique_layer(technique_count, groups_dict, overlay_dict, groups_software_dict,
|
|
|
|
overlay_file_type, overlay_type, all_techniques)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
|
|
|
# make a list group names for the involved groups.
|
|
|
|
if groups == ['all']:
|
|
|
|
groups_list = ['all']
|
|
|
|
else:
|
2019-07-31 08:19:51 +00:00
|
|
|
groups_list = _get_group_list(groups_dict, groups_file_type)
|
|
|
|
overlay_list = _get_group_list(overlay_dict, overlay_file_type)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2019-12-06 12:50:16 +00:00
|
|
|
desc = 'stage: ' + stage + ' | platform(s): ' + platform_to_name(platform, separator=', ') + ' | group(s): ' \
|
2020-05-29 10:16:54 +00:00
|
|
|
+ ', '.join(groups_list) + ' | overlay group(s): ' + ', '.join(overlay_list)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2020-06-08 14:56:56 +00:00
|
|
|
if not layer_name:
|
|
|
|
layer_name = stage[0].upper() + stage[1:] + ' - ' + platform_to_name(platform, separator=', ')
|
|
|
|
|
|
|
|
layer = get_layer_template_groups(layer_name, max_count, desc, stage, platform, overlay_type)
|
2019-03-29 14:26:25 +00:00
|
|
|
layer['techniques'] = technique_layer
|
|
|
|
|
|
|
|
json_string = simplejson.dumps(layer).replace('}, ', '},\n')
|
|
|
|
|
2020-05-25 09:44:13 +00:00
|
|
|
if not output_filename:
|
|
|
|
if stage == 'pre-attack':
|
|
|
|
filename = '_'.join(groups_list)
|
|
|
|
elif overlay:
|
|
|
|
filename = platform_to_name(platform) + '_' + '_'.join(groups_list) + '-overlay_' + '_'.join(overlay_list)
|
|
|
|
else:
|
|
|
|
filename = platform_to_name(platform) + '_' + '_'.join(groups_list)
|
2019-03-29 14:26:25 +00:00
|
|
|
|
2020-05-25 09:44:13 +00:00
|
|
|
filename = create_output_filename(stage, filename)
|
|
|
|
write_file(filename, json_string)
|
|
|
|
else:
|
|
|
|
write_file(output_filename, json_string)
|