- Made compatible with the latest version of attackcti (v0.2.6)
- Fixed a bug that caused the campaign name not to be displayed for a group (part of the Group YAML file)master
parent
20408b9411
commit
f3407f6ec4
120
group_mapping.py
120
group_mapping.py
|
@ -25,9 +25,9 @@ def is_group_found(groups_found, argument_groups):
|
|||
Check if a group that has been provided using '-g/--groups'/'-o/--overlay' is present within MITRE ATT&CK.
|
||||
:param groups_found: groups that are found in the ATT&CK data
|
||||
:param argument_groups: groups provided via the command line by the user
|
||||
:return: returns boolean that incidates if the group is found
|
||||
:return: returns boolean that indicates if the group is found
|
||||
"""
|
||||
groups_json = load_attack_data(DATATYPE_ALL_GROUPS)
|
||||
groups = load_attack_data(DATATYPE_ALL_GROUPS)
|
||||
|
||||
for group_arg in argument_groups:
|
||||
if group_arg == 'all': # this one will be ignored as it does not make any sense for this function
|
||||
|
@ -35,11 +35,11 @@ def is_group_found(groups_found, argument_groups):
|
|||
|
||||
group_id = None
|
||||
|
||||
for group in groups_json: # is the group provided via the command line known in ATT&CK?
|
||||
if group['group_aliases']:
|
||||
group_aliases_lower = list(map(lambda x: x.lower(), group['group_aliases']))
|
||||
if group_arg in group_aliases_lower or group_arg == group['group_id'].lower():
|
||||
group_id = group['group_id']
|
||||
for group in groups: # is the group provided via the command line known in ATT&CK?
|
||||
if 'aliases' in group:
|
||||
group_aliases_lower = list(map(lambda x: x.lower(), group['aliases']))
|
||||
if group_arg in group_aliases_lower or group_arg == get_attack_id(group).lower():
|
||||
group_id = get_attack_id(group)
|
||||
|
||||
if not group_id: # the group that has been provided through the command line cannot be found in ATT&CK
|
||||
print('[!] Unknown group: ' + group_arg)
|
||||
|
@ -63,11 +63,11 @@ def get_software_techniques(groups, stage, platform):
|
|||
# { group_id: {group_name: NAME, techniques: set{id, ...} } }
|
||||
groups_dict = {}
|
||||
|
||||
tech_by_software_json = load_attack_data(DATATYPE_TECH_BY_SOFTWARE)
|
||||
tech_by_software = load_attack_data(DATATYPE_TECH_BY_SOFTWARE)
|
||||
|
||||
# { software_id: [technique, ...] }
|
||||
software_dict = {}
|
||||
for tech in tech_by_software_json:
|
||||
for tech in tech_by_software:
|
||||
if tech['software_id'] not in software_dict:
|
||||
software_dict[tech['software_id']] = set([tech['technique_id']])
|
||||
else:
|
||||
|
@ -80,7 +80,7 @@ def get_software_techniques(groups, stage, platform):
|
|||
|
||||
for group in config['groups']:
|
||||
if group['enabled']:
|
||||
group_id = get_group_id(group['group_name'], group['campaign'])
|
||||
group_id = generate_group_id(group['group_name'], group['campaign'])
|
||||
groups_dict[group_id] = dict()
|
||||
|
||||
groups_dict[group_id]['group_name'] = group['group_name']
|
||||
|
@ -97,22 +97,23 @@ def get_software_techniques(groups, stage, platform):
|
|||
|
||||
# groups are provided as arguments via the command line
|
||||
else:
|
||||
software_by_group_json = load_attack_data(DATATYPE_SOFTWARE_BY_GROUP)
|
||||
software_by_group = load_attack_data(DATATYPE_SOFTWARE_BY_GROUP)
|
||||
|
||||
for s in software_by_group_json:
|
||||
# group matches the: matrix/stage, platform and the group(s) we are interested in
|
||||
if s['software_platform']: # their is some software that do not have a platform, skip those
|
||||
if s['matrix'] == 'mitre-'+stage and (platform in s['software_platform'] or platform == 'all') and \
|
||||
(groups[0] == 'all' or s['group_id'].lower() in groups or is_in_group(s['group_aliases'], groups)):
|
||||
for s in software_by_group:
|
||||
# software matches the ATT&CK Matrix and platform
|
||||
# and the group is a group we are interested in
|
||||
if s['x_mitre_platforms']: # their is some software that do not have a platform, skip those
|
||||
if s['matrix'] == 'mitre-'+stage and (platform in s['x_mitre_platforms'] or platform == 'all') and \
|
||||
(groups[0] == 'all' or s['group_id'].lower() in groups or is_in_group(s['aliases'], groups)):
|
||||
if s['group_id'] not in groups_dict:
|
||||
groups_dict[s['group_id']] = {'group_name': s['group']}
|
||||
groups_dict[s['group_id']] = {'group_name': s['name']}
|
||||
groups_dict[s['group_id']]['techniques'] = set()
|
||||
groups_dict[s['group_id']]['techniques'].update(software_dict[s['software_id']])
|
||||
|
||||
return groups_dict
|
||||
|
||||
|
||||
def get_group_id(group_name, campaign):
|
||||
def generate_group_id(group_name, campaign):
|
||||
# CG_GROUPS = { group_name+campaign: id } }
|
||||
"""
|
||||
Generate a custom group id.
|
||||
|
@ -164,7 +165,7 @@ def get_group_techniques(groups, stage, platform, file_type):
|
|||
for group in config['groups']:
|
||||
if group['enabled']:
|
||||
campaign = group['campaign'] if group['campaign'] else ''
|
||||
group_id = get_group_id(group['group_name'], campaign)
|
||||
group_id = generate_group_id(group['group_name'], campaign)
|
||||
groups_dict[group_id] = dict()
|
||||
|
||||
groups_dict[group_id]['group_name'] = group['group_name']
|
||||
|
@ -178,25 +179,25 @@ def get_group_techniques(groups, stage, platform, file_type):
|
|||
groups_dict[group_id]['software'] = group['software_id']
|
||||
else:
|
||||
# groups are provided as arguments via the command line
|
||||
groups_json = load_attack_data(DATATYPE_TECH_BY_GROUP)
|
||||
all_groups_tech = load_attack_data(DATATYPE_TECH_BY_GROUP)
|
||||
|
||||
for e in groups_json:
|
||||
json_platform = e['platform']
|
||||
if not json_platform:
|
||||
for gr in all_groups_tech:
|
||||
platforms = gr['x_mitre_platforms']
|
||||
if not platforms:
|
||||
# we just set this to an random legit value, because for pre-attack 'platform' is not used
|
||||
json_platform = 'Windows'
|
||||
platforms = 'Windows'
|
||||
|
||||
# group matches the: matrix/stage, platform and the group(s) we are interested in
|
||||
if e['matrix'] == 'mitre-'+stage and (platform in json_platform or platform == 'all') and \
|
||||
(groups[0] == 'all' or e['group_id'].lower() in groups or is_in_group(e['group_aliases'], groups)):
|
||||
if e['group_id'] not in groups_dict:
|
||||
groups_found.add(e['group_id'])
|
||||
groups_dict[e['group_id']] = {'group_name': e['group']}
|
||||
groups_dict[e['group_id']]['techniques'] = set()
|
||||
groups_dict[e['group_id']]['weight'] = dict()
|
||||
if gr['matrix'] == 'mitre-'+stage and (platform in platforms or platform == 'all') and \
|
||||
(groups[0] == 'all' or gr['group_id'].lower() in groups or is_in_group(gr['aliases'], groups)):
|
||||
if gr['group_id'] not in groups_dict:
|
||||
groups_found.add(gr['group_id'])
|
||||
groups_dict[gr['group_id']] = {'group_name': gr['name']}
|
||||
groups_dict[gr['group_id']]['techniques'] = set()
|
||||
groups_dict[gr['group_id']]['weight'] = dict()
|
||||
|
||||
groups_dict[e['group_id']]['techniques'].add(e['technique_id'])
|
||||
groups_dict[e['group_id']]['weight'][e['technique_id']] = 1
|
||||
groups_dict[gr['group_id']]['techniques'].add(gr['technique_id'])
|
||||
groups_dict[gr['group_id']]['weight'][gr['technique_id']] = 1
|
||||
|
||||
# do not call 'is_group_found' when groups is a YAML file
|
||||
# (this could contain groups that do not exists within ATT&CK)
|
||||
|
@ -368,9 +369,9 @@ def get_technique_layer(techniques_count, groups, overlay, groups_software, over
|
|||
metadata_dict['Groups'] = set()
|
||||
metadata_dict['Groups'].add(values['group_name'])
|
||||
|
||||
# this will only be effective when loading a YAML files that has a value for the key 'campaign'
|
||||
if 'Campaign' in values and values['campaign'] is not None:
|
||||
if 'CAMPAIGN' not in metadata_dict:
|
||||
# this will only be effective when loading a YAML files that have a value for the key 'campaign'
|
||||
if 'campaign' in values:
|
||||
if 'Campaign' not in metadata_dict:
|
||||
metadata_dict['Campaign'] = set()
|
||||
metadata_dict['Campaign'].add(values['campaign'])
|
||||
|
||||
|
@ -402,7 +403,7 @@ def get_technique_layer(techniques_count, groups, overlay, groups_software, over
|
|||
metadata_dict['Overlay'].add(values['group_name'])
|
||||
|
||||
# this will only be effective when loading a YAML files that has a value for the key 'campaign'
|
||||
if 'campaign' in values and values['campaign'] is not None:
|
||||
if 'campaign' in values:
|
||||
if 'Campaign' not in metadata_dict:
|
||||
metadata_dict['Campaign'] = set()
|
||||
metadata_dict['Campaign'].add(values['campaign'])
|
||||
|
@ -440,7 +441,7 @@ def get_group_list(groups, file_type):
|
|||
groups_list = []
|
||||
for group, values in groups.items():
|
||||
# if YAML file contains campaign key with a legit value
|
||||
if 'campaign' in values and values['campaign'] is not None:
|
||||
if 'campaign' in values:
|
||||
groups_list.append(values['group_name'] + ' (' + values['campaign'] + ')')
|
||||
else:
|
||||
groups_list.append(values['group_name'])
|
||||
|
@ -558,12 +559,13 @@ def get_updates(update_type, sort='modified'):
|
|||
sorted_techniques = sorted(techniques, key=lambda k: k[sort])
|
||||
|
||||
for t in sorted_techniques:
|
||||
print(t['technique_id'] + ' ' + t['technique'])
|
||||
print(' ' * 6 + 'created: ' + t['created'].split(' ')[0])
|
||||
print(' ' * 6 + 'modified: ' + t['modified'][:10])
|
||||
print(' ' * 6 + 'matrix: ' + t['matrix'][6:])
|
||||
if t['tactic']:
|
||||
print(' ' * 6 + 'tactic: ' + ' '.join(t['tactic']))
|
||||
print(get_attack_id(t) + ' ' + t['name'])
|
||||
print(' ' * 6 + 'created: ' + t['created'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'modified: ' + t['modified'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'matrix: ' + t['external_references'][0]['source_name'][6:])
|
||||
tactics = get_tactics(t)
|
||||
if tactics:
|
||||
print(' ' * 6 + 'tactic: ' + ', '.join(tactics))
|
||||
else:
|
||||
print(' ' * 6 + 'tactic: None')
|
||||
print('')
|
||||
|
@ -572,24 +574,24 @@ def get_updates(update_type, sort='modified'):
|
|||
groups = load_attack_data(DATATYPE_ALL_GROUPS)
|
||||
sorted_groups = sorted(groups, key=lambda k: k[sort])
|
||||
|
||||
for t in sorted_groups:
|
||||
print(t['group_id'] + ' ' + t['group'])
|
||||
print(' ' * 6 + 'created: ' + t['created'].split(' ')[0])
|
||||
print(' ' * 6 + 'modified: ' + t['modified'][:10])
|
||||
for g in sorted_groups:
|
||||
print(get_attack_id(g) + ' ' + g['name'])
|
||||
print(' ' * 6 + 'created: ' + g['created'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'modified: ' + g['modified'].strftime('%Y-%m-%d'))
|
||||
print('')
|
||||
|
||||
elif update_type == 'software':
|
||||
software = load_attack_data(DATATYPE_ALL_SOFTWARE)
|
||||
sorted_software = sorted(software, key=lambda k: k[sort])
|
||||
|
||||
for t in sorted_software:
|
||||
print(t['software_id'] + ' ' + t['software'])
|
||||
print(' ' * 6 + 'created: ' + t['created'].split(' ')[0])
|
||||
print(' ' * 6 + 'modified: ' + t['modified'][:10])
|
||||
print(' ' * 6 + 'matrix: ' + t['matrix'][6:])
|
||||
print(' ' * 6 + 'type: ' + t['type'])
|
||||
if t['software_platform']:
|
||||
print(' ' * 6 + 'platform: ' + ', '.join(t['software_platform']))
|
||||
for s in sorted_software:
|
||||
print(get_attack_id(s) + ' ' + s['name'])
|
||||
print(' ' * 6 + 'created: ' + s['created'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'modified: ' + s['modified'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'matrix: ' + s['external_references'][0]['source_name'][6:])
|
||||
print(' ' * 6 + 'type: ' + s['type'])
|
||||
if 'x_mitre_platforms' in s:
|
||||
print(' ' * 6 + 'platform: ' + ', '.join(s['x_mitre_platforms']))
|
||||
else:
|
||||
print(' ' * 6 + 'platform: None')
|
||||
print('')
|
||||
|
@ -605,9 +607,9 @@ def get_statistics():
|
|||
# {data_source: {techniques: [T0001, ...}, count: ...}
|
||||
data_sources_dict = {}
|
||||
for tech in techniques:
|
||||
tech_id = tech['technique_id']
|
||||
data_sources = tech['data_sources']
|
||||
|
||||
tech_id = get_attack_id(tech)
|
||||
# Not every technique has a data source listed
|
||||
data_sources = try_get_key(tech, 'x_mitre_data_sources')
|
||||
if data_sources:
|
||||
for ds in data_sources:
|
||||
if ds not in data_sources_dict:
|
||||
|
|
Loading…
Reference in New Issue