264 lines
10 KiB
Python
264 lines
10 KiB
Python
import requests
|
|
import time
|
|
import schedule
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
import os
|
|
import re
|
|
|
|
# Global variable to store previously fetched CVEs and their last modified timestamps
|
|
previous_cves = {} # Dictionary to store CVE IDs and their lastModified timestamps
|
|
FILE_PATH = 'previous_cves.json'
|
|
|
|
def load_previous_cves():
|
|
"""Load previously fetched CVEs from a file."""
|
|
global previous_cves
|
|
previous_cves = {}
|
|
if os.path.exists(FILE_PATH):
|
|
with open(FILE_PATH, 'r') as file:
|
|
try:
|
|
# Load JSON data from the file
|
|
data = json.load(file)
|
|
|
|
# Ensure the data is a list of dictionaries
|
|
if isinstance(data, list):
|
|
previous_cves = { (cve.get('id'), cve.get('lastModified', 'Not available')): cve.get('lastModified', 'Not available') for cve in data if 'id' in cve }
|
|
else:
|
|
print("Error: JSON data is not in the expected format.")
|
|
except json.JSONDecodeError:
|
|
print("Error: Failed to decode JSON from the file.")
|
|
|
|
def save_previous_cves():
|
|
"""Save previously fetched CVEs to a file."""
|
|
with open(FILE_PATH, 'w') as file:
|
|
# Convert the dictionary to a list of dictionaries for saving
|
|
data_to_save = [{'id': cve_id, 'lastModified': last_modified} for (cve_id, last_modified), _ in previous_cves.items()]
|
|
json.dump(data_to_save, file, indent=2)
|
|
|
|
def send_webhook(embed_data):
|
|
"""Send an embed to a webhook URL."""
|
|
webhook_url = '' # Replace with your webhook URL
|
|
try:
|
|
response = requests.post(webhook_url, json={'embeds': [embed_data]})
|
|
response.raise_for_status()
|
|
print(f"Webhook sent successfully: {response.status_code}")
|
|
except requests.RequestException as e:
|
|
print(f"Error sending webhook: {e}")
|
|
|
|
def format_embed(cve):
|
|
"""Format CVE data into an embed."""
|
|
# Extract CVE ID
|
|
cve_id = cve.get('cve', {}).get('id', 'Unknown CVE ID')
|
|
|
|
# Get the English description, or fallback to 'No description available'
|
|
description = next(
|
|
(desc.get('value') for desc in cve.get('cve', {}).get('descriptions', []) if desc.get('lang') == 'en'),
|
|
'No description available'
|
|
)
|
|
|
|
# Fetch CVSS data
|
|
cvss_metrics = cve.get('cve', {}).get('metrics', {}).get('cvssMetricV31', [])
|
|
|
|
# Check if NIST data is available
|
|
nist_data = next((metric for metric in cvss_metrics if metric.get('source') == 'nvd@nist.gov'), None)
|
|
|
|
# Use NIST data if available, otherwise fallback to other available metrics
|
|
if nist_data:
|
|
cvss_data = nist_data.get('cvssData', {})
|
|
else:
|
|
cvss_data = cvss_metrics[0].get('cvssData', {}) if cvss_metrics else {}
|
|
|
|
# Extract CVSS-related fields
|
|
base_score = cvss_data.get('baseScore', 'Not available')
|
|
base_severity = cvss_data.get('baseSeverity', 'Not available')
|
|
attack_vector = cvss_data.get('attackVector', 'Not available')
|
|
attack_complexity = cvss_data.get('attackComplexity', 'Not available')
|
|
privileges_required = cvss_data.get('privilegesRequired', 'Not available')
|
|
user_interaction = cvss_data.get('userInteraction', 'Not available')
|
|
vector_string = cvss_data.get('vectorString', 'Not available')
|
|
|
|
# Set color based on base severity
|
|
severity_colors = {
|
|
'LOW': 0x00FF00, # Green for Low severity
|
|
'MEDIUM': 0xFFFF00, # Yellow for Medium severity
|
|
'HIGH': 0xFF0000, # Red for High severity
|
|
'CRITICAL': 0xFF0000 # Red for Critical severity (added if needed)
|
|
}
|
|
color = severity_colors.get(base_severity, 0x0000FF) # Default to blue if severity is unknown
|
|
|
|
# Extract and format dates
|
|
published_date = cve.get('cve', {}).get('published', 'Not available')
|
|
last_modified_date = cve.get('cve', {}).get('lastModified', 'Not available')
|
|
|
|
try:
|
|
published_date = datetime.fromisoformat(published_date).strftime('%Y-%m-%d %H:%M:%S UTC')
|
|
last_modified_date = datetime.fromisoformat(last_modified_date).strftime('%Y-%m-%d %H:%M:%S UTC')
|
|
except ValueError:
|
|
pass
|
|
|
|
# Check if the system is vulnerable from configurations
|
|
configurations = cve.get('cve', {}).get('configurations', [])
|
|
vulnerable_criteria = []
|
|
|
|
for config in configurations:
|
|
for node in config.get('nodes', []):
|
|
for cpe_match in node.get('cpeMatch', []):
|
|
if cpe_match.get('vulnerable', False):
|
|
vulnerable_criteria.append(cpe_match.get('criteria', 'N/A'))
|
|
|
|
# Collect references, if any exist
|
|
references = [ref.get('url') for ref in cve.get('cve', {}).get('references', []) if ref.get('url')]
|
|
|
|
# Create the embed structure
|
|
embed = {
|
|
'title': f'CVE Details: {cve_id}',
|
|
'description': description,
|
|
'color': color,
|
|
'fields': [
|
|
{
|
|
'name': 'CVSS Score',
|
|
'value': str(base_score) if base_score != 'Not available' else 'Not available',
|
|
'inline': True
|
|
},
|
|
{
|
|
'name': 'CVSS Severity',
|
|
'value': base_severity if base_severity != 'Not available' else 'Not available',
|
|
'inline': True
|
|
},
|
|
{
|
|
'name': 'Attack Vector',
|
|
'value': attack_vector if attack_vector != 'Not available' else 'Not available',
|
|
'inline': True
|
|
},
|
|
{
|
|
'name': 'Attack Complexity',
|
|
'value': attack_complexity if attack_complexity != 'Not available' else 'Not available',
|
|
'inline': True
|
|
},
|
|
{
|
|
'name': 'Privileges Required',
|
|
'value': privileges_required if privileges_required != 'Not available' else 'Not available',
|
|
'inline': True
|
|
},
|
|
{
|
|
'name': 'User Interaction',
|
|
'value': user_interaction if user_interaction != 'Not available' else 'Not available',
|
|
'inline': True
|
|
},
|
|
{
|
|
'name': 'CVSS Vector',
|
|
'value': vector_string if vector_string != 'Not available' else 'Not available',
|
|
'inline': True
|
|
},
|
|
{
|
|
'name': 'Published Date',
|
|
'value': published_date if published_date != 'Not available' else 'Not available',
|
|
'inline': True
|
|
},
|
|
{
|
|
'name': 'Last Modified Date',
|
|
'value': last_modified_date if last_modified_date != 'Not available' else 'Not available',
|
|
'inline': True
|
|
},
|
|
{
|
|
'name': 'Vulnerable Systems',
|
|
'value': '\n'.join(vulnerable_criteria) if vulnerable_criteria else 'No vulnerable systems specified',
|
|
'inline': False
|
|
}
|
|
],
|
|
'footer': {
|
|
'text': 'Source: NVD'
|
|
},
|
|
'url': f'https://nvd.nist.gov/vuln/detail/{cve_id}'
|
|
}
|
|
|
|
# Add references if any exist
|
|
if references:
|
|
embed['fields'].append({
|
|
'name': 'References',
|
|
'value': '\n'.join(references),
|
|
'inline': False
|
|
})
|
|
|
|
return embed
|
|
|
|
def fetch_cves():
|
|
global previous_cves
|
|
|
|
# Current time for the request
|
|
now = datetime.utcnow()
|
|
start_date = (now - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%S.000')
|
|
end_date = now.strftime('%Y-%m-%dT%H:%M:%S.000')
|
|
|
|
cvs = ["LOW", "MEDIUM", "HIGH"]
|
|
|
|
# Construct the request URL with the updated date range
|
|
for severity in cvs:
|
|
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0/?pubStartDate={start_date}&pubEndDate={end_date}&cvssV3Severity={severity}"
|
|
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'
|
|
}
|
|
|
|
retry_attempts = 3
|
|
for attempt in range(retry_attempts):
|
|
try:
|
|
# Make the request to the NVD API
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Debug: Print raw response
|
|
#print("Raw response data:")
|
|
#print(json.dumps(data, indent=2)) # Pretty-print the JSON response
|
|
|
|
# Extract CVEs
|
|
cves = data.get('vulnerabilities', [])
|
|
|
|
# Debug: Print number of CVEs found
|
|
print(f"Number of CVEs found: {len(cves)}")
|
|
|
|
# Identify new or updated CVEs
|
|
new_or_updated_cves = []
|
|
for cve in cves:
|
|
cve_id = cve['cve']['id']
|
|
last_modified = cve['cve'].get('lastModified', 'Not available')
|
|
|
|
if (cve_id, last_modified) not in previous_cves:
|
|
new_or_updated_cves.append(cve)
|
|
|
|
if new_or_updated_cves:
|
|
print(f"Found {len(new_or_updated_cves)} new or updated CVEs:")
|
|
for cve in new_or_updated_cves:
|
|
# Format the CVE data as an embed
|
|
embed = format_embed(cve)
|
|
# Send the embed via webhook
|
|
send_webhook(embed)
|
|
|
|
# Update the dictionary of previously fetched CVEs
|
|
previous_cves[(cve['cve']['id'], cve['cve'].get('lastModified', 'Not available'))] = cve['cve'].get('lastModified', 'Not available')
|
|
|
|
save_previous_cves()
|
|
|
|
else:
|
|
print("No new or updated CVEs found.")
|
|
|
|
# Exit the retry loop if successful
|
|
break
|
|
|
|
except requests.RequestException as e:
|
|
print(f"Error fetching CVEs (attempt {attempt + 1}/{retry_attempts}): {e}")
|
|
# Wait before retrying
|
|
time.sleep(5)
|
|
|
|
# Load previously fetched CVEs from file
|
|
load_previous_cves()
|
|
|
|
# Schedule the task to run every hour
|
|
schedule.every(1).hours.do(fetch_cves)
|
|
fetch_cves()
|
|
print("Starting CVE monitoring...")
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(1)
|