- Added new functionality for Mitigations statistics.
- Moved multiple functions.master
parent
cf4a55081c
commit
3d11aa5835
133
generic.py
133
generic.py
|
@ -57,9 +57,9 @@ def load_attack_data(data_type):
|
|||
attack_data = None
|
||||
if data_type == DATA_TYPE_STIX_ALL_RELATIONSHIPS:
|
||||
attack_data = mitre.get_relationships()
|
||||
if data_type == DATA_TYPE_STIX_ALL_TECH_ENTERPRISE:
|
||||
elif data_type == DATA_TYPE_STIX_ALL_TECH_ENTERPRISE:
|
||||
attack_data = mitre.get_enterprise_techniques()
|
||||
if data_type == DATA_TYPE_CUSTOM_TECH_BY_GROUP:
|
||||
elif data_type == DATA_TYPE_CUSTOM_TECH_BY_GROUP:
|
||||
# First we need to know which technique references (STIX Object type 'attack-pattern') we have for all
|
||||
# groups. This results in a dict: {group_id: Gxxxx, technique_ref/attack-pattern_ref: ...}
|
||||
groups = load_attack_data(DATA_TYPE_STIX_ALL_GROUPS)
|
||||
|
@ -166,6 +166,12 @@ def load_attack_data(data_type):
|
|||
})
|
||||
attack_data = all_group_use
|
||||
|
||||
elif data_type == DATA_TYPE_STIX_ALL_ENTERPRISE_MITIGATIONS:
|
||||
attack_data = mitre.get_enterprise_mitigations()
|
||||
|
||||
elif data_type == DATA_TYPE_STIX_ALL_MOBILE_MITIGATIONS:
|
||||
attack_data = mitre.get_mobile_mitigations()
|
||||
|
||||
_save_attack_data(attack_data, "cache/" + data_type)
|
||||
|
||||
return attack_data
|
||||
|
@ -937,3 +943,126 @@ def check_file(filename, file_type=None, health_is_called=False):
|
|||
return yaml_content['file_type']
|
||||
|
||||
return yaml_content # value is None
|
||||
|
||||
def get_updates(update_type, sort='modified'):
|
||||
"""
|
||||
Print a list of updates for a techniques, groups or software. Sort by modified or creation date.
|
||||
:param update_type: the type of update: techniques, groups or software
|
||||
:param sort: sort the list by modified or creation date
|
||||
:return:
|
||||
"""
|
||||
if update_type[:-1] == 'technique':
|
||||
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
|
||||
sorted_techniques = sorted(techniques, key=lambda k: k[sort])
|
||||
|
||||
for t in sorted_techniques:
|
||||
print(get_attack_id(t) + ' ' + t['name'])
|
||||
print(' ' * 6 + 'created: ' + t['created'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'modified: ' + t['modified'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'matrix: ' + t['external_references'][0]['source_name'][6:])
|
||||
tactics = get_tactics(t)
|
||||
if tactics:
|
||||
print(' ' * 6 + 'tactic: ' + ', '.join(tactics))
|
||||
else:
|
||||
print(' ' * 6 + 'tactic: None')
|
||||
print('')
|
||||
|
||||
elif update_type[:-1] == 'group':
|
||||
groups = load_attack_data(DATA_TYPE_STIX_ALL_GROUPS)
|
||||
sorted_groups = sorted(groups, key=lambda k: k[sort])
|
||||
|
||||
for g in sorted_groups:
|
||||
print(get_attack_id(g) + ' ' + g['name'])
|
||||
print(' ' * 6 + 'created: ' + g['created'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'modified: ' + g['modified'].strftime('%Y-%m-%d'))
|
||||
print('')
|
||||
|
||||
elif update_type == 'software':
|
||||
software = load_attack_data(DATA_TYPE_STIX_ALL_SOFTWARE)
|
||||
sorted_software = sorted(software, key=lambda k: k[sort])
|
||||
|
||||
for s in sorted_software:
|
||||
print(get_attack_id(s) + ' ' + s['name'])
|
||||
print(' ' * 6 + 'created: ' + s['created'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'modified: ' + s['modified'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'matrix: ' + s['external_references'][0]['source_name'][6:])
|
||||
print(' ' * 6 + 'type: ' + s['type'])
|
||||
if 'x_mitre_platforms' in s:
|
||||
print(' ' * 6 + 'platform: ' + ', '.join(s['x_mitre_platforms']))
|
||||
else:
|
||||
print(' ' * 6 + 'platform: None')
|
||||
print('')
|
||||
|
||||
|
||||
def get_statistics_mitigations(matrix):
|
||||
"""
|
||||
Print out statistics related to mitigations and how many techniques they cover
|
||||
:return:
|
||||
"""
|
||||
|
||||
if matrix == 'enterprise':
|
||||
mitigations = load_attack_data(DATA_TYPE_STIX_ALL_ENTERPRISE_MITIGATIONS)
|
||||
elif matrix == 'mobile':
|
||||
mitigations = load_attack_data(DATA_TYPE_STIX_ALL_MOBILE_MITIGATIONS)
|
||||
|
||||
mitigations_dict = dict()
|
||||
for m in mitigations:
|
||||
if m['external_references'][0]['external_id'].startswith('M'):
|
||||
mitigations_dict[m['id']] = {'mID': m['external_references'][0]['external_id'], 'name': m['name']}
|
||||
|
||||
relationships = load_attack_data(DATA_TYPE_STIX_ALL_RELATIONSHIPS)
|
||||
relationships_mitigates = [r for r in relationships if r['relationship_type'] == 'mitigates']
|
||||
|
||||
# {id: {name: ..., count: ..., name: ...} }
|
||||
count_dict = dict()
|
||||
for r in relationships_mitigates:
|
||||
src_ref = r['source_ref']
|
||||
if src_ref.startswith('course-of-action') \
|
||||
and r['target_ref'].startswith('attack-pattern') \
|
||||
and src_ref in mitigations_dict:
|
||||
|
||||
m = mitigations_dict[src_ref]
|
||||
if m['mID'] not in count_dict:
|
||||
count_dict[m['mID']] = dict()
|
||||
count_dict[m['mID']]['count'] = 1
|
||||
count_dict[m['mID']]['name'] = m['name']
|
||||
else:
|
||||
count_dict[m['mID']]['count'] += 1
|
||||
|
||||
count_dict_sorted = dict(sorted(count_dict.items(), key=lambda kv: kv[1]['count'], reverse=True))
|
||||
|
||||
str_format = '{:<6s} {:<14s} {:s}'
|
||||
print(str_format.format('Count', 'Mitigation ID', 'Name'))
|
||||
print('-' * 60)
|
||||
for k, v in count_dict_sorted.items():
|
||||
print(str_format.format(str(v['count']), k, v['name']))
|
||||
|
||||
|
||||
def get_statistics_data_sources():
|
||||
"""
|
||||
Print out statistics related to data sources and how many techniques they cover.
|
||||
:return:
|
||||
"""
|
||||
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
|
||||
|
||||
# {data_source: {techniques: [T0001, ...}, count: ...}
|
||||
data_sources_dict = {}
|
||||
for tech in techniques:
|
||||
tech_id = get_attack_id(tech)
|
||||
# Not every technique has a data source listed
|
||||
data_sources = try_get_key(tech, 'x_mitre_data_sources')
|
||||
if data_sources:
|
||||
for ds in data_sources:
|
||||
if ds not in data_sources_dict:
|
||||
data_sources_dict[ds] = {'techniques': [tech_id], 'count': 1}
|
||||
else:
|
||||
data_sources_dict[ds]['techniques'].append(tech_id)
|
||||
data_sources_dict[ds]['count'] += 1
|
||||
|
||||
# sort the dict on the value of 'count'
|
||||
data_sources_dict_sorted = dict(sorted(data_sources_dict.items(), key=lambda kv: kv[1]['count'], reverse=True))
|
||||
str_format = '{:<6s} {:s}'
|
||||
print(str_format.format('Count', 'Data Source'))
|
||||
print('-'*50)
|
||||
for k, v in data_sources_dict_sorted.items():
|
||||
print(str_format.format(str(v['count']), k))
|
|
@ -551,82 +551,3 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
|
|||
f.write(json_string)
|
||||
print('Written layer: ' + filename)
|
||||
|
||||
|
||||
def get_updates(update_type, sort='modified'):
|
||||
"""
|
||||
Print a list of updates for a techniques, groups or software. Sort by modified or creation date.
|
||||
:param update_type: the type of update: techniques, groups or software
|
||||
:param sort: sort the list by modified or creation date
|
||||
:return:
|
||||
"""
|
||||
if update_type[:-1] == 'technique':
|
||||
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
|
||||
sorted_techniques = sorted(techniques, key=lambda k: k[sort])
|
||||
|
||||
for t in sorted_techniques:
|
||||
print(get_attack_id(t) + ' ' + t['name'])
|
||||
print(' ' * 6 + 'created: ' + t['created'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'modified: ' + t['modified'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'matrix: ' + t['external_references'][0]['source_name'][6:])
|
||||
tactics = get_tactics(t)
|
||||
if tactics:
|
||||
print(' ' * 6 + 'tactic: ' + ', '.join(tactics))
|
||||
else:
|
||||
print(' ' * 6 + 'tactic: None')
|
||||
print('')
|
||||
|
||||
elif update_type[:-1] == 'group':
|
||||
groups = load_attack_data(DATA_TYPE_STIX_ALL_GROUPS)
|
||||
sorted_groups = sorted(groups, key=lambda k: k[sort])
|
||||
|
||||
for g in sorted_groups:
|
||||
print(get_attack_id(g) + ' ' + g['name'])
|
||||
print(' ' * 6 + 'created: ' + g['created'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'modified: ' + g['modified'].strftime('%Y-%m-%d'))
|
||||
print('')
|
||||
|
||||
elif update_type == 'software':
|
||||
software = load_attack_data(DATA_TYPE_STIX_ALL_SOFTWARE)
|
||||
sorted_software = sorted(software, key=lambda k: k[sort])
|
||||
|
||||
for s in sorted_software:
|
||||
print(get_attack_id(s) + ' ' + s['name'])
|
||||
print(' ' * 6 + 'created: ' + s['created'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'modified: ' + s['modified'].strftime('%Y-%m-%d'))
|
||||
print(' ' * 6 + 'matrix: ' + s['external_references'][0]['source_name'][6:])
|
||||
print(' ' * 6 + 'type: ' + s['type'])
|
||||
if 'x_mitre_platforms' in s:
|
||||
print(' ' * 6 + 'platform: ' + ', '.join(s['x_mitre_platforms']))
|
||||
else:
|
||||
print(' ' * 6 + 'platform: None')
|
||||
print('')
|
||||
|
||||
|
||||
def get_statistics():
|
||||
"""
|
||||
Print out statistics related to data sources and how many techniques they cover.
|
||||
:return:
|
||||
"""
|
||||
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
|
||||
|
||||
# {data_source: {techniques: [T0001, ...}, count: ...}
|
||||
data_sources_dict = {}
|
||||
for tech in techniques:
|
||||
tech_id = get_attack_id(tech)
|
||||
# Not every technique has a data source listed
|
||||
data_sources = try_get_key(tech, 'x_mitre_data_sources')
|
||||
if data_sources:
|
||||
for ds in data_sources:
|
||||
if ds not in data_sources_dict:
|
||||
data_sources_dict[ds] = {'techniques': [tech_id], 'count': 1}
|
||||
else:
|
||||
data_sources_dict[ds]['techniques'].append(tech_id)
|
||||
data_sources_dict[ds]['count'] += 1
|
||||
|
||||
# sort the dict on the value of 'count'
|
||||
data_sources_dict_sorted = dict(sorted(data_sources_dict.items(), key=lambda kv: kv[1]['count'], reverse=True))
|
||||
str_format = '{:<6s} {:s}'
|
||||
print(str_format.format('Count', 'Data Source'))
|
||||
print('-'*50)
|
||||
for k, v in data_sources_dict_sorted.items():
|
||||
print(str_format.format(str(v['count']), k))
|
||||
|
|
Loading…
Reference in New Issue