Pruning now reharvests full text, reruns ngram tasks, and regenerates suggestions with new stopwords filtered

peterrauscher/oap-41
Peter Rauscher 2023-03-31 01:00:08 -04:00
parent 87d610f325
commit 149f2cbdc0
11 changed files with 300 additions and 106 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
.vercel
oapen-engine/lib/
.python-version
.python-version
*.code-workspace

View File

@ -19,7 +19,7 @@ GET_COLLECTION_BY_LABEL = (
GET_WEEKLY_ITEMS = "/rest/search?query=dc.date.accessioned_dt:[NOW-7DAY/DAY+TO+NOW]&expand=bitstreams,metadata"
GET_UPDATED_ITEMS = (
"/rest/search?query=lastModified%3E{date}&expand=metadata,bitsteams" # YYYY-MM-DD
"/rest/search?query=lastModified%3E{date}&expand=metadata,bitstreams" # YYYY-MM-DD
)
# This is the only community we care about right now

View File

@ -1,3 +1,4 @@
import re
from typing import List, Union
import psycopg2
@ -217,7 +218,7 @@ class OapenDB:
args
)
cursor.execute(query)
records = cursor.fetchall
records = cursor.fetchall()
ret = records
except (Exception, psycopg2.Error) as error:
logger.error(error)
@ -322,7 +323,7 @@ class OapenDB:
finally:
cursor.close()
def get_new_stopwords(self, stopwords):
def get_new_stopwords(self, stopwords) -> List[str]:
ret = None
try:
cursor = self.connection.cursor()
@ -331,12 +332,12 @@ class OapenDB:
DROP TABLE IF EXISTS temp_stopwords;
CREATE TEMPORARY TABLE temp_stopwords (stopword text);
INSERT INTO temp_stopwords (stopword) VALUES {};
SELECT DISTINCT temp_stopwords.stopword
FROM (
temp_stopwords LEFT OUTER JOIN oapen_suggestions.stopwords
ON temp_stopwords.stopword=oapen_suggestions.stopwords.stopword
)
WHERE oapen_suggestions.stopwords.stopword IS NULL;
SELECT stopword from temp_stopwords
WHERE NOT EXISTS (
SELECT 1
FROM oapen_suggestions.stopwords
WHERE stopword = oapen_suggestions.stopwords.stopword
)
""".format(
args
)
@ -350,14 +351,15 @@ class OapenDB:
cursor.close()
return ret
def update_stopwords(self, stopwords):
def update_stopwords(self, stopwords) -> None:
try:
cursor = self.connection.cursor()
args = self.mogrify_stopwords(stopwords)
query = """
DELETE FROM oapen_suggestions.stopwords;
INSERT INTO oapen_suggestions.stopwords (stopword)
VALUES {args};
VALUES {}
ON CONFLICT (stopword) DO NOTHING;
""".format(
args
)
@ -368,20 +370,20 @@ class OapenDB:
finally:
cursor.close()
def get_all_items_containing_stopwords(self, stopwords):
def get_all_items_containing_stopwords(self, stopwords) -> List[str]:
ret = None
try:
cursor = self.connection.cursor()
query = """
SELECT handle
FROM oapen_suggestions.ngrams
WHERE EXISTS (
WHERE ngrams != '{{}}' AND EXISTS (
SELECT 1
FROM UNNEST(ngrams) AS ng
WHERE ng.ngram ~ '(^|\s)({})($|\s)'
WHERE ng.ngram ~ '(^| )({})($| )'
);
""".format(
"|".join(stopwords)
"|".join([sw.replace("'", "''") for sw in stopwords])
)
cursor.execute(query)

View File

@ -1,7 +1,8 @@
import os
from functools import reduce
import nltk
from nltk.corpus import stopwords
from functools import reduce
import os
# This is run as a precaution in case of the error "NLTK stop words not found",
# which makes sure to download the stop words after installing nltk
@ -23,11 +24,7 @@ custom_stopwords_in_use = [
]
# For reference on available languages, please reference https://pypi.org/project/stop-words/
enabled_languages = [
"english",
"german",
"dutch"
]
enabled_languages = ["english", "german", "dutch"]
# the combined stopwords of all enabled langauges
nltk_stopwords = []
@ -37,8 +34,10 @@ for language in enabled_languages:
# get the custom lists
custom_stopwords = []
for custom_list in custom_stopwords_in_use:
with open(custom_lists_folder + custom_list + ".txt", "r") as file: # specify folder name
with open(
custom_lists_folder + custom_list + ".txt", "r"
) as file: # specify folder name
custom_stopwords += [line.rstrip() for line in file]
# add languages and custom stopwords for final stopwords var
STOPWORDS = (nltk_stopwords + custom_stopwords)
STOPWORDS = set(nltk_stopwords + custom_stopwords)

View File

@ -1,130 +1,209 @@
import concurrent.futures
import multiprocessing
import signal
import sys
import threading
import time
from collections import Counter
from threading import Lock
from typing import List
import config
import tqdm
from data.connection import close_connection, get_connection
from data.oapen_db import OapenDB
from logger.base_logger import logger
from model.oapen_types import NgramRow, SuggestionRow
from model.stopwords import get_all_stopwords
from model.stopwords_processor import STOPWORDS
from prune_tasks.db_task import db_task
from prune_tasks.ngrams_task import ngrams_task
from prune_tasks.reharvest_task import reharvest_task
from tasks.generate_suggestions import get_ngrams_list, suggestion_task
from tqdm.auto import tqdm
connection = get_connection()
db = OapenDB(connection)
def suggestion_task(items, all_items, mutex, suggestions):
for item_a in items:
handle_a = item_a[0]
ngrams_a = [
x[0] for x in item_a[1][0 : min(len(item_a[1]), config.TOP_K_NGRAMS_COUNT)]
]
def shutdown():
logger.info("Stopping import.")
close_connection(connection)
item_suggestions = []
for item_b in all_items:
handle_b = item_b[0]
ngrams_b = [
x[0]
for x in item_b[1][0 : min(len(item_b[1]), config.TOP_K_NGRAMS_COUNT)]
]
if handle_a == handle_b:
continue
repeated = len(list(filter(lambda x: x in ngrams_b, ngrams_a)))
if repeated >= config.SCORE_THRESHOLD:
item_suggestions.append((handle_b, repeated))
mutex.acquire()
item_suggestions.sort(key=lambda x: x[1], reverse=True)
mutex.release()
suggestions.append((handle_a, handle_a, item_suggestions))
def signal_handler(signal, frame):
logger.warning("Received shutdown for prune.py.")
shutdown()
sys.exit(0)
def run():
connection = get_connection()
db = OapenDB(connection)
db_mutex = Lock()
all_stopwords = get_all_stopwords()
print(all_stopwords)
refresh_handles = []
# Find the new stopwords since previous run
new_stopwords = [sw[0] for sw in db.get_new_stopwords(STOPWORDS)]
# Update the stopwords table
db.update_stopwords(STOPWORDS)
# Get the list of (only) the new stopwords
logger.info("Searching existing items for added stopwords")
new_stopwords = list(db.get_new_stopwords(all_stopwords))
# Update the stopwords in the database
# (used to determine which are new next run)
db.update_stopwords(all_stopwords)
if new_stopwords == None:
logger.info("No new stopwords! Aborting suggestion re-run")
else:
logger.info("Added new stopwords: " + ", ".join(new_stopwords))
if not new_stopwords == None or len(new_stopwords) == 0:
logger.info("Added {} new stopwords.".format(len(new_stopwords)))
# Get the handles of all items with ngrams containing the new stopwords
logger.info("Searching existing items for added stopwords...")
refresh_handles = db.get_all_items_containing_stopwords(new_stopwords)
logger.info(
"Found {} items with ngrams containing stopwords.".format(
len(refresh_handles)
)
)
# Re-harvest the bitstreams for those items
manager = multiprocessing.Manager()
item_queue = multiprocessing.JoinableQueue(config.IO_MAX_WORKERS * 2)
db_queue = multiprocessing.JoinableQueue()
ngrams_event = manager.Event()
db_event = threading.Event()
counter = Counter(items_found=0, ngrams_generated=0, items_inserted=0)
producers_done, consumers_done = 0, 0
producer_futures = []
consumer_futures = []
db_futures = []
signal.signal(signal.SIGINT, signal_handler)
pbar = tqdm(
total=len(refresh_handles),
desc="Re-harvest Threads Completed",
mininterval=0,
miniters=1,
initial=0,
)
pbar.set_postfix({"items found": counter["items_found"]})
time_start = time.perf_counter()
ngrams_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count()
)
io_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=config.IO_MAX_WORKERS
)
db_executor = concurrent.futures.ThreadPoolExecutor()
for _ in range(multiprocessing.cpu_count()):
consumer_futures.append(
ngrams_executor.submit(ngrams_task, item_queue, db_queue, ngrams_event)
)
logger.info("Spawned {} ngrams processes.".format(len(consumer_futures)))
db_futures.append(db_executor.submit(db_task, db, db_queue, db_event))
def refresh(future, pbar, counter):
pbar.update(1)
counter["items_found"] += future.result()
pbar.set_postfix({"items found": counter["items_found"]})
pbar.refresh()
for handle in refresh_handles:
future = io_executor.submit(reharvest_task, handle, item_queue)
future.add_done_callback(lambda x: refresh(x, pbar, counter))
producer_futures.append(future)
time.sleep(config.HARVEST_THREAD_SPAWN_DELAY)
for future in concurrent.futures.as_completed(producer_futures):
producers_done += 1
if producers_done == len(producer_futures) or io_executor._shutdown:
item_queue.join()
ngrams_event.set()
db_event.set()
for future in concurrent.futures.as_completed(consumer_futures):
res = future.result()
consumers_done += 1
counter["ngrams_generated"] += res
db_queue.join()
ngrams_executor.shutdown(wait=True)
db_executor.shutdown(wait=True)
io_executor.shutdown()
pbar.close()
logger.info("Completed {0} items".format(counter["ngrams_generated"]))
logger.info(
"Reharvest finished in " + str(time.perf_counter() - time_start) + "s."
)
# Rerun generate_suggestions on the flagged items
refresh_items: List[NgramRow] = db.get_ngrams_with_handles(refresh_handles)
refreshed_suggestions: List[SuggestionRow] = []
# Remove empty entries
refresh_items = [item for item in refresh_items if len(item[1]) != 0]
logger.info(
"Regenerating suggestions for {0} items".format(len(refresh_handles))
"Regenerating suggestions for {} items...".format(len(refresh_handles))
)
# Get only top k ngrams for all items before processing
for item in refresh_items:
item = (
item[0],
[x[0] for x in itme[1]][
0 : min(len(item[1]), config.TOP_K_NGRAMS_COUNT)
],
)
# Necessary to get all items, to run comparisons
all_items: List[NgramRow] = db.get_all_ngrams(get_empty=False)
suggestions_futures = []
# Get only top k ngrams for all items before processing
for item in all_items:
ngrams = get_ngrams_list(item)
item = (item[0], ngrams)
time_start_suggestions = time.perf_counter()
time_start = time.perf_counter()
n = config.SUGGESTIONS_MAX_ITEMS
chunks = [refresh_items[i : i + n] for i in range(0, len(refresh_items), n)]
with concurrent.futures.ThreadPoolExecutor(
suggestions_counter = Counter(items_updated=0)
suggestions_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=config.SUGGESTIONS_MAX_WORKERS
) as executor:
)
for chunk in chunks:
future = executor.submit(
suggestion_task, chunk, refresh_items, mutex, refreshed_suggestions
)
futures.append(future)
def refresh_suggestion_pbar(future, counter, pbar):
pbar.update(future.result())
counter["items_updated"] += future.result()
pbar.refresh()
with tqdm.tqdm(
total=len(futures),
mininterval=0,
miniters=1,
leave=True,
position=0,
initial=0,
) as pbar:
pbar = tqdm(
total=len(refresh_items),
mininterval=0,
miniters=1,
leave=True,
position=0,
initial=0,
)
for future in concurrent.futures.as_completed(futures):
future.result()
pbar.update(1)
for chunk in chunks:
future = suggestions_executor.submit(
suggestion_task, chunk, all_items, db_mutex, db
)
future.add_done_callback(
lambda x: refresh_suggestion_pbar(x, suggestions_counter, pbar)
)
suggestions_futures.append(future)
db.add_many_suggestions(refreshed_suggestions)
for future in concurrent.futures.as_completed(suggestions_futures):
pass
logger.info(
"Updated suggestions for "
+ str(len(refresh_items))
+ str(suggestions_counter["items_updated"])
+ " items in "
+ str(time.perf_counter() - time_start)
+ str(time.perf_counter() - time_start_suggestions)
+ "s."
)
# Close the database connection
suggestions_executor.shutdown(wait=True)
pbar.close()
else:
logger.info("No new stopwords, aborting suggestion re-run.")
# Close the database connection no matter what
close_connection(connection)

View File

@ -0,0 +1,45 @@
import multiprocessing
from threading import Event
import config
from data.oapen_db import OapenDB
from logger.base_logger import logger
ENTRIES_PER_INSERT = int(config.NGRAMS_PER_INSERT / config.ITEMS_PER_IMPORT_THREAD)
def db_task(db: OapenDB, db_queue: multiprocessing.Queue, event: Event):
logger.info("(DB) - Starting DB task")
def insert_items(entries):
try:
logger.debug("(DB) - Inserting {0} item(s).".format(len(entries)))
db.add_many_ngrams(entries)
logger.debug("(DB) - Inserted {0} item(s).".format(len(entries)))
return len(entries)
except Exception as e:
logger.error(e)
return -1
while not event.is_set():
if db_queue.qsize() >= ENTRIES_PER_INSERT:
entries = [db_queue.get() for _ in range(ENTRIES_PER_INSERT)]
count = len(entries)
insert_items(entries)
for _ in range(count):
db_queue.task_done()
if not db_queue.empty():
entries = []
while not db_queue.empty():
entries.append(db_queue.get())
count = len(entries)
insert_items(entries)
for _ in range(count):
db_queue.task_done()
logger.info("(DB) - Exiting...")
return

View File

@ -0,0 +1,31 @@
import multiprocessing
import queue
import model.ngrams as OapenEngine
from logger.base_logger import logger
def ngrams_task(
item_queue: multiprocessing.JoinableQueue,
db_queue: multiprocessing.JoinableQueue,
event: multiprocessing.Event,
):
count = 0
while True:
try:
item = item_queue.get_nowait()
ngrams = OapenEngine.get_ngrams_for_items([item])
db_queue.put(ngrams)
item_queue.task_done()
except queue.Empty:
if event.is_set():
break
else:
continue
except Exception as e:
logger.error(e)
return count

View File

@ -0,0 +1,37 @@
import errno
import multiprocessing
import time
import data.oapen as OapenAPI
from logger.base_logger import logger
MAX_RETRIES = 3
RETRY_DELAY = 10
def reharvest_task(handle: str, items: multiprocessing.JoinableQueue) -> int or None:
ret = 0
handle = handle[0]
logger.info("(IO) Starting - " + str(handle))
for i in range(MAX_RETRIES):
logger.debug("(IO) {}/{} - {}".format(i, MAX_RETRIES, handle))
try:
item = OapenAPI.get_item(handle)
if item is not None:
logger.debug("(IO) DONE")
items.put(item)
ret = 1
break
except IOError as e:
if e.errno == errno.EPIPE:
logger.warning("(IO) (will retry) - " + str(e))
else:
logger.error("(IO) (will retry) - " + str(e))
continue
except Exception as e:
logger.error("(IO) (will retry) - " + str(e))
time.sleep(RETRY_DELAY)
return ret