Added new functionality that provides the capability to search over custom key-value pairs within a technique administration YAML file.
parent
40657c4e06
commit
ee98d1bd8a
94
eql_yaml.py
94
eql_yaml.py
|
@ -58,36 +58,26 @@ def _techniques_to_events(techniques, obj_type, include_all_score_objs):
|
|||
techniques = techniques['techniques']
|
||||
|
||||
for tech in techniques:
|
||||
tech_id = tech['technique_id']
|
||||
tech_name = tech['technique_name']
|
||||
|
||||
# first we will make events from detections
|
||||
if not isinstance(tech[obj_type], list):
|
||||
tech[obj_type] = [tech[obj_type]]
|
||||
|
||||
# loop over all visibility or detection objects
|
||||
for d in tech[obj_type]:
|
||||
d = set_yaml_dv_comments(d)
|
||||
app_to = d['applicable_to']
|
||||
g_comment = d['comment']
|
||||
if obj_type == 'detection':
|
||||
location = d['location']
|
||||
for obj in tech[obj_type]:
|
||||
obj = set_yaml_dv_comments(obj)
|
||||
|
||||
# latest can be set by the user using the '--latest' argument
|
||||
if not isinstance(d['score_logbook'], list):
|
||||
d['score_logbook'] = [d['score_logbook']]
|
||||
if not isinstance(obj['score_logbook'], list):
|
||||
obj['score_logbook'] = [obj['score_logbook']]
|
||||
if not include_all_score_objs:
|
||||
d['score_logbook'] = [get_latest_score_obj(d)]
|
||||
obj['score_logbook'] = [get_latest_score_obj(obj)]
|
||||
|
||||
# loop over all scores (if we have multiple) create the actual events for EQL
|
||||
for scr_log in d['score_logbook']:
|
||||
event_lvl_3 = {'comment': scr_log['comment'], 'date': scr_log['date'], 'score': scr_log['score']}
|
||||
event_lvl_2 = {'applicable_to': app_to, 'comment': g_comment, 'score_logbook': event_lvl_3}
|
||||
if obj_type == 'detection':
|
||||
# noinspection PyUnboundLocalVariable
|
||||
event_lvl_2['location'] = location
|
||||
event_lvl_1 = {'technique_id': tech_id, 'technique_name': tech_name,
|
||||
obj_type: event_lvl_2}
|
||||
for scr_log in obj['score_logbook']:
|
||||
event_lvl_2 = deepcopy(obj)
|
||||
event_lvl_2['score_logbook'] = scr_log
|
||||
event_lvl_1 = deepcopy(tech)
|
||||
del event_lvl_1['visibility']
|
||||
del event_lvl_1['detection']
|
||||
event_lvl_1[obj_type] = event_lvl_2
|
||||
|
||||
technique_events.append(event_lvl_1)
|
||||
|
||||
|
@ -96,30 +86,29 @@ def _techniques_to_events(techniques, obj_type, include_all_score_objs):
|
|||
|
||||
def _object_in_technique(obj_event, technique_yaml, obj_type):
|
||||
"""
|
||||
Check if the detection/visibility object already exists within the provided technique object ('technique_yaml')
|
||||
:param obj_event: visibility or detection EQL 'event'
|
||||
:param technique_yaml: technique object
|
||||
- Check if the detection/visibility object already exists within the provided technique object ('technique_yaml')
|
||||
- If it exists return the object's index in the list of other detection/visibility objects to which the
|
||||
'score_logbook' should be added. This is needed for techniques which have multiple visibility or detection objects
|
||||
due to 'applicable_to'
|
||||
:param obj_event: visibility or detection EQL event
|
||||
:param technique_yaml: the technique object that's being reconstructing from the EQL events
|
||||
:param obj_type: 'visibility' or 'detection'
|
||||
:return: -1 if it does not exists, otherwise the index within the list (this is needed for techniques which have
|
||||
multiple vicinities or detection objects due to applicable_to)
|
||||
:return: -1 if it does not exists, otherwise the index within the list
|
||||
"""
|
||||
app_to = obj_event['applicable_to']
|
||||
comment = obj_event['comment']
|
||||
if obj_type == 'detection':
|
||||
location = obj_event['location']
|
||||
|
||||
idx = 0
|
||||
for obj in technique_yaml[obj_type]:
|
||||
if obj_type == 'detection':
|
||||
# noinspection PyUnboundLocalVariable
|
||||
if obj['applicable_to'] == app_to and obj['comment'] == comment and obj['location'] == location:
|
||||
return idx
|
||||
else:
|
||||
if obj['applicable_to'] == app_to and obj['comment'] == comment:
|
||||
return idx
|
||||
match = True
|
||||
for k, v in obj_event.items():
|
||||
# we need to skip the score_logbook in the comparison this will not match as we are still re-creating the object
|
||||
if (k in obj and obj[k] == v) or k == 'score_logbook':
|
||||
continue
|
||||
else:
|
||||
match = False
|
||||
break
|
||||
if match:
|
||||
return idx
|
||||
idx += 1
|
||||
|
||||
# detection not in technique object
|
||||
return -1
|
||||
|
||||
|
||||
|
@ -191,10 +180,6 @@ def _events_to_yaml(query_results, obj_type):
|
|||
tech_name = tech_event['technique_name']
|
||||
obj_event = tech_event[obj_type]
|
||||
score_logbook_event = tech_event[obj_type]['score_logbook']
|
||||
if score_logbook_event['date'] and isinstance(score_logbook_event['date'], str):
|
||||
score_date = datetime.datetime.strptime(score_logbook_event['date'], '%Y-%m-%d')
|
||||
else:
|
||||
score_date = None
|
||||
|
||||
# create the technique dict if not already created
|
||||
if not _value_in_dict_list(techniques_yaml, 'technique_id', tech_id):
|
||||
|
@ -209,21 +194,20 @@ def _events_to_yaml(query_results, obj_type):
|
|||
# figure out if the detection/visibility dict already exists
|
||||
obj_idx = _object_in_technique(obj_event, tech_yaml, obj_type)
|
||||
|
||||
score_obj_yaml = {'date': score_date, 'score': score_logbook_event['score'],
|
||||
'comment': score_logbook_event['comment']}
|
||||
# create the score object
|
||||
score_obj_yaml = {}
|
||||
for k, v in score_logbook_event.items():
|
||||
value = v
|
||||
if isinstance(v, str) and REGEX_YAML_VALID_DATE.match(value):
|
||||
value = datetime.datetime.strptime(v, '%Y-%m-%d')
|
||||
score_obj_yaml[k] = value
|
||||
|
||||
# The detection/visibility dict is missing. Create it.
|
||||
if obj_idx == -1:
|
||||
yaml_object = {
|
||||
'applicable_to': obj_event['applicable_to'], 'comment': obj_event['comment'],
|
||||
'score_logbook': [score_obj_yaml]
|
||||
}
|
||||
if obj_type == 'detection':
|
||||
yaml_object['location'] = obj_event['location']
|
||||
|
||||
tech_yaml[obj_type].append(yaml_object)
|
||||
obj_event['score_logbook'] = [obj_event['score_logbook']]
|
||||
tech_yaml[obj_type].append(obj_event)
|
||||
else:
|
||||
# add the a score object to the score_logbook within the proper detection object using 'obj_idx'
|
||||
# add the score object to the score_logbook within the proper detection/visibility object using 'obj_idx'
|
||||
tech_yaml[obj_type][obj_idx]['score_logbook'].append(score_obj_yaml)
|
||||
|
||||
return techniques_yaml
|
||||
|
|
Loading…
Reference in New Issue