import requests import time import schedule from datetime import datetime, timedelta import json import os import re # Global variable to store previously fetched CVEs and their last modified timestamps previous_cves = {} # Dictionary to store CVE IDs and their lastModified timestamps FILE_PATH = 'previous_cves.json' def load_previous_cves(): """Load previously fetched CVEs from a file.""" global previous_cves previous_cves = {} if os.path.exists(FILE_PATH): with open(FILE_PATH, 'r') as file: try: # Load JSON data from the file data = json.load(file) # Ensure the data is a list of dictionaries if isinstance(data, list): previous_cves = { (cve.get('id'), cve.get('lastModified', 'Not available')): cve.get('lastModified', 'Not available') for cve in data if 'id' in cve } else: print("Error: JSON data is not in the expected format.") except json.JSONDecodeError: print("Error: Failed to decode JSON from the file.") def save_previous_cves(): """Save previously fetched CVEs to a file.""" with open(FILE_PATH, 'w') as file: # Convert the dictionary to a list of dictionaries for saving data_to_save = [{'id': cve_id, 'lastModified': last_modified} for (cve_id, last_modified), _ in previous_cves.items()] json.dump(data_to_save, file, indent=2) def send_webhook(embed_data): """Send an embed to a webhook URL.""" webhook_url = '' # Replace with your webhook URL try: response = requests.post(webhook_url, json={'embeds': [embed_data]}) response.raise_for_status() print(f"Webhook sent successfully: {response.status_code}") except requests.RequestException as e: print(f"Error sending webhook: {e}") def format_embed(cve): """Format CVE data into an embed.""" # Extract CVE ID cve_id = cve.get('cve', {}).get('id', 'Unknown CVE ID') # Get the English description, or fallback to 'No description available' description = next( (desc.get('value') for desc in cve.get('cve', {}).get('descriptions', []) if desc.get('lang') == 'en'), 'No description available' ) # Fetch CVSS data cvss_metrics = cve.get('cve', {}).get('metrics', {}).get('cvssMetricV31', []) # Check if NIST data is available nist_data = next((metric for metric in cvss_metrics if metric.get('source') == 'nvd@nist.gov'), None) # Use NIST data if available, otherwise fallback to other available metrics if nist_data: cvss_data = nist_data.get('cvssData', {}) else: cvss_data = cvss_metrics[0].get('cvssData', {}) if cvss_metrics else {} # Extract CVSS-related fields base_score = cvss_data.get('baseScore', 'Not available') base_severity = cvss_data.get('baseSeverity', 'Not available') attack_vector = cvss_data.get('attackVector', 'Not available') attack_complexity = cvss_data.get('attackComplexity', 'Not available') privileges_required = cvss_data.get('privilegesRequired', 'Not available') user_interaction = cvss_data.get('userInteraction', 'Not available') vector_string = cvss_data.get('vectorString', 'Not available') # Set color based on base severity severity_colors = { 'LOW': 0x00FF00, # Green for Low severity 'MEDIUM': 0xFFFF00, # Yellow for Medium severity 'HIGH': 0xFF0000, # Red for High severity 'CRITICAL': 0xFF0000 # Red for Critical severity (added if needed) } color = severity_colors.get(base_severity, 0x0000FF) # Default to blue if severity is unknown # Extract and format dates published_date = cve.get('cve', {}).get('published', 'Not available') last_modified_date = cve.get('cve', {}).get('lastModified', 'Not available') try: published_date = datetime.fromisoformat(published_date).strftime('%Y-%m-%d %H:%M:%S UTC') last_modified_date = datetime.fromisoformat(last_modified_date).strftime('%Y-%m-%d %H:%M:%S UTC') except ValueError: pass # Check if the system is vulnerable from configurations configurations = cve.get('cve', {}).get('configurations', []) vulnerable_criteria = [] for config in configurations: for node in config.get('nodes', []): for cpe_match in node.get('cpeMatch', []): if cpe_match.get('vulnerable', False): vulnerable_criteria.append(cpe_match.get('criteria', 'N/A')) # Collect references, if any exist references = [ref.get('url') for ref in cve.get('cve', {}).get('references', []) if ref.get('url')] # Create the embed structure embed = { 'title': f'CVE Details: {cve_id}', 'description': description, 'color': color, 'fields': [ { 'name': 'CVSS Score', 'value': str(base_score) if base_score != 'Not available' else 'Not available', 'inline': True }, { 'name': 'CVSS Severity', 'value': base_severity if base_severity != 'Not available' else 'Not available', 'inline': True }, { 'name': 'Attack Vector', 'value': attack_vector if attack_vector != 'Not available' else 'Not available', 'inline': True }, { 'name': 'Attack Complexity', 'value': attack_complexity if attack_complexity != 'Not available' else 'Not available', 'inline': True }, { 'name': 'Privileges Required', 'value': privileges_required if privileges_required != 'Not available' else 'Not available', 'inline': True }, { 'name': 'User Interaction', 'value': user_interaction if user_interaction != 'Not available' else 'Not available', 'inline': True }, { 'name': 'CVSS Vector', 'value': vector_string if vector_string != 'Not available' else 'Not available', 'inline': True }, { 'name': 'Published Date', 'value': published_date if published_date != 'Not available' else 'Not available', 'inline': True }, { 'name': 'Last Modified Date', 'value': last_modified_date if last_modified_date != 'Not available' else 'Not available', 'inline': True }, { 'name': 'Vulnerable Systems', 'value': '\n'.join(vulnerable_criteria) if vulnerable_criteria else 'No vulnerable systems specified', 'inline': False } ], 'footer': { 'text': 'Source: NVD' }, 'url': f'https://nvd.nist.gov/vuln/detail/{cve_id}' } # Add references if any exist if references: embed['fields'].append({ 'name': 'References', 'value': '\n'.join(references), 'inline': False }) return embed def fetch_cves(): global previous_cves # Current time for the request now = datetime.utcnow() start_date = (now - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%S.000') end_date = now.strftime('%Y-%m-%dT%H:%M:%S.000') cvs = ["LOW", "MEDIUM", "HIGH"] # Construct the request URL with the updated date range for severity in cvs: url = f"https://services.nvd.nist.gov/rest/json/cves/2.0/?pubStartDate={start_date}&pubEndDate={end_date}&cvssV3Severity={severity}" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36' } retry_attempts = 3 for attempt in range(retry_attempts): try: # Make the request to the NVD API response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() # Debug: Print raw response #print("Raw response data:") #print(json.dumps(data, indent=2)) # Pretty-print the JSON response # Extract CVEs cves = data.get('vulnerabilities', []) # Debug: Print number of CVEs found print(f"Number of CVEs found: {len(cves)}") # Identify new or updated CVEs new_or_updated_cves = [] for cve in cves: cve_id = cve['cve']['id'] last_modified = cve['cve'].get('lastModified', 'Not available') if (cve_id, last_modified) not in previous_cves: new_or_updated_cves.append(cve) if new_or_updated_cves: print(f"Found {len(new_or_updated_cves)} new or updated CVEs:") for cve in new_or_updated_cves: # Format the CVE data as an embed embed = format_embed(cve) # Send the embed via webhook send_webhook(embed) # Update the dictionary of previously fetched CVEs previous_cves[(cve['cve']['id'], cve['cve'].get('lastModified', 'Not available'))] = cve['cve'].get('lastModified', 'Not available') save_previous_cves() else: print("No new or updated CVEs found.") # Exit the retry loop if successful break except requests.RequestException as e: print(f"Error fetching CVEs (attempt {attempt + 1}/{retry_attempts}): {e}") # Wait before retrying time.sleep(5) # Load previously fetched CVEs from file load_previous_cves() # Schedule the task to run every hour schedule.every(1).hours.do(fetch_cves) fetch_cves() print("Starting CVE monitoring...") while True: schedule.run_pending() time.sleep(1)