commit
0317b982c9
|
@ -1,6 +1,6 @@
|
|||
FROM python:3.7-alpine
|
||||
|
||||
LABEL version="1.1.2"
|
||||
LABEL version="1.2.1"
|
||||
|
||||
# update repository and install Linux packages
|
||||
RUN apk update && \
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
<img src="https://github.com/rabobank-cdc/DeTTECT/wiki/images/logo.png" alt="DeTT&CT" width=30% height=30%>
|
||||
|
||||
#### Detect Tactics, Techniques & Combat Threats
|
||||
Latest version: [1.2.0](https://github.com/rabobank-cdc/DeTTECT/wiki/Changelog#version-120)
|
||||
Latest version: [1.2.1](https://github.com/rabobank-cdc/DeTTECT/wiki/Changelog#version-121)
|
||||
|
||||
To get started with DeTT&CT, check out this [page](https://github.com/rabobank-cdc/DeTTECT/wiki/Getting-started) and our [blog](https://split.to/FkqwE7U).
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ import re
|
|||
|
||||
APP_NAME = 'DeTT&CT'
|
||||
APP_DESC = 'Detect Tactics, Techniques & Combat Threats'
|
||||
VERSION = '1.2.0'
|
||||
VERSION = '1.2.1'
|
||||
|
||||
EXPIRE_TIME = 60 * 60 * 24
|
||||
|
||||
|
@ -88,9 +88,9 @@ FILE_TYPE_TECHNIQUE_ADMINISTRATION_UPGRADE_TEXT = {
|
|||
" The primary purpose of doing this is to allow you to keep track of changes in the score."}
|
||||
|
||||
# visibility update questions and answers
|
||||
V_UPDATE_Q_ALL_MANUAL = 'For all most recent visibility score objects that are eligible for an update. The key-value pair \'auto-generated\' is set to \'false\' or is not present.\n' \
|
||||
V_UPDATE_Q_ALL_MANUAL = 'For all most recent visibility score objects that are eligible for an update, the key-value pair \'auto-generated\' is set to \'false\' or is not present.\n' \
|
||||
'This implies that these scores are manually assigned. How do you want to proceed?:'
|
||||
V_UPDATE_Q_ALL_AUTO = 'For all most recent visibility score objects that are eligible for an update. The key-value pair \'auto-generated\' is set to \'true\'. \n' \
|
||||
V_UPDATE_Q_ALL_AUTO = 'For all most recent visibility score objects that are eligible for an update, the key-value pair \'auto-generated\' is set to \'true\'. \n' \
|
||||
'This implies that these scores are auto-generated. How do you want to proceed?:'
|
||||
V_UPDATE_Q_MIXED = 'You have visibility scores that are eligible for an update, which are manually assigned and which are calculated based on the nr. of data sources (i.e. auto-generated = true)\n' \
|
||||
'How do you want to proceed?'
|
||||
|
|
|
@ -355,9 +355,6 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
|
|||
|
||||
if manually_scored and auto_scored:
|
||||
mix_scores = True
|
||||
manually_scored = False
|
||||
auto_scored = False
|
||||
break
|
||||
|
||||
# stop if none of the present visibility scores are eligible for an update
|
||||
if not mix_scores and not manually_scored and not auto_scored:
|
||||
|
@ -366,12 +363,12 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
|
|||
print('\nA total of ' + str(updated_vis_score_cnt) + ' visibility scores are eligible for an update.\n')
|
||||
# ask how the score should be updated
|
||||
answer = 0
|
||||
if manually_scored:
|
||||
if mix_scores:
|
||||
answer = ask_multiple_choice(V_UPDATE_Q_MIXED, [V_UPDATE_ANSWER_3, V_UPDATE_ANSWER_4, V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
|
||||
elif manually_scored:
|
||||
answer = ask_multiple_choice(V_UPDATE_Q_ALL_MANUAL, [V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
|
||||
elif auto_scored:
|
||||
answer = ask_multiple_choice(V_UPDATE_Q_ALL_AUTO, [V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
|
||||
elif mix_scores:
|
||||
answer = ask_multiple_choice(V_UPDATE_Q_MIXED, [V_UPDATE_ANSWER_3, V_UPDATE_ANSWER_4, V_UPDATE_ANSWER_1, V_UPDATE_ANSWER_2, V_UPDATE_ANSWER_CANCEL])
|
||||
if answer == V_UPDATE_ANSWER_CANCEL:
|
||||
return
|
||||
|
||||
|
@ -443,6 +440,7 @@ def update_technique_administration_file(file_data_sources, file_tech_admin):
|
|||
print(' - Date: ' + get_latest_date(old_vis_obj[obj_idx]).strftime('%Y-%m-%d'))
|
||||
print(' - Score: ' + str(get_latest_score(old_vis_obj[obj_idx])))
|
||||
print(' - Visibility comment: ' + _indent_comment(get_latest_comment(old_vis_obj[obj_idx]), 23))
|
||||
print(' - Auto generated: ' + str(get_latest_score_obj(old_vis_obj[obj_idx]).get('auto_generated', 'False')))
|
||||
print('NEW score object:')
|
||||
print(' - Date: ' + new_score_obj['date'])
|
||||
print(' - Score: ' + str(new_score_obj['score']))
|
||||
|
|
21
eql_yaml.py
21
eql_yaml.py
|
@ -86,7 +86,7 @@ def _techniques_to_events(techniques, obj_type, include_all_score_objs):
|
|||
if obj_type == 'detection':
|
||||
# noinspection PyUnboundLocalVariable
|
||||
event_lvl_2['location'] = location
|
||||
event_lvl_1 = {'event_type': 'techniques', 'technique_id': tech_id, 'technique_name': tech_name,
|
||||
event_lvl_1 = {'technique_id': tech_id, 'technique_name': tech_name,
|
||||
obj_type: event_lvl_2}
|
||||
|
||||
technique_events.append(event_lvl_1)
|
||||
|
@ -161,9 +161,7 @@ def _events_to_yaml(query_results, obj_type):
|
|||
|
||||
if obj_type == 'data_sources':
|
||||
try:
|
||||
# Remove the event_type key. We no longer need this.
|
||||
for r in query_results:
|
||||
del r['event_type']
|
||||
if r['date_registered'] and isinstance(r['date_registered'], str):
|
||||
r['date_registered'] = datetime.datetime.strptime(r['date_registered'], '%Y-%m-%d')
|
||||
if r['date_connected'] and isinstance(r['date_connected'], str):
|
||||
|
@ -304,18 +302,20 @@ def _prepare_yaml_file(filename, obj_type, include_all_score_objs):
|
|||
yaml_content = _yaml.load(yaml_file)
|
||||
|
||||
yaml_content_eql = _traverse_modify_date(yaml_content)
|
||||
yaml_eql_events = []
|
||||
|
||||
# add the event type for EQL
|
||||
# create EQL events from the list of dictionaries
|
||||
if obj_type == 'data_sources':
|
||||
for item in yaml_content_eql[obj_type]:
|
||||
item['event_type'] = obj_type
|
||||
yaml_content_eql = yaml_content_eql['data_sources']
|
||||
yaml_eql_events.append(eql.Event(obj_type, 0, item))
|
||||
|
||||
# flatten the technique administration file to events
|
||||
# flatten the technique administration file to EQL events
|
||||
elif obj_type in ['visibility', 'detection']:
|
||||
yaml_content_eql = _techniques_to_events(yaml_content_eql, obj_type, include_all_score_objs)
|
||||
for e in yaml_content_eql:
|
||||
yaml_eql_events.append(eql.Event('techniques', 0, e))
|
||||
|
||||
return yaml_content_eql, yaml_content
|
||||
return yaml_eql_events, yaml_content
|
||||
|
||||
|
||||
def _check_query_results(query_results, obj_type):
|
||||
|
@ -353,7 +353,6 @@ def _execute_eql_query(events, query):
|
|||
"""
|
||||
# learn and load the schema
|
||||
schema = eql.Schema.learn(events)
|
||||
schema.default(schema)
|
||||
|
||||
query_results = []
|
||||
|
||||
|
@ -363,14 +362,14 @@ def _execute_eql_query(events, query):
|
|||
|
||||
# create the engine and parse the query
|
||||
engine = eql.PythonEngine()
|
||||
with engine.schema:
|
||||
with schema:
|
||||
try:
|
||||
eql_query = eql.parse_query(query, implied_any=True, implied_base=True)
|
||||
engine.add_query(eql_query)
|
||||
except eql.EqlError as e:
|
||||
print(e, file=sys.stderr)
|
||||
print('\nTake into account the following schema:')
|
||||
pprint(eql.Schema.current().schema)
|
||||
pprint(schema.schema)
|
||||
# when using an EQL query that does not match the schema, return None.
|
||||
return None
|
||||
engine.add_output_hook(callback)
|
||||
|
|
|
@ -32,10 +32,10 @@ data_sources:
|
|||
consistency: 0
|
||||
retention: 0
|
||||
- data_source_name: Process command-line parameters
|
||||
date_registered: 2019-03-01
|
||||
date_connected: 2017-01-01
|
||||
products: [Windows event log]
|
||||
available_for_data_analytics: True
|
||||
date_registered:
|
||||
date_connected:
|
||||
products: [None]
|
||||
available_for_data_analytics: False
|
||||
comment: ''
|
||||
data_quality:
|
||||
device_completeness: 0
|
||||
|
|
Loading…
Reference in New Issue