Mapped data sources to platforms

master
Marcus Bakker 2020-02-10 12:17:00 +01:00
parent 10f2c4bfd5
commit 363beab8a5
6 changed files with 217 additions and 90 deletions

View File

@ -1,7 +1,7 @@
<img src="https://github.com/rabobank-cdc/DeTTECT/wiki/images/logo.png" alt="DeTT&CT" width=30% height=30%>
#### Detect Tactics, Techniques & Combat Threats
Latest version: [1.2.6](https://github.com/rabobank-cdc/DeTTECT/wiki/Changelog#version-126)
Latest version: [1.2.7](https://github.com/rabobank-cdc/DeTTECT/wiki/Changelog#version-127)
To get started with DeTT&CT, check out this [page](https://github.com/rabobank-cdc/DeTTECT/wiki/Getting-started), our [talk](https://www.youtube.com/watch?v=_kWpekkhomU) at hack.lu 2019 and our blog on:
- [mbsecure.nl/blog/2019/5/dettact-mapping-your-blue-team-to-mitre-attack](https://www.mbsecure.nl/blog/2019/5/dettact-mapping-your-blue-team-to-mitre-attack) or

View File

@ -2,7 +2,7 @@ import re
APP_NAME = 'DeTT&CT'
APP_DESC = 'Detect Tactics, Techniques & Combat Threats'
VERSION = '1.2.6'
VERSION = '1.2.7'
EXPIRE_TIME = 60 * 60 * 24
@ -131,6 +131,7 @@ YAML_OBJ_VISIBILITY = {'applicable_to': ['all'],
'auto_generated': True}
]
}
YAML_OBJ_DETECTION = {'applicable_to': ['all'],
'location': [''],
'comment': '',
@ -139,7 +140,7 @@ YAML_OBJ_DETECTION = {'applicable_to': ['all'],
{'date': None,
'score': -1,
'comment': ''}
]}
]}
YAML_OBJ_TECHNIQUE = {'technique_id': '',
'technique_name': '',
@ -175,3 +176,42 @@ HEALTH_ERROR_TXT = '[!] The below YAML file contains possible errors. It\'s reco
PLATFORMS = {'windows': 'Windows', 'linux': 'Linux', 'macos': 'macOS', 'aws': 'AWS', 'gcp': 'GCP', 'azure': 'Azure',
'azure ad': 'Azure AD', 'office 365': 'Office 365', 'saas': 'SaaS'}
# Data sources applicable per platform
DATA_SOURCES = {'Windows': ['Access tokens', 'Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Binary file metadata', 'BIOS', 'Browser extensions',
'Component firmware', 'Data loss prevention', 'Detonation chamber', 'Digital certificate logs', 'Disk forensics', 'DLL monitoring', 'DNS records', 'EFI', 'Email gateway',
'Environment variable', 'File monitoring', 'Host network interface', 'Kernel drivers', 'Loaded DLLs', 'Mail server', 'Malware reverse engineering', 'MBR', 'Named Pipes',
'Netflow/Enclave netflow', 'Network device logs', 'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'PowerShell logs',
'Process command-line parameters', 'Process monitoring', 'Process use of network', 'Sensor health and status', 'Services', 'SSL/TLS inspection', 'System calls',
'Third-party application logs', 'User interface', 'VBR', 'Web application firewall logs', 'Web logs', 'Web proxy', 'Windows Error Reporting', 'Windows event logs',
'Windows Registry', 'WMI Objects'],
'Linux': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Binary file metadata', 'BIOS', 'Browser extensions', 'Component firmware',
'Data loss prevention', 'Detonation chamber', 'Digital certificate logs', 'Disk forensics', 'DNS records', 'EFI', 'Email gateway', 'Environment variable', 'File monitoring',
'Host network interface', 'Kernel drivers', 'Mail server', 'Malware reverse engineering', 'MBR', 'Named Pipes', 'Netflow/Enclave netflow', 'Network device logs',
'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'PowerShell logs', 'Process command-line parameters', 'Process monitoring',
'Process use of network', 'Sensor health and status', 'Services', 'SSL/TLS inspection', 'System calls', 'Third-party application logs', 'User interface', 'VBR',
'Web application firewall logs', 'Web logs', 'Web proxy'],
'macOS': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Binary file metadata', 'BIOS', 'Browser extensions', 'Component firmware',
'Data loss prevention', 'Detonation chamber', 'Digital certificate logs', 'Disk forensics', 'DNS records', 'EFI', 'Email gateway', 'Environment variable', 'File monitoring',
'Host network interface', 'Kernel drivers', 'Mail server', 'Malware reverse engineering', 'MBR', 'Named Pipes', 'Netflow/Enclave netflow', 'Network device logs',
'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'PowerShell logs', 'Process command-line parameters', 'Process monitoring',
'Process use of network', 'Sensor health and status', 'Services', 'SSL/TLS inspection', 'System calls', 'Third-party application logs', 'User interface', 'VBR',
'Web application firewall logs', 'Web logs', 'Web proxy'],
'AWS': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'AWS CloudTrail logs', 'AWS OS logs', 'Binary file metadata', 'Data loss prevention',
'Detonation chamber', 'DNS records', 'Email gateway', 'File monitoring', 'Mail server', 'Malware reverse engineering', 'Netflow/Enclave netflow', 'Network device logs',
'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'Sensor health and status', 'SSL/TLS inspection', 'Third-party application logs',
'Web application firewall logs', 'Web logs', 'Web proxy'],
'GCP': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Binary file metadata', 'Data loss prevention', 'Detonation chamber',
'DNS records', 'Email gateway', 'File monitoring', 'Mail server', 'Malware reverse engineering', 'Netflow/Enclave netflow', 'Network device logs',
'Network intrusion detection system', 'Network protocol analysis', 'Packet capture', 'Sensor health and status', 'SSL/TLS inspection', 'Stackdriver logs',
'Third-party application logs', 'Web application firewall logs', 'Web logs', 'Web proxy'],
'Azure': ['Anti-virus', 'API monitoring', 'Application logs', 'Asset management', 'Authentication logs', 'Azure activity logs', 'Azure OS logs', 'Binary file metadata',
'Data loss prevention', 'DNS records', 'File monitoring', 'Malware reverse engineering', 'Netflow/Enclave netflow', 'Network device logs', 'Network intrusion detection system',
'Network protocol analysis', 'Packet capture', 'Sensor health and status', 'SSL/TLS inspection', 'Third-party application logs', 'Web application firewall logs',
'Web logs', 'Web proxy'],
'Azure AD': ['API monitoring', 'Authentication logs', 'Azure activity logs', 'Malware reverse engineering', 'Sensor health and status'],
'Office 365': ['Anti-virus', 'API monitoring', 'Authentication logs', 'Azure activity logs', 'Data loss prevention', 'Detonation chamber', 'Email gateway', 'Mail server',
'Malware reverse engineering', 'Office 365 account logs', 'Office 365 audit logs', 'Office 365 trace logs', 'Sensor health and status'],
'Saas': ['Anti-virus', 'API monitoring', 'Application logs', 'Authentication logs', 'Data loss prevention', 'Detonation chamber', 'Email gateway', 'Mail server',
'Malware reverse engineering', 'OAuth audit logs', 'Sensor health and status', 'Third-party application logs', 'Web application firewall logs', 'Web logs']
}

