Merge branch 'main' into celinanperalta/OAP-60
commit
dab7812da4
|
@ -1,27 +0,0 @@
|
|||
name: OAPEN Engine
|
||||
|
||||
on: [push]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
working-directory: ./oapen-engine
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.10'
|
||||
|
||||
- name: Install dependencies with pipenv
|
||||
working-directory: ${{env.working-directory}}
|
||||
run: |
|
||||
pip install pipenv
|
||||
pipenv install --deploy --dev
|
||||
pipenv run isort --profile black src/
|
||||
pipenv run black --check src/ --exclude="lib/*"
|
||||
pipenv run flake8 src/ --ignore="lib/*, W, E203, E266, E501, W503, F403, F401"
|
||||
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
name: Web lint checker
|
||||
name: Build and test web
|
||||
on: push
|
||||
jobs:
|
||||
test:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
name: Test Containers
|
||||
name: Build and test containers
|
||||
|
||||
on: push
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ The OAPEN Suggestion Engine will suggest ebooks based on other books with simila
|
|||
|
||||
## Running server
|
||||
|
||||
You can run all the servers together with `./all-dev.sh` -- after installing dependencies with `./setup.sh`
|
||||
You can run all the servers together with `./all-dev.sh` -- after installing dependencies with `. ./setup.sh`
|
||||
|
||||
## Monorepo components
|
||||
|
||||
|
@ -17,6 +17,8 @@ Our suggestion service is centered around the trigram semantic inferencing algor
|
|||
|
||||
You can find the code for the mining engine in `oapen-engine/`.
|
||||
|
||||
Information about running the mining engine is in [`oapen-engine/README.md`](oapen-engine/README.md).
|
||||
|
||||
**Base dependencies**:
|
||||
* Python v3.10
|
||||
* PIP package manager
|
||||
|
@ -44,6 +46,8 @@ This API server returns a list of recommended books from the database.
|
|||
|
||||
You can find the code for the API engine in `api/`.
|
||||
|
||||
Configuration info for the API engine is in [`api/README.md`](api/README.md).
|
||||
|
||||
**Base dependencies**:
|
||||
* NodeJS 14.x+
|
||||
* NPM package manager
|
||||
|
@ -64,6 +68,8 @@ This is a web-app demo that can be used to query the API engine and see suggeste
|
|||
|
||||
You can find the code for the web demo in `web/`.
|
||||
|
||||
Configuration info for the web demo is in [`web/README.md`](web/README.md).
|
||||
|
||||
**Base dependencies**:
|
||||
* NodeJS 14.x+
|
||||
* NPM package manager
|
||||
|
|
|
@ -8,11 +8,10 @@ async function querySuggestions(handle, threshold = 0) {
|
|||
await validate.checkHandle(handle);
|
||||
|
||||
const query = new PQ({
|
||||
text: `SELECT s.*
|
||||
FROM (SELECT handle, unnest(suggestions::oapen_suggestions.suggestion[]) AS suggestion
|
||||
FROM oapen_suggestions.suggestions) s
|
||||
text: `SELECT suggestion AS handle, score
|
||||
FROM oapen_suggestions.suggestions
|
||||
WHERE handle = $1
|
||||
AND (s.suggestion).similarity >= $2`,
|
||||
AND score >= $2`,
|
||||
values: [handle, threshold],
|
||||
});
|
||||
|
||||
|
@ -22,10 +21,12 @@ async function querySuggestions(handle, threshold = 0) {
|
|||
|
||||
if (result?.["error"])
|
||||
return result;
|
||||
|
||||
console.log(result);
|
||||
|
||||
const data = {
|
||||
"handle": handle,
|
||||
"suggestions": result.map((e) => {return e["suggestion"];})
|
||||
"suggestions": result
|
||||
};
|
||||
|
||||
return data;
|
||||
|
|
|
@ -3,7 +3,7 @@ services:
|
|||
oapen-engine :
|
||||
build: ./oapen-engine/
|
||||
environment:
|
||||
- RUN_CLEAN=1
|
||||
- RUN_CLEAN=0
|
||||
- COLLECTION_IMPORT_LIMIT=0 # Set to 0 for full harvest
|
||||
- REFRESH_PERIOD=86400 # daily
|
||||
- HARVEST_PERIOD=604800 # weekly
|
||||
|
|
|
@ -48,4 +48,4 @@ RUN chmod -R +x scripts
|
|||
USER appuser
|
||||
|
||||
# Run the application
|
||||
ENTRYPOINT ["./scripts/run.sh"]
|
||||
ENTRYPOINT ["./scripts/test-and-run.sh"]
|
|
@ -0,0 +1,42 @@
|
|||
PYTHONEX ?= "python"
|
||||
PYTHONPATH = "$(CURDIR)/src"
|
||||
PYTHON = PYTHONPATH="$(PYTHONPATH)" $(PYTHONEX)
|
||||
|
||||
setup-env:
|
||||
ifeq ($(OS),Windows_NT)
|
||||
py -m pip install --upgrade pip
|
||||
else
|
||||
$(PYTHON) -m pip install --upgrade pip
|
||||
endif
|
||||
$(PYTHON) -m pip install pipenv
|
||||
$(PYTHON) -m pipenv install --skip-lock
|
||||
$(PYTHON) -m pipenv shell
|
||||
|
||||
seed_db:
|
||||
cd src && $(PYTHON) -m pipenv run python tasks/seed.py
|
||||
|
||||
clean_db:
|
||||
cd src && $(PYTHON) -m pipenv run python tasks/clean.py
|
||||
|
||||
clean_and_seed:
|
||||
$(MAKE) clean_db
|
||||
$(MAKE) seed_db
|
||||
|
||||
generate_suggestions:
|
||||
cd src && $(PYTHON) -m pipenv run python tasks/generate_suggestions.py
|
||||
|
||||
run:
|
||||
$(MAKE) clean_and_seed
|
||||
$(MAKE) generate_suggestions
|
||||
|
||||
run-tests:
|
||||
cd src && $(PYTHON) -m pipenv run pytest
|
||||
|
||||
refresh-items:
|
||||
cd src && $(PYTHON) -m pipenv run python tasks/refresh_items.py
|
||||
|
||||
run-daemon:
|
||||
cd src && $(PYTHON) -m pipenv run python tasks/daemon.py
|
||||
|
||||
run-unit-tests:
|
||||
cd src && $(PYTHON) -m pipenv run python test/data/run_tests.py
|
|
@ -10,6 +10,10 @@ psycopg2 = "2.9.3"
|
|||
pandas = "*"
|
||||
scikit-learn = "*"
|
||||
lxml = "*"
|
||||
schedule = "*"
|
||||
charset_normalizer = "*"
|
||||
idna = "*"
|
||||
certifi = "*"
|
||||
|
||||
[dev-packages]
|
||||
pytest = "*"
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
#!/bin/sh
|
||||
|
||||
python src/tasks/daemon.py
|
|
@ -0,0 +1,9 @@
|
|||
#!/bin/sh
|
||||
|
||||
# exit when any command fails
|
||||
set -e
|
||||
|
||||
echo "Running tests..." && \
|
||||
python src/test/data/run_tests.py && \
|
||||
echo "Running app" && \
|
||||
python src/tasks/daemon.py
|
|
@ -20,7 +20,6 @@ def get_connection():
|
|||
|
||||
cur.close()
|
||||
|
||||
register_composite("oapen_suggestions.suggestion", conn, globally=True)
|
||||
register_composite("oapen_suggestions.ngram", conn, globally=True)
|
||||
|
||||
return conn
|
||||
|
|
|
@ -32,10 +32,7 @@ class OapenDB:
|
|||
suggestions = self.deduplicate(suggestions)
|
||||
cursor = self.connection.cursor()
|
||||
args = ",".join(
|
||||
cursor.mogrify("(%s,%s,%s::oapen_suggestions.suggestion[])", x).decode(
|
||||
"utf-8"
|
||||
)
|
||||
for x in suggestions
|
||||
cursor.mogrify("(%s,%s,%s,%s)", x).decode("utf-8") for x in suggestions
|
||||
)
|
||||
cursor.close()
|
||||
return args
|
||||
|
@ -81,14 +78,13 @@ class OapenDB:
|
|||
cursor = self.connection.cursor()
|
||||
query = """
|
||||
INSERT INTO oapen_suggestions.suggestions (handle, name, suggestions)
|
||||
VALUES (%s, %s, %s::oapen_suggestions.suggestion[])
|
||||
ON CONFLICT (handle)
|
||||
DO
|
||||
UPDATE SET suggestions = excluded.suggestions
|
||||
VALUES (%s, %s, %s, %s)
|
||||
"""
|
||||
|
||||
try:
|
||||
cursor.execute(query, (suggestion[0], suggestion[1], suggestion[2]))
|
||||
cursor.execute(
|
||||
query, (suggestion[0], suggestion[1], suggestion[2], suggestion[3])
|
||||
)
|
||||
except (Exception, psycopg2.Error) as error:
|
||||
logger.error(error)
|
||||
finally:
|
||||
|
@ -98,11 +94,8 @@ class OapenDB:
|
|||
cursor = self.connection.cursor()
|
||||
args = self.mogrify_suggestions(suggestions)
|
||||
query = f"""
|
||||
INSERT INTO oapen_suggestions.suggestions (handle, name, suggestions)
|
||||
INSERT INTO oapen_suggestions.suggestions (handle, name, suggestion, score)
|
||||
VALUES {args}
|
||||
ON CONFLICT (handle)
|
||||
DO
|
||||
UPDATE SET suggestions = excluded.suggestions
|
||||
"""
|
||||
|
||||
try:
|
||||
|
@ -147,13 +140,17 @@ class OapenDB:
|
|||
finally:
|
||||
cursor.close()
|
||||
|
||||
def get_all_ngrams(self, ngram_limit=None) -> List[NgramRow]:
|
||||
# get_empty = True -> Include rows with no ngrams in result
|
||||
def get_all_ngrams(self, get_empty=True) -> List[NgramRow]:
|
||||
cursor = self.connection.cursor()
|
||||
query = """
|
||||
SELECT handle, CAST (ngrams AS oapen_suggestions.ngram[]), created_at, updated_at
|
||||
FROM oapen_suggestions.ngrams
|
||||
"""
|
||||
ret = None
|
||||
if not get_empty:
|
||||
query += """
|
||||
WHERE ngrams != \'{}\'
|
||||
"""
|
||||
try:
|
||||
cursor.execute(query)
|
||||
records = cursor.fetchall()
|
||||
|
@ -168,8 +165,7 @@ class OapenDB:
|
|||
def get_all_suggestions(self) -> List[SuggestionRow]:
|
||||
cursor = self.connection.cursor()
|
||||
query = """
|
||||
SELECT handle, name, CAST (suggestions AS oapen_suggestions.suggestion[]), created_at, updated_at
|
||||
FROM oapen_suggestions.suggestions
|
||||
SELECT * FROM oapen_suggestions.suggestions
|
||||
"""
|
||||
ret = None
|
||||
try:
|
||||
|
@ -184,6 +180,25 @@ class OapenDB:
|
|||
cursor.close()
|
||||
return ret
|
||||
|
||||
def get_suggestions_for_item(self, handle) -> List[SuggestionRow]:
|
||||
cursor = self.connection.cursor()
|
||||
query = """
|
||||
SELECT * FROM oapen_suggestions.suggestions
|
||||
WHERE handle = \'%s\'
|
||||
"""
|
||||
ret = None
|
||||
try:
|
||||
cursor.execute(query, handle)
|
||||
records = cursor.fetchall()
|
||||
|
||||
ret = records
|
||||
|
||||
except (Exception, psycopg2.Error) as error:
|
||||
logger.error(error)
|
||||
finally:
|
||||
cursor.close()
|
||||
return ret
|
||||
|
||||
def count_table(self, table_name) -> int or None:
|
||||
cursor = self.connection.cursor()
|
||||
query = "SELECT COUNT(*) FROM %s"
|
||||
|
|
|
@ -2,8 +2,6 @@ import logging
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
file_handler = logging.FileHandler("debug.log")
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
stream_handler = logging.StreamHandler()
|
||||
stream_handler.setLevel(logging.INFO)
|
||||
|
||||
|
@ -11,5 +9,5 @@ logging.basicConfig(
|
|||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(threadName)s - %(funcName)s: %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
handlers=[file_handler, stream_handler],
|
||||
handlers=[stream_handler],
|
||||
)
|
||||
|
|
|
@ -1,37 +1,19 @@
|
|||
import string
|
||||
from typing import List
|
||||
|
||||
import pandas as pd # pylint: disable=import-error
|
||||
from nltk import word_tokenize # pylint: disable=import-error
|
||||
from nltk.corpus import stopwords # pylint: disable=import-error
|
||||
import nltk
|
||||
from nltk import word_tokenize
|
||||
from .stopwords_processor import STOPWORDS
|
||||
import pandas as pd
|
||||
|
||||
from .oapen_types import ( # pylint: disable=relative-beyond-top-level
|
||||
nltk.download('punkt')
|
||||
|
||||
from .oapen_types import (
|
||||
NgramDict,
|
||||
NgramRowWithoutDate,
|
||||
OapenItem,
|
||||
)
|
||||
|
||||
stopword_paths = [
|
||||
"src/model/stopwords_broken.txt",
|
||||
"src/model/stopwords_dutch.txt",
|
||||
"src/model/stopwords_filter.txt",
|
||||
"src/model/stopwords_publisher.txt",
|
||||
]
|
||||
|
||||
stopwords_list = []
|
||||
|
||||
for p in stopword_paths:
|
||||
with open(p, "r") as f:
|
||||
stopwords_list += [line.rstrip() for line in f]
|
||||
|
||||
STOPWORDS = (
|
||||
stopwords.words("english")
|
||||
+ stopwords.words("german")
|
||||
+ stopwords.words("dutch")
|
||||
+ stopwords_list
|
||||
)
|
||||
|
||||
|
||||
def process_text(text):
|
||||
l_text = text.lower()
|
||||
p_text = "".join([c for c in l_text if c not in string.punctuation])
|
||||
|
|
|
@ -25,9 +25,8 @@ class OapenItem:
|
|||
return hash(self.handle, "handle")
|
||||
|
||||
|
||||
Suggestion = Tuple[str, float]
|
||||
SuggestionRowWithoutDate = Tuple[str, str, List[Suggestion]]
|
||||
SuggestionRowWithDate = Tuple[str, str, List[Suggestion], datetime, datetime]
|
||||
SuggestionRowWithoutDate = Tuple[str, str, str, int]
|
||||
SuggestionRowWithDate = Tuple[str, str, str, int, datetime, datetime]
|
||||
SuggestionRow = Union[SuggestionRowWithDate, SuggestionRowWithoutDate]
|
||||
|
||||
Ngram = Tuple[str, int]
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
import nltk
|
||||
from nltk.corpus import stopwords
|
||||
from functools import reduce
|
||||
import os
|
||||
|
||||
# This is run as a precaution in case of the error "NLTK stop words not found",
|
||||
# which makes sure to download the stop words after installing nltk
|
||||
nltk.download("stopwords")
|
||||
|
||||
# add additional custom stopwords to ./custom_lists/ folder and update the reference here
|
||||
# print working directory
|
||||
print("Working directory: " + os.getcwd())
|
||||
|
||||
current_dir = os.path.realpath(os.path.dirname(__file__))
|
||||
print("Local script directory: " + current_dir)
|
||||
|
||||
custom_lists_folder = current_dir + "/stopwords/"
|
||||
custom_stopwords_in_use = [
|
||||
"broken",
|
||||
"dutch",
|
||||
"filter",
|
||||
"publisher",
|
||||
]
|
||||
|
||||
# For reference on available languages, please reference https://pypi.org/project/stop-words/
|
||||
enabled_languages = [
|
||||
"english",
|
||||
"german",
|
||||
"dutch"
|
||||
]
|
||||
|
||||
# the combined stopwords of all enabled langauges
|
||||
nltk_stopwords = []
|
||||
for language in enabled_languages:
|
||||
nltk_stopwords += stopwords.words(language)
|
||||
|
||||
# get the custom lists
|
||||
custom_stopwords = []
|
||||
for custom_list in custom_stopwords_in_use:
|
||||
with open(custom_lists_folder + custom_list + ".txt", "r") as file: # specify folder name
|
||||
custom_stopwords += [line.rstrip() for line in file]
|
||||
|
||||
# add languages and custom stopwords for final stopwords var
|
||||
STOPWORDS = (nltk_stopwords + custom_stopwords)
|
|
@ -14,7 +14,6 @@ def create_schema(connection) -> None:
|
|||
"""
|
||||
CREATE SCHEMA oapen_suggestions;
|
||||
|
||||
CREATE TYPE oapen_suggestions.suggestion AS (handle text, similarity float);
|
||||
CREATE TYPE oapen_suggestions.ngram AS (ngram text, count int);
|
||||
|
||||
CREATE OR REPLACE FUNCTION update_modtime()
|
||||
|
@ -26,11 +25,13 @@ def create_schema(connection) -> None:
|
|||
$$ language 'plpgsql';
|
||||
|
||||
CREATE TABLE IF NOT EXISTS oapen_suggestions.suggestions (
|
||||
handle text PRIMARY KEY,
|
||||
handle text,
|
||||
name text,
|
||||
suggestions oapen_suggestions.suggestion[],
|
||||
suggestion text,
|
||||
score int,
|
||||
created_at timestamp default current_timestamp,
|
||||
updated_at timestamp default current_timestamp
|
||||
updated_at timestamp default current_timestamp,
|
||||
PRIMARY KEY (handle, suggestion)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS oapen_suggestions.ngrams (
|
||||
|
@ -49,6 +50,12 @@ def create_schema(connection) -> None:
|
|||
CREATE TRIGGER update_suggestion_modtime BEFORE UPDATE ON oapen_suggestions.suggestions FOR EACH ROW EXECUTE PROCEDURE update_modtime();
|
||||
CREATE TRIGGER update_ngrams_modtime BEFORE UPDATE ON oapen_suggestions.ngrams FOR EACH ROW EXECUTE PROCEDURE update_modtime();
|
||||
CREATE TRIGGER update_endpoint_modtime BEFORE UPDATE ON oapen_suggestions.endpoints FOR EACH ROW EXECUTE PROCEDURE update_modtime();
|
||||
|
||||
CREATE INDEX idx_suggestion
|
||||
ON oapen_suggestions.suggestions(handle, suggestion);
|
||||
|
||||
ALTER TABLE oapen_suggestions.suggestions
|
||||
ADD CONSTRAINT uq_Suggestion UNIQUE(handle, suggestion);
|
||||
"""
|
||||
)
|
||||
|
||||
|
@ -63,7 +70,6 @@ def drop_schema(connection) -> None:
|
|||
DROP TABLE IF EXISTS oapen_suggestions.suggestions CASCADE;
|
||||
DROP TABLE IF EXISTS oapen_suggestions.ngrams CASCADE;
|
||||
DROP TABLE IF EXISTS oapen_suggestions.endpoints CASCADE;
|
||||
DROP TYPE IF EXISTS oapen_suggestions.suggestion CASCADE;
|
||||
DROP TYPE IF EXISTS oapen_suggestions.ngram CASCADE;
|
||||
"""
|
||||
)
|
||||
|
@ -76,7 +82,15 @@ def get_endpoints(collections):
|
|||
|
||||
COLLECTION_IMPORT_LIMIT = int(os.environ["COLLECTION_IMPORT_LIMIT"])
|
||||
|
||||
SKIPPED_COLLECTIONS = [
|
||||
"1f7c8abd-677e-4275-8b4e-3d8da49f7b36",
|
||||
"93223e33-3c7c-47bd-9356-a7878b2814a0",
|
||||
]
|
||||
|
||||
for collection in collections:
|
||||
if collection["uuid"] in SKIPPED_COLLECTIONS:
|
||||
continue
|
||||
|
||||
num_items = (
|
||||
collection["numberItems"]
|
||||
if COLLECTION_IMPORT_LIMIT == 0
|
||||
|
|
|
@ -16,9 +16,9 @@ SCORE_THRESHOLD = 1
|
|||
TOP_K_NGRAMS_COUNT = 30
|
||||
|
||||
# Number of threads to generate suggestions
|
||||
SUGGESTIONS_MAX_WORKERS = 250
|
||||
SUGGESTIONS_MAX_ITEMS = 25
|
||||
SUGGESTIONS_MAX_WORKERS = 10
|
||||
SUGGESTIONS_MAX_ITEMS = 50
|
||||
|
||||
# Update items that were modifed since X days ago
|
||||
UPDATE_DAYS_BEFORE = 30
|
||||
REFRESH_IMPORT_LIMIT = 50
|
||||
REFRESH_IMPORT_LIMIT = 0
|
||||
|
|
|
@ -4,7 +4,9 @@ import signal
|
|||
import sys
|
||||
import time
|
||||
|
||||
import schedule
|
||||
from clean import run as run_clean
|
||||
from clean import seed_endpoints
|
||||
from data.connection import get_connection
|
||||
from data.oapen_db import OapenDB
|
||||
from generate_suggestions import run as run_generate_suggestions
|
||||
|
@ -12,10 +14,17 @@ from logger.base_logger import logger
|
|||
from refresh_items import run as run_refresh_items
|
||||
from seed import run as run_seed
|
||||
|
||||
conn = get_connection()
|
||||
db = OapenDB(conn)
|
||||
logger.info("Daemon up")
|
||||
|
||||
|
||||
def harvest():
|
||||
run_seed()
|
||||
run_generate_suggestions()
|
||||
seed_endpoints()
|
||||
urls = db.get_incomplete_urls()
|
||||
if len(urls) > 0:
|
||||
run_seed()
|
||||
run_generate_suggestions()
|
||||
|
||||
|
||||
def refresh():
|
||||
|
@ -23,12 +32,6 @@ def refresh():
|
|||
run_generate_suggestions()
|
||||
|
||||
|
||||
logger.info("Daemon up")
|
||||
|
||||
conn = get_connection()
|
||||
db = OapenDB(conn)
|
||||
|
||||
|
||||
def signal_handler(signal, frame):
|
||||
conn.close()
|
||||
logger.info("Daemon exiting.")
|
||||
|
@ -37,29 +40,25 @@ def signal_handler(signal, frame):
|
|||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
logger.info("Daemon up")
|
||||
|
||||
conn = get_connection()
|
||||
db = OapenDB(conn)
|
||||
|
||||
if int(os.environ["RUN_CLEAN"]) == 1 or (
|
||||
not db.table_exists("suggestions") or not db.table_exists("ngrams")
|
||||
not db.table_exists("suggestions")
|
||||
or not db.table_exists("ngrams")
|
||||
or not db.table_exists("endpoints")
|
||||
):
|
||||
run_clean()
|
||||
|
||||
harvest()
|
||||
|
||||
harvest_acc = 0
|
||||
refresh_acc = 0
|
||||
schedule.every().day.at("20:00").do(refresh)
|
||||
schedule.every().sunday.at("22:00").do(harvest)
|
||||
|
||||
while True:
|
||||
if harvest_acc >= int(os.environ["HARVEST_PERIOD"]):
|
||||
urls = db.get_incomplete_urls()
|
||||
if len(urls) > 0:
|
||||
harvest()
|
||||
harvest_acc = 0
|
||||
|
||||
if refresh_acc >= int(os.environ["REFRESH_PERIOD"]):
|
||||
refresh()
|
||||
refresh_acc = 0
|
||||
|
||||
schedule.run_pending()
|
||||
time.sleep(60)
|
||||
refresh_acc += 60
|
||||
harvest_acc += 60
|
||||
|
||||
logger.info("Daemon down")
|
||||
|
|
|
@ -1,18 +1,15 @@
|
|||
import concurrent.futures
|
||||
import time
|
||||
from collections import Counter
|
||||
from threading import Lock
|
||||
from typing import List
|
||||
|
||||
import config
|
||||
import tqdm
|
||||
from data.connection import close_connection, get_connection
|
||||
from data.oapen_db import OapenDB
|
||||
from logger.base_logger import logger
|
||||
from model.oapen_types import NgramRow, SuggestionRow
|
||||
|
||||
# for each item in ngrams
|
||||
# get suggestions for item
|
||||
# store in database
|
||||
from tqdm.auto import tqdm
|
||||
|
||||
# initial seed -> get suggestions on everything n^2
|
||||
# weekly update ->
|
||||
|
@ -21,98 +18,94 @@ from model.oapen_types import NgramRow, SuggestionRow
|
|||
# optimization: only suggest once per pair
|
||||
|
||||
|
||||
def suggestion_task(items, all_items, mutex, suggestions):
|
||||
def get_ngrams_list(arr: List[NgramRow]):
|
||||
return [x[0] for x in arr[1][0 : min(len(arr[1]), config.TOP_K_NGRAMS_COUNT)]]
|
||||
|
||||
|
||||
def suggestion_task(items, all_items, db_mutex, db):
|
||||
suggestions: List[SuggestionRow] = []
|
||||
for item_a in items:
|
||||
handle_a = item_a[0]
|
||||
ngrams_a = [
|
||||
x[0] for x in item_a[1][0 : min(len(item_a[1]), config.TOP_K_NGRAMS_COUNT)]
|
||||
]
|
||||
|
||||
item_suggestions = []
|
||||
|
||||
for item_b in all_items:
|
||||
handle_b = item_b[0]
|
||||
ngrams_b = [
|
||||
x[0]
|
||||
for x in item_b[1][0 : min(len(item_b[1]), config.TOP_K_NGRAMS_COUNT)]
|
||||
]
|
||||
|
||||
if handle_a == handle_b:
|
||||
continue
|
||||
|
||||
repeated = len(list(filter(lambda x: x in ngrams_b, ngrams_a)))
|
||||
ngrams_shared = len(list(filter(lambda x: x in item_b[1], item_a[1])))
|
||||
|
||||
if repeated >= config.SCORE_THRESHOLD:
|
||||
item_suggestions.append((handle_b, repeated))
|
||||
if ngrams_shared >= config.SCORE_THRESHOLD:
|
||||
suggestions.append((handle_a, handle_a, handle_b, ngrams_shared))
|
||||
|
||||
mutex.acquire()
|
||||
item_suggestions.sort(key=lambda x: x[1], reverse=True)
|
||||
mutex.release()
|
||||
db_mutex.acquire()
|
||||
db.add_many_suggestions(suggestions)
|
||||
db_mutex.release()
|
||||
|
||||
suggestions.append((handle_a, handle_a, item_suggestions))
|
||||
return len(items)
|
||||
|
||||
|
||||
def refresh(future, counter, pbar):
|
||||
pbar.update(future.result())
|
||||
counter["items_updated"] += future.result()
|
||||
pbar.refresh()
|
||||
|
||||
|
||||
def run():
|
||||
|
||||
mutex = Lock()
|
||||
connection = get_connection()
|
||||
db = OapenDB(connection)
|
||||
|
||||
all_items: List[NgramRow] = db.get_all_ngrams()
|
||||
suggestions: List[SuggestionRow] = []
|
||||
|
||||
# Remove any empty entries
|
||||
all_items = list(filter(lambda item: len(item[1]) != 0, all_items))
|
||||
|
||||
logger.info("Generating suggestions for {0} items.".format(str(len(all_items))))
|
||||
all_items: List[NgramRow] = db.get_all_ngrams(get_empty=False)
|
||||
|
||||
executor = concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=config.SUGGESTIONS_MAX_WORKERS
|
||||
)
|
||||
futures = []
|
||||
db_mutex = Lock()
|
||||
|
||||
counter = Counter(items_updated=0)
|
||||
|
||||
pbar = tqdm(
|
||||
total=len(all_items),
|
||||
mininterval=0,
|
||||
miniters=1,
|
||||
leave=True,
|
||||
position=0,
|
||||
initial=0,
|
||||
)
|
||||
|
||||
logger.info("Getting suggestions for {0} items...".format(str(len(all_items))))
|
||||
time_start = time.perf_counter()
|
||||
|
||||
# Get only top k ngrams for all items before processing
|
||||
for item in all_items:
|
||||
item = (
|
||||
item[0],
|
||||
[x[0] for x in item[1]][0 : min(len(item[1]), config.TOP_K_NGRAMS_COUNT)],
|
||||
)
|
||||
ngrams = get_ngrams_list(item)
|
||||
item = (item[0], ngrams)
|
||||
|
||||
time_start = time.perf_counter()
|
||||
chunks = [
|
||||
all_items[i : i + config.SUGGESTIONS_MAX_ITEMS]
|
||||
for i in range(0, len(all_items), config.SUGGESTIONS_MAX_ITEMS)
|
||||
]
|
||||
|
||||
n = config.SUGGESTIONS_MAX_ITEMS
|
||||
for chunk in chunks:
|
||||
future = executor.submit(suggestion_task, chunk, all_items, db_mutex, db)
|
||||
future.add_done_callback(lambda x: refresh(x, counter, pbar))
|
||||
futures.append(future)
|
||||
|
||||
chunks = [all_items[i : i + n] for i in range(0, len(all_items), n)]
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=config.SUGGESTIONS_MAX_WORKERS
|
||||
) as executor:
|
||||
|
||||
for chunk in chunks:
|
||||
future = executor.submit(
|
||||
suggestion_task, chunk, all_items, mutex, suggestions
|
||||
)
|
||||
futures.append(future)
|
||||
|
||||
with tqdm.tqdm(
|
||||
total=len(futures),
|
||||
mininterval=0,
|
||||
miniters=1,
|
||||
leave=True,
|
||||
position=0,
|
||||
initial=0,
|
||||
) as pbar:
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
future.result()
|
||||
pbar.update(1)
|
||||
|
||||
db.add_many_suggestions(suggestions)
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
pass
|
||||
|
||||
logger.info(
|
||||
"Updated suggestions for "
|
||||
+ str(len(all_items))
|
||||
+ " items in "
|
||||
"Updated "
|
||||
+ str(counter["items_updated"])
|
||||
+ " suggestions in "
|
||||
+ str(time.perf_counter() - time_start)
|
||||
+ "s."
|
||||
)
|
||||
|
||||
executor.shutdown(wait=True)
|
||||
|
||||
pbar.close()
|
||||
close_connection(connection)
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
import test_oapen
|
||||
import test_stopwords
|
||||
import test_ngrams
|
||||
|
||||
def run_test(run_msg, func):
|
||||
print(run_msg, end = " ")
|
||||
func()
|
||||
print("OK") # will throw on fail
|
||||
|
||||
def main():
|
||||
print("Testing connection to OAPEN.")
|
||||
try:
|
||||
run_test("Attempting to get item [Embodying Contagion]:", test_oapen.test_get_item)
|
||||
run_test("Attempting to get null item:", test_oapen.test_get_item_404)
|
||||
run_test("Attempting to get collection limit by label [Knowledge Unlatched (KU)]:",
|
||||
test_oapen.test_get_collection_limit)
|
||||
run_test("Attempting to get null collection:", test_oapen.test_get_collection_404)
|
||||
except Exception as e:
|
||||
print("\nFailed:")
|
||||
print(e)
|
||||
|
||||
print("\nTesting stopwords generation.")
|
||||
try:
|
||||
run_test("Testing stopwords correctly generated:",
|
||||
test_stopwords.test_stopwords_contains_all)
|
||||
except Exception as e:
|
||||
print("Failed:")
|
||||
print(e)
|
||||
|
||||
print("\nTesting ngrams functionality.")
|
||||
try:
|
||||
run_test("Testing process_text:", test_ngrams.test_process_text)
|
||||
run_test("Testing ngram generation:", test_ngrams.test_generate_ngram)
|
||||
run_test("Testing similarity score:", test_ngrams.test_similarity_score)
|
||||
|
||||
except Exception as e:
|
||||
print("Failed:")
|
||||
print(e)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,51 @@
|
|||
import model.ngrams as ngrams
|
||||
|
||||
test_text1 = "Foxes are cunning animals. There was a quick, red fox known to avoid crossing roads during the day, doing so only at night."
|
||||
test_text2 = "The quick red fox jumped over the lazy brown dog. It had a fantastic time doing so, as it felt finally free. The fox had been in the zoo for far too long, held in captivity."
|
||||
|
||||
processed_text1 = ['foxes', 'cunning', 'animals', 'quick', 'red', 'fox', 'known', 'avoid', 'crossing', 'roads', 'day', 'night']
|
||||
processed_text2 = ['quick', 'red', 'fox', 'jumped', 'lazy', 'brown', 'dog', 'fantastic', 'time', 'felt', 'finally', 'free', 'fox', 'zoo', 'far', 'long', 'held', 'captivity']
|
||||
|
||||
ngrams1 = {
|
||||
'foxes cunning animals': 1,
|
||||
'cunning animals quick': 1,
|
||||
'animals quick red': 1,
|
||||
'quick red fox': 1,
|
||||
'red fox known': 1,
|
||||
'fox known avoid': 1,
|
||||
'known avoid crossing': 1,
|
||||
'avoid crossing roads': 1,
|
||||
'crossing roads day': 1,
|
||||
'roads day night': 1
|
||||
}
|
||||
ngrams2 = {
|
||||
'quick red fox': 1,
|
||||
'red fox jumped': 1,
|
||||
'fox jumped lazy': 1,
|
||||
'jumped lazy brown': 1,
|
||||
'lazy brown dog': 1,
|
||||
'brown dog fantastic': 1,
|
||||
'dog fantastic time': 1,
|
||||
'fantastic time felt': 1,
|
||||
'time felt finally': 1,
|
||||
'felt finally free': 1,
|
||||
'finally free fox': 1,
|
||||
'free fox zoo': 1,
|
||||
'fox zoo far': 1,
|
||||
'zoo far long': 1,
|
||||
'far long held': 1,
|
||||
'long held captivity': 1
|
||||
}
|
||||
|
||||
def test_process_text():
|
||||
assert(ngrams.process_text(test_text1) == processed_text1)
|
||||
assert(ngrams.process_text(test_text2) == processed_text2)
|
||||
|
||||
def test_generate_ngram():
|
||||
assert(ngrams.generate_ngram(processed_text1) == ngrams1)
|
||||
assert(ngrams.generate_ngram(processed_text2) == ngrams2)
|
||||
|
||||
def test_similarity_score():
|
||||
assert(ngrams.get_similarity_score(ngrams1, ngrams2, n=5, as_percent=False) == 1)
|
||||
assert(ngrams.get_similarity_score(ngrams1, ngrams2, n=5, as_percent=True) == 0.2)
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
from typing import List
|
||||
|
||||
import src.data.oapen as OapenAPI
|
||||
import data.oapen as OapenAPI
|
||||
from model.oapen_types import OapenItem
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
from model.stopwords_processor import STOPWORDS
|
||||
import model.stopwords.stopwords_full_list as stopwords_full_list
|
||||
# currently contains stopwords_filter, stopwords_publisher, stopwords_broken, stopwords_dutch_extra
|
||||
|
||||
# tests all at once
|
||||
def test_stopwords_contains_all():
|
||||
assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_filter))
|
||||
assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_publisher))
|
||||
assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_broken))
|
||||
assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_dutch_extra))
|
||||
|
||||
# individual tests provided if needed
|
||||
def test_stopwords_contains_stopwords_filter():
|
||||
assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_filter))
|
||||
|
||||
def test_stopwords_contains_stopwords_publisher():
|
||||
assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_publisher))
|
||||
|
||||
def test_stopwords_contains_stopwords_broken():
|
||||
assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_broken))
|
||||
|
||||
def test_stopwords_contains_stopwords_dutch_extra():
|
||||
assert(all(x in STOPWORDS for x in stopwords_full_list.stopwords_dutch_extra))
|
Loading…
Reference in New Issue