celinanperalta/OAP-58, OAP-59: Update suggestion task, remove unnecessary collections from harvest (#42)

* Each thread inserts into DB using one synchronized conn

* Fix formatting for get_empty query

* OAP-59: Filter out unnecessary collections from harvest

* Add endpoints table check

* Fix typo in get_empty description
better-pass-related
Celina Peralta 2023-03-22 13:15:53 -04:00 committed by GitHub
parent 15f801a19a
commit 884872cf60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 73 additions and 58 deletions

View File

@ -3,7 +3,7 @@ services:
oapen-engine :
build: ./oapen-engine/
environment:
- RUN_CLEAN=1
- RUN_CLEAN=0
- COLLECTION_IMPORT_LIMIT=0 # Set to 0 for full harvest
- REFRESH_PERIOD=86400 # daily
- HARVEST_PERIOD=604800 # weekly

View File

@ -147,12 +147,17 @@ class OapenDB:
finally:
cursor.close()
def get_all_ngrams(self, ngram_limit=None) -> List[NgramRow]:
# get_empty = True -> Include rows with no ngrams in result
def get_all_ngrams(self, get_empty=True) -> List[NgramRow]:
cursor = self.connection.cursor()
query = """
SELECT handle, CAST (ngrams AS oapen_suggestions.ngram[]), created_at, updated_at
FROM oapen_suggestions.ngrams
"""
if not get_empty:
query += """
WHERE ngrams != \'{}\'
"""
ret = None
try:
cursor.execute(query)

View File

@ -85,7 +85,15 @@ def seed_endpoints(connection):
COLLECTION_IMPORT_LIMIT = int(os.environ["COLLECTION_IMPORT_LIMIT"])
SKIPPED_COLLECTIONS = [
"1f7c8abd-677e-4275-8b4e-3d8da49f7b36",
"93223e33-3c7c-47bd-9356-a7878b2814a0",
]
for collection in collections:
if collection["uuid"] in SKIPPED_COLLECTIONS:
continue
num_items = (
collection["numberItems"]
if COLLECTION_IMPORT_LIMIT == 0

View File

@ -16,8 +16,8 @@ SCORE_THRESHOLD = 1
TOP_K_NGRAMS_COUNT = 30
# Number of threads to generate suggestions
SUGGESTIONS_MAX_WORKERS = 250
SUGGESTIONS_MAX_ITEMS = 25
SUGGESTIONS_MAX_WORKERS = 10
SUGGESTIONS_MAX_ITEMS = 50
# Update items that were modifed since X days ago
UPDATE_DAYS_BEFORE = 30

View File

@ -38,7 +38,9 @@ def signal_handler(signal, frame):
signal.signal(signal.SIGINT, signal_handler)
if int(os.environ["RUN_CLEAN"]) == 1 or (
not db.table_exists("suggestions") or not db.table_exists("ngrams")
not db.table_exists("suggestions")
or not db.table_exists("ngrams")
or not db.table_exists("endpoints")
):
run_clean()

View File

@ -1,18 +1,15 @@
import concurrent.futures
import time
from collections import Counter
from threading import Lock
from typing import List
import config
import tqdm
from data.connection import close_connection, get_connection
from data.oapen_db import OapenDB
from logger.base_logger import logger
from model.oapen_types import NgramRow, SuggestionRow
# for each item in ngrams
# get suggestions for item
# store in database
from tqdm.auto import tqdm
# initial seed -> get suggestions on everything n^2
# weekly update ->
@ -21,58 +18,55 @@ from model.oapen_types import NgramRow, SuggestionRow
# optimization: only suggest once per pair
def suggestion_task(items, all_items, mutex, suggestions):
for item_a in items:
handle_a = item_a[0]
ngrams_a = [
x[0] for x in item_a[1][0 : min(len(item_a[1]), config.TOP_K_NGRAMS_COUNT)]
]
def get_ngrams_list(arr: List[NgramRow]):
return [x[0] for x in arr[1][0 : min(len(arr[1]), config.TOP_K_NGRAMS_COUNT)]]
def suggestion_task(items, all_items, db_mutex, db):
suggestions: List[SuggestionRow] = []
for item_a in items:
item_suggestions = []
handle_a = item_a[0]
for item_b in all_items:
handle_b = item_b[0]
ngrams_b = [
x[0]
for x in item_b[1][0 : min(len(item_b[1]), config.TOP_K_NGRAMS_COUNT)]
]
if handle_a == handle_b:
continue
repeated = len(list(filter(lambda x: x in ngrams_b, ngrams_a)))
ngrams_shared = len(list(filter(lambda x: x in item_b[1], item_a[1])))
if repeated >= config.SCORE_THRESHOLD:
item_suggestions.append((handle_b, repeated))
if ngrams_shared >= config.SCORE_THRESHOLD:
item_suggestions.append((handle_b, ngrams_shared))
mutex.acquire()
item_suggestions.sort(key=lambda x: x[1], reverse=True)
mutex.release()
suggestions.append((handle_a, handle_a, item_suggestions))
count = len(suggestions)
db_mutex.acquire()
db.add_many_suggestions(suggestions)
db_mutex.release()
return count
def run():
mutex = Lock()
db_mutex = Lock()
connection = get_connection()
db = OapenDB(connection)
all_items: List[NgramRow] = db.get_all_ngrams()
suggestions: List[SuggestionRow] = []
all_items: List[NgramRow] = db.get_all_ngrams(get_empty=False)
# Remove any empty entries
all_items = list(filter(lambda item: len(item[1]) != 0, all_items))
logger.info("Generating suggestions for {0} items.".format(str(len(all_items))))
logger.info("Getting suggestions for {0} items...".format(str(len(all_items))))
futures = []
# Get only top k ngrams for all items before processing
for item in all_items:
item = (
item[0],
[x[0] for x in item[1]][0 : min(len(item[1]), config.TOP_K_NGRAMS_COUNT)],
)
ngrams = get_ngrams_list(item)
item = (item[0], ngrams)
time_start = time.perf_counter()
@ -80,39 +74,45 @@ def run():
chunks = [all_items[i : i + n] for i in range(0, len(all_items), n)]
with concurrent.futures.ThreadPoolExecutor(
counter = Counter(items_updated=0)
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=config.SUGGESTIONS_MAX_WORKERS
) as executor:
)
for chunk in chunks:
future = executor.submit(
suggestion_task, chunk, all_items, mutex, suggestions
)
futures.append(future)
def refresh(future, counter, pbar):
pbar.update(future.result())
counter["items_updated"] += future.result()
pbar.refresh()
with tqdm.tqdm(
total=len(futures),
mininterval=0,
miniters=1,
leave=True,
position=0,
initial=0,
) as pbar:
pbar = tqdm(
total=len(all_items),
mininterval=0,
miniters=1,
leave=True,
position=0,
initial=0,
)
for future in concurrent.futures.as_completed(futures):
future.result()
pbar.update(1)
for chunk in chunks:
future = executor.submit(suggestion_task, chunk, all_items, db_mutex, db)
future.add_done_callback(lambda x: refresh(x, counter, pbar))
futures.append(future)
db.add_many_suggestions(suggestions)
for future in concurrent.futures.as_completed(futures):
pass
logger.info(
"Updated suggestions for "
+ str(len(all_items))
+ str(counter["items_updated"])
+ " items in "
+ str(time.perf_counter() - time_start)
+ "s."
)
executor.shutdown(wait=True)
pbar.close()
close_connection(connection)