Implemented a health check for data source administration YAML files.
parent
248c6a07d8
commit
98067447c6
17
dettect.py
17
dettect.py
|
@ -51,6 +51,7 @@ def _init_menu():
|
|||
'not updated without your approval. The updated visibility '
|
||||
'scores are calculated in the same way as with the option: '
|
||||
'-y, --yaml', action='store_true')
|
||||
parser_data_sources.add_argument('--health', help='check the YAML file(s) for errors', action='store_true')
|
||||
|
||||
# create the visibility parser
|
||||
parser_visibility = subparsers.add_parser('visibility', aliases=['v'],
|
||||
|
@ -76,9 +77,9 @@ def _init_menu():
|
|||
action='store_true')
|
||||
parser_visibility.add_argument('-o', '--overlay', help='generate a visibility layer overlaid with detections for '
|
||||
'the ATT&CK navigator', action='store_true')
|
||||
parser_visibility.add_argument('-g', '--graph', help='generate a graph with visibility items added through time',
|
||||
action='store_true')
|
||||
parser_visibility.add_argument('--health', help='check the technique YAML file for errors', action='store_true')
|
||||
parser_visibility.add_argument('-g', '--graph', help='generate a graph with visibility added through time',
|
||||
action='store_true')
|
||||
parser_visibility.add_argument('--health', help='check the YAML file for errors', action='store_true')
|
||||
|
||||
# create the detection parser
|
||||
parser_detection = subparsers.add_parser('detection', aliases=['d'],
|
||||
|
@ -106,9 +107,9 @@ def _init_menu():
|
|||
action='store_true')
|
||||
parser_detection.add_argument('-o', '--overlay', help='generate a detection layer overlaid with visibility for '
|
||||
'the ATT&CK navigator', action='store_true')
|
||||
parser_detection.add_argument('-g', '--graph', help='generate a graph with detection items added through time',
|
||||
parser_detection.add_argument('-g', '--graph', help='generate a graph with detections added through time',
|
||||
action='store_true')
|
||||
parser_detection.add_argument('--health', help='check the technique YAML file for errors', action='store_true')
|
||||
parser_detection.add_argument('--health', help='check the YAML file(s) for errors', action='store_true')
|
||||
|
||||
# create the group parser
|
||||
parser_group = subparsers.add_parser('group', aliases=['g'],
|
||||
|
@ -145,7 +146,7 @@ def _init_menu():
|
|||
'the EQL search. The default behaviour is to only include the '
|
||||
'most recent \'score\' objects',
|
||||
action='store_true', default=False)
|
||||
parser_group.add_argument('--health', help='check the technique YAML file for errors', action='store_true')
|
||||
parser_group.add_argument('--health', help='check the YAML file(s) for errors', action='store_true')
|
||||
|
||||
# create the generic parser
|
||||
parser_generic = subparsers.add_parser('generic', description='Generic functions which will output to stdout.',
|
||||
|
@ -180,14 +181,14 @@ def _menu(menu_parser):
|
|||
interactive_menu()
|
||||
|
||||
elif args.subparser in ['datasource', 'ds']:
|
||||
if check_file(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION):
|
||||
if check_file(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, args.health):
|
||||
file_ds = args.file_ds
|
||||
|
||||
if args.search:
|
||||
file_ds = search(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, args.search)
|
||||
if not file_ds:
|
||||
quit() # something went wrong in executing the search or 0 results where returned
|
||||
if args.update and check_file(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION):
|
||||
if args.update and check_file(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION, args.health):
|
||||
update_technique_administration_file(file_ds, args.file_tech)
|
||||
if args.layer:
|
||||
generate_data_sources_layer(file_ds)
|
||||
|
|
|
@ -0,0 +1,307 @@
|
|||
import os
|
||||
import pickle
|
||||
from difflib import SequenceMatcher
|
||||
from constants import *
|
||||
|
||||
|
||||
def _print_error_msg(msg, print_error):
|
||||
if print_error:
|
||||
print(msg)
|
||||
return True
|
||||
|
||||
|
||||
def _update_health_state(current, update):
|
||||
if current or update:
|
||||
return True
|
||||
else:
|
||||
return update
|
||||
|
||||
|
||||
def _is_file_modified(filename):
|
||||
"""
|
||||
Check if the provided file was modified since the last check
|
||||
:param filename: file location
|
||||
:return: true when modified else false
|
||||
"""
|
||||
last_modified_file = 'cache/last-modified_' + os.path.basename(filename).rstrip('.yaml')
|
||||
|
||||
def _update_modified_date(date):
|
||||
with open(last_modified_file, 'wb') as fd:
|
||||
pickle.dump(date, fd)
|
||||
|
||||
if not os.path.exists(last_modified_file):
|
||||
last_modified = os.path.getmtime(filename)
|
||||
_update_modified_date(last_modified)
|
||||
|
||||
return True
|
||||
else:
|
||||
with open(last_modified_file, 'rb') as f:
|
||||
last_modified_cache = pickle.load(f)
|
||||
last_modified_current = os.path.getmtime(filename)
|
||||
|
||||
if last_modified_cache != last_modified_current:
|
||||
_update_modified_date(last_modified_current)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def _get_health_state_cache(filename):
|
||||
"""
|
||||
Get file health state from disk
|
||||
:param filename: file location
|
||||
:return: the cached error state
|
||||
"""
|
||||
last_error_file = 'cache/last-error-state_' + os.path.basename(filename).rstrip('.yaml')
|
||||
|
||||
if os.path.exists(last_error_file):
|
||||
with open(last_error_file, 'rb') as f:
|
||||
last_error_state_cache = pickle.load(f)
|
||||
|
||||
return last_error_state_cache
|
||||
|
||||
|
||||
def _update_health_state_cache(filename, has_error):
|
||||
"""
|
||||
Write the file health state to disk if changed
|
||||
:param filename: file location
|
||||
"""
|
||||
# the function 'check_health_data_sources' will call this function without providing a filename when
|
||||
# 'check_health_data_sources' is called from '_events_to_yaml' within 'eql_yaml.py'
|
||||
if filename:
|
||||
last_error_file = 'cache/last-error-state_' + os.path.basename(filename).rstrip('.yaml')
|
||||
|
||||
def _update(error):
|
||||
with open(last_error_file, 'wb') as fd:
|
||||
pickle.dump(error, fd)
|
||||
|
||||
if not os.path.exists(last_error_file):
|
||||
_update(has_error)
|
||||
else:
|
||||
error_state_cache = _get_health_state_cache(filename)
|
||||
if error_state_cache != has_error:
|
||||
_update(has_error)
|
||||
|
||||
|
||||
def check_health_data_sources(filename, ds_content, health_is_called, no_print=False):
|
||||
"""
|
||||
Check on errors in the provided data sources administration YAML file.
|
||||
:param filename: YAML file location
|
||||
:param ds_content: content of the YAML file in a list of dicts
|
||||
:param health_is_called: boolean that specifies if detailed errors in the file will be printed to stdout
|
||||
:param no_print: specifies if the non-detailed error message is printed to stdout or not
|
||||
:return: False if no errors have been found, otherwise True
|
||||
"""
|
||||
has_error = False
|
||||
|
||||
for ds in ds_content['data_sources']:
|
||||
# check for missing keys
|
||||
for key in ['data_source_name', 'date_registered', 'date_connected', 'products', 'available_for_data_analytics', 'comment', 'data_quality']:
|
||||
if key not in ds:
|
||||
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' is MISSING a key-value pair: ' + key, health_is_called)
|
||||
|
||||
for key in ['date_registered', 'date_connected']:
|
||||
if key in ds and not ds[key] is None:
|
||||
try:
|
||||
# noinspection PyStatementEffect
|
||||
ds[key].year
|
||||
# noinspection PyStatementEffect
|
||||
ds[key].month
|
||||
# noinspection PyStatementEffect
|
||||
ds[key].day
|
||||
except AttributeError:
|
||||
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID data format for the dimension \'' + dimension
|
||||
+ '\': ' + ds[key] + ' (should be YYYY-MM-DD without quotes)', health_is_called)
|
||||
|
||||
if 'available_for_data_analytics' in ds:
|
||||
if not isinstance(ds['available_for_data_analytics'], bool):
|
||||
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID \'available_for_data_analytics\' value: should be set to \'true\' or \'false\'', health_is_called)
|
||||
|
||||
if 'data_quality' in ds:
|
||||
if isinstance(ds['data_quality'], dict):
|
||||
for dimension in ['device_completeness', 'data_field_completeness', 'timeliness', 'consistency', 'retention']:
|
||||
if dimension not in ds['data_quality']:
|
||||
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' is MISSING a key-value pair in \'data_quality\': ' + dimension, health_is_called)
|
||||
else:
|
||||
if isinstance(ds['data_quality'][dimension], int):
|
||||
if not 0 <= ds['data_quality'][dimension] <= 5:
|
||||
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID data quality score for the dimension \''
|
||||
+ dimension + '\': ' + str(ds['data_quality'][dimension]) + ' (should be between 0 and 5)', health_is_called)
|
||||
else:
|
||||
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' has an INVALID data quality score for the dimension \'' +
|
||||
dimension + '\': ' + str(ds['data_quality'][dimension]) + ' (should be an an integer)', health_is_called)
|
||||
else:
|
||||
has_error = _print_error_msg('[!] Data source: \'' + ds['data_source_name'] + '\' the key-value pair \'data_quality\' is NOT a dictionary with data quality dimension scores', health_is_called)
|
||||
|
||||
if has_error and not health_is_called and not no_print:
|
||||
print(HEALTH_ERROR_TXT + filename)
|
||||
|
||||
_update_health_state_cache(filename, has_error)
|
||||
|
||||
return has_error
|
||||
|
||||
|
||||
def _check_health_score_object(yaml_object, object_type, tech_id, health_is_called):
|
||||
"""
|
||||
Check the health of a score_logbook inside a visibility or detection YAML object
|
||||
:param yaml_object: YAML file lines
|
||||
:param object_type: 'detection' or 'visibility'
|
||||
:param tech_id: ATT&CK technique ID
|
||||
:param health_is_called: boolean that specifies if detailed errors in the file will be printed
|
||||
:return: True if the YAML file is unhealthy, otherwise False
|
||||
"""
|
||||
has_error = False
|
||||
min_score = None
|
||||
max_score = None
|
||||
|
||||
if object_type == 'detection':
|
||||
min_score = -1
|
||||
max_score = 5
|
||||
elif object_type == 'visibility':
|
||||
min_score = 0
|
||||
max_score = 4
|
||||
|
||||
if not isinstance(yaml_object['score_logbook'], list):
|
||||
yaml_object['score_logbook'] = [yaml_object['score_logbook']]
|
||||
|
||||
try:
|
||||
for score_obj in yaml_object['score_logbook']:
|
||||
for key in ['date', 'score', 'comment']:
|
||||
if key not in score_obj:
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' is MISSING a key-value pair in a ' + object_type + ' score object within the \'score_logbook\': ' + key, health_is_called)
|
||||
|
||||
if score_obj['score'] is None:
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object within the \'score_logbook\': score', health_is_called)
|
||||
|
||||
elif not isinstance(score_obj['score'], int):
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID score format in a ' + object_type + ' score object within the \'score_logbook\': ' + score_obj['score'] + ' (should be an integer)', health_is_called)
|
||||
|
||||
if 'auto_generated' in score_obj:
|
||||
if not isinstance(score_obj['auto_generated'], bool):
|
||||
has_error = _print_error_msg(
|
||||
'[!] Technique ID: ' + tech_id + ' has an INVALID \'auto_generated\' value in a ' + object_type + ' score object within the \'score_logbook\': should be set to \'true\' or \'false\'', health_is_called)
|
||||
|
||||
if isinstance(score_obj['score'], int):
|
||||
if score_obj['date'] is None and score_obj['score'] > -1:
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an EMPTY key-value pair in a ' + object_type + ' score object within the \'score_logbook\': date', health_is_called)
|
||||
|
||||
# noinspection PyChainedComparisons
|
||||
if not (score_obj['score'] >= min_score and score_obj['score'] <= max_score):
|
||||
has_error = _print_error_msg(
|
||||
'[!] Technique ID: ' + tech_id + ' has an INVALID ' + object_type + ' score in a score object within the \'score_logbook\': ' + str(score_obj['score']) + ' (should be between ' + str(min_score) + ' and ' + str(max_score) + ')', health_is_called)
|
||||
|
||||
if not score_obj['date'] is None:
|
||||
try:
|
||||
# noinspection PyStatementEffect
|
||||
score_obj['date'].year
|
||||
# noinspection PyStatementEffect
|
||||
score_obj['date'].month
|
||||
# noinspection PyStatementEffect
|
||||
score_obj['date'].day
|
||||
except AttributeError:
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech_id + ' has an INVALID data format in a ' + object_type + ' score object within the \'score_logbook\': ' + score_obj['date'] + ' (should be YYYY-MM-DD without quotes)', health_is_called)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return has_error
|
||||
|
||||
|
||||
def _check_health_techniques(filename, technique_content, health_is_called):
|
||||
"""
|
||||
Check on errors in the provided technique administration YAML file.
|
||||
:param filename: YAML file location
|
||||
:param technique_content: content of the YAML file in a list of dicts
|
||||
:param health_is_called: boolean that specifies if detailed errors in the file will be printed to stdout
|
||||
:return:
|
||||
"""
|
||||
from generic import load_techniques
|
||||
|
||||
has_error = False
|
||||
|
||||
# create a list of ATT&CK technique IDs and check for duplicates
|
||||
tech_ids = list(map(lambda x: x['technique_id'], technique_content['techniques']))
|
||||
tech_dup = set()
|
||||
for tech in tech_ids:
|
||||
if tech not in tech_dup:
|
||||
tech_dup.add(tech)
|
||||
else:
|
||||
has_error = _print_error_msg('[!] Duplicate technique ID: ' + tech, health_is_called)
|
||||
|
||||
# check if the technique has a valid format
|
||||
if not REGEX_YAML_TECHNIQUE_ID_FORMAT.match(tech):
|
||||
has_error = _print_error_msg('[!] Invalid technique ID: ' + tech, health_is_called)
|
||||
|
||||
all_applicable_to = set()
|
||||
|
||||
techniques = load_techniques(filename)
|
||||
for tech, v in techniques[0].items():
|
||||
for obj_type in ['detection', 'visibility']:
|
||||
if obj_type not in v:
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING a key-value pair: ' + obj_type, health_is_called)
|
||||
else:
|
||||
for obj in v[obj_type]:
|
||||
obj_keys = ['applicable_to', 'comment', 'score_logbook']
|
||||
obj_keys_list = ['applicable_to']
|
||||
if obj_type == 'detection':
|
||||
obj_keys.append('location')
|
||||
obj_keys_list.append('location')
|
||||
|
||||
for okey in obj_keys:
|
||||
if okey not in obj:
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING a key-value pair in \'' + obj_type + '\': ' + okey, health_is_called)
|
||||
|
||||
for okey in obj_keys_list:
|
||||
if okey in obj:
|
||||
if not isinstance(obj[okey], list):
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' the key-value pair \'' + okey + '\' in \'' + obj_type + '\' is NOT a list', health_is_called)
|
||||
|
||||
health = _check_health_score_object(obj, obj_type, tech, health_is_called)
|
||||
has_error = _update_health_state(has_error, health)
|
||||
|
||||
if 'applicable_to' in obj and isinstance(obj['applicable_to'], list):
|
||||
all_applicable_to.update(obj['applicable_to'])
|
||||
|
||||
# get values within the key-value pair 'applicable_to' and 'location' which are a very close match
|
||||
similar = set()
|
||||
for i1 in all_applicable_to:
|
||||
for i2 in all_applicable_to:
|
||||
match_value = SequenceMatcher(None, i1, i2).ratio()
|
||||
if match_value > 0.8 and match_value != 1:
|
||||
similar.add(i1)
|
||||
similar.add(i2)
|
||||
|
||||
if len(similar) > 0:
|
||||
has_error = _print_error_msg('[!] There are values in the key-value pairs for \'applicable_to\' which are very similar. Correct where necessary:', health_is_called)
|
||||
for s in similar:
|
||||
_print_error_msg(' - ' + s, health_is_called)
|
||||
|
||||
if has_error and not health_is_called:
|
||||
print(HEALTH_ERROR_TXT + filename)
|
||||
|
||||
_update_health_state_cache(filename, has_error)
|
||||
|
||||
|
||||
def check_yaml_file_health(filename, file_type, health_is_called):
|
||||
"""
|
||||
Check on errors in the provided YAML file.
|
||||
:param filename: YAML file location
|
||||
:param file_type: currently FILE_TYPE_TECHNIQUE_ADMINISTRATION and FILE_TYPE_DATA_SOURCE_ADMINISTRATION is supported
|
||||
:param health_is_called: boolean that specifies if detailed errors in the file will be printed to stdout
|
||||
:return:
|
||||
"""
|
||||
from generic import init_yaml
|
||||
|
||||
# first we check if the file was modified. Otherwise, the health check is skipped for performance reasons
|
||||
if _is_file_modified(filename) or health_is_called:
|
||||
|
||||
_yaml = init_yaml()
|
||||
with open(filename, 'r') as yaml_file:
|
||||
yaml_content = _yaml.load(yaml_file)
|
||||
|
||||
if file_type == FILE_TYPE_DATA_SOURCE_ADMINISTRATION:
|
||||
check_health_data_sources(filename, yaml_content, health_is_called)
|
||||
elif file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||
_check_health_techniques(filename, yaml_content, health_is_called)
|
||||
|
||||
elif _get_health_state_cache(filename):
|
||||
print(HEALTH_ERROR_TXT + filename)
|
|
@ -251,6 +251,7 @@ def _menu_data_source(filename_ds):
|
|||
print('6. update the visibility scores within a technique administration YAML file based on changes within any of '
|
||||
'the data sources. \nPast visibility scores are preserved in the score_logbook, and manually assigned scores are '
|
||||
'not updated without your approval. \nThe updated visibility are based on the number of available data sources.')
|
||||
print('7. Check the data sources YAML file for errors.')
|
||||
print('9. Back to main menu.')
|
||||
choice = _ask_input()
|
||||
if choice == '1':
|
||||
|
@ -287,6 +288,10 @@ def _menu_data_source(filename_ds):
|
|||
print('Updating visibility scores...')
|
||||
update_technique_administration_file(filename_ds, filename_t)
|
||||
_wait()
|
||||
elif choice == '7':
|
||||
print('Checking the data source YAML for errors...')
|
||||
check_yaml_file_health(filename_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, health_is_called=True)
|
||||
_wait()
|
||||
elif choice == '9':
|
||||
interactive_menu()
|
||||
elif choice == 'q':
|
||||
|
@ -318,7 +323,7 @@ def _menu_detection(filename_t):
|
|||
print('Select what you want to do:')
|
||||
print('4. Generate a layer for detection coverage for the ATT&CK Navigator.')
|
||||
print('5. Generate a layer for detection coverage overlaid with visibility for the ATT&CK Navigator.')
|
||||
print('6. Generate a graph with detection items added through time.')
|
||||
print('6. Generate a graph with detections added through time.')
|
||||
print('7. Generate an Excel sheet with all administrated techniques.')
|
||||
print('8. Check the technique YAML file for errors.')
|
||||
print('9. Back to main menu.')
|
||||
|
@ -360,7 +365,7 @@ def _menu_detection(filename_t):
|
|||
print('Generating Excel file...')
|
||||
export_techniques_list_to_excel(file_tech)
|
||||
_wait()
|
||||
elif choice == '8`x':
|
||||
elif choice == '8':
|
||||
print('Checking the technique YAML file for errors...')
|
||||
check_yaml_file_health(filename_t, FILE_TYPE_TECHNIQUE_ADMINISTRATION, health_is_called=True)
|
||||
_wait()
|
||||
|
@ -397,7 +402,7 @@ def _menu_visibility(filename_t, filename_ds):
|
|||
print('Select what you want to do:')
|
||||
print('4. Generate a layer for visibility for the ATT&CK Navigator.')
|
||||
print('5. Generate a layer for visibility overlaid with detection coverage for the ATT&CK Navigator.')
|
||||
print('6. Generate a graph with visibility items added through time.')
|
||||
print('6. Generate a graph with visibility added through time.')
|
||||
print('7. Generate an Excel sheet with all administrated techniques.')
|
||||
print('8. Check the technique YAML file for errors.')
|
||||
print('9. Back to main menu.')
|
||||
|
|
Loading…
Reference in New Issue