- The health function now checks for very similar values within the key-value pair 'applicable_to'. E.g. 'server' and 'servers'.
- The health function is now always called for technique admin files. Showing a generic error message if possible errors are found. - Created new function 'check_file' to separate the functionality from 'check_file_type'.master
parent
3de186c96e
commit
f10e4ea9ab
26
dettact.py
26
dettact.py
|
@ -145,7 +145,7 @@ def menu(menu_parser):
|
|||
interactive_menu()
|
||||
|
||||
elif args.subparser in ['datasource', 'ds']:
|
||||
if check_file_type(args.file, FILE_TYPE_DATA_SOURCE_ADMINISTRATION):
|
||||
if check_file(args.file, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, args.health):
|
||||
if args.layer:
|
||||
generate_data_sources_layer(args.file)
|
||||
if args.excel:
|
||||
|
@ -162,19 +162,18 @@ def menu(menu_parser):
|
|||
'administration YAML file (\'--file-ds\')')
|
||||
quit()
|
||||
|
||||
if check_file_type(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION) and \
|
||||
check_file_type(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION):
|
||||
if check_file(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION, args.health) and \
|
||||
check_file(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, args.health):
|
||||
if args.layer:
|
||||
generate_visibility_layer(args.file_tech, args.file_ds, False, args.applicable)
|
||||
if args.overlay:
|
||||
generate_visibility_layer(args.file_tech, args.file_ds, True, args.applicable)
|
||||
|
||||
if args.excel and check_file_type(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION) and args.applicable == 'all':
|
||||
export_techniques_list_to_excel(args.file_tech)
|
||||
if args.excel and args.applicable != 'all':
|
||||
print("[!] Filtering on 'applicable_to' is not supported for Excel output")
|
||||
if args.health:
|
||||
check_yaml_file_health(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION)
|
||||
if check_file(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION, args.health):
|
||||
if args.excel and args.applicable == 'all':
|
||||
export_techniques_list_to_excel(args.file_tech)
|
||||
if args.excel and args.applicable != 'all':
|
||||
print('[!] Filtering on \'applicable_to\' is not supported for Excel output')
|
||||
|
||||
elif args.subparser in ['group', 'g']:
|
||||
generate_group_heat_map(args.groups, args.overlay, args.overlay_type, args.stage, args.platform, args.software_group, args.applicable)
|
||||
|
@ -184,13 +183,13 @@ def menu(menu_parser):
|
|||
if not args.file_ds:
|
||||
print('[!] Doing an overlay requires adding the data source administration YAML file (\'--file-ds\')')
|
||||
quit()
|
||||
if not check_file_type(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION):
|
||||
if not check_file(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, args.health):
|
||||
quit()
|
||||
|
||||
if check_file_type(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION):
|
||||
if check_file(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION, args.health):
|
||||
if args.layer:
|
||||
generate_detection_layer(args.file_tech, args.file_ds, False, args.applicable)
|
||||
if args.overlay and check_file_type(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION):
|
||||
if args.overlay and check_file(args.file_ds, FILE_TYPE_DATA_SOURCE_ADMINISTRATION, args.health):
|
||||
generate_detection_layer(args.file_tech, args.file_ds, True, args.applicable)
|
||||
if args.graph:
|
||||
plot_detection_graph(args.file_tech, args.applicable)
|
||||
|
@ -198,8 +197,6 @@ def menu(menu_parser):
|
|||
export_techniques_list_to_excel(args.file_tech)
|
||||
if args.excel and args.applicable != 'all':
|
||||
print("[!] Filtering on 'applicable_to' is not supported for Excel output")
|
||||
if args.health:
|
||||
check_yaml_file_health(args.file_tech, FILE_TYPE_TECHNIQUE_ADMINISTRATION)
|
||||
|
||||
elif args.subparser in ['generic', 'ge']:
|
||||
if args.statistics:
|
||||
|
@ -210,6 +207,7 @@ def menu(menu_parser):
|
|||
else:
|
||||
menu_parser.print_help()
|
||||
|
||||
|
||||
def prepare_folders():
|
||||
"""
|
||||
Create the folders 'cache' and 'output' if they do not exist.
|
||||
|
|
178
generic.py
178
generic.py
|
@ -4,6 +4,7 @@ from datetime import datetime as dt
|
|||
import yaml
|
||||
from upgrade import upgrade_yaml_file
|
||||
from constants import *
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
# Due to performance reasons the import of attackcti is within the function that makes use of this library.
|
||||
|
||||
|
@ -283,45 +284,6 @@ def get_all_mitre_data_sources():
|
|||
return sorted(data_sources)
|
||||
|
||||
|
||||
def check_file_type(filename, file_type=None):
|
||||
"""
|
||||
Check if the provided YAML file has the key 'file_type' and possible if that key matches a specific value.
|
||||
:param filename: path to a YAML file
|
||||
:param file_type: value to check against the 'file_type' key in the YAML file
|
||||
:return: the file_type if present, else None is returned.
|
||||
"""
|
||||
if not os.path.exists(filename):
|
||||
print('[!] File: \'' + filename + '\' does not exist')
|
||||
return None
|
||||
with open(filename, 'r') as yaml_file:
|
||||
try:
|
||||
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
||||
except Exception as e:
|
||||
print('[!] File: \'' + filename + '\' is not a valid YAML file.')
|
||||
print(' ' + str(e)) # print more detailed error information to help the user in fixing the error.
|
||||
return None
|
||||
|
||||
# This check is performed because a text file will also be considered to be valid YAML. But, we are using
|
||||
# key-value pairs within the YAML files.
|
||||
if not hasattr(yaml_content, 'keys'):
|
||||
print('[!] File: \'' + filename + '\' is not a valid YAML file.')
|
||||
return None
|
||||
|
||||
if 'file_type' not in yaml_content.keys():
|
||||
print('[!] File: \'' + filename + '\' does not contain a file_type key.')
|
||||
return None
|
||||
elif file_type:
|
||||
if file_type != yaml_content['file_type']:
|
||||
print('[!] File: \'' + filename + '\' is not a file type of: \'' + file_type + '\'')
|
||||
return None
|
||||
else:
|
||||
upgrade_yaml_file(filename, file_type, yaml_content['version'], load_attack_data(DATATYPE_ALL_TECH))
|
||||
return yaml_content['file_type']
|
||||
else:
|
||||
upgrade_yaml_file(filename, file_type, yaml_content['version'], load_attack_data(DATATYPE_ALL_TECH))
|
||||
return yaml_content['file_type']
|
||||
|
||||
|
||||
def calculate_score(l, zero_value=0):
|
||||
"""
|
||||
Calculates the average score in the given list which contains dictionaries with 'score' field.
|
||||
|
@ -393,13 +355,22 @@ def load_techniques(filename, detection_or_visibility='all', filter_applicable_t
|
|||
return my_techniques, name, platform
|
||||
|
||||
|
||||
def check_yaml_file_health(filename, file_type):
|
||||
def _print_error_msg(msg, print_error):
|
||||
if print_error:
|
||||
print(msg)
|
||||
return True
|
||||
|
||||
|
||||
def check_yaml_file_health(filename, file_type, health_is_called):
|
||||
"""
|
||||
Check on error in the provided YAML file.
|
||||
:param filename: YAML file location
|
||||
:param file_type: currenlty only 'FILE_TYPE_TECHNIQUE_ADMINISTRATION' is being supported
|
||||
:param file_type: currently only 'FILE_TYPE_TECHNIQUE_ADMINISTRATION' is being supported
|
||||
:param health_is_called: boolean that specifies if detailed errors in the file will be printed and then quit()
|
||||
:return:
|
||||
"""
|
||||
|
||||
has_error = False
|
||||
if file_type == FILE_TYPE_TECHNIQUE_ADMINISTRATION:
|
||||
# check for duplicate tech IDs
|
||||
with open(filename, 'r') as yaml_file:
|
||||
|
@ -411,69 +382,156 @@ def check_yaml_file_health(filename, file_type):
|
|||
if tech not in tech_dup:
|
||||
tech_dup.append(tech)
|
||||
else:
|
||||
print('[!] Duplicate technique ID: ' + tech)
|
||||
has_error = _print_error_msg('[!] Duplicate technique ID: ' + tech, health_is_called)
|
||||
|
||||
# checks on:
|
||||
# - empty key-value pairs: 'date_implemented', 'date_registered', 'location', 'applicable_to', 'score'
|
||||
# - invalid date format for: 'date_implemented', 'date_registered'
|
||||
# - detection or visibility score out-of-range
|
||||
# - missing key-value pairs: 'applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment'
|
||||
# - check on 'applicable_to' values which are very similar
|
||||
|
||||
all_applicable_to = set()
|
||||
techniques = load_techniques(filename)
|
||||
for tech, v in techniques[0].items():
|
||||
|
||||
for key in ['detection', 'visibility']:
|
||||
if key not in v:
|
||||
print('[!] Technique ID: ' + tech + ' is MISSING ' + key)
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING ' + key, health_is_called)
|
||||
else:
|
||||
# create at set containing all values for 'applicable_to'
|
||||
all_applicable_to.update([a for v in v[key] for a in v['applicable_to']])
|
||||
|
||||
for detection in v['detection']:
|
||||
for key in ['applicable_to', 'date_registered', 'date_implemented', 'score', 'location', 'comment']:
|
||||
if key not in detection:
|
||||
print('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in detection: ' + key)
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in detection: ' + key, health_is_called)
|
||||
|
||||
try:
|
||||
if detection['score'] is None:
|
||||
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: score')
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: score', health_is_called)
|
||||
|
||||
elif not (detection['score'] >= -1 and detection['score'] <= 5):
|
||||
print('[!] Technique ID: ' + tech + ' has an INVALID detection score: ' + str(detection['score']) +
|
||||
' (should be between -1 and 5)')
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an INVALID detection score: '
|
||||
+ str(detection['score']) + ' (should be between -1 and 5)', health_is_called)
|
||||
|
||||
elif detection['score'] > -1:
|
||||
for key in ['date_implemented', 'date_registered']:
|
||||
if not detection[key]:
|
||||
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key)
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key, health_is_called)
|
||||
break
|
||||
try:
|
||||
detection[key].year
|
||||
detection[key].month
|
||||
detection[key].day
|
||||
except AttributeError:
|
||||
print('[!] Technique ID: ' + tech + ' has an INVALID data format for the key-value pair '
|
||||
'in detection: ' + key + ' (should be YYYY-MM-DD)')
|
||||
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech +
|
||||
' has an INVALID data format for the key-value pair in detection: ' +
|
||||
key + ' (should be YYYY-MM-DD)', health_is_called)
|
||||
for key in ['location', 'applicable_to']:
|
||||
try:
|
||||
if detection[key][0] is None:
|
||||
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key)
|
||||
except TypeError:
|
||||
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key)
|
||||
if not isinstance(detection[key], list):
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has for the key-value pair \''
|
||||
+ key + '\' a string value assigned (should be a list)', health_is_called)
|
||||
else:
|
||||
try:
|
||||
if detection[key][0] is None:
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key, health_is_called)
|
||||
except TypeError:
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in detection: ' + key, health_is_called)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
for visibility in v['visibility']:
|
||||
for key in ['applicable_to', 'score', 'comment']:
|
||||
if key not in visibility:
|
||||
print('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in visibility: ' + key)
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is MISSING the key-value pair in visibility: ' + key, health_is_called)
|
||||
|
||||
try:
|
||||
if visibility['score'] is None:
|
||||
print('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in visibility: score')
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' is has an EMPTY key-value pair in visibility: score', health_is_called)
|
||||
elif not (visibility['score'] >= 0 and visibility['score'] <= 4):
|
||||
print('[!] Technique ID: ' + tech + ' has an INVALID visibility score: ' + str(detection['score']) +
|
||||
' (should be between 0 and 4)')
|
||||
has_error = _print_error_msg('[!] Technique ID: ' + tech + ' has an INVALID visibility score: '
|
||||
+ str(detection['score']) + ' (should be between 0 and 4)', health_is_called)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# get values within the key-value pair 'applicable_to' which are a very close match
|
||||
similar = set()
|
||||
for i1 in all_applicable_to:
|
||||
for i2 in all_applicable_to:
|
||||
match_value = SequenceMatcher(None, i1, i2).ratio()
|
||||
if match_value > 0.8 and match_value != 1:
|
||||
print(match_value)
|
||||
similar.add(i1)
|
||||
similar.add(i2)
|
||||
|
||||
if len(similar) > 0:
|
||||
has_error = _print_error_msg('[!] There are values in the key-value pair \'applicable_to\' which are very similar. Correct where necessary:', health_is_called)
|
||||
for s in similar:
|
||||
_print_error_msg(' - ' + s, health_is_called)
|
||||
|
||||
if has_error and not health_is_called:
|
||||
print('[!] The below YAML file contains possible errors. It\'s recommended to check via the \'--health\' '
|
||||
'argument or using the option in the interactive menu: \n - ' + filename)
|
||||
|
||||
if has_error:
|
||||
print('') # print a newline
|
||||
|
||||
|
||||
def check_file_type(filename, file_type=None):
|
||||
"""
|
||||
Check if the provided YAML file has the key 'file_type' and possible if that key matches a specific value.
|
||||
:param filename: path to a YAML file
|
||||
:param file_type: value to check against the 'file_type' key in the YAML file
|
||||
:return: the file_type if present, else None is returned
|
||||
"""
|
||||
if not os.path.exists(filename):
|
||||
print('[!] File: \'' + filename + '\' does not exist')
|
||||
return None
|
||||
with open(filename, 'r') as yaml_file:
|
||||
try:
|
||||
yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader)
|
||||
except Exception as e:
|
||||
print('[!] File: \'' + filename + '\' is not a valid YAML file.')
|
||||
print(' ' + str(e)) # print more detailed error information to help the user in fixing the error.
|
||||
return None
|
||||
|
||||
# This check is performed because a text file will also be considered to be valid YAML. But, we are using
|
||||
# key-value pairs within the YAML files.
|
||||
if not hasattr(yaml_content, 'keys'):
|
||||
print('[!] File: \'' + filename + '\' is not a valid YAML file.')
|
||||
return None
|
||||
|
||||
if 'file_type' not in yaml_content.keys():
|
||||
print('[!] File: \'' + filename + '\' does not contain a file_type key.')
|
||||
return None
|
||||
elif file_type:
|
||||
if file_type != yaml_content['file_type']:
|
||||
print('[!] File: \'' + filename + '\' is not a file type of: \'' + file_type + '\'')
|
||||
return None
|
||||
else:
|
||||
return yaml_content
|
||||
else:
|
||||
return yaml_content
|
||||
|
||||
|
||||
def check_file(filename, file_type=None, health_is_called=False):
|
||||
"""
|
||||
Calls three functions to perform the following checks: is the file a valid YAML file, needs the file to be upgrade,
|
||||
does the file contain errors.
|
||||
:param filename: path to a YAML file
|
||||
:param file_type: value to check against the 'file_type' key in the YAML file
|
||||
:param health_is_called: boolean that specifies if detailed errors in the file will be printed by the function 'check_yaml_file_health' and then quit()
|
||||
:return: the file_type if present, else None is returned
|
||||
"""
|
||||
|
||||
yaml_content = check_file_type(filename, file_type)
|
||||
|
||||
# if the file is a valid YAML, continue. Else, return None
|
||||
if yaml_content:
|
||||
upgrade_yaml_file(filename, file_type, yaml_content['version'], load_attack_data(DATATYPE_ALL_TECH))
|
||||
check_yaml_file_health(filename, file_type, health_is_called)
|
||||
|
||||
return yaml_content['file_type']
|
||||
|
||||
return yaml_content # value is None
|
||||
|
|
|
@ -468,7 +468,7 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
|
|||
|
||||
groups_file_type = None
|
||||
if os.path.isfile(groups):
|
||||
groups_file_type = check_file_type(groups, file_type=FILE_TYPE_GROUP_ADMINISTRATION)
|
||||
groups_file_type = check_file(groups, file_type=FILE_TYPE_GROUP_ADMINISTRATION)
|
||||
if not groups_file_type:
|
||||
return
|
||||
else:
|
||||
|
@ -482,7 +482,7 @@ def generate_group_heat_map(groups, overlay, overlay_type, stage, platform, soft
|
|||
expected_file_type = FILE_TYPE_GROUP_ADMINISTRATION if overlay_type == OVERLAY_TYPE_GROUP \
|
||||
else FILE_TYPE_TECHNIQUE_ADMINISTRATION \
|
||||
if overlay_type in [OVERLAY_TYPE_VISIBILITY, OVERLAY_TYPE_DETECTION] else None
|
||||
overlay_file_type = check_file_type(overlay, expected_file_type)
|
||||
overlay_file_type = check_file(overlay, expected_file_type)
|
||||
if not overlay_file_type:
|
||||
return
|
||||
else:
|
||||
|
|
|
@ -136,7 +136,7 @@ def select_file(title, what, expected_file_type, b_clear=True):
|
|||
else:
|
||||
if choice.isdigit() and int(choice) < n:
|
||||
filename = files[int(choice) - 1]
|
||||
file_type = check_file_type(filename, file_type=expected_file_type)
|
||||
file_type = check_file(filename, file_type=expected_file_type)
|
||||
if file_type:
|
||||
print('Selected file: ' + filename)
|
||||
wait()
|
||||
|
@ -294,7 +294,7 @@ def menu_detection(filename_t):
|
|||
wait()
|
||||
elif choice == '6':
|
||||
print('Checking the technique YAML file for errors...')
|
||||
check_yaml_file_health(filename_t, FILE_TYPE_TECHNIQUE_ADMINISTRATION)
|
||||
check_yaml_file_health(filename_t, FILE_TYPE_TECHNIQUE_ADMINISTRATION, health_is_called=True)
|
||||
wait()
|
||||
elif choice == '9':
|
||||
interactive_menu()
|
||||
|
@ -345,7 +345,7 @@ def menu_visibility(filename_t, filename_ds):
|
|||
wait()
|
||||
elif choice == '5':
|
||||
print('Checking the technique YAML file for errors...')
|
||||
check_yaml_file_health(filename_t, FILE_TYPE_TECHNIQUE_ADMINISTRATION)
|
||||
check_yaml_file_health(filename_t, FILE_TYPE_TECHNIQUE_ADMINISTRATION, health_is_called=True)
|
||||
wait()
|
||||
elif choice == '9':
|
||||
interactive_menu()
|
||||
|
|
Loading…
Reference in New Issue