163 lines
5.9 KiB
Python
163 lines
5.9 KiB
Python
|
import requests
|
||
|
from selenium import webdriver
|
||
|
from selenium.webdriver.chrome.options import Options
|
||
|
from selenium.webdriver.common.by import By
|
||
|
from datetime import datetime, timedelta
|
||
|
import time
|
||
|
import re
|
||
|
|
||
|
# Set your Discord webhook URL here
|
||
|
DISCORD_WEBHOOK_URL = '' # Replace with your actual webhook URL
|
||
|
SEEN_VIDEOS_FILE = 'seen_videos.txt' # File to keep track of seen video IDs
|
||
|
|
||
|
options = Options()
|
||
|
#options.add_argument("--headless") # Uncomment for headless mode
|
||
|
options.add_argument('--no-sandbox') # Disable sandbox
|
||
|
options.add_argument('--disable-dev-shm-usage') # Use shared memory in case of resource issues
|
||
|
options.add_argument('--disable-gpu') # Disable GPU if running into GPU issues
|
||
|
options.add_experimental_option("prefs", {"profile.default_content_setting_values.notifications": 1})
|
||
|
options.add_argument("--disable-infobars")
|
||
|
|
||
|
|
||
|
|
||
|
CHECK_INTERVAL = 60*60*1 # Check every 12 hours (43200 seconds)
|
||
|
seen_videos = set() # To keep track of seen video IDs
|
||
|
|
||
|
def load_seen_videos():
|
||
|
"""Load seen video IDs from a file."""
|
||
|
try:
|
||
|
with open(SEEN_VIDEOS_FILE, 'r') as file:
|
||
|
return set(line.strip() for line in file if line.strip())
|
||
|
except FileNotFoundError:
|
||
|
return set()
|
||
|
|
||
|
def save_seen_video(video_id):
|
||
|
"""Save a video ID to the seen videos file."""
|
||
|
with open(SEEN_VIDEOS_FILE, 'a') as file:
|
||
|
file.write(f"{video_id}\n")
|
||
|
|
||
|
def send_discord_notification(video_title, video_link, upload_time, thumbnail_url, creator_name):
|
||
|
embed = {
|
||
|
"title": video_title,
|
||
|
"url": video_link,
|
||
|
"description": f"Creator: {creator_name}\nUploaded on: {upload_time.strftime('%Y-%m-%d %H:%M:%S')}",
|
||
|
"thumbnail": {"url": thumbnail_url},
|
||
|
"color": 16711680
|
||
|
}
|
||
|
data = {
|
||
|
"embeds": [embed]
|
||
|
}
|
||
|
requests.post(DISCORD_WEBHOOK_URL, json=data)
|
||
|
|
||
|
def parse_time(upload_time_str):
|
||
|
"""Convert the upload time string to a datetime object."""
|
||
|
time_units = {
|
||
|
'years': 0,
|
||
|
'months': 0,
|
||
|
'weeks': 0,
|
||
|
'days': 0,
|
||
|
'hours': 0,
|
||
|
'minutes': 0,
|
||
|
'seconds': 0
|
||
|
}
|
||
|
|
||
|
time_pattern = re.compile(r'(\d+)\s*(year|years|month|months|week|weeks|day|days|hour|hours|minute|minutes|second|seconds)')
|
||
|
|
||
|
for match in time_pattern.finditer(upload_time_str):
|
||
|
value, unit = match.groups()
|
||
|
value = int(value)
|
||
|
|
||
|
if 'year' in unit:
|
||
|
time_units['years'] += value
|
||
|
elif 'month' in unit:
|
||
|
time_units['months'] += value
|
||
|
elif 'week' in unit:
|
||
|
time_units['weeks'] += value
|
||
|
elif 'day' in unit:
|
||
|
time_units['days'] += value
|
||
|
elif 'hour' in unit:
|
||
|
time_units['hours'] += value
|
||
|
elif 'minute' in unit:
|
||
|
time_units['minutes'] += value
|
||
|
elif 'second' in unit:
|
||
|
time_units['seconds'] += value
|
||
|
|
||
|
# Convert months and years to days
|
||
|
total_days = (time_units['years'] * 365) + (time_units['months'] * 30) + time_units['days']
|
||
|
|
||
|
# Create a timedelta
|
||
|
total_time = timedelta(
|
||
|
days=total_days,
|
||
|
weeks=time_units['weeks'],
|
||
|
hours=time_units['hours'],
|
||
|
minutes=time_units['minutes'],
|
||
|
seconds=time_units['seconds']
|
||
|
)
|
||
|
|
||
|
# Return the correct upload time
|
||
|
return datetime.now() - total_time
|
||
|
|
||
|
def fetch_latest_videos(channel_url, driver):
|
||
|
driver.get(channel_url)
|
||
|
time.sleep(3) # Wait for the page to load
|
||
|
|
||
|
# Extract the creator name from the <title> tag
|
||
|
creator_name = driver.title.replace(" - YouTube", "")
|
||
|
|
||
|
# Click "Allow Cookies" if the button is present
|
||
|
try:
|
||
|
cookie_button = driver.find_element(By.CSS_SELECTOR, "button.VfPpkd-LgbsSe.VfPpkd-LgbsSe-OWXEXe-k8QpJ.VfPpkd-LgbsSe-OWXEXe-dgl2Hf.nCP5yc.AjY5Oe.DuMIQc.Gu558e")
|
||
|
cookie_button.click()
|
||
|
except Exception as e:
|
||
|
pass
|
||
|
|
||
|
# Find all video elements
|
||
|
video_elements = driver.find_elements(By.CSS_SELECTOR, 'ytd-rich-grid-media')
|
||
|
new_videos = []
|
||
|
|
||
|
for video in video_elements:
|
||
|
title_element = video.find_element(By.ID, 'video-title-link')
|
||
|
upload_time_element = video.find_elements(By.CSS_SELECTOR, 'span.inline-metadata-item')[-1] # Get the last one for upload time
|
||
|
thumbnail_element = video.find_element(By.CSS_SELECTOR, 'img.yt-core-image')
|
||
|
|
||
|
video_id = title_element.get_attribute('href').split('v=')[-1]
|
||
|
if video_id not in seen_videos:
|
||
|
video_title = title_element.get_attribute('title')
|
||
|
upload_time_str = upload_time_element.text
|
||
|
thumbnail_url = thumbnail_element.get_attribute('src')
|
||
|
|
||
|
# Get upload time
|
||
|
upload_time = parse_time(upload_time_str)
|
||
|
|
||
|
# Check if the video was uploaded in the last 12 hours
|
||
|
if datetime.now() - upload_time < timedelta(days=7):
|
||
|
new_videos.append((video_title, title_element.get_attribute('href'), upload_time, thumbnail_url, creator_name))
|
||
|
seen_videos.add(video_id)
|
||
|
save_seen_video(video_id)
|
||
|
|
||
|
return new_videos
|
||
|
|
||
|
def main():
|
||
|
global seen_videos
|
||
|
seen_videos = load_seen_videos()
|
||
|
try:
|
||
|
with open('youtubes.txt', 'r') as file:
|
||
|
channels = [line.strip() for line in file if line.strip()]
|
||
|
|
||
|
while True:
|
||
|
driver = webdriver.Chrome(options=options)
|
||
|
for channel in channels:
|
||
|
channel_url = f"{channel}/videos"
|
||
|
new_videos = fetch_latest_videos(channel_url,driver)
|
||
|
for video_title, video_link, upload_time, thumbnail_url, creator_name in new_videos:
|
||
|
print(f"New video found: {video_title}\nLink: {video_link}\nUploaded on: {upload_time}\nThumbnail: {thumbnail_url}\nCreator: {creator_name}")
|
||
|
send_discord_notification(video_title, video_link, upload_time, thumbnail_url, creator_name) # Notify
|
||
|
driver.quit()
|
||
|
time.sleep(CHECK_INTERVAL)
|
||
|
finally:
|
||
|
driver.quit()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|