Made compatible with the latest version of attackcti (v0.2.6)

master
Marcus Bakker 2019-07-13 14:40:24 +02:00
parent 679fd81d1a
commit e251c6157c
5 changed files with 234 additions and 73 deletions

View File

@ -6,6 +6,7 @@ EXPIRE_TIME = 60*60*24
DATATYPE_TECH_BY_GROUP = 'mitre_techniques_used_by_group'
DATATYPE_ALL_TECH = 'mitre_all_techniques'
DATATYPE_ALL_TECH_ENTERPRISE = 'mitre_all_techniques_enterprise'
DATATYPE_ALL_GROUPS = 'mitre_all_groups'
DATATYPE_ALL_SOFTWARE = 'mitre_all_software'
DATATYPE_TECH_BY_SOFTWARE = 'mitre_techniques_used_by_software'

View File

@ -177,17 +177,17 @@ def _map_and_colorize_techniques(my_ds, exceptions):
# Color the techniques based on how many data sources are available.
for t in techniques:
if t['data_sources']:
total_ds_count = len(t['data_sources'])
if 'x_mitre_data_sources' in t:
total_ds_count = len(t['x_mitre_data_sources'])
ds_count = 0
for ds in t['data_sources']:
for ds in t['x_mitre_data_sources']:
if ds in my_ds.keys():
ds_count += 1
if total_ds_count > 0:
result = (float(ds_count) / float(total_ds_count)) * 100
color = COLOR_DS_25p if result <= 25 else COLOR_DS_50p if result <= 50 else COLOR_DS_75p \
if result <= 75 else COLOR_DS_99p if result <= 99 else COLOR_DS_100p
technique_colors[t['technique_id']] = color
technique_colors[get_attack_id(t)] = color
my_techniques = map_techniques_to_data_sources(techniques, my_ds)
@ -219,7 +219,7 @@ def generate_technique_administration_file(filename):
"""
my_data_sources, name, platform, exceptions = _load_data_sources(filename)
techniques = load_attack_data(DATATYPE_ALL_TECH)
techniques = load_attack_data(DATATYPE_ALL_TECH_ENTERPRISE)
# This is part of the techniques administration YAML file and is used as a template
dict_tech = {'technique_id': '', 'technique_name': '', 'detection': {'applicable_to': ['all'],
@ -237,30 +237,31 @@ def generate_technique_administration_file(filename):
# Score visibility based on the number of available data sources and the exceptions
for t in techniques:
if t['matrix'] == 'mitre-attack':
platforms_lower = list(map(lambda x: x.lower(), t['platform']))
if platform in platforms_lower:
if t['data_sources']:
total_ds_count = len(t['data_sources'])
ds_count = 0
for ds in t['data_sources']:
if ds in my_data_sources.keys():
ds_count += 1
if total_ds_count > 0:
result = (float(ds_count) / float(total_ds_count)) * 100
platforms_lower = list(map(lambda x: x.lower(), try_get_key(t, 'x_mitre_platforms')))
if platform in platforms_lower:
# not every technique has data source listed
if 'x_mitre_data_sources' in t:
total_ds_count = len(t['x_mitre_data_sources'])
ds_count = 0
for ds in t['x_mitre_data_sources']:
if ds in my_data_sources.keys():
ds_count += 1
if total_ds_count > 0:
result = (float(ds_count) / float(total_ds_count)) * 100
score = 0 if result == 0 else 1 if result <= 49 else 2 if result <= 74 else 3 if result <= 99 else 4
else:
score = 0
score = 0 if result == 0 else 1 if result <= 49 else 2 if result <= 74 else 3 if result <= 99 else 4
else:
score = 0
# Do not add technique if score == 0 or part of the exception list\
techniques_upper = list(map(lambda x: x.upper(), exceptions))
if score > 0 and t['technique_id'] not in techniques_upper:
tech = copy.deepcopy(dict_tech)
tech['technique_id'] = t['technique_id']
tech['technique_name'] = get_technique(techniques, t['technique_id'])['technique']
tech['visibility']['score'] = score
yaml_file['techniques'].append(tech)
# Do not add technique if score == 0 or part of the exception list
techniques_upper = list(map(lambda x: x.upper(), exceptions))
tech_id = get_attack_id(t)
if score > 0 and tech_id not in techniques_upper:
tech = copy.deepcopy(dict_tech)
tech['technique_id'] = tech_id
tech['technique_name'] = t['name']
tech['visibility']['score'] = score
yaml_file['techniques'].append(tech)
yaml_string = '%YAML 1.2\n---\n' + yaml.dump(yaml_file, sort_keys=False).replace('null', '')
output_filename = 'output/techniques-administration-' + normalize_name_to_filename(name+'-'+platform) + '.yaml'

View File

@ -9,6 +9,34 @@ from difflib import SequenceMatcher
# Due to performance reasons the import of attackcti is within the function that makes use of this library.
def try_get_key(dictionary, key):
"""
Return None if the key does not exists within the provided dict
:param dictionary: dictionary
:param key: key
:return: key value or None
"""
if key in dictionary:
return dictionary[key]
return None
def try_except(self, stix_objects, object_type, nested_value=None):
if object_type in stix_objects:
specific_stix_object = stix_objects[object_type]
if isinstance(specific_stix_object, list):
if nested_value is None:
lists = self.handle_list(stix_objects, object_type)
return lists
else:
nested_result = self.handle_nested(stix_objects, object_type, nested_value)
return nested_result
else:
return stix_objects[object_type]
else:
return None
def save_attack_data(data, path):
"""
Save ATT&CK data to disk for the purpose of caching.
@ -39,23 +67,120 @@ def load_attack_data(data_type):
from attackcti import attack_client
mitre = attack_client()
json_data = None
stix_objects = None
if data_type == DATATYPE_ALL_TECH_ENTERPRISE:
stix_objects = mitre.get_all_enterprise_techniques()
if data_type == DATATYPE_TECH_BY_GROUP:
json_data = mitre.get_techniques_used_by_group()
# First we need to know which technique references (STIX Object type 'attack-pattern') we have for all
# groups. This results in a dict: {group_id: Gxxxx, technique_ref/attack-pattern_ref: ...}
groups = mitre.get_all_groups()
relationships = mitre.get_all_relationships()
all_groups_relationships = []
for g in groups:
for r in relationships:
if g['id'] == r['source_ref'] and r['relationship_type'] == 'uses' and \
r['target_ref'].startswith('attack-pattern--'):
# much more information on the group can be added. Only the minimal required data is now added.
all_groups_relationships.append(
{
'group_id': get_attack_id(g),
'name': g['name'],
'aliases': try_get_key(g, 'aliases'),
'technique_ref': r['target_ref']
})
# Now we start resolving this part of the dict created above: 'technique_ref/attack-pattern_ref'.
# and we add some more data to the final result.
all_group_use = []
techniques = mitre.get_all_techniques()
for gr in all_groups_relationships:
for t in techniques:
if t['id'] == gr['technique_ref']:
all_group_use.append(
{
'group_id': gr['group_id'],
'name': gr['name'],
'aliases': gr['aliases'],
'technique_id': get_attack_id(t),
'x_mitre_platforms': try_get_key(t, 'x_mitre_platforms'),
'matrix': t['external_references'][0]['source_name']
})
stix_objects = all_group_use
elif data_type == DATATYPE_ALL_TECH:
json_data = mitre.get_all_techniques()
stix_objects = mitre.get_all_techniques()
elif data_type == DATATYPE_ALL_GROUPS:
json_data = mitre.get_all_groups()
stix_objects = mitre.get_all_groups()
elif data_type == DATATYPE_ALL_SOFTWARE:
json_data = mitre.get_all_software()
stix_objects = mitre.get_all_software()
elif data_type == DATATYPE_TECH_BY_SOFTWARE:
json_data = mitre.get_techniques_used_by_software()
# TODO cache the stix objects
# First we need to know which technique references (STIX Object type 'attack-pattern') we have for all software
# This results in a dict: {software_id: Sxxxx, technique_ref/attack-pattern_ref: ...}
software = mitre.get_all_software()
relationships = mitre.get_all_relationships()
all_software_relationships = []
for s in software:
for r in relationships:
if s['id'] == r['source_ref'] and r['relationship_type'] == 'uses' and \
r['target_ref'].startswith('attack-pattern--'):
# much more information (e.g. description, aliases, platform) on the software can be added to the
# dict if necessary. Only the minimal required data is now added.
all_software_relationships.append({'software_id': get_attack_id(s), 'technique_ref': r['target_ref']})
# Now we start resolving this part of the dict created above: 'technique_ref/attack-pattern_ref'
techniques = mitre.get_all_techniques()
all_software_use = []
for sr in all_software_relationships:
for t in techniques:
if t['id'] == sr['technique_ref']:
# much more information on the technique can be added to the dict. Only the minimal required data
# is now added (i.e. resolving the technique ref to an actual ATT&CK ID)
all_software_use.append({'software_id': sr['software_id'], 'technique_id': get_attack_id(t)})
stix_objects = all_software_use
elif data_type == DATATYPE_SOFTWARE_BY_GROUP:
json_data = mitre.get_software_used_by_group()
# First we need to know which software references (STIX Object type 'malware' or 'tool') we have for all
# groups. This results in a dict: {group_id: Gxxxx, software_ref/malware-tool_ref: ...}
groups = mitre.get_all_groups()
relationships = mitre.get_all_relationships()
all_groups_relationships = []
for g in groups:
for r in relationships:
if g['id'] == r['source_ref'] and r['relationship_type'] == 'uses' and \
(r['target_ref'].startswith('tool--') or r['target_ref'].startswith('malware--')):
# much more information on the group can be added. Only the minimal required data is now added.
all_groups_relationships.append(
{
'group_id': get_attack_id(g),
'name': g['name'],
'aliases': try_get_key(g, 'aliases'),
'software_ref': r['target_ref']
})
save_attack_data(json_data, "cache/" + data_type)
# Now we start resolving this part of the dict created above: 'software_ref/malware-tool_ref'.
# and we add some more data to the final result.
all_group_use = []
software = mitre.get_all_software()
for gr in all_groups_relationships:
for s in software:
if s['id'] == gr['software_ref']:
all_group_use.append(
{
'group_id': gr['group_id'],
'name': gr['name'],
'aliases': gr['aliases'],
'software_id': get_attack_id(s),
'x_mitre_platforms': try_get_key(s, 'x_mitre_platforms'),
'matrix': s['external_references'][0]['source_name']
})
stix_objects = all_group_use
return json_data
save_attack_data(stix_objects, "cache/" + data_type)
return stix_objects
def _get_base_template(name, description, stage, platform, sorting):
@ -221,6 +346,31 @@ def get_layer_template_layered(name, description, stage, platform):
return layer
def get_attack_id(stix_obj):
"""
Get the Technique, Group or Software ID from the STIX object
:param stix_obj: STIX object (Technique, Software or Group)
:return: ATT&CK ID
"""
for ext_ref in stix_obj['external_references']:
if ext_ref['source_name'] in ['mitre-attack', 'mitre-mobile-attack', 'mitre-pre-attack']:
return ext_ref['external_id']
def get_tactics(technique):
"""
Get all tactics from a given technique
:param technique: technique STIX object
:return: list with tactics
"""
tactics = []
if 'kill_chain_phases' in technique:
for phase in technique['kill_chain_phases']:
tactics.append(phase['phase_name'])
return tactics
def get_technique(techniques, technique_id):
"""
Generic function to lookup a specific technique_id in a list of dictionaries with techniques.
@ -228,9 +378,9 @@ def get_technique(techniques, technique_id):
:param technique_id: technique_id to look for
:return: the technique you're searching for. None if not found.
"""
for t in techniques:
if technique_id == t['technique_id']:
return t
for tech in techniques:
if technique_id == get_attack_id(tech):
return tech
return None
@ -256,15 +406,18 @@ def map_techniques_to_data_sources(techniques, my_data_sources):
for t in techniques:
# If your data source is in the list of data sources for this technique AND if the
# technique isn't added yet (by an other data source):
if t['data_sources'] and i_ds in t['data_sources'] and t['technique_id'] not in my_techniques.keys():
my_techniques[t['technique_id']] = {}
my_techniques[t['technique_id']]['my_data_sources'] = [i_ds, ]
my_techniques[t['technique_id']]['data_sources'] = t['data_sources']
my_techniques[t['technique_id']]['tactics'] = t['tactic']
my_techniques[t['technique_id']]['products'] = set(my_data_sources[i_ds]['products'])
elif t['data_sources'] and i_ds in t['data_sources'] and t['technique_id'] in my_techniques.keys():
my_techniques[t['technique_id']]['my_data_sources'].append(i_ds)
my_techniques[t['technique_id']]['products'].update(my_data_sources[i_ds]['products'])
tech_id = get_attack_id(t)
if 'x_mitre_data_sources' in t:
if i_ds in t['x_mitre_data_sources'] and tech_id not in my_techniques.keys():
my_techniques[tech_id] = {}
my_techniques[tech_id]['my_data_sources'] = [i_ds, ]
my_techniques[tech_id]['data_sources'] = t['x_mitre_data_sources']
# create a list of tactics
my_techniques[tech_id]['tactics'] = list(map(lambda k: k['phase_name'], try_get_key(t, 'kill_chain_phases')))
my_techniques[tech_id]['products'] = set(my_data_sources[i_ds]['products'])
elif t['x_mitre_data_sources'] and i_ds in t['x_mitre_data_sources'] and tech_id in my_techniques.keys():
my_techniques[tech_id]['my_data_sources'].append(i_ds)
my_techniques[tech_id]['products'].update(my_data_sources[i_ds]['products'])
return my_techniques
@ -275,12 +428,12 @@ def get_all_mitre_data_sources():
:return: a sorted list with all data sources
"""
techniques = load_attack_data(DATATYPE_ALL_TECH)
data_sources = set()
for t in techniques:
if t['data_sources']:
for ds in t['data_sources']:
if ds not in data_sources:
data_sources.add(ds)
if 'x_mitre_data_sources' in t.keys():
for ds in t['x_mitre_data_sources']:
data_sources.add(ds)
return sorted(data_sources)
@ -333,19 +486,19 @@ def load_techniques(filename, detection_or_visibility='all', filter_applicable_t
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
for d in yaml_content['techniques']:
# Add detection items:
if type(d['detection']) == dict: # There is just one detection entry
if type(d['detection']) == dict: # There is just one detection entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', d['detection'])
elif type(d['detection']) == list: # There are multiple detection entries
elif type(d['detection']) == list: # There are multiple detection entries
for de in d['detection']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'detection', de)
# Add visibility items
if type(d['visibility']) == dict: # There is just one visibility entry
if type(d['visibility']) == dict: # There is just one visibility entry
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in d[detection_or_visibility]['applicable_to'] or 'all' in d[detection_or_visibility]['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', d['visibility'])
elif type(d['visibility']) == list: # There are multiple visibility entries
elif type(d['visibility']) == list: # There are multiple visibility entries
for de in d['visibility']:
if detection_or_visibility == 'all' or filter_applicable_to == 'all' or filter_applicable_to in de['applicable_to'] or 'all' in de['applicable_to']:
_add_entry_to_list_in_dictionary(my_techniques, d['technique_id'], 'visibility', de)
@ -426,7 +579,7 @@ def check_yaml_file_health(filename, file_type, health_is_called):
detection[key].day
except AttributeError:
has_error = _print_error_msg('[!] Technique ID: ' + tech +
' has an INVALID data format for the key-value pair in detection: ' +
' has an INVALID data format for the key-value pair in detection: ' +
key + ' (should be YYYY-MM-DD)', health_is_called)
for key in ['location', 'applicable_to']:
if not isinstance(detection[key], list):

View File

@ -136,7 +136,7 @@ def _map_and_colorize_techniques_for_detections(my_techniques):
if s == 3 else COLOR_D_4 if s == 4 else COLOR_D_5 if s == 5 else ''
technique = get_technique(techniques, technique_id)
for tactic in technique['tactic']:
for tactic in get_tactics(technique):
x = {}
x['techniqueID'] = technique_id
x['color'] = color
@ -189,7 +189,7 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
technique = get_technique(techniques, technique_id)
color = COLOR_V_1 if s == 1 else COLOR_V_2 if s == 2 else COLOR_V_3 if s == 3 else COLOR_V_4 if s == 4 else ''
for tactic in technique['tactic']:
for tactic in get_tactics(technique):
x = {}
x['techniqueID'] = technique_id
x['color'] = color
@ -198,7 +198,7 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
x['tactic'] = tactic.lower().replace(' ', '-')
x['metadata'] = []
x['metadata'].append({'name': '-Available data sources', 'value': my_ds})
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['data_sources'])})
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['x_mitre_data_sources'])})
x['metadata'].append({'name': '---', 'value': '---'})
x['score'] = s
@ -217,15 +217,17 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
mapped_techniques.append(x)
for t in techniques:
if t['technique_id'] not in my_techniques.keys():
if t['tactic']:
for tactic in t['tactic']:
tech_id = get_attack_id(t)
if tech_id not in my_techniques.keys():
tactics = get_tactics(t)
if tactics:
for tactic in tactics:
x = {}
x['techniqueID'] = t['technique_id']
x['techniqueID'] = tech_id
x['comment'] = ''
x['enabled'] = True
x['tactic'] = tactic.lower().replace(' ', '-')
ds = ', '.join(t['data_sources']) if t['data_sources'] else '-'
ds = ', '.join(t['x_mitre_data_sources']) if 'x_mitre_data_sources' in t else '-'
x['metadata'] = [{'name': '-ATT&CK data sources', 'value': ds}]
mapped_techniques.append(x)
@ -274,7 +276,7 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, fi
my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else '-'
technique = get_technique(techniques, technique_id)
for tactic in technique['tactic']:
for tactic in get_tactics(technique):
x = {}
x['techniqueID'] = technique_id
x['color'] = color
@ -283,7 +285,7 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, fi
x['tactic'] = tactic.lower().replace(' ', '-')
x['metadata'] = []
x['metadata'].append({'name': '-Available data sources', 'value': my_ds})
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['data_sources'])})
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['x_mitre_data_sources'])})
x['metadata'].append({'name': '---', 'value': '---'})
# Metadata for detection:
@ -394,13 +396,15 @@ def export_techniques_list_to_excel(filename):
# Add row for every detection that is defined:
for detection in technique_data['detection']:
worksheet_detections.write(y, 0, technique_id, valign_top)
worksheet_detections.write(y, 1, get_technique(mitre_techniques, technique_id)['technique'], valign_top)
worksheet_detections.write(y, 2, ', '.join(t.capitalize() for t in get_technique(mitre_techniques, technique_id)['tactic']), valign_top)
worksheet_detections.write(y, 1, get_technique(mitre_techniques, technique_id)['name'], valign_top)
worksheet_detections.write(y, 2, ', '.join(t.capitalize() for t in
get_tactics(get_technique(mitre_techniques, technique_id))),
valign_top)
worksheet_detections.write(y, 3, ', '.join(detection['applicable_to']), wrap_text)
worksheet_detections.write(y, 4, str(detection['date_registered']).replace('None', ''), valign_top)
worksheet_detections.write(y, 5, str(detection['date_implemented']).replace('None', ''), valign_top)
ds = detection['score']
worksheet_detections.write(y, 6, ds, detection_score_0 if ds == 0 else detection_score_1 if ds ==1 else detection_score_2 if ds == 2 else detection_score_3 if ds == 3 else detection_score_4 if ds == 4 else detection_score_5 if ds == 5 else no_score)
worksheet_detections.write(y, 6, ds, detection_score_0 if ds == 0 else detection_score_1 if ds == 1 else detection_score_2 if ds == 2 else detection_score_3 if ds == 3 else detection_score_4 if ds == 4 else detection_score_5 if ds == 5 else no_score)
worksheet_detections.write(y, 7, '\n'.join(detection['location']), wrap_text)
worksheet_detections.write(y, 8, detection['comment'][:-1] if detection['comment'].endswith('\n') else detection['comment'], wrap_text)
y += 1
@ -426,8 +430,10 @@ def export_techniques_list_to_excel(filename):
# Add row for every visibility that is defined:
for visibility in technique_data['visibility']:
worksheet_visibility.write(y, 0, technique_id, valign_top)
worksheet_visibility.write(y, 1, get_technique(mitre_techniques, technique_id)['technique'], valign_top)
worksheet_visibility.write(y, 2, ', '.join(t.capitalize() for t in get_technique(mitre_techniques, technique_id)['tactic']), valign_top)
worksheet_visibility.write(y, 1, get_technique(mitre_techniques, technique_id)['name'], valign_top)
worksheet_visibility.write(y, 2, ', '.join(t.capitalize() for t in
get_tactics(get_technique(mitre_techniques, technique_id))),
valign_top)
worksheet_visibility.write(y, 3, ', '.join(visibility['applicable_to']), wrap_text)
vs = visibility['score']
worksheet_visibility.write(y, 4, vs, visibility_score_1 if vs == 1 else visibility_score_2 if vs == 2 else visibility_score_3 if vs == 3 else visibility_score_4 if vs == 4 else no_score)

View File

@ -135,7 +135,7 @@ def _upgrade_technique_yaml_10_to_11(file_lines, attack_tech_data):
file_new_lines.append(l)
tech_id = regex_tech_id.search(l).group(1)
tech_name = _get_technique(attack_tech_data, tech_id)['technique']
tech_name = _get_technique(attack_tech_data, tech_id)['name']
file_new_lines.append(' technique_name: ' + tech_name+'\n')
elif regex_detection.match(l):
file_new_lines.append(l)