add schedule package to daemon (#48)

pull/51/head
Celina Peralta 2023-03-31 15:33:41 -04:00 committed by GitHub
parent 966c336b68
commit b80d745b30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 23 additions and 27 deletions

View File

@ -10,6 +10,7 @@ psycopg2 = "2.9.3"
pandas = "*" pandas = "*"
scikit-learn = "*" scikit-learn = "*"
lxml = "*" lxml = "*"
schedule = "*"
charset_normalizer = "*" charset_normalizer = "*"
idna = "*" idna = "*"
certifi = "*" certifi = "*"

View File

@ -71,16 +71,13 @@ def drop_schema(connection) -> None:
cursor.close() cursor.close()
def seed_endpoints(connection): def get_endpoints():
collections = OapenAPI.get_all_collections() collections = OapenAPI.get_all_collections()
if collections is None: if collections is None:
logger.error("Could not fetch collections from OAPEN server. Is it down?") logger.error("Could not fetch collections from OAPEN server. Is it down?")
sys.exit(1) sys.exit(1)
db = OapenDB(connection)
endpoints = [] endpoints = []
COLLECTION_IMPORT_LIMIT = int(os.environ["COLLECTION_IMPORT_LIMIT"]) COLLECTION_IMPORT_LIMIT = int(os.environ["COLLECTION_IMPORT_LIMIT"])
@ -108,6 +105,12 @@ def seed_endpoints(connection):
) )
endpoints.append(x) endpoints.append(x)
return endpoints
def seed_endpoints(connection):
db = OapenDB(connection)
endpoints = get_endpoints()
db.add_urls(endpoints) db.add_urls(endpoints)

View File

@ -21,4 +21,4 @@ SUGGESTIONS_MAX_ITEMS = 50
# Update items that were modifed since X days ago # Update items that were modifed since X days ago
UPDATE_DAYS_BEFORE = 30 UPDATE_DAYS_BEFORE = 30
REFRESH_IMPORT_LIMIT = 50 REFRESH_IMPORT_LIMIT = 0

View File

@ -4,7 +4,9 @@ import signal
import sys import sys
import time import time
import schedule
from clean import run as run_clean from clean import run as run_clean
from clean import seed_endpoints
from data.connection import get_connection from data.connection import get_connection
from data.oapen_db import OapenDB from data.oapen_db import OapenDB
from generate_suggestions import run as run_generate_suggestions from generate_suggestions import run as run_generate_suggestions
@ -12,10 +14,17 @@ from logger.base_logger import logger
from refresh_items import run as run_refresh_items from refresh_items import run as run_refresh_items
from seed import run as run_seed from seed import run as run_seed
conn = get_connection()
db = OapenDB(conn)
logger.info("Daemon up")
def harvest(): def harvest():
run_seed() seed_endpoints()
run_generate_suggestions() urls = db.get_incomplete_urls()
if len(urls) > 0:
run_seed()
run_generate_suggestions()
def refresh(): def refresh():
@ -23,12 +32,6 @@ def refresh():
run_generate_suggestions() run_generate_suggestions()
logger.info("Daemon up")
conn = get_connection()
db = OapenDB(conn)
def signal_handler(signal, frame): def signal_handler(signal, frame):
conn.close() conn.close()
logger.info("Daemon exiting.") logger.info("Daemon exiting.")
@ -46,22 +49,11 @@ if int(os.environ["RUN_CLEAN"]) == 1 or (
harvest() harvest()
harvest_acc = 0 schedule.every().day.at("20:00").do(refresh)
refresh_acc = 0 schedule.every().sunday.at("22:00").do(harvest)
while True: while True:
if harvest_acc >= int(os.environ["HARVEST_PERIOD"]): schedule.run_pending()
urls = db.get_incomplete_urls()
if len(urls) > 0:
harvest()
harvest_acc = 0
if refresh_acc >= int(os.environ["REFRESH_PERIOD"]):
refresh()
refresh_acc = 0
time.sleep(60) time.sleep(60)
refresh_acc += 60
harvest_acc += 60
logger.info("Daemon down") logger.info("Daemon down")