celinanperalta/OAP 65 (#54)

Add additional parameters to OAPEN DB and refactor engine types
pull/57/head
Celina Peralta 2023-04-14 10:39:01 +02:00 committed by GitHub
parent 3f05e94e67
commit 7e33ac4677
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 244 additions and 237 deletions

View File

@ -1,40 +1,36 @@
import random
import time
from datetime import datetime
from typing import List
import requests
from logger.base_logger import logger
from model.oapen_types import OapenItem, transform_item_data
from model.oapen_types import OapenItem
SERVER_PATH = "https://library.oapen.org"
GET_COMMUNITY = "/rest/communities/{id}"
GET_COLLECTION = "/rest/collections/{id}"
GET_COLLECTIONS = "/rest/collections/"
GET_ITEM_BITSTREAMS = "/rest/items/{id}/bitstreams"
GET_COLLECTION_ITEMS = "/rest/collections/{id}/items"
GET_COMMUNITY_COLLECTIONS = "/rest/communities/{id}/collections"
GET_ITEM = "/rest/search?query=handle:%22{handle}%22&expand=bitstreams,metadata"
GET_COLLECTION_BY_LABEL = (
"/rest/search?query=oapen.collection:%22{label}%22&expand=metadata"
)
GET_WEEKLY_ITEMS = "/rest/search?query=dc.date.accessioned_dt:[NOW-7DAY/DAY+TO+NOW]&expand=bitstreams,metadata"
GET_UPDATED_ITEMS = (
"/rest/search?query=lastModified%3E{date}&expand=metadata,bitsteams" # YYYY-MM-DD
)
# This is the only community we care about right now
BOOKS_COMMUNITY_ID = "3579505d-9d1b-4745-bcaf-a37329d25c69"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15",
]
GET_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36"
}
def transform_item_data(item) -> OapenItem:
thumbnail = get_bitstream_thumbnail(item)
text = get_bitstream_text(item)
return OapenItem(item["handle"], item["name"], thumbnail, text)
def transform_multiple_items_data(items) -> List[OapenItem]:
return [
transform_item_data(item, get_bitstream_text(item["bitstreams"]))
for item in items
]
return [transform_item_data(item) for item in items]
def get(endpoint, params=None):
@ -43,7 +39,7 @@ def get(endpoint, params=None):
url=SERVER_PATH + endpoint,
params=params,
timeout=(None, 120),
headers=GET_HEADERS,
headers={"User-Agent": random.choice(USER_AGENTS)},
)
ret = None
@ -54,41 +50,19 @@ def get(endpoint, params=None):
ret = res.content
else:
logger.error("ERROR - GET {}: {}".format(res.url, res.status_code))
logger.debug("GET {}: {}".format(res.url, res.status_code))
return ret
def get_all_communities():
return get(endpoint=GET_COMMUNITY)
def get_community(id):
return get(endpoint=GET_COMMUNITY.format(id=id))
def get_collection(id):
return get(endpoint=GET_COLLECTION.format(id=id))
def get_item(handle) -> OapenItem:
res = get(endpoint=GET_ITEM.format(handle=handle))
if res is not None and len(res) > 0:
return transform_item_data(res[0], get_bitstream_text(res[0]["bitstreams"]))
return res
def get_collections_from_community(id):
res = get(endpoint=GET_COMMUNITY_COLLECTIONS.format(id=id))
return res
def get_all_collections():
res = get(endpoint=GET_COLLECTIONS)
return res
# i.e. /rest/collections/2154c0ca-7814-4cc8-a869-3de4215c4121/items?limit=25&offset=0&expand=bitstreams,metadata
# This is a redundancy of get_collection_items_by_id meant to be used with endpoints that
# are constructed in tasks/clean.py and cached in the database. This is not a high priority
# issue, but it certainly can be cleaned up in the future.
def get_collection_items_by_endpoint(endpoint) -> List[OapenItem]:
res = get(endpoint=endpoint)
@ -108,39 +82,15 @@ def get_collection_items_by_id(id, limit=None, offset=None) -> List[OapenItem]:
return res
def get_collection_items_by_label(label, limit=None) -> List[OapenItem]:
label = "+".join(label.split(" "))
res = get(
endpoint=GET_COLLECTION_BY_LABEL.format(label=label),
params={"limit": limit},
)
if res is not None and len(res) > 0:
return transform_multiple_items_data(res)
return res
def get_bitstream_text(bitstreams, limit=None) -> str:
if bitstreams is not None:
for bitstream in bitstreams:
if bitstream["mimeType"] == "text/plain":
retrieveLink = bitstream["retrieveLink"]
text = str(get(retrieveLink).decode("utf-8"))
return text if limit is None else text[:limit]
return ""
# Gets all items added in the last week
def get_weekly_items(limit=None) -> List[OapenItem]:
res = get(endpoint=GET_WEEKLY_ITEMS, params={"limit": limit})
if res is not None and len(res) > 0:
return transform_multiple_items_data(res)
return res
def get_updated_items(date: datetime, limit=None, offset=None) -> List[OapenItem]:
date = date.strftime("%Y-%m-%d")
res = get(
endpoint=GET_UPDATED_ITEMS.format(date=date),
@ -150,3 +100,27 @@ def get_updated_items(date: datetime, limit=None, offset=None) -> List[OapenItem
if res is not None and len(res) > 0:
return transform_multiple_items_data(res)
return res
# General function to extract the retrieveLink of a bitstream with type bitstream_type
def __get_bitstream_url(bitstreams, bundle_name: str) -> str or None:
if bitstreams is not None:
for bitstream in bitstreams:
if bitstream["bundleName"] == bundle_name:
return bitstream["retrieveLink"]
return None
# limit: int - number of characters
def get_bitstream_text(item) -> str:
retrieveLink = __get_bitstream_url(item["bitstreams"], "TEXT")
if retrieveLink is not None:
time.sleep(1) # Attempt to avoid rate limiting
text = str(get(retrieveLink).decode("utf-8"))
return text
else:
return ""
def get_bitstream_thumbnail(item):
return __get_bitstream_url(item["bitstreams"], "THUMBNAIL")

View File

@ -13,16 +13,19 @@ class OapenDB:
seen = set()
res = []
for i in items:
if i[0] not in seen:
if i not in seen:
res.append(i)
seen.add(i[0])
seen.add(i)
return res
def mogrify_ngrams(self, ngrams: List[NgramRow]) -> str:
ngrams = self.deduplicate(ngrams)
cursor = self.connection.cursor()
args = ",".join(
cursor.mogrify("(%s,%s::oapen_suggestions.ngram[])", x).decode("utf-8")
cursor.mogrify(
"(%s,%s,%s,%s::oapen_suggestions.ngram[])",
(x.handle, x.name, x.thumbnail, x.ngrams),
).decode("utf-8")
for x in ngrams
)
cursor.close()
@ -32,7 +35,17 @@ class OapenDB:
suggestions = self.deduplicate(suggestions)
cursor = self.connection.cursor()
args = ",".join(
cursor.mogrify("(%s,%s,%s,%s)", x).decode("utf-8") for x in suggestions
cursor.mogrify(
"(%s,%s,%s,%s,%s)",
(
x.handle,
x.suggestion,
x.suggestion_name,
x.suggestion_thumbnail,
x.score,
),
).decode("utf-8")
for x in suggestions
)
cursor.close()
return args
@ -93,11 +106,11 @@ class OapenDB:
def add_many_suggestions(self, suggestions: List[SuggestionRow]) -> None:
cursor = self.connection.cursor()
args = self.mogrify_suggestions(suggestions)
query = f"""
INSERT INTO oapen_suggestions.suggestions (handle, name, suggestion, score)
INSERT INTO oapen_suggestions.suggestions
VALUES {args}
"""
try:
cursor.execute(query)
except (Exception, psycopg2.Error) as error:
@ -127,13 +140,12 @@ class OapenDB:
cursor = self.connection.cursor()
args = self.mogrify_ngrams(ngrams)
query = f"""
INSERT INTO oapen_suggestions.ngrams (handle, ngrams)
INSERT INTO oapen_suggestions.ngrams
VALUES {args}
ON CONFLICT (handle)
DO
UPDATE SET ngrams = excluded.ngrams
"""
cursor.execute(query)
except (Exception, psycopg2.Error) as error:
logger.error(error)
@ -144,7 +156,7 @@ class OapenDB:
def get_all_ngrams(self, get_empty=True) -> List[NgramRow]:
cursor = self.connection.cursor()
query = """
SELECT handle, CAST (ngrams AS oapen_suggestions.ngram[]), created_at, updated_at
SELECT handle, name, thumbnail, CAST (ngrams AS oapen_suggestions.ngram[]), created_at, updated_at
FROM oapen_suggestions.ngrams
"""
if not get_empty:
@ -154,7 +166,7 @@ class OapenDB:
try:
cursor.execute(query)
records = cursor.fetchall()
ret = records
ret = [NgramRow(*record) for record in records]
except (Exception, psycopg2.Error) as error:
logger.error(error)
@ -172,6 +184,25 @@ class OapenDB:
cursor.execute(query)
records = cursor.fetchall()
ret = [SuggestionRow(*record) for record in records]
except (Exception, psycopg2.Error) as error:
logger.error(error)
finally:
cursor.close()
return ret
def get_suggestions_for_item(self, handle) -> List[SuggestionRow]:
cursor = self.connection.cursor()
query = """
SELECT * FROM oapen_suggestions.suggestions
WHERE handle = \'%s\'
"""
ret = None
try:
cursor.execute(query, handle)
records = cursor.fetchall()
ret = records
except (Exception, psycopg2.Error) as error:
@ -251,7 +282,7 @@ class OapenDB:
cursor.execute(query)
records = cursor.fetchall()
ret = records
ret = [UrlRow(*record) for record in records]
except (Exception, psycopg2.Error) as error:
logger.error(error)

View File

@ -2,6 +2,12 @@ import re
import string
from typing import List
from .oapen_types import ( # pylint: disable=relative-beyond-top-level
NgramDict,
NgramRow,
OapenItem,
)
import nltk
import pandas as pd
from nltk import word_tokenize
@ -10,9 +16,6 @@ from .stopwords_processor import STOPWORDS
nltk.download("punkt")
from .oapen_types import NgramDict, NgramRowWithoutDate, OapenItem
def process_text(text):
p_text = "".join([c for c in text.lower() if c not in string.punctuation])
stopwords_regex = re.compile(r"\b%s\b" % r"\b|\b".join(map(re.escape, STOPWORDS)))
@ -24,19 +27,6 @@ def process_text(text):
return filtered_words
def make_df(data: List[OapenItem]):
df = pd.DataFrame(columns=["handle", "name", "lang", "text"])
for item in data:
text = process_text(item.text)
df.loc[len(df.index)] = [item.handle, item.name, item.language, text]
return df
def get_text_by_handle(df, handle):
return df.loc[df.handle == handle].text[0]
def sort_ngrams_by_count(ngrams: NgramDict):
return sorted(ngrams.items(), key=lambda item: item[1], reverse=True)
@ -51,11 +41,6 @@ def generate_ngram(text, n=3) -> NgramDict:
return dict(sort_ngrams_by_count(ngrams)) # return sorted by count
def generate_ngram_by_handle(df, handle, n=3) -> NgramDict:
text = get_text_by_handle(df, handle)
return generate_ngram(text, n)
def get_n_most_occuring(dic: NgramDict, n=100):
sorted_dict = dict(
sort_ngrams_by_count(dic)
@ -93,12 +78,16 @@ def get_similarity_score_by_dict_count(ngrams1: NgramDict, ngrams2: NgramDict) -
return repeated / total
def get_ngrams_for_items(
items: List[OapenItem], n=3, ngram_limit=30
) -> List[NgramRowWithoutDate]:
def get_ngrams_for_items(items: List[OapenItem], n=3, ngram_limit=30) -> List[NgramRow]:
rows = []
for item in items:
text = process_text(item.text)
ngrams = generate_ngram(text, n)
rows.append((item.handle, list(sort_ngrams_by_count(ngrams))[0:ngram_limit]))
row = NgramRow(
handle=item.handle,
name=item.name,
thumbnail=item.thumbnail,
ngrams=list(sort_ngrams_by_count(ngrams))[0:ngram_limit],
)
rows.append(row)
return rows

View File

@ -1,22 +1,12 @@
from datetime import datetime
from typing import Dict, List, Tuple, Union
from typing import Dict, List, NamedTuple
class OapenItem:
def __init__(
self, uuid, name, handle, expand, link, metadata, bitstreams, text: str
):
self.uuid = uuid
self.name = name
self.handle = handle
self.expand = expand
self.link = link
self.metadata = metadata
self.bitstreams = bitstreams
language = list(filter(lambda x: x["key"] == "dc.language", self.metadata))
self.language = None if len(language) == 0 else language[0]["value"]
self.text = text
class OapenItem(NamedTuple):
handle: str
name: str
thumbnail: str
text: str
def __eq__(self, other):
return self.handle == other.handle
@ -24,29 +14,44 @@ class OapenItem:
def __hash__(self):
return hash(self.handle, "handle")
class SuggestionRow(NamedTuple):
handle: str
suggestion: str
suggestion_name: str
suggestion_thumbnail: str
score: int
created_at: datetime = None
updated_at: datetime = None
SuggestionRowWithoutDate = Tuple[str, str, str, int]
SuggestionRowWithDate = Tuple[str, str, str, int, datetime, datetime]
SuggestionRow = Union[SuggestionRowWithDate, SuggestionRowWithoutDate]
def __eq__(self, other):
return self.handle == other.handle and self.suggestion == other.suggestion
def __hash__(self) -> int:
return hash((self.handle, self.suggestion))
class Ngram(NamedTuple):
ngram: str
count: int
class NgramRow(NamedTuple):
handle: str
name: str
thumbnail: str
ngrams: List[Ngram]
created_at: datetime = None
updated_at: datetime = None
def __eq__(self, other):
return self.handle == other.handle
def __hash__(self) -> int:
return hash(self.handle)
Ngram = Tuple[str, int]
NgramRowWithoutDate = Tuple[str, List[Ngram]]
NgramRowWithDate = Tuple[str, List[Ngram], datetime, datetime]
NgramRow = Union[NgramRowWithDate, NgramRowWithoutDate]
NgramDict = Dict[str, int]
UrlRow = Tuple[str, bool]
def transform_item_data(item, text) -> OapenItem:
return OapenItem(
item["uuid"],
item["name"],
item["handle"],
item["expand"],
item["link"],
item["metadata"],
item["bitstreams"],
text,
)
class UrlRow(NamedTuple):
url: str
completed: bool
updated_at: datetime = None

View File

@ -25,17 +25,20 @@ def create_schema(connection) -> None:
$$ language 'plpgsql';
CREATE TABLE IF NOT EXISTS oapen_suggestions.suggestions (
handle text,
name text,
suggestion text,
score int,
created_at timestamp default current_timestamp,
updated_at timestamp default current_timestamp,
handle text,
suggestion text,
suggestion_name text,
suggestion_thumbnail text,
score int,
created_at timestamp default current_timestamp,
updated_at timestamp default current_timestamp,
PRIMARY KEY (handle, suggestion)
);
CREATE TABLE IF NOT EXISTS oapen_suggestions.ngrams (
handle text PRIMARY KEY,
name text,
thumbnail text,
ngrams oapen_suggestions.ngram[],
created_at timestamp default current_timestamp,
updated_at timestamp default current_timestamp
@ -63,6 +66,7 @@ def create_schema(connection) -> None:
def drop_schema(connection) -> None:
logger.warn("WARNING: DROPPING DATABASE!")
cursor = connection.cursor()
cursor.execute(
"""

View File

@ -4,7 +4,7 @@ ITEMS_PER_IMPORT_THREAD = 25
IO_MAX_WORKERS = 5
# Delay for submitting new API call thread
HARVEST_THREAD_SPAWN_DELAY = 2
HARVEST_THREAD_SPAWN_DELAY = 5
# Size of list of items to process into ngrams per process
NGRAMS_PER_INSERT = 100

View File

@ -7,7 +7,7 @@ import time
import schedule
from clean import run as run_clean
from clean import seed_endpoints
from data.connection import get_connection
from data.connection import close_connection, get_connection
from data.oapen_db import OapenDB
from generate_suggestions import run as run_generate_suggestions
from logger.base_logger import logger
@ -16,12 +16,16 @@ from seed import run as run_seed
conn = get_connection()
db = OapenDB(conn)
logger.info("Daemon up")
def harvest():
conn = get_connection()
db = OapenDB(conn)
seed_endpoints(conn)
urls = db.get_incomplete_urls()
close_connection(conn)
if len(urls) > 0:
run_seed()
run_generate_suggestions()
@ -32,33 +36,31 @@ def refresh():
run_generate_suggestions()
def signal_handler(signal, frame):
conn.close()
logger.info("Daemon exiting.")
sys.exit(0)
def main():
def signal_handler(signal, frame):
logger.info("Daemon exiting.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
logger.info("Daemon up")
if int(os.environ["RUN_CLEAN"]) == 1 or (
not db.table_exists("suggestions")
or not db.table_exists("ngrams")
or not db.table_exists("endpoints")
):
run_clean()
harvest()
schedule.every().day.at("20:00").do(refresh)
schedule.every().sunday.at("22:00").do(harvest)
while True:
schedule.run_pending()
time.sleep(60)
signal.signal(signal.SIGINT, signal_handler)
logger.info("Daemon up")
conn = get_connection()
db = OapenDB(conn)
if int(os.environ["RUN_CLEAN"]) == 1 or (
not db.table_exists("suggestions")
or not db.table_exists("ngrams")
or not db.table_exists("endpoints")
):
run_clean()
harvest()
schedule.every().day.at("20:00").do(refresh)
schedule.every().sunday.at("22:00").do(harvest)
while True:
schedule.run_pending()
time.sleep(60)
logger.info("Daemon down")
if __name__ == "__main__":
main()

View File

@ -8,7 +8,7 @@ import config
from data.connection import close_connection, get_connection
from data.oapen_db import OapenDB
from logger.base_logger import logger
from model.oapen_types import NgramRow, SuggestionRow
from model.oapen_types import Ngram, NgramRow, SuggestionRow
from tqdm.auto import tqdm
# initial seed -> get suggestions on everything n^2
@ -18,33 +18,41 @@ from tqdm.auto import tqdm
# optimization: only suggest once per pair
def get_ngrams_list(arr: List[NgramRow]):
return [x[0] for x in arr[1][0 : min(len(arr[1]), config.TOP_K_NGRAMS_COUNT)]]
def truncate_ngrams_list(item: NgramRow) -> List[NgramRow]:
ngrams = [
Ngram(ngram=x[0], count=0)
for x in item.ngrams[0 : min(len(item.ngrams), config.TOP_K_NGRAMS_COUNT)]
]
return item._replace(ngrams=ngrams)
def suggestion_task(items, all_items, db_mutex, db):
def suggestion_task(items: List[NgramRow], all_items: List[NgramRow], db_mutex, db):
suggestions: List[SuggestionRow] = []
for item_a in items:
handle_a = item_a[0]
for item_b in all_items:
handle_b = item_b[0]
if handle_a == handle_b:
if item_a.handle == item_b.handle:
continue
ngrams_shared = len(list(filter(lambda x: x in item_b[1], item_a[1])))
score = len(list(filter(lambda x: x in item_b.ngrams, item_a.ngrams)))
if ngrams_shared >= config.SCORE_THRESHOLD:
suggestions.append((handle_a, handle_a, handle_b, ngrams_shared))
if score >= config.SCORE_THRESHOLD:
suggestions.append(
SuggestionRow(
handle=item_a.handle,
suggestion=item_b.handle,
suggestion_name=item_b.name,
suggestion_thumbnail=item_b.thumbnail,
score=score,
)
)
db_mutex.acquire()
db.add_many_suggestions(suggestions)
db_mutex.release()
if len(suggestions) > 0:
db_mutex.acquire()
db.add_many_suggestions(suggestions)
db_mutex.release()
return len(items)
def refresh(future, counter, pbar):
pbar.update(future.result())
counter["items_updated"] += future.result()
@ -78,9 +86,8 @@ def run():
time_start = time.perf_counter()
# Get only top k ngrams for all items before processing
for item in all_items:
ngrams = get_ngrams_list(item)
item = (item[0], ngrams)
for i in range(len(all_items)):
all_items[i] = truncate_ngrams_list(all_items[i])
chunks = [
all_items[i : i + config.SUGGESTIONS_MAX_ITEMS]

View File

@ -1,5 +1,6 @@
import concurrent.futures
import multiprocessing
import random
import signal
import sys
import threading
@ -89,7 +90,7 @@ def run():
future = io_executor.submit(harvest_task, url[0], item_queue)
future.add_done_callback(lambda x: refresh(x, pbar, counter))
producer_futures.append(future)
time.sleep(config.HARVEST_THREAD_SPAWN_DELAY)
time.sleep(random.randint(1, config.HARVEST_THREAD_SPAWN_DELAY))
for future in concurrent.futures.as_completed(producer_futures):
producers_done += 1

View File

@ -1,3 +1,4 @@
import itertools
import multiprocessing
from threading import Event
@ -13,22 +14,20 @@ def db_task(db: OapenDB, db_queue: multiprocessing.Queue, event: Event):
def insert_items(entries):
try:
urls = [e[0] for e in entries]
items = []
items = list(itertools.chain(*[e[1] for e in entries]))
for e in entries:
items += e[1]
logger.debug("(DB) - Inserting {0} item(s).".format(len(items)))
logger.info("(DB) - Inserting {0} item(s).".format(len(items)))
db.add_many_ngrams(items)
logger.debug("(DB) - Inserted {0} item(s).".format(len(items)))
logger.info("(DB) - Inserted {0} item(s).".format(len(items)))
for url in urls:
db.update_url(url, True)
return len(items)
return
except Exception as e:
logger.error(e)
return -1

View File

@ -33,7 +33,6 @@ def harvest_task(url: str, items: multiprocessing.JoinableQueue) -> int or None:
continue
except Exception as e:
logger.error("(IO) (will retry) - " + str(e))
time.sleep(RETRY_DELAY)
return ret

View File

@ -1,8 +1,10 @@
import multiprocessing
import queue
from typing import List
import model.ngrams as OapenEngine
from logger.base_logger import logger
from model.oapen_types import OapenItem
def ngrams_task(
@ -15,7 +17,8 @@ def ngrams_task(
try:
entry = item_queue.get_nowait()
url, items = entry[0], entry[1]
url: str = entry[0]
items: List[OapenItem] = entry[1]
ngrams = OapenEngine.get_ngrams_for_items(items)

View File

@ -1,28 +1,34 @@
import test_ngrams
import test_oapen
import test_stopwords
import test_ngrams
def run_test(run_msg, func):
print(run_msg, end = " ")
print(run_msg, end=" ")
func()
print("OK") # will throw on fail
def main():
print("Testing connection to OAPEN.")
try:
run_test("Attempting to get item [Embodying Contagion]:", test_oapen.test_get_item)
run_test("Attempting to get null item:", test_oapen.test_get_item_404)
run_test("Attempting to get collection limit by label [Knowledge Unlatched (KU)]:",
test_oapen.test_get_collection_limit)
run_test("Attempting to get null collection:", test_oapen.test_get_collection_404)
run_test(
"Attempting to get collection limit by id (ea93f8f0-430f-4a03-b7e2-5b06053585b0):",
test_oapen.test_get_collection_limit,
)
run_test(
"Attempting to get null collection:", test_oapen.test_get_collection_404
)
except Exception as e:
print("\nFailed:")
print(e)
print("\nTesting stopwords generation.")
try:
run_test("Testing stopwords correctly generated:",
test_stopwords.test_stopwords_contains_all)
run_test(
"Testing stopwords correctly generated:",
test_stopwords.test_stopwords_contains_all,
)
except Exception as e:
print("Failed:")
print(e)
@ -37,5 +43,6 @@ def main():
print("Failed:")
print(e)
if __name__ == "__main__":
main()

View File

@ -1,27 +1,13 @@
from typing import List
import data.oapen as OapenAPI
from model.oapen_types import OapenItem
def test_get_item():
item = OapenAPI.get_item("20.500.12657/47586")
assert isinstance(item, OapenItem)
assert item.name == "Embodying Contagion"
def test_get_item_404():
item: List[OapenItem] = OapenAPI.get_item("20.400.12657/47581")
assert len(item) == 0
def test_get_collection_limit():
collection = OapenAPI.get_collection_items_by_label(
"Knowledge Unlatched (KU)", limit=10
collection = OapenAPI.get_collection_items_by_id(
"ea93f8f0-430f-4a03-b7e2-5b06053585b0", limit=10
)
assert len(collection) <= 10
def test_get_collection_404():
collection = OapenAPI.get_collection_items_by_label("hahaha", limit=10)
assert len(collection) == 0
collection = OapenAPI.get_collection_items_by_id("hahaha", limit=10)
assert collection is None