View File

@ -1,7 +1,7 @@
from copy import deepcopy
from datetime import datetime
import simplejson
import xlsxwriter
import simplejson
from generic import *
@ -17,7 +17,7 @@ def generate_data_sources_layer(filename):
my_data_sources, name, platform, exceptions = _load_data_sources(filename)
# Do the mapping between my data sources and MITRE data sources:
my_techniques = _map_and_colorize_techniques(my_data_sources, exceptions)
my_techniques = _map_and_colorize_techniques(my_data_sources, platform, exceptions)
layer = get_layer_template_data_sources("Data sources " + name, 'description', 'attack', platform)
layer['techniques'] = my_techniques
@ -66,7 +66,7 @@ def export_data_source_list_to_excel(filename, eql_search=False):
:return:
"""
# pylint: disable=unused-variable
my_data_sources, name, platform, exceptions = _load_data_sources(filename, filter_empty_scores=False)
my_data_sources, name, platforms, exceptions = _load_data_sources(filename, filter_empty_scores=False)
excel_filename = get_non_existing_filename('output/data_sources', 'xlsx')
workbook = xlsxwriter.Workbook(excel_filename)
@ -121,7 +121,9 @@ def export_data_source_list_to_excel(filename, eql_search=False):
ds_miss_text = 'ATT&CK data source is missing from the YAML file'
# pylint: disable=consider-iterating-dictionary
my_ds_list = [ds.lower() for ds in my_data_sources.keys()]
for ds in get_all_mitre_data_sources():
applicable_data_sources = get_applicable_data_sources_platform(platforms)
for ds in applicable_data_sources:
if ds.lower() not in my_ds_list:
ds_obj = deepcopy(YAML_OBJ_DATA_SOURCE)
ds_obj['data_source_name'] = ds
@ -157,9 +159,9 @@ def export_data_source_list_to_excel(filename, eql_search=False):
score += v
score_count += 1
if score > 0:
score = score/score_count
score = score / score_count
worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score)
worksheet.write(y, 11, score, dq_score_1 if score < 2 else dq_score_2 if score < 3 else dq_score_3 if score < 4 else dq_score_4 if score < 5 else dq_score_5 if score < 6 else no_score) # noqa
y += 1
worksheet.autofilter(2, 0, 2, 11)
@ -207,22 +209,40 @@ def _load_data_sources(file, filter_empty_scores=True):
return my_data_sources, name, platform, exceptions
def _map_and_colorize_techniques(my_ds, exceptions):
def _count_applicable_data_sources(technique, applicable_data_sources):
"""
get the count of applicable data sources for the provided technique.
This takes into account which data sources are applicable for a platform(s)
:param technique: ATT&CK CTI technique object
:param applicable_data_sources: a list of applicable ATT&CK data sources
:return: a count of the applicable data sources for this technique
"""
applicable_ds_count = 0
for ds in technique['x_mitre_data_sources']:
if ds in applicable_data_sources:
applicable_ds_count += 1
return applicable_ds_count
def _map_and_colorize_techniques(my_ds, platforms, exceptions):
"""
Determine the color of the techniques based on how many data sources are available per technique.
:param my_ds: the configured data sources
:param platforms: the configured platform(s)
:param exceptions: the list of ATT&CK technique exception within the data source YAML file
:return: a dictionary with techniques that can be used in the layer's output file
"""
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
applicable_data_sources = get_applicable_data_sources_platform(platforms)
technique_colors = {}
# Color the techniques based on how many data sources are available.
for t in techniques:
if 'x_mitre_data_sources' in t:
total_ds_count = len(t['x_mitre_data_sources'])
total_ds_count = _count_applicable_data_sources(t, applicable_data_sources)
ds_count = 0
for ds in t['x_mitre_data_sources']:
if ds in my_ds.keys():
if ds in my_ds.keys() and ds in applicable_data_sources:
ds_count += 1
if total_ds_count > 0:
result = (float(ds_count) / float(total_ds_count)) * 100
@ -234,7 +254,7 @@ def _map_and_colorize_techniques(my_ds, exceptions):
output_techniques = []
for t, v in my_techniques.items():
if t not in exceptions:
if t not in exceptions and t in technique_colors:
for tactic in v['tactics']:
d = dict()
d['techniqueID'] = t
@ -243,8 +263,10 @@ def _map_and_colorize_techniques(my_ds, exceptions):
d['enabled'] = True
d['tactic'] = tactic.lower().replace(' ', '-')
d['metadata'] = [{'name': '-Available data sources', 'value': ', '.join(v['my_data_sources'])},
{'name': '-ATT&CK data sources', 'value': ', '.join(v['data_sources'])},
{'name': '-ATT&CK data sources', 'value': ', '.join(get_applicable_data_sources_technique(v['data_sources'],
applicable_data_sources))},
{'name': '-Products', 'value': ', '.join(v['products'])}]
d['metadata'] = make_layer_metadata_compliant(d['metadata'])
output_techniques.append(d)
@ -335,7 +357,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
if new_tech['technique_id'] in tech_ids_new:
are_scores_updated = True
yaml_file_tech_admin['techniques'].append(new_tech)
tech_new_print.append(' - ' + new_tech['technique_id']+'\n')
tech_new_print.append(' - ' + new_tech['technique_id'] + '\n')
x += 1
print('The following new technique IDs are added to the technique administration file with a visibility '
@ -353,7 +375,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
for cur_tech, cur_values in cur_visibility_scores.items():
new_tech = _get_technique_yaml_obj(new_visibility_scores['techniques'], cur_tech)
if new_tech: # new_tech will be None if technique_id is part of the 'exception' list within the
# data source administration file
# data source administration file
new_score = new_tech['visibility']['score_logbook'][0]['score']
for cur_obj in cur_values['visibility']:
@ -377,7 +399,8 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
# ask how the score should be updated
answer = 0
if mix_scores:
answer = ask_multiple_choice(V_UPDATE_Q_MIXED, [V_UPDATE_ANSWER_3, V_UPDATE_ANSWER_4, V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
answer = ask_multiple_choice(V_UPDATE_Q_MIXED, [V_UPDATE_ANSWER_3, V_UPDATE_ANSWER_4,
V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
elif manually_scored:
answer = ask_multiple_choice(V_UPDATE_Q_ALL_MANUAL, [V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
elif auto_scored:
@ -442,7 +465,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
elif update_action == V_UPDATE_ACTION_DIFF:
print('-' * 80)
tmp_txt = '[updates remaining: ' + str(updated_vis_score_cnt - score_updates_handled) + ']'
print(' ' * (80-len(tmp_txt)) + tmp_txt)
print(' ' * (80 - len(tmp_txt)) + tmp_txt)
print('')
print('Visibility object:')
print(' - ATT&CK ID/name ' + tech_id + ' / ' + tech_name)
@ -458,7 +481,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
print(' - Date: ' + new_score_obj['date'])
print(' - Score: ' + str(new_score_obj['score']))
print(' - Visibility comment: ' + _indent_comment(new_score_obj['comment'], 23))
print(' - Auto generated: true')
print(' - Auto generated: True')
print('')
if ask_yes_no('Update the score?'):
are_scores_updated = True
@ -484,6 +507,8 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
print('No visibility scores have been updated.')
# pylint: disable=redefined-outer-name
def generate_technique_administration_file(filename, write_file=True, all_techniques=False):
"""
Generate a technique administration file based on the data source administration YAML file
@ -496,6 +521,7 @@ def generate_technique_administration_file(filename, write_file=True, all_techni
my_data_sources, name, platform, exceptions = _load_data_sources(filename)
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH_ENTERPRISE)
applicable_data_sources = get_applicable_data_sources_platform(platform)
yaml_file = dict()
yaml_file['version'] = FILE_TYPE_TECHNIQUE_ADMINISTRATION_VERSION
@ -511,10 +537,10 @@ def generate_technique_administration_file(filename, write_file=True, all_techni
if platform == 'all' or len(set(platforms).intersection(set(platform))) > 0:
# not every technique has data source listed
if 'x_mitre_data_sources' in t:
total_ds_count = len(t['x_mitre_data_sources'])
total_ds_count = _count_applicable_data_sources(t, applicable_data_sources)
ds_count = 0
for ds in t['x_mitre_data_sources']:
if ds in my_data_sources.keys():
if ds in my_data_sources.keys() and ds in applicable_data_sources:
ds_count += 1
if total_ds_count > 0:
result = (float(ds_count) / float(total_ds_count)) * 100
@ -549,7 +575,8 @@ def generate_technique_administration_file(filename, write_file=True, all_techni
# remove the single quotes from the date
yaml_file_lines = fix_date_and_remove_null(file_lines, today, input_type='list')
output_filename = get_non_existing_filename('output/techniques-administration-' + normalize_name_to_filename(name + '-' + platform_to_name(platform)), 'yaml')
output_filename = get_non_existing_filename('output/techniques-administration-' +
normalize_name_to_filename(name + '-' + platform_to_name(platform)), 'yaml')
with open(output_filename, 'w') as f:
f.writelines(yaml_file_lines)
print("File written: " + output_filename)

View File

@ -2,8 +2,8 @@ import os
import shutil
import pickle
from io import StringIO
from ruamel.yaml import YAML
from datetime import datetime as dt
from ruamel.yaml import YAML
from upgrade import upgrade_yaml_file
from constants import *
from health import check_yaml_file_health
@ -173,6 +173,10 @@ def load_attack_data(data_type):
def init_yaml():
"""
Initialize ruamel.yaml with the correct settings
:return: am uamel.yaml object
"""
_yaml = YAML()
_yaml.Representer.ignore_aliases = lambda *args: True # disable anchors/aliases
return _yaml
@ -268,7 +272,7 @@ def get_layer_template_detections(name, description, stage, platform):
{'label': 'Detection score 3: Good', 'color': COLOR_D_3},
{'label': 'Detection score 4: Very good', 'color': COLOR_D_4},
{'label': 'Detection score 5: Excellent', 'color': COLOR_D_5}
]
]
return layer
@ -291,7 +295,7 @@ def get_layer_template_data_sources(name, description, stage, platform):
{'label': '51-75% of data sources available', 'color': COLOR_DS_75p},
{'label': '76-99% of data sources available', 'color': COLOR_DS_99p},
{'label': '100% of data sources available', 'color': COLOR_DS_100p}
]
]
return layer
@ -313,7 +317,7 @@ def get_layer_template_visibility(name, description, stage, platform):
{'label': 'Visibility score 2: Medium', 'color': COLOR_V_2},
{'label': 'Visibility score 3: Good', 'color': COLOR_V_3},
{'label': 'Visibility score 4: Excellent', 'color': COLOR_V_4}
]
]
return layer
@ -334,7 +338,7 @@ def get_layer_template_layered(name, description, stage, platform):
{'label': 'Visibility', 'color': COLOR_OVERLAY_VISIBILITY},
{'label': 'Detection', 'color': COLOR_OVERLAY_DETECTION},
{'label': 'Visibility and detection', 'color': COLOR_OVERLAY_BOTH}
]
]
return layer
@ -601,6 +605,39 @@ def platform_to_name(platform, separator='-'):
return ''
def get_applicable_data_sources_platform(platforms):
"""
Get the applicable ATT&CK data sources for the provided platform(s)
:param platforms: the ATT&CK platform(s)
:return: a list of applicable ATT&CK data sources
"""
applicable_data_sources = set()
if platforms == 'all' or 'all' in platforms:
# pylint: disable=unused-variable
for k, v in DATA_SOURCES.items():
applicable_data_sources.update(v)
else:
for p in platforms:
applicable_data_sources.update(DATA_SOURCES[p])
return list(applicable_data_sources)
def get_applicable_data_sources_technique(technique_data_sources, platform_applicable_data_sources):
"""
Get the applicable ATT&CK data sources for the provided technique's data sources (for which the source is ATT&CK CTI)
:param technique_data_sources: the ATT&CK technique's data sources
:param platform_applicable_data_sources: a list of applicable ATT&CK data sources based on 'DATA_SOURCES'
:return: a list of applicable data sources
"""
applicable_data_sources = set()
for ds in technique_data_sources:
if ds in platform_applicable_data_sources:
applicable_data_sources.add(ds)
return list(applicable_data_sources)
def map_techniques_to_data_sources(techniques, my_data_sources):
"""
This function maps the MITRE ATT&CK techniques to your data sources.
@ -806,6 +843,20 @@ def check_file(filename, file_type=None, health_is_called=False):
return yaml_content # value is None
def make_layer_metadata_compliant(metadata):
"""
Make sure the metadata values in the Navigator layer file are compliant with the expected data structure
from the latest version on: https://github.com/mitre-attack/attack-navigator/tree/master/layers
:param metadata: list of metadata dictionaries
:return: compliant list of metadata dictionaries
"""
for md_item in metadata:
if not md_item['value'] or md_item['value'] == '':
md_item['value'] = '-'
return metadata
def get_updates(update_type, sort='modified'):
"""
Print a list of updates for a techniques, groups or software. Sort by modified or creation date.

View File

@ -94,7 +94,7 @@ def check_health_data_sources(filename, ds_content, health_is_called, no_print=F
ATT&CK Platform is not part of the EQL search result
:return: False if no errors have been found, otherwise True
"""
from generic import get_all_mitre_data_sources
from generic import get_applicable_data_sources_platform
has_error = False
platform = ds_content.get('platform', None)
@ -113,8 +113,8 @@ def check_health_data_sources(filename, ds_content, health_is_called, no_print=F
health_is_called)
ds_list = [kv['data_source_name'].lower() for kv in ds_content['data_sources']]
ds_list_mitre = get_all_mitre_data_sources()
for ds in ds_list_mitre:
applicable_data_sources = get_applicable_data_sources_platform(platform)
for ds in applicable_data_sources:
if ds.lower() not in ds_list:
has_error = _print_error_msg('[!] Data source: \'' + ds + '\' is MISSING from the YAML file', health_is_called)
@ -122,30 +122,34 @@ def check_health_data_sources(filename, ds_content, health_is_called, no_print=F
# check for missing keys
for key in ['data_source_name', 'date_registered', 'date_connected', 'products', 'available_for_data_analytics', 'comment', 'data_quality']:
if key not in ds:
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' is MISSING a key-value pair: ' + key, health_is_called)
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] +
'\' is MISSING a key-value pair: ' + key, health_is_called)
for key in ['date_registered', 'date_connected']:
if key in ds and not ds[key] is None:
try:
# noinspection PyStatementEffect
# pylint: disable=pointless-statement
ds[key].year
# noinspection PyStatementEffect
# pylint: disable=pointless-statement
ds[key].month
# noinspection PyStatementEffect
# pylint: disable=pointless-statement
ds[key].day
except AttributeError:
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID data format for the dimension \'' + dimension
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID data format for the key-value pair \'' + key
+ '\': ' + ds[key] + ' (should be YYYY-MM-DD without quotes)', health_is_called)
print(type(ds[key]))
if 'available_for_data_analytics' in ds:
if not isinstance(ds['available_for_data_analytics'], bool):
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID \'available_for_data_analytics\' value: should be set to \'true\' or \'false\'', health_is_called)
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] +
'\' has an INVALID \'available_for_data_analytics\' value: should be set to \'true\' or \'false\'', health_is_called)
if 'data_quality' in ds:
if isinstance(ds['data_quality'], dict):
for dimension in ['device_completeness', 'data_field_completeness', 'timeliness', 'consistency', 'retention']:
if dimension not in ds['data_quality']:
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' is MISSING a key-value pair in \'data_quality\': ' + dimension, health_is_called)
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] +
'\' is MISSING a key-value pair in \'data_quality\': ' + dimension, health_is_called)
else:
if isinstance(ds['data_quality'][dimension], int):
if not 0 <= ds['data_quality'][dimension] <= 5:
@ -155,14 +159,16 @@ def check_health_data_sources(filename, ds_content, health_is_called, no_print=F
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID data quality score for the dimension \'' +
dimension + '\': ' + str(ds['data_quality'][dimension]) + ' (should be an an integer)', health_is_called)
else:
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' the key-value pair \'data_quality\' is NOT a dictionary with data quality dimension scores', health_is_called)
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] +
'\' the key-value pair \'data_quality\' is NOT a dictionary with data quality dimension scores', health_is_called)
if 'exceptions' in ds_content:
for tech in ds_content['exceptions']:
tech_id = str(tech['technique_id'])
if not REGEX_YAML_TECHNIQUE_ID_FORMAT.match(tech_id) and tech_id != 'None':
has_error = _print_error_msg('[!] INVALID technique ID in the \'exceptions\' list of data source admin. file: ' + tech_id, health_is_called)
has_error = _print_error_msg(
'[!] INVALID technique ID in the \'exceptions\' list of data source admin. file: ' + tech_id, health_is_called)
if has_error and not health_is_called and not no_print:
print(HEALTH_ERROR_TXT + filename)
@ -199,13 +205,16 @@ def _check_health_score_object(yaml_object, object_type, tech_id, health_is_call
for score_obj in yaml_object['score_logbook']:
for key in ['date', 'score', 'comment']:
if key not in score_obj:
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' is MISSING a key-value pair in a ' + object_type + ' score object within the \'score_logbook\': ' + key, health_is_called)
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' is MISSING a key-value pair in a ' +
object_type + ' score object within the \'score_logbook\': ' + key, health_is_called)
if score_obj['score'] is None:
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object within the \'score_logbook\': score', health_is_called)
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' +
object_type + ' score object within the \'score_logbook\': score', health_is_called)
elif not isinstance(score_obj['score'], int):
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID score format in a ' + object_type + ' score object within the \'score_logbook\': ' + score_obj['score'] + ' (should be an integer)', health_is_called)
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID score format in a ' + object_type +
' score object within the \'score_logbook\': ' + score_obj['score'] + ' (should be an integer)', health_is_called)
if 'auto_generated' in score_obj:
if not isinstance(score_obj['auto_generated'], bool):
@ -214,23 +223,24 @@ def _check_health_score_object(yaml_object, object_type, tech_id, health_is_call
if isinstance(score_obj['score'], int):
if score_obj['date'] is None and ((score_obj['score'] > -1 and object_type == 'detection') or (score_obj['score'] > 0 and object_type == 'visibility')):
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object within the \'score_logbook\': date', health_is_called)
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' +
object_type + ' score object within the \'score_logbook\': date', health_is_called)
# noinspection PyChainedComparisons
if not (score_obj['score'] >= min_score and score_obj['score'] <= max_score):
has_error = _print_error_msg(
'[!] Technique ID: ' + tech_id + ' has an INVALID ' + object_type + ' score in a score object within the \'score_logbook\': ' + str(score_obj['score']) + ' (should be between ' + str(min_score) + ' and ' + str(max_score) + ')', health_is_called)
if not score_obj['date'] is None:
try:
# noinspection PyStatementEffect
# pylint: disable=pointless-statement
score_obj['date'].year
# noinspection PyStatementEffect
# pylint: disable=pointless-statement
score_obj['date'].month
# noinspection PyStatementEffect
# pylint: disable=pointless-statement
score_obj['date'].day
except AttributeError:
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID data format in a ' + object_type + ' score object within the \'score_logbook\': ' + score_obj['date'] + ' (should be YYYY-MM-DD without quotes)', health_is_called)
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID data format in a ' + object_type +
' score object within the \'score_logbook\': ' + score_obj['date'] + ' (should be YYYY-MM-DD without quotes)', health_is_called)
except KeyError:
pass
@ -296,12 +306,14 @@ def _check_health_techniques(filename, technique_content, health_is_called):
for okey in obj_keys:
if okey not in obj:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING a key-value pair in \'' + obj_type + '\': ' + okey, health_is_called)
has_error = _print_error_msg('[!] Technique ID: ' + tech +
' is MISSING a key-value pair in \'' + obj_type + '\': ' + okey, health_is_called)
for okey in obj_keys_list:
if okey in obj:
if not isinstance(obj[okey], list):
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + obj_type + '\' is NOT a list', health_is_called)
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey +
'\' in \'' + obj_type + '\' is NOT a list', health_is_called)
for okey in obj_keys_not_none:
if okey in obj:
@ -310,9 +322,11 @@ def _check_health_techniques(filename, technique_content, health_is_called):
if item is None:
none_count += 1
if none_count == 1:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + obj_type + '\' has an EMPTY value (an empty string is allowed: \'\')', health_is_called)
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' +
obj_type + '\' has an EMPTY value (an empty string is allowed: \'\')', health_is_called)
elif none_count > 1:
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + obj_type + '\' has multiple EMPTY values (an empty string is allowed: \'\')', health_is_called)
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + obj_type +
'\' has multiple EMPTY values (an empty string is allowed: \'\')', health_is_called)
health = _check_health_score_object(obj, obj_type, tech, health_is_called)
has_error = _update_health_state(has_error, health)
@ -330,7 +344,8 @@ def _check_health_techniques(filename, technique_content, health_is_called):
similar.add(i2)
if len(similar) > 0:
has_error = _print_error_msg('[!] There are values in the key-value pairs for \'applicable_to\' which are very similar. Correct where necessary:', health_is_called)
has_error = _print_error_msg(
'[!] There are values in the key-value pairs for \'applicable_to\' which are very similar. Correct where necessary:', health_is_called)
for s in similar:
_print_error_msg(' - ' + s, health_is_called)

View File

@ -1,6 +1,6 @@
import simplejson
from generic import *
import xlsxwriter
from generic import *
from datetime import datetime
# Imports for pandas and plotly are because of performance reasons in the function that uses these libraries.
@ -21,7 +21,7 @@ def generate_detection_layer(filename_techniques, filename_data_sources, overlay
else:
my_techniques, name, platform = load_techniques(filename_techniques)
my_data_sources = _load_data_sources(filename_data_sources)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform)
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name)
@ -38,28 +38,29 @@ def generate_visibility_layer(filename_techniques, filename_data_sources, overla
if not overlay:
my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources)
mapped_techniques_visibility = _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources, platform)
layer_visibility = get_layer_template_visibility('Visibility ' + name, 'description', 'attack', platform)
_write_layer(layer_visibility, mapped_techniques_visibility, 'visibility', name)
else:
my_techniques, name, platform = load_techniques(filename_techniques)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources)
mapped_techniques_both = _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platform)
layer_both = get_layer_template_layered('Visibility and Detection ' + name, 'description', 'attack', platform)
_write_layer(layer_both, mapped_techniques_both, 'visibility_and_detection', name)
def plot_graph(filename, type):
def plot_graph(filename, type_graph):
"""
Generates a line graph which shows the improvements on detections through the time.
:param filename: the filename of the YAML file containing the techniques administration
:param type: indicates the type of the graph: detection or visibility
:param type_graph: indicates the type of the graph: detection or visibility
:return:
"""
# pylint: disable=unused-variable
my_techniques, name, platform = load_techniques(filename)
graph_values = []
for t in my_techniques.values():
for item in t[type]:
for item in t[type_graph]:
date = get_latest_date(item)
if date:
yyyymm = date.strftime('%Y-%m')
@ -69,13 +70,13 @@ def plot_graph(filename, type):
df = pd.DataFrame(graph_values).groupby('date', as_index=False)[['count']].sum()
df['cumcount'] = df['count'].cumsum()
output_filename = get_non_existing_filename('output/graph_%s' % type, 'html')
output_filename = get_non_existing_filename('output/graph_%s' % type_graph, 'html')
import plotly
import plotly.graph_objs as go
plotly.offline.plot(
{'data': [go.Scatter(x=df['date'], y=df['cumcount'])],
'layout': go.Layout(title="# of %s items for %s" % (type, name))},
'layout': go.Layout(title="# of %s items for %s" % (type_graph, name))},
filename=output_filename, auto_open=False
)
print("File written: " + output_filename)
@ -122,20 +123,6 @@ def _write_layer(layer, mapped_techniques, filename_prefix, name):
write_file(filename_prefix, name, json_string)
def _layer_metadata_make_compliant(metadata):
"""
Make sure the metadata values in the Navigator layer file are compliant with the expected data structure
from the latest version on: https://github.com/mitre-attack/attack-navigator/tree/master/layers
:param metadata: list of metadata dictionaries
:return: compliant list of metadata dictionaries
"""
for md_item in metadata:
if not md_item['value'] or md_item['value'] == '':
md_item['value'] = '-'
return metadata
def _map_and_colorize_techniques_for_detections(my_techniques):
"""
Determine the color of the techniques based on the detection score in the given YAML file.
@ -153,7 +140,7 @@ def _map_and_colorize_techniques_for_detections(my_techniques):
if s != -1:
color = COLOR_D_0 if s == 0 else COLOR_D_1 if s == 1 else COLOR_D_2 if s == 2 else COLOR_D_3 \
if s == 3 else COLOR_D_4 if s == 4 else COLOR_D_5 if s == 5 else ''
if s == 3 else COLOR_D_4 if s == 4 else COLOR_D_5 if s == 5 else ''
technique = get_technique(techniques, technique_id)
for tactic in get_tactics(technique):
@ -180,7 +167,7 @@ def _map_and_colorize_techniques_for_detections(my_techniques):
if cnt != tcnt:
x['metadata'].append({'name': '---', 'value': '---'})
cnt += 1
x['metadata'] = _layer_metadata_make_compliant(x['metadata'])
x['metadata'] = make_layer_metadata_compliant(x['metadata'])
mapped_techniques.append(x)
except Exception as e:
print('[!] Possible error in YAML file at: %s. Error: %s' % (technique_id, str(e)))
@ -189,14 +176,16 @@ def _map_and_colorize_techniques_for_detections(my_techniques):
return mapped_techniques
def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources, platforms):
"""
Determine the color of the techniques based on the visibility score in the given YAML file.
:param my_techniques: the configured techniques
:param my_data_sources: the configured data sources
:param platforms: the configured platform(s)
:return: a dictionary with techniques that can be used in the layer's output file
"""
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
applicable_data_sources = get_applicable_data_sources_platform(platforms)
technique_ds_mapping = map_techniques_to_data_sources(techniques, my_data_sources)
@ -208,7 +197,7 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
if s == 0:
s = None
my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else ''
my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else '' # noqa
technique = get_technique(techniques, technique_id)
color = COLOR_V_1 if s == 1 else COLOR_V_2 if s == 2 else COLOR_V_3 if s == 3 else COLOR_V_4 if s == 4 else ''
@ -221,7 +210,8 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
x['tactic'] = tactic.lower().replace(' ', '-')
x['metadata'] = []
x['metadata'].append({'name': '-Available data sources', 'value': my_ds})
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['x_mitre_data_sources'])})
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(get_applicable_data_sources_technique(technique['x_mitre_data_sources'],
applicable_data_sources))})
x['metadata'].append({'name': '---', 'value': '---'})
x['score'] = s
@ -237,7 +227,7 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
x['metadata'].append({'name': '---', 'value': '---'})
cnt += 1
x['metadata'] = _layer_metadata_make_compliant(x['metadata'])
x['metadata'] = make_layer_metadata_compliant(x['metadata'])
mapped_techniques.append(x)
for t in techniques:
@ -251,23 +241,25 @@ def _map_and_colorize_techniques_for_visibility(my_techniques, my_data_sources):
x['comment'] = ''
x['enabled'] = True
x['tactic'] = tactic.lower().replace(' ', '-')
ds = ', '.join(t['x_mitre_data_sources']) if 'x_mitre_data_sources' in t else ''
ds = ', '.join(get_applicable_data_sources_technique(t['x_mitre_data_sources'], applicable_data_sources)) if 'x_mitre_data_sources' in t else '' # noqa
x['metadata'] = [{'name': '-ATT&CK data sources', 'value': ds}]
x['metadata'] = _layer_metadata_make_compliant(x['metadata'])
x['metadata'] = make_layer_metadata_compliant(x['metadata'])
mapped_techniques.append(x)
return mapped_techniques
def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources):
def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources, platforms):
"""
Determine the color of the techniques based on both detection and visibility.
:param my_techniques: the configured techniques
:param my_data_sources: the configured data sources
:param platforms: the configured platform(s)
:return: a dictionary with techniques that can be used in the layer's output file
"""
techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
applicable_data_sources = get_applicable_data_sources_platform(platforms)
technique_ds_mapping = map_techniques_to_data_sources(techniques, my_data_sources)
@ -290,7 +282,7 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources):
else:
color = COLOR_WHITE
my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else ''
my_ds = ', '.join(technique_ds_mapping[technique_id]['my_data_sources']) if technique_id in technique_ds_mapping.keys() and technique_ds_mapping[technique_id]['my_data_sources'] else '' # noqa
technique = get_technique(techniques, technique_id)
for tactic in get_tactics(technique):
@ -302,7 +294,8 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources):
x['tactic'] = tactic.lower().replace(' ', '-')
x['metadata'] = []
x['metadata'].append({'name': '-Available data sources', 'value': my_ds})
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(technique['x_mitre_data_sources'])})
x['metadata'].append({'name': '-ATT&CK data sources', 'value': ', '.join(get_applicable_data_sources_technique(technique['x_mitre_data_sources'],
applicable_data_sources))})
x['metadata'].append({'name': '---', 'value': '---'})
# Metadata for detection:
@ -337,7 +330,7 @@ def _map_and_colorize_techniques_for_overlaid(my_techniques, my_data_sources):
x['metadata'].append({'name': '---', 'value': '---'})
cnt += 1
x['metadata'] = _layer_metadata_make_compliant(x['metadata'])
x['metadata'] = make_layer_metadata_compliant(x['metadata'])
mapped_techniques.append(x)
return mapped_techniques
@ -349,6 +342,7 @@ def export_techniques_list_to_excel(filename):
:param filename: the filename of the YAML file containing the techniques administration
:return:
"""
# pylint: disable=unused-variable
my_techniques, name, platform = load_techniques(filename)
my_techniques = dict(sorted(my_techniques.items(), key=lambda kv: kv[0], reverse=False))
mitre_techniques = load_attack_data(DATA_TYPE_STIX_ALL_TECH)
@ -422,7 +416,7 @@ def export_techniques_list_to_excel(filename):
tmp_date = tmp_date.strftime('%Y-%m-%d')
worksheet_detections.write(y, 4, str(tmp_date).replace('None', ''), valign_top)
ds = get_latest_score(detection)
worksheet_detections.write(y, 5, ds, detection_score_0 if ds == 0 else detection_score_1 if ds == 1 else detection_score_2 if ds == 2 else detection_score_3 if ds == 3 else detection_score_4 if ds == 4 else detection_score_5 if ds == 5 else no_score)
worksheet_detections.write(y, 5, ds, detection_score_0 if ds == 0 else detection_score_1 if ds == 1 else detection_score_2 if ds == 2 else detection_score_3 if ds == 3 else detection_score_4 if ds == 4 else detection_score_5 if ds == 5 else no_score) # noqa
worksheet_detections.write(y, 6, '\n'.join(detection['location']), wrap_text)
worksheet_detections.write(y, 7, detection['comment'][:-1] if detection['comment'].endswith('\n') else detection['comment'], wrap_text)
d_comment = get_latest_comment(detection)
@ -463,7 +457,7 @@ def export_techniques_list_to_excel(filename):
tmp_date = tmp_date.strftime('%Y-%m-%d')
worksheet_visibility.write(y, 4, str(tmp_date).replace('None', ''), valign_top)
vs = get_latest_score(visibility)
worksheet_visibility.write(y, 5, vs, visibility_score_1 if vs == 1 else visibility_score_2 if vs == 2 else visibility_score_3 if vs == 3 else visibility_score_4 if vs == 4 else no_score)
worksheet_visibility.write(y, 5, vs, visibility_score_1 if vs == 1 else visibility_score_2 if vs == 2 else visibility_score_3 if vs == 3 else visibility_score_4 if vs == 4 else no_score) # noqa
v_comment = get_latest_comment(visibility)
worksheet_visibility.write(y, 6, visibility['comment'][:-1] if visibility['comment'].endswith('\n') else visibility['comment'], wrap_text)
worksheet_visibility.write(y, 7, v_comment[:-1] if v_comment.endswith('\n') else v_comment, wrap_text)