From 884872cf60f180cdd4b8b7c0b88c260f66d77081 Mon Sep 17 00:00:00 2001 From: Celina Peralta Date: Wed, 22 Mar 2023 13:15:53 -0400 Subject: [PATCH 1/5] celinanperalta/OAP-58, OAP-59: Update suggestion task, remove unnecessary collections from harvest (#42) * Each thread inserts into DB using one synchronized conn * Fix formatting for get_empty query * OAP-59: Filter out unnecessary collections from harvest * Add endpoints table check * Fix typo in get_empty description --- docker-compose.yml | 2 +- oapen-engine/src/data/oapen_db.py | 7 +- oapen-engine/src/tasks/clean.py | 8 ++ oapen-engine/src/tasks/config.py | 4 +- oapen-engine/src/tasks/daemon.py | 4 +- .../src/tasks/generate_suggestions.py | 106 +++++++++--------- 6 files changed, 73 insertions(+), 58 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index cdfb586..1b101f9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ services: oapen-engine : build: ./oapen-engine/ environment: - - RUN_CLEAN=1 + - RUN_CLEAN=0 - COLLECTION_IMPORT_LIMIT=0 # Set to 0 for full harvest - REFRESH_PERIOD=86400 # daily - HARVEST_PERIOD=604800 # weekly diff --git a/oapen-engine/src/data/oapen_db.py b/oapen-engine/src/data/oapen_db.py index 9987006..6835492 100644 --- a/oapen-engine/src/data/oapen_db.py +++ b/oapen-engine/src/data/oapen_db.py @@ -147,12 +147,17 @@ class OapenDB: finally: cursor.close() - def get_all_ngrams(self, ngram_limit=None) -> List[NgramRow]: + # get_empty = True -> Include rows with no ngrams in result + def get_all_ngrams(self, get_empty=True) -> List[NgramRow]: cursor = self.connection.cursor() query = """ SELECT handle, CAST (ngrams AS oapen_suggestions.ngram[]), created_at, updated_at FROM oapen_suggestions.ngrams """ + if not get_empty: + query += """ + WHERE ngrams != \'{}\' + """ ret = None try: cursor.execute(query) diff --git a/oapen-engine/src/tasks/clean.py b/oapen-engine/src/tasks/clean.py index 1210a6e..7900b71 100644 --- a/oapen-engine/src/tasks/clean.py +++ b/oapen-engine/src/tasks/clean.py @@ -85,7 +85,15 @@ def seed_endpoints(connection): COLLECTION_IMPORT_LIMIT = int(os.environ["COLLECTION_IMPORT_LIMIT"]) + SKIPPED_COLLECTIONS = [ + "1f7c8abd-677e-4275-8b4e-3d8da49f7b36", + "93223e33-3c7c-47bd-9356-a7878b2814a0", + ] + for collection in collections: + if collection["uuid"] in SKIPPED_COLLECTIONS: + continue + num_items = ( collection["numberItems"] if COLLECTION_IMPORT_LIMIT == 0 diff --git a/oapen-engine/src/tasks/config.py b/oapen-engine/src/tasks/config.py index 4978a3f..a90b08a 100644 --- a/oapen-engine/src/tasks/config.py +++ b/oapen-engine/src/tasks/config.py @@ -16,8 +16,8 @@ SCORE_THRESHOLD = 1 TOP_K_NGRAMS_COUNT = 30 # Number of threads to generate suggestions -SUGGESTIONS_MAX_WORKERS = 250 -SUGGESTIONS_MAX_ITEMS = 25 +SUGGESTIONS_MAX_WORKERS = 10 +SUGGESTIONS_MAX_ITEMS = 50 # Update items that were modifed since X days ago UPDATE_DAYS_BEFORE = 30 diff --git a/oapen-engine/src/tasks/daemon.py b/oapen-engine/src/tasks/daemon.py index 8f4b77f..507c785 100644 --- a/oapen-engine/src/tasks/daemon.py +++ b/oapen-engine/src/tasks/daemon.py @@ -38,7 +38,9 @@ def signal_handler(signal, frame): signal.signal(signal.SIGINT, signal_handler) if int(os.environ["RUN_CLEAN"]) == 1 or ( - not db.table_exists("suggestions") or not db.table_exists("ngrams") + not db.table_exists("suggestions") + or not db.table_exists("ngrams") + or not db.table_exists("endpoints") ): run_clean() diff --git a/oapen-engine/src/tasks/generate_suggestions.py b/oapen-engine/src/tasks/generate_suggestions.py index 5270770..68f6c12 100644 --- a/oapen-engine/src/tasks/generate_suggestions.py +++ b/oapen-engine/src/tasks/generate_suggestions.py @@ -1,18 +1,15 @@ import concurrent.futures import time +from collections import Counter from threading import Lock from typing import List import config -import tqdm from data.connection import close_connection, get_connection from data.oapen_db import OapenDB from logger.base_logger import logger from model.oapen_types import NgramRow, SuggestionRow - -# for each item in ngrams -# get suggestions for item -# store in database +from tqdm.auto import tqdm # initial seed -> get suggestions on everything n^2 # weekly update -> @@ -21,58 +18,55 @@ from model.oapen_types import NgramRow, SuggestionRow # optimization: only suggest once per pair -def suggestion_task(items, all_items, mutex, suggestions): - for item_a in items: - handle_a = item_a[0] - ngrams_a = [ - x[0] for x in item_a[1][0 : min(len(item_a[1]), config.TOP_K_NGRAMS_COUNT)] - ] +def get_ngrams_list(arr: List[NgramRow]): + return [x[0] for x in arr[1][0 : min(len(arr[1]), config.TOP_K_NGRAMS_COUNT)]] + +def suggestion_task(items, all_items, db_mutex, db): + suggestions: List[SuggestionRow] = [] + for item_a in items: item_suggestions = [] + handle_a = item_a[0] + for item_b in all_items: handle_b = item_b[0] - ngrams_b = [ - x[0] - for x in item_b[1][0 : min(len(item_b[1]), config.TOP_K_NGRAMS_COUNT)] - ] + if handle_a == handle_b: continue - repeated = len(list(filter(lambda x: x in ngrams_b, ngrams_a))) + ngrams_shared = len(list(filter(lambda x: x in item_b[1], item_a[1]))) - if repeated >= config.SCORE_THRESHOLD: - item_suggestions.append((handle_b, repeated)) + if ngrams_shared >= config.SCORE_THRESHOLD: + item_suggestions.append((handle_b, ngrams_shared)) - mutex.acquire() item_suggestions.sort(key=lambda x: x[1], reverse=True) - mutex.release() - suggestions.append((handle_a, handle_a, item_suggestions)) + count = len(suggestions) + + db_mutex.acquire() + db.add_many_suggestions(suggestions) + db_mutex.release() + + return count + def run(): - - mutex = Lock() + db_mutex = Lock() connection = get_connection() db = OapenDB(connection) - all_items: List[NgramRow] = db.get_all_ngrams() - suggestions: List[SuggestionRow] = [] + all_items: List[NgramRow] = db.get_all_ngrams(get_empty=False) - # Remove any empty entries - all_items = list(filter(lambda item: len(item[1]) != 0, all_items)) - - logger.info("Generating suggestions for {0} items.".format(str(len(all_items)))) + logger.info("Getting suggestions for {0} items...".format(str(len(all_items)))) futures = [] # Get only top k ngrams for all items before processing for item in all_items: - item = ( - item[0], - [x[0] for x in item[1]][0 : min(len(item[1]), config.TOP_K_NGRAMS_COUNT)], - ) + ngrams = get_ngrams_list(item) + item = (item[0], ngrams) time_start = time.perf_counter() @@ -80,39 +74,45 @@ def run(): chunks = [all_items[i : i + n] for i in range(0, len(all_items), n)] - with concurrent.futures.ThreadPoolExecutor( + counter = Counter(items_updated=0) + + executor = concurrent.futures.ThreadPoolExecutor( max_workers=config.SUGGESTIONS_MAX_WORKERS - ) as executor: + ) - for chunk in chunks: - future = executor.submit( - suggestion_task, chunk, all_items, mutex, suggestions - ) - futures.append(future) + def refresh(future, counter, pbar): + pbar.update(future.result()) + counter["items_updated"] += future.result() + pbar.refresh() - with tqdm.tqdm( - total=len(futures), - mininterval=0, - miniters=1, - leave=True, - position=0, - initial=0, - ) as pbar: + pbar = tqdm( + total=len(all_items), + mininterval=0, + miniters=1, + leave=True, + position=0, + initial=0, + ) - for future in concurrent.futures.as_completed(futures): - future.result() - pbar.update(1) + for chunk in chunks: + future = executor.submit(suggestion_task, chunk, all_items, db_mutex, db) + future.add_done_callback(lambda x: refresh(x, counter, pbar)) + futures.append(future) - db.add_many_suggestions(suggestions) + for future in concurrent.futures.as_completed(futures): + pass logger.info( "Updated suggestions for " - + str(len(all_items)) + + str(counter["items_updated"]) + " items in " + str(time.perf_counter() - time_start) + "s." ) + executor.shutdown(wait=True) + + pbar.close() close_connection(connection) From 376545450dffe0664a3015fd20c41f6d992b904e Mon Sep 17 00:00:00 2001 From: Justin O'Boyle Date: Wed, 22 Mar 2023 14:52:38 -0400 Subject: [PATCH 2/5] Basic testing (#45) * finished changes to stopwords and langauges * final changes to stopwords * basic testing * add tests * Remove formatter for now * fix merge * cd * touch __init__ * Relative path issue \? * run tests before app * Move tests to inside docker * exit when any command fails --------- Co-authored-by: Max Zaremba --- .github/workflows/engine.yml | 27 ---------- .github/workflows/lint-web.yml | 2 +- .github/workflows/test-containers.yml | 2 +- oapen-engine/Dockerfile | 2 +- oapen-engine/Makefile | 42 +++++++++++++++ oapen-engine/Pipfile | 3 ++ oapen-engine/scripts/run.sh | 3 -- oapen-engine/scripts/test-and-run.sh | 9 ++++ oapen-engine/src/data/__init__.py | 0 oapen-engine/src/model/__init__.py | 0 oapen-engine/src/model/ngrams.py | 32 +++--------- .../broken.txt} | 0 .../dutch.txt} | 0 .../filter.txt} | 0 .../publisher.txt} | 0 .../stopwords_full_list.py} | 0 oapen-engine/src/model/stopwords_processor.py | 44 ++++++++++++++++ oapen-engine/src/test/data/run_tests.py | 41 +++++++++++++++ oapen-engine/src/test/data/test_ngrams.py | 51 +++++++++++++++++++ oapen-engine/src/test/data/test_oapen.py | 2 +- oapen-engine/src/test/data/test_stopwords.py | 23 +++++++++ 21 files changed, 224 insertions(+), 59 deletions(-) delete mode 100644 .github/workflows/engine.yml create mode 100644 oapen-engine/Makefile delete mode 100644 oapen-engine/scripts/run.sh create mode 100644 oapen-engine/scripts/test-and-run.sh create mode 100644 oapen-engine/src/data/__init__.py create mode 100644 oapen-engine/src/model/__init__.py rename oapen-engine/src/model/{stopwords_broken.txt => stopwords/broken.txt} (100%) rename oapen-engine/src/model/{stopwords_dutch.txt => stopwords/dutch.txt} (100%) rename oapen-engine/src/model/{stopwords_filter.txt => stopwords/filter.txt} (100%) rename oapen-engine/src/model/{stopwords_publisher.txt => stopwords/publisher.txt} (100%) rename oapen-engine/src/model/{stopwords.py => stopwords/stopwords_full_list.py} (100%) create mode 100644 oapen-engine/src/model/stopwords_processor.py create mode 100644 oapen-engine/src/test/data/run_tests.py create mode 100644 oapen-engine/src/test/data/test_ngrams.py create mode 100644 oapen-engine/src/test/data/test_stopwords.py diff --git a/.github/workflows/engine.yml b/.github/workflows/engine.yml deleted file mode 100644 index dd55e76..0000000 --- a/.github/workflows/engine.yml +++ /dev/null @@ -1,27 +0,0 @@ -name: OAPEN Engine - -on: [push] - -jobs: - build: - runs-on: ubuntu-latest - env: - working-directory: ./oapen-engine - steps: - - uses: actions/checkout@v3 - - - name: Setup Python - uses: actions/setup-python@v4 - with: - python-version: '3.10' - - - name: Install dependencies with pipenv - working-directory: ${{env.working-directory}} - run: | - pip install pipenv - pipenv install --deploy --dev - pipenv run isort --profile black src/ - pipenv run black --check src/ --exclude="lib/*" - pipenv run flake8 src/ --ignore="lib/*, W, E203, E266, E501, W503, F403, F401" - - \ No newline at end of file diff --git a/.github/workflows/lint-web.yml b/.github/workflows/lint-web.yml index 29301ed..0afcd41 100644 --- a/.github/workflows/lint-web.yml +++ b/.github/workflows/lint-web.yml @@ -1,4 +1,4 @@ -name: Web lint checker +name: Build and test web on: push jobs: test: diff --git a/.github/workflows/test-containers.yml b/.github/workflows/test-containers.yml index 88dede5..c97188e 100644 --- a/.github/workflows/test-containers.yml +++ b/.github/workflows/test-containers.yml @@ -1,4 +1,4 @@ -name: Test Containers +name: Build and test containers on: push diff --git a/oapen-engine/Dockerfile b/oapen-engine/Dockerfile index 4629d9e..b0eccde 100644 --- a/oapen-engine/Dockerfile +++ b/oapen-engine/Dockerfile @@ -48,4 +48,4 @@ RUN chmod -R +x scripts USER appuser # Run the application -ENTRYPOINT ["./scripts/run.sh"] \ No newline at end of file +ENTRYPOINT ["./scripts/test-and-run.sh"] \ No newline at end of file diff --git a/oapen-engine/Makefile b/oapen-engine/Makefile new file mode 100644 index 0000000..180016b --- /dev/null +++ b/oapen-engine/Makefile @@ -0,0 +1,42 @@ +PYTHONEX ?= "python" +PYTHONPATH = "$(CURDIR)/src" +PYTHON = PYTHONPATH="$(PYTHONPATH)" $(PYTHONEX) + +setup-env: +ifeq ($(OS),Windows_NT) + py -m pip install --upgrade pip +else + $(PYTHON) -m pip install --upgrade pip +endif + $(PYTHON) -m pip install pipenv + $(PYTHON) -m pipenv install --skip-lock + $(PYTHON) -m pipenv shell + +seed_db: + cd src && $(PYTHON) -m pipenv run python tasks/seed.py + +clean_db: + cd src && $(PYTHON) -m pipenv run python tasks/clean.py + +clean_and_seed: + $(MAKE) clean_db + $(MAKE) seed_db + +generate_suggestions: + cd src && $(PYTHON) -m pipenv run python tasks/generate_suggestions.py + +run: + $(MAKE) clean_and_seed + $(MAKE) generate_suggestions + +run-tests: + cd src && $(PYTHON) -m pipenv run pytest + +refresh-items: + cd src && $(PYTHON) -m pipenv run python tasks/refresh_items.py + +run-daemon: + cd src && $(PYTHON) -m pipenv run python tasks/daemon.py + +run-unit-tests: + cd src && $(PYTHON) -m pipenv run python test/data/run_tests.py \ No newline at end of file diff --git a/oapen-engine/Pipfile b/oapen-engine/Pipfile index bec27fc..0627da4 100644 --- a/oapen-engine/Pipfile +++ b/oapen-engine/Pipfile @@ -10,6 +10,9 @@ psycopg2 = "2.9.3" pandas = "*" scikit-learn = "*" lxml = "*" +charset_normalizer = "*" +idna = "*" +certifi = "*" [dev-packages] pytest = "*" diff --git a/oapen-engine/scripts/run.sh b/oapen-engine/scripts/run.sh deleted file mode 100644 index b4b02fc..0000000 --- a/oapen-engine/scripts/run.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh - -python src/tasks/daemon.py \ No newline at end of file diff --git a/oapen-engine/scripts/test-and-run.sh b/oapen-engine/scripts/test-and-run.sh new file mode 100644 index 0000000..a4a6002 --- /dev/null +++ b/oapen-engine/scripts/test-and-run.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +# exit when any command fails +set -e + +echo "Running tests..." && \ +python src/test/data/run_tests.py && \ +echo "Running app" && \ +python src/tasks/daemon.py \ No newline at end of file diff --git a/oapen-engine/src/data/__init__.py b/oapen-engine/src/data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/oapen-engine/src/model/__init__.py b/oapen-engine/src/model/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/oapen-engine/src/model/ngrams.py b/oapen-engine/src/model/ngrams.py index 0e3b45b..ffb7547 100644 --- a/oapen-engine/src/model/ngrams.py +++ b/oapen-engine/src/model/ngrams.py @@ -1,37 +1,19 @@ import string from typing import List -import pandas as pd # pylint: disable=import-error -from nltk import word_tokenize # pylint: disable=import-error -from nltk.corpus import stopwords # pylint: disable=import-error +import nltk +from nltk import word_tokenize +from .stopwords_processor import STOPWORDS +import pandas as pd -from .oapen_types import ( # pylint: disable=relative-beyond-top-level +nltk.download('punkt') + +from .oapen_types import ( NgramDict, NgramRowWithoutDate, OapenItem, ) -stopword_paths = [ - "src/model/stopwords_broken.txt", - "src/model/stopwords_dutch.txt", - "src/model/stopwords_filter.txt", - "src/model/stopwords_publisher.txt", -] - -stopwords_list = [] - -for p in stopword_paths: - with open(p, "r") as f: - stopwords_list += [line.rstrip() for line in f] - -STOPWORDS = ( - stopwords.words("english") - + stopwords.words("german") - + stopwords.words("dutch") - + stopwords_list -) - - def process_text(text): l_text = text.lower() p_text = "".join([c for c in l_text if c not in string.punctuation]) diff --git a/oapen-engine/src/model/stopwords_broken.txt b/oapen-engine/src/model/stopwords/broken.txt similarity index 100% rename from oapen-engine/src/model/stopwords_broken.txt rename to oapen-engine/src/model/stopwords/broken.txt diff --git a/oapen-engine/src/model/stopwords_dutch.txt b/oapen-engine/src/model/stopwords/dutch.txt similarity index 100% rename from oapen-engine/src/model/stopwords_dutch.txt rename to oapen-engine/src/model/stopwords/dutch.txt diff --git a/oapen-engine/src/model/stopwords_filter.txt b/oapen-engine/src/model/stopwords/filter.txt similarity index 100% rename from oapen-engine/src/model/stopwords_filter.txt rename to oapen-engine/src/model/stopwords/filter.txt diff --git a/oapen-engine/src/model/stopwords_publisher.txt b/oapen-engine/src/model/stopwords/publisher.txt similarity index 100% rename from oapen-engine/src/model/stopwords_publisher.txt rename to oapen-engine/src/model/stopwords/publisher.txt diff --git a/oapen-engine/src/model/stopwords.py b/oapen-engine/src/model/stopwords/stopwords_full_list.py similarity index 100% rename from oapen-engine/src/model/stopwords.py rename to oapen-engine/src/model/stopwords/stopwords_full_list.py diff --git a/oapen-engine/src/model/stopwords_processor.py b/oapen-engine/src/model/stopwords_processor.py new file mode 100644 index 0000000..720ddd5 --- /dev/null +++ b/oapen-engine/src/model/stopwords_processor.py @@ -0,0 +1,44 @@ +import nltk +from nltk.corpus import stopwords +from functools import reduce +import os + +# This is run as a precaution in case of the error "NLTK stop words not found", +# which makes sure to download the stop words after installing nltk +nltk.download("stopwords") + +# add additional custom stopwords to ./custom_lists/ folder and update the reference here +# print working directory +print("Working directory: " + os.getcwd()) + +current_dir = os.path.realpath(os.path.dirname(__file__)) +print("Local script directory: " + current_dir) + +custom_lists_folder = current_dir + "/stopwords/" +custom_stopwords_in_use = [ + "broken", + "dutch", + "filter", + "publisher", +] + +# For reference on available languages, please reference https://pypi.org/project/stop-words/ +enabled_languages = [ + "english", + "german", + "dutch" +] + +# the combined stopwords of all enabled langauges +nltk_stopwords = [] +for language in enabled_languages: + nltk_stopwords += stopwords.words(language) + +# get the custom lists +custom_stopwords = [] +for custom_list in custom_stopwords_in_use: + with open(custom_lists_folder + custom_list + ".txt", "r") as file: # specify folder name + custom_stopwords += [line.rstrip() for line in file] + +# add languages and custom stopwords for final stopwords var +STOPWORDS = (nltk_stopwords + custom_stopwords) \ No newline at end of file diff --git a/oapen-engine/src/test/data/run_tests.py b/oapen-engine/src/test/data/run_tests.py new file mode 100644 index 0000000..67ec7c5 --- /dev/null +++ b/oapen-engine/src/test/data/run_tests.py @@ -0,0 +1,41 @@ +import test_oapen +import test_stopwords +import test_ngrams + +def run_test(run_msg, func): + print(run_msg, end = " ") + func() + print("OK") # will throw on fail + +def main(): + print("Testing connection to OAPEN.") + try: + run_test("Attempting to get item [Embodying Contagion]:", test_oapen.test_get_item) + run_test("Attempting to get null item:", test_oapen.test_get_item_404) + run_test("Attempting to get collection limit by label [Knowledge Unlatched (KU)]:", + test_oapen.test_get_collection_limit) + run_test("Attempting to get null collection:", test_oapen.test_get_collection_404) + except Exception as e: + print("\nFailed:") + print(e) + + print("\nTesting stopwords generation.") + try: + run_test("Testing stopwords correctly generated:", + test_stopwords.test_stopwords_contains_all) + except Exception as e: + print("Failed:") + print(e) + + print("\nTesting ngrams functionality.") + try: + run_test("Testing process_text:", test_ngrams.test_process_text) + run_test("Testing ngram generation:", test_ngrams.test_generate_ngram) + run_test("Testing similarity score:", test_ngrams.test_similarity_score) + + except Exception as e: + print("Failed:") + print(e) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/oapen-engine/src/test/data/test_ngrams.py b/oapen-engine/src/test/data/test_ngrams.py new file mode 100644 index 0000000..31e186c --- /dev/null +++ b/oapen-engine/src/test/data/test_ngrams.py @@ -0,0 +1,51 @@ +import model.ngrams as ngrams + +test_text1 = "Foxes are cunning animals. There was a quick, red fox known to avoid crossing roads during the day, doing so only at night." +test_text2 = "The quick red fox jumped over the lazy brown dog. It had a fantastic time doing so, as it felt finally free. The fox had been in the zoo for far too long, held in captivity." + +processed_text1 = ['foxes', 'cunning', 'animals', 'quick', 'red', 'fox', 'known', 'avoid', 'crossing', 'roads', 'day', 'night'] +processed_text2 = ['quick', 'red', 'fox', 'jumped', 'lazy', 'brown', 'dog', 'fantastic', 'time', 'felt', 'finally', 'free', 'fox', 'zoo', 'far', 'long', 'held', 'captivity'] + +ngrams1 = { + 'foxes cunning animals': 1, + 'cunning animals quick': 1, + 'animals quick red': 1, + 'quick red fox': 1, + 'red fox known': 1, + 'fox known avoid': 1, + 'known avoid crossing': 1, + 'avoid crossing roads': 1, + 'crossing roads day': 1, + 'roads day night': 1 +} +ngrams2 = { + 'quick red fox': 1, + 'red fox jumped': 1, + 'fox jumped lazy': 1, + 'jumped lazy brown': 1, + 'lazy brown dog': 1, + 'brown dog fantastic': 1, + 'dog fantastic time': 1, + 'fantastic time felt': 1, + 'time felt finally': 1, + 'felt finally free': 1, + 'finally free fox': 1, + 'free fox zoo': 1, + 'fox zoo far': 1, + 'zoo far long': 1, + 'far long held': 1, + 'long held captivity': 1 +} + +def test_process_text(): + assert(ngrams.process_text(test_text1) == processed_text1) + assert(ngrams.process_text(test_text2) == processed_text2) + +def test_generate_ngram(): + assert(ngrams.generate_ngram(processed_text1) == ngrams1) + assert(ngrams.generate_ngram(processed_text2) == ngrams2) + +def test_similarity_score(): + assert(ngrams.get_similarity_score(ngrams1, ngrams2, n=5, as_percent=False) == 1) + assert(ngrams.get_similarity_score(ngrams1, ngrams2, n=5, as_percent=True) == 0.2) + \ No newline at end of file diff --git a/oapen-engine/src/test/data/test_oapen.py b/oapen-engine/src/test/data/test_oapen.py index f57d0c1..b78c8c4 100644 --- a/oapen-engine/src/test/data/test_oapen.py +++ b/oapen-engine/src/test/data/test_oapen.py @@ -1,6 +1,6 @@ from typing import List -import src.data.oapen as OapenAPI +import data.oapen as OapenAPI from model.oapen_types import OapenItem diff --git a/oapen-engine/src/test/data/test_stopwords.py b/oapen-engine/src/test/data/test_stopwords.py new file mode 100644 index 0000000..210fb5a --- /dev/null +++ b/oapen-engine/src/test/data/test_stopwords.py @@ -0,0 +1,23 @@ +from model.stopwords_processor import STOPWORDS +import model.stopwords.stopwords_full_list as stopwords_full_list +# currently contains stopwords_filter, stopwords_publisher, stopwords_broken, stopwords_dutch_extra + +# tests all at once +def test_stopwords_contains_all(): + assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_filter)) + assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_publisher)) + assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_broken)) + assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_dutch_extra)) + +# individual tests provided if needed +def test_stopwords_contains_stopwords_filter(): + assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_filter)) + +def test_stopwords_contains_stopwords_publisher(): + assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_publisher)) + +def test_stopwords_contains_stopwords_broken(): + assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_broken)) + +def test_stopwords_contains_stopwords_dutch_extra(): + assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_dutch_extra)) \ No newline at end of file From 966c336b689f62266aeeabbab798baed8f1ad166 Mon Sep 17 00:00:00 2001 From: Eric Hellman Date: Mon, 27 Mar 2023 12:53:06 -0400 Subject: [PATCH 3/5] improve documentation (#43) * fix permissions * add links to sub-READMEs --- README.md | 8 +++++++- all-dev.sh | 0 run-api.sh | 0 run-web.sh | 0 4 files changed, 7 insertions(+), 1 deletion(-) mode change 100644 => 100755 all-dev.sh mode change 100644 => 100755 run-api.sh mode change 100644 => 100755 run-web.sh diff --git a/README.md b/README.md index 5fb5cd7..5ceb096 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ The OAPEN Suggestion Engine will suggest ebooks based on other books with simila ## Running server -You can run all the servers together with `./all-dev.sh` -- after installing dependencies with `./setup.sh` +You can run all the servers together with `./all-dev.sh` -- after installing dependencies with `. ./setup.sh` ## Monorepo components @@ -17,6 +17,8 @@ Our suggestion service is centered around the trigram semantic inferencing algor You can find the code for the mining engine in `oapen-engine/`. +Information about running the mining engine is in [`oapen-engine/README.md`](oapen-engine/README.md). + **Base dependencies**: * Python v3.10 * PIP package manager @@ -44,6 +46,8 @@ This API server returns a list of recommended books from the database. You can find the code for the API engine in `api/`. +Configuration info for the API engine is in [`api/README.md`](api/README.md). + **Base dependencies**: * NodeJS 14.x+ * NPM package manager @@ -64,6 +68,8 @@ This is a web-app demo that can be used to query the API engine and see suggeste You can find the code for the web demo in `web/`. +Configuration info for the web demo is in [`web/README.md`](web/README.md). + **Base dependencies**: * NodeJS 14.x+ * NPM package manager diff --git a/all-dev.sh b/all-dev.sh old mode 100644 new mode 100755 diff --git a/run-api.sh b/run-api.sh old mode 100644 new mode 100755 diff --git a/run-web.sh b/run-web.sh old mode 100644 new mode 100755 From b80d745b300c41140c23cf2ca49e22ad0a5f9224 Mon Sep 17 00:00:00 2001 From: Celina Peralta Date: Fri, 31 Mar 2023 15:33:41 -0400 Subject: [PATCH 4/5] add schedule package to daemon (#48) --- oapen-engine/Pipfile | 1 + oapen-engine/src/tasks/clean.py | 11 ++++++---- oapen-engine/src/tasks/config.py | 2 +- oapen-engine/src/tasks/daemon.py | 36 +++++++++++++------------------- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/oapen-engine/Pipfile b/oapen-engine/Pipfile index 0627da4..4ca7281 100644 --- a/oapen-engine/Pipfile +++ b/oapen-engine/Pipfile @@ -10,6 +10,7 @@ psycopg2 = "2.9.3" pandas = "*" scikit-learn = "*" lxml = "*" +schedule = "*" charset_normalizer = "*" idna = "*" certifi = "*" diff --git a/oapen-engine/src/tasks/clean.py b/oapen-engine/src/tasks/clean.py index 7900b71..c1fbfcc 100644 --- a/oapen-engine/src/tasks/clean.py +++ b/oapen-engine/src/tasks/clean.py @@ -71,16 +71,13 @@ def drop_schema(connection) -> None: cursor.close() -def seed_endpoints(connection): - +def get_endpoints(): collections = OapenAPI.get_all_collections() if collections is None: logger.error("Could not fetch collections from OAPEN server. Is it down?") sys.exit(1) - db = OapenDB(connection) - endpoints = [] COLLECTION_IMPORT_LIMIT = int(os.environ["COLLECTION_IMPORT_LIMIT"]) @@ -108,6 +105,12 @@ def seed_endpoints(connection): ) endpoints.append(x) + return endpoints + + +def seed_endpoints(connection): + db = OapenDB(connection) + endpoints = get_endpoints() db.add_urls(endpoints) diff --git a/oapen-engine/src/tasks/config.py b/oapen-engine/src/tasks/config.py index a90b08a..d68c6a6 100644 --- a/oapen-engine/src/tasks/config.py +++ b/oapen-engine/src/tasks/config.py @@ -21,4 +21,4 @@ SUGGESTIONS_MAX_ITEMS = 50 # Update items that were modifed since X days ago UPDATE_DAYS_BEFORE = 30 -REFRESH_IMPORT_LIMIT = 50 +REFRESH_IMPORT_LIMIT = 0 diff --git a/oapen-engine/src/tasks/daemon.py b/oapen-engine/src/tasks/daemon.py index 507c785..23346fb 100644 --- a/oapen-engine/src/tasks/daemon.py +++ b/oapen-engine/src/tasks/daemon.py @@ -4,7 +4,9 @@ import signal import sys import time +import schedule from clean import run as run_clean +from clean import seed_endpoints from data.connection import get_connection from data.oapen_db import OapenDB from generate_suggestions import run as run_generate_suggestions @@ -12,10 +14,17 @@ from logger.base_logger import logger from refresh_items import run as run_refresh_items from seed import run as run_seed +conn = get_connection() +db = OapenDB(conn) +logger.info("Daemon up") + def harvest(): - run_seed() - run_generate_suggestions() + seed_endpoints() + urls = db.get_incomplete_urls() + if len(urls) > 0: + run_seed() + run_generate_suggestions() def refresh(): @@ -23,12 +32,6 @@ def refresh(): run_generate_suggestions() -logger.info("Daemon up") - -conn = get_connection() -db = OapenDB(conn) - - def signal_handler(signal, frame): conn.close() logger.info("Daemon exiting.") @@ -46,22 +49,11 @@ if int(os.environ["RUN_CLEAN"]) == 1 or ( harvest() -harvest_acc = 0 -refresh_acc = 0 +schedule.every().day.at("20:00").do(refresh) +schedule.every().sunday.at("22:00").do(harvest) while True: - if harvest_acc >= int(os.environ["HARVEST_PERIOD"]): - urls = db.get_incomplete_urls() - if len(urls) > 0: - harvest() - harvest_acc = 0 - - if refresh_acc >= int(os.environ["REFRESH_PERIOD"]): - refresh() - refresh_acc = 0 - + schedule.run_pending() time.sleep(60) - refresh_acc += 60 - harvest_acc += 60 logger.info("Daemon down") From 7f92b17dc257c36d17a467e7c2a3d11aae30650b Mon Sep 17 00:00:00 2001 From: Celina Peralta Date: Mon, 3 Apr 2023 17:02:16 -0400 Subject: [PATCH 5/5] OAP-61: flatten suggestions database (#46) * OAP-61: flatten suggestions database * Remove limit on items to refresh * Delete run.sh --- api/db/data.js | 11 ++-- oapen-engine/scripts/clean.sh | 0 oapen-engine/scripts/refresh.sh | 0 oapen-engine/src/data/connection.py | 1 - oapen-engine/src/data/oapen_db.py | 42 ++++++++----- oapen-engine/src/logger/base_logger.py | 4 +- oapen-engine/src/model/oapen_types.py | 5 +- oapen-engine/src/tasks/clean.py | 16 +++-- oapen-engine/src/tasks/daemon.py | 5 ++ .../src/tasks/generate_suggestions.py | 59 ++++++++----------- 10 files changed, 77 insertions(+), 66 deletions(-) mode change 100644 => 100755 oapen-engine/scripts/clean.sh mode change 100644 => 100755 oapen-engine/scripts/refresh.sh diff --git a/api/db/data.js b/api/db/data.js index a50f0db..cf51812 100644 --- a/api/db/data.js +++ b/api/db/data.js @@ -8,11 +8,10 @@ async function querySuggestions(handle, threshold = 0) { await validate.checkHandle(handle); const query = new PQ({ - text: `SELECT s.* - FROM (SELECT handle, unnest(suggestions::oapen_suggestions.suggestion[]) AS suggestion - FROM oapen_suggestions.suggestions) s + text: `SELECT suggestion AS handle, score + FROM oapen_suggestions.suggestions WHERE handle = $1 - AND (s.suggestion).similarity >= $2`, + AND score >= $2`, values: [handle, threshold], }); @@ -22,10 +21,12 @@ async function querySuggestions(handle, threshold = 0) { if (result?.["error"]) return result; + + console.log(result); const data = { "handle": handle, - "suggestions": result.map((e) => {return e["suggestion"];}) + "suggestions": result }; return data; diff --git a/oapen-engine/scripts/clean.sh b/oapen-engine/scripts/clean.sh old mode 100644 new mode 100755 diff --git a/oapen-engine/scripts/refresh.sh b/oapen-engine/scripts/refresh.sh old mode 100644 new mode 100755 diff --git a/oapen-engine/src/data/connection.py b/oapen-engine/src/data/connection.py index e89678e..35ff2a4 100644 --- a/oapen-engine/src/data/connection.py +++ b/oapen-engine/src/data/connection.py @@ -20,7 +20,6 @@ def get_connection(): cur.close() - register_composite("oapen_suggestions.suggestion", conn, globally=True) register_composite("oapen_suggestions.ngram", conn, globally=True) return conn diff --git a/oapen-engine/src/data/oapen_db.py b/oapen-engine/src/data/oapen_db.py index 6835492..e88b527 100644 --- a/oapen-engine/src/data/oapen_db.py +++ b/oapen-engine/src/data/oapen_db.py @@ -32,10 +32,7 @@ class OapenDB: suggestions = self.deduplicate(suggestions) cursor = self.connection.cursor() args = ",".join( - cursor.mogrify("(%s,%s,%s::oapen_suggestions.suggestion[])", x).decode( - "utf-8" - ) - for x in suggestions + cursor.mogrify("(%s,%s,%s,%s)", x).decode("utf-8") for x in suggestions ) cursor.close() return args @@ -81,14 +78,13 @@ class OapenDB: cursor = self.connection.cursor() query = """ INSERT INTO oapen_suggestions.suggestions (handle, name, suggestions) - VALUES (%s, %s, %s::oapen_suggestions.suggestion[]) - ON CONFLICT (handle) - DO - UPDATE SET suggestions = excluded.suggestions + VALUES (%s, %s, %s, %s) """ try: - cursor.execute(query, (suggestion[0], suggestion[1], suggestion[2])) + cursor.execute( + query, (suggestion[0], suggestion[1], suggestion[2], suggestion[3]) + ) except (Exception, psycopg2.Error) as error: logger.error(error) finally: @@ -98,11 +94,8 @@ class OapenDB: cursor = self.connection.cursor() args = self.mogrify_suggestions(suggestions) query = f""" - INSERT INTO oapen_suggestions.suggestions (handle, name, suggestions) + INSERT INTO oapen_suggestions.suggestions (handle, name, suggestion, score) VALUES {args} - ON CONFLICT (handle) - DO - UPDATE SET suggestions = excluded.suggestions """ try: @@ -158,7 +151,6 @@ class OapenDB: query += """ WHERE ngrams != \'{}\' """ - ret = None try: cursor.execute(query) records = cursor.fetchall() @@ -173,8 +165,7 @@ class OapenDB: def get_all_suggestions(self) -> List[SuggestionRow]: cursor = self.connection.cursor() query = """ - SELECT handle, name, CAST (suggestions AS oapen_suggestions.suggestion[]), created_at, updated_at - FROM oapen_suggestions.suggestions + SELECT * FROM oapen_suggestions.suggestions """ ret = None try: @@ -189,6 +180,25 @@ class OapenDB: cursor.close() return ret + def get_suggestions_for_item(self, handle) -> List[SuggestionRow]: + cursor = self.connection.cursor() + query = """ + SELECT * FROM oapen_suggestions.suggestions + WHERE handle = \'%s\' + """ + ret = None + try: + cursor.execute(query, handle) + records = cursor.fetchall() + + ret = records + + except (Exception, psycopg2.Error) as error: + logger.error(error) + finally: + cursor.close() + return ret + def count_table(self, table_name) -> int or None: cursor = self.connection.cursor() query = "SELECT COUNT(*) FROM %s" diff --git a/oapen-engine/src/logger/base_logger.py b/oapen-engine/src/logger/base_logger.py index 696f7ff..44ffcf9 100644 --- a/oapen-engine/src/logger/base_logger.py +++ b/oapen-engine/src/logger/base_logger.py @@ -2,8 +2,6 @@ import logging logger = logging.getLogger(__name__) -file_handler = logging.FileHandler("debug.log") -file_handler.setLevel(logging.DEBUG) stream_handler = logging.StreamHandler() stream_handler.setLevel(logging.INFO) @@ -11,5 +9,5 @@ logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(threadName)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", - handlers=[file_handler, stream_handler], + handlers=[stream_handler], ) diff --git a/oapen-engine/src/model/oapen_types.py b/oapen-engine/src/model/oapen_types.py index 83e4cd3..adaf2f7 100644 --- a/oapen-engine/src/model/oapen_types.py +++ b/oapen-engine/src/model/oapen_types.py @@ -25,9 +25,8 @@ class OapenItem: return hash(self.handle, "handle") -Suggestion = Tuple[str, float] -SuggestionRowWithoutDate = Tuple[str, str, List[Suggestion]] -SuggestionRowWithDate = Tuple[str, str, List[Suggestion], datetime, datetime] +SuggestionRowWithoutDate = Tuple[str, str, str, int] +SuggestionRowWithDate = Tuple[str, str, str, int, datetime, datetime] SuggestionRow = Union[SuggestionRowWithDate, SuggestionRowWithoutDate] Ngram = Tuple[str, int] diff --git a/oapen-engine/src/tasks/clean.py b/oapen-engine/src/tasks/clean.py index c1fbfcc..4171955 100644 --- a/oapen-engine/src/tasks/clean.py +++ b/oapen-engine/src/tasks/clean.py @@ -14,7 +14,6 @@ def create_schema(connection) -> None: """ CREATE SCHEMA oapen_suggestions; - CREATE TYPE oapen_suggestions.suggestion AS (handle text, similarity float); CREATE TYPE oapen_suggestions.ngram AS (ngram text, count int); CREATE OR REPLACE FUNCTION update_modtime() @@ -26,11 +25,13 @@ def create_schema(connection) -> None: $$ language 'plpgsql'; CREATE TABLE IF NOT EXISTS oapen_suggestions.suggestions ( - handle text PRIMARY KEY, + handle text, name text, - suggestions oapen_suggestions.suggestion[], + suggestion text, + score int, created_at timestamp default current_timestamp, - updated_at timestamp default current_timestamp + updated_at timestamp default current_timestamp, + PRIMARY KEY (handle, suggestion) ); CREATE TABLE IF NOT EXISTS oapen_suggestions.ngrams ( @@ -49,6 +50,12 @@ def create_schema(connection) -> None: CREATE TRIGGER update_suggestion_modtime BEFORE UPDATE ON oapen_suggestions.suggestions FOR EACH ROW EXECUTE PROCEDURE update_modtime(); CREATE TRIGGER update_ngrams_modtime BEFORE UPDATE ON oapen_suggestions.ngrams FOR EACH ROW EXECUTE PROCEDURE update_modtime(); CREATE TRIGGER update_endpoint_modtime BEFORE UPDATE ON oapen_suggestions.endpoints FOR EACH ROW EXECUTE PROCEDURE update_modtime(); + + CREATE INDEX idx_suggestion + ON oapen_suggestions.suggestions(handle, suggestion); + + ALTER TABLE oapen_suggestions.suggestions + ADD CONSTRAINT uq_Suggestion UNIQUE(handle, suggestion); """ ) @@ -63,7 +70,6 @@ def drop_schema(connection) -> None: DROP TABLE IF EXISTS oapen_suggestions.suggestions CASCADE; DROP TABLE IF EXISTS oapen_suggestions.ngrams CASCADE; DROP TABLE IF EXISTS oapen_suggestions.endpoints CASCADE; - DROP TYPE IF EXISTS oapen_suggestions.suggestion CASCADE; DROP TYPE IF EXISTS oapen_suggestions.ngram CASCADE; """ ) diff --git a/oapen-engine/src/tasks/daemon.py b/oapen-engine/src/tasks/daemon.py index 23346fb..339d01c 100644 --- a/oapen-engine/src/tasks/daemon.py +++ b/oapen-engine/src/tasks/daemon.py @@ -40,6 +40,11 @@ def signal_handler(signal, frame): signal.signal(signal.SIGINT, signal_handler) +logger.info("Daemon up") + +conn = get_connection() +db = OapenDB(conn) + if int(os.environ["RUN_CLEAN"]) == 1 or ( not db.table_exists("suggestions") or not db.table_exists("ngrams") diff --git a/oapen-engine/src/tasks/generate_suggestions.py b/oapen-engine/src/tasks/generate_suggestions.py index 68f6c12..5ab442f 100644 --- a/oapen-engine/src/tasks/generate_suggestions.py +++ b/oapen-engine/src/tasks/generate_suggestions.py @@ -25,8 +25,6 @@ def get_ngrams_list(arr: List[NgramRow]): def suggestion_task(items, all_items, db_mutex, db): suggestions: List[SuggestionRow] = [] for item_a in items: - item_suggestions = [] - handle_a = item_a[0] for item_b in all_items: @@ -38,52 +36,34 @@ def suggestion_task(items, all_items, db_mutex, db): ngrams_shared = len(list(filter(lambda x: x in item_b[1], item_a[1]))) if ngrams_shared >= config.SCORE_THRESHOLD: - item_suggestions.append((handle_b, ngrams_shared)) - - item_suggestions.sort(key=lambda x: x[1], reverse=True) - suggestions.append((handle_a, handle_a, item_suggestions)) - - count = len(suggestions) + suggestions.append((handle_a, handle_a, handle_b, ngrams_shared)) db_mutex.acquire() db.add_many_suggestions(suggestions) db_mutex.release() - return count + return len(items) + + +def refresh(future, counter, pbar): + pbar.update(future.result()) + counter["items_updated"] += future.result() + pbar.refresh() def run(): - db_mutex = Lock() connection = get_connection() db = OapenDB(connection) all_items: List[NgramRow] = db.get_all_ngrams(get_empty=False) - logger.info("Getting suggestions for {0} items...".format(str(len(all_items)))) - - futures = [] - - # Get only top k ngrams for all items before processing - for item in all_items: - ngrams = get_ngrams_list(item) - item = (item[0], ngrams) - - time_start = time.perf_counter() - - n = config.SUGGESTIONS_MAX_ITEMS - - chunks = [all_items[i : i + n] for i in range(0, len(all_items), n)] - - counter = Counter(items_updated=0) - executor = concurrent.futures.ThreadPoolExecutor( max_workers=config.SUGGESTIONS_MAX_WORKERS ) + futures = [] + db_mutex = Lock() - def refresh(future, counter, pbar): - pbar.update(future.result()) - counter["items_updated"] += future.result() - pbar.refresh() + counter = Counter(items_updated=0) pbar = tqdm( total=len(all_items), @@ -94,6 +74,19 @@ def run(): initial=0, ) + logger.info("Getting suggestions for {0} items...".format(str(len(all_items)))) + time_start = time.perf_counter() + + # Get only top k ngrams for all items before processing + for item in all_items: + ngrams = get_ngrams_list(item) + item = (item[0], ngrams) + + chunks = [ + all_items[i : i + config.SUGGESTIONS_MAX_ITEMS] + for i in range(0, len(all_items), config.SUGGESTIONS_MAX_ITEMS) + ] + for chunk in chunks: future = executor.submit(suggestion_task, chunk, all_items, db_mutex, db) future.add_done_callback(lambda x: refresh(x, counter, pbar)) @@ -103,9 +96,9 @@ def run(): pass logger.info( - "Updated suggestions for " + "Updated " + str(counter["items_updated"]) - + " items in " + + " suggestions in " + str(time.perf_counter() - time_start) + "s." )