OAP-61: flatten suggestions database (#46)

* OAP-61: flatten suggestions database

* Remove limit on items to refresh

* Delete run.sh
pull/56/head
Celina Peralta 2023-04-03 17:02:16 -04:00 committed by GitHub
parent b80d745b30
commit 7f92b17dc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 77 additions and 66 deletions

View File

@ -8,11 +8,10 @@ async function querySuggestions(handle, threshold = 0) {
await validate.checkHandle(handle);
const query = new PQ({
text: `SELECT s.*
FROM (SELECT handle, unnest(suggestions::oapen_suggestions.suggestion[]) AS suggestion
FROM oapen_suggestions.suggestions) s
text: `SELECT suggestion AS handle, score
FROM oapen_suggestions.suggestions
WHERE handle = $1
AND (s.suggestion).similarity >= $2`,
AND score >= $2`,
values: [handle, threshold],
});
@ -23,9 +22,11 @@ async function querySuggestions(handle, threshold = 0) {
if (result?.["error"])
return result;
console.log(result);
const data = {
"handle": handle,
"suggestions": result.map((e) => {return e["suggestion"];})
"suggestions": result
};
return data;

0
oapen-engine/scripts/clean.sh Normal file → Executable file
View File

0
oapen-engine/scripts/refresh.sh Normal file → Executable file
View File

View File

@ -20,7 +20,6 @@ def get_connection():
cur.close()
register_composite("oapen_suggestions.suggestion", conn, globally=True)
register_composite("oapen_suggestions.ngram", conn, globally=True)
return conn

View File

@ -32,10 +32,7 @@ class OapenDB:
suggestions = self.deduplicate(suggestions)
cursor = self.connection.cursor()
args = ",".join(
cursor.mogrify("(%s,%s,%s::oapen_suggestions.suggestion[])", x).decode(
"utf-8"
)
for x in suggestions
cursor.mogrify("(%s,%s,%s,%s)", x).decode("utf-8") for x in suggestions
)
cursor.close()
return args
@ -81,14 +78,13 @@ class OapenDB:
cursor = self.connection.cursor()
query = """
INSERT INTO oapen_suggestions.suggestions (handle, name, suggestions)
VALUES (%s, %s, %s::oapen_suggestions.suggestion[])
ON CONFLICT (handle)
DO
UPDATE SET suggestions = excluded.suggestions
VALUES (%s, %s, %s, %s)
"""
try:
cursor.execute(query, (suggestion[0], suggestion[1], suggestion[2]))
cursor.execute(
query, (suggestion[0], suggestion[1], suggestion[2], suggestion[3])
)
except (Exception, psycopg2.Error) as error:
logger.error(error)
finally:
@ -98,11 +94,8 @@ class OapenDB:
cursor = self.connection.cursor()
args = self.mogrify_suggestions(suggestions)
query = f"""
INSERT INTO oapen_suggestions.suggestions (handle, name, suggestions)
INSERT INTO oapen_suggestions.suggestions (handle, name, suggestion, score)
VALUES {args}
ON CONFLICT (handle)
DO
UPDATE SET suggestions = excluded.suggestions
"""
try:
@ -158,7 +151,6 @@ class OapenDB:
query += """
WHERE ngrams != \'{}\'
"""
ret = None
try:
cursor.execute(query)
records = cursor.fetchall()
@ -173,8 +165,7 @@ class OapenDB:
def get_all_suggestions(self) -> List[SuggestionRow]:
cursor = self.connection.cursor()
query = """
SELECT handle, name, CAST (suggestions AS oapen_suggestions.suggestion[]), created_at, updated_at
FROM oapen_suggestions.suggestions
SELECT * FROM oapen_suggestions.suggestions
"""
ret = None
try:
@ -189,6 +180,25 @@ class OapenDB:
cursor.close()
return ret
def get_suggestions_for_item(self, handle) -> List[SuggestionRow]:
cursor = self.connection.cursor()
query = """
SELECT * FROM oapen_suggestions.suggestions
WHERE handle = \'%s\'
"""
ret = None
try:
cursor.execute(query, handle)
records = cursor.fetchall()
ret = records
except (Exception, psycopg2.Error) as error:
logger.error(error)
finally:
cursor.close()
return ret
def count_table(self, table_name) -> int or None:
cursor = self.connection.cursor()
query = "SELECT COUNT(*) FROM %s"

View File

@ -2,8 +2,6 @@ import logging
logger = logging.getLogger(__name__)
file_handler = logging.FileHandler("debug.log")
file_handler.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
@ -11,5 +9,5 @@ logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(threadName)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[file_handler, stream_handler],
handlers=[stream_handler],
)

View File

@ -25,9 +25,8 @@ class OapenItem:
return hash(self.handle, "handle")
Suggestion = Tuple[str, float]
SuggestionRowWithoutDate = Tuple[str, str, List[Suggestion]]
SuggestionRowWithDate = Tuple[str, str, List[Suggestion], datetime, datetime]
SuggestionRowWithoutDate = Tuple[str, str, str, int]
SuggestionRowWithDate = Tuple[str, str, str, int, datetime, datetime]
SuggestionRow = Union[SuggestionRowWithDate, SuggestionRowWithoutDate]
Ngram = Tuple[str, int]

View File

@ -14,7 +14,6 @@ def create_schema(connection) -> None:
"""
CREATE SCHEMA oapen_suggestions;
CREATE TYPE oapen_suggestions.suggestion AS (handle text, similarity float);
CREATE TYPE oapen_suggestions.ngram AS (ngram text, count int);
CREATE OR REPLACE FUNCTION update_modtime()
@ -26,11 +25,13 @@ def create_schema(connection) -> None:
$$ language 'plpgsql';
CREATE TABLE IF NOT EXISTS oapen_suggestions.suggestions (
handle text PRIMARY KEY,
handle text,
name text,
suggestions oapen_suggestions.suggestion[],
suggestion text,
score int,
created_at timestamp default current_timestamp,
updated_at timestamp default current_timestamp
updated_at timestamp default current_timestamp,
PRIMARY KEY (handle, suggestion)
);
CREATE TABLE IF NOT EXISTS oapen_suggestions.ngrams (
@ -49,6 +50,12 @@ def create_schema(connection) -> None:
CREATE TRIGGER update_suggestion_modtime BEFORE UPDATE ON oapen_suggestions.suggestions FOR EACH ROW EXECUTE PROCEDURE update_modtime();
CREATE TRIGGER update_ngrams_modtime BEFORE UPDATE ON oapen_suggestions.ngrams FOR EACH ROW EXECUTE PROCEDURE update_modtime();
CREATE TRIGGER update_endpoint_modtime BEFORE UPDATE ON oapen_suggestions.endpoints FOR EACH ROW EXECUTE PROCEDURE update_modtime();
CREATE INDEX idx_suggestion
ON oapen_suggestions.suggestions(handle, suggestion);
ALTER TABLE oapen_suggestions.suggestions
ADD CONSTRAINT uq_Suggestion UNIQUE(handle, suggestion);
"""
)
@ -63,7 +70,6 @@ def drop_schema(connection) -> None:
DROP TABLE IF EXISTS oapen_suggestions.suggestions CASCADE;
DROP TABLE IF EXISTS oapen_suggestions.ngrams CASCADE;
DROP TABLE IF EXISTS oapen_suggestions.endpoints CASCADE;
DROP TYPE IF EXISTS oapen_suggestions.suggestion CASCADE;
DROP TYPE IF EXISTS oapen_suggestions.ngram CASCADE;
"""
)

View File

@ -40,6 +40,11 @@ def signal_handler(signal, frame):
signal.signal(signal.SIGINT, signal_handler)
logger.info("Daemon up")
conn = get_connection()
db = OapenDB(conn)
if int(os.environ["RUN_CLEAN"]) == 1 or (
not db.table_exists("suggestions")
or not db.table_exists("ngrams")

View File

@ -25,8 +25,6 @@ def get_ngrams_list(arr: List[NgramRow]):
def suggestion_task(items, all_items, db_mutex, db):
suggestions: List[SuggestionRow] = []
for item_a in items:
item_suggestions = []
handle_a = item_a[0]
for item_b in all_items:
@ -38,52 +36,34 @@ def suggestion_task(items, all_items, db_mutex, db):
ngrams_shared = len(list(filter(lambda x: x in item_b[1], item_a[1])))
if ngrams_shared >= config.SCORE_THRESHOLD:
item_suggestions.append((handle_b, ngrams_shared))
item_suggestions.sort(key=lambda x: x[1], reverse=True)
suggestions.append((handle_a, handle_a, item_suggestions))
count = len(suggestions)
suggestions.append((handle_a, handle_a, handle_b, ngrams_shared))
db_mutex.acquire()
db.add_many_suggestions(suggestions)
db_mutex.release()
return count
return len(items)
def refresh(future, counter, pbar):
pbar.update(future.result())
counter["items_updated"] += future.result()
pbar.refresh()
def run():
db_mutex = Lock()
connection = get_connection()
db = OapenDB(connection)
all_items: List[NgramRow] = db.get_all_ngrams(get_empty=False)
logger.info("Getting suggestions for {0} items...".format(str(len(all_items))))
futures = []
# Get only top k ngrams for all items before processing
for item in all_items:
ngrams = get_ngrams_list(item)
item = (item[0], ngrams)
time_start = time.perf_counter()
n = config.SUGGESTIONS_MAX_ITEMS
chunks = [all_items[i : i + n] for i in range(0, len(all_items), n)]
counter = Counter(items_updated=0)
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=config.SUGGESTIONS_MAX_WORKERS
)
futures = []
db_mutex = Lock()
def refresh(future, counter, pbar):
pbar.update(future.result())
counter["items_updated"] += future.result()
pbar.refresh()
counter = Counter(items_updated=0)
pbar = tqdm(
total=len(all_items),
@ -94,6 +74,19 @@ def run():
initial=0,
)
logger.info("Getting suggestions for {0} items...".format(str(len(all_items))))
time_start = time.perf_counter()
# Get only top k ngrams for all items before processing
for item in all_items:
ngrams = get_ngrams_list(item)
item = (item[0], ngrams)
chunks = [
all_items[i : i + config.SUGGESTIONS_MAX_ITEMS]
for i in range(0, len(all_items), config.SUGGESTIONS_MAX_ITEMS)
]
for chunk in chunks:
future = executor.submit(suggestion_task, chunk, all_items, db_mutex, db)
future.add_done_callback(lambda x: refresh(x, counter, pbar))
@ -103,9 +96,9 @@ def run():
pass
logger.info(
"Updated suggestions for "
"Updated "
+ str(counter["items_updated"])
+ " items in "
+ " suggestions in "
+ str(time.perf_counter() - time_start)
+ "s."
)