diff --git a/.gitignore b/.gitignore index e2f3719..e90d0df 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,10 @@ /books /tests/outputs +**/empty_alt_text.txt +**/empty_alt_text_sample.txt +**/book_outputs +**/downloaded_books + **/keys.py **/vertex-key.json \ No newline at end of file diff --git a/src/alttext/alttext.py b/src/alttext/alttext.py index 9fd644d..a7d37ab 100644 --- a/src/alttext/alttext.py +++ b/src/alttext/alttext.py @@ -83,7 +83,7 @@ class AltText(ABC): # PARSING METHODS @abstractmethod - def parse(self, data: str) -> bs4.BeautifulSoup | epub.EpubBook: + def parse(self, data: str) -> typing.Union[bs4.BeautifulSoup, epub.EpubBook]: """Parses data into a BeautifulSoup or EpubBook object. Args: @@ -95,7 +95,9 @@ class AltText(ABC): pass @abstractmethod - def parseFile(self, filepath: str) -> bs4.BeautifulSoup | epub.EpubBook: + def parseFile( + self, filepath: str + ) -> typing.Union[bs4.BeautifulSoup, epub.EpubBook]: """Parses data from a file into a BeautifulSoup or EpubBook object. Args: @@ -162,7 +164,7 @@ class AltText(ABC): pass @abstractmethod - def export(self) -> str | epub.EpubBook: + def export(self) -> typing.Union[str, epub.EpubBook]: """Exports the current data. Returns: diff --git a/src/alttext/descengine/descengine.py b/src/alttext/descengine/descengine.py index 94b4f37..de157cb 100644 --- a/src/alttext/descengine/descengine.py +++ b/src/alttext/descengine/descengine.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod + ### DESCENGINE CLASSES class DescEngine(ABC): @abstractmethod diff --git a/src/alttext/langengine/langengine.py b/src/alttext/langengine/langengine.py index 599fd12..c76fb09 100644 --- a/src/alttext/langengine/langengine.py +++ b/src/alttext/langengine/langengine.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod + class LangEngine(ABC): @abstractmethod def _completion(self, prompt: str) -> str: @@ -99,4 +100,4 @@ class LangEngine(ABC): Returns: bool: True if successful. """ - pass \ No newline at end of file + pass diff --git a/src/alttext/langengine/privategpt.py b/src/alttext/langengine/privategpt.py index 7f18c06..ef91784 100644 --- a/src/alttext/langengine/privategpt.py +++ b/src/alttext/langengine/privategpt.py @@ -2,6 +2,7 @@ import requests from .langengine import LangEngine + class PrivateGPT(LangEngine): def __init__(self, host) -> None: self.host = host diff --git a/src/alttext/ocrengine/ocrengine.py b/src/alttext/ocrengine/ocrengine.py index f1dcdc5..3097936 100644 --- a/src/alttext/ocrengine/ocrengine.py +++ b/src/alttext/ocrengine/ocrengine.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod + class OCREngine(ABC): @abstractmethod def genChars(self, imgData: bytes, src: str, context: str = None) -> str: @@ -13,4 +14,4 @@ class OCREngine(ABC): Returns: str: Characters found in an image. """ - pass \ No newline at end of file + pass diff --git a/tests/automate.py b/tests/automate.py new file mode 100644 index 0000000..bed06df --- /dev/null +++ b/tests/automate.py @@ -0,0 +1,132 @@ +# automate.py - tests the generation of images and benchmarks the systems +# run getbooks.py then downloadbooks.py with input (.txt file), use output for next steps + +import os +import sys +import time +import csv + +import keys + +sys.path.append("../") +from src.alttext.alttext import AltTextHTML +from src.alttext.descengine.replicateapi import ReplicateAPI +from src.alttext.ocrengine.tesseract import Tesseract +from src.alttext.langengine.openaiapi import OpenAIAPI + + +class AltTextGenerator(AltTextHTML): + # Use genAltTextV2 + # ADD benchmark time stamps + def genAltTextV2(self, src: str, book_id, image_path, book_path) -> str: + print(f"PROCESSING BOOK {book_id} | IMAGE {image_path}") + status = False + # Start total timing + total_start_time = time.time() + + imgdata = self.getImgData(src) + context = self.getContext(self.getImg(src)) + + # Description generation timing + # print("starting desc") + genDesc_start_time = time.time() + desc = self.genDesc(imgdata, src, context) + genDesc_end_time = time.time() + genDesc_total_time = genDesc_end_time - genDesc_start_time + + # OCR processing timing + # print("starting ocr") + ocr_start_time = time.time() + chars = self.genChars(imgdata, src).strip() + ocr_end_time = time.time() + ocr_total_time = ocr_end_time - ocr_start_time + + # Refinement processing timing + # print("starting refinement") + refine_start_time = time.time() + refined_desc = self.langEngine.refineAlt(desc, chars, context, None) + refine_end_time = time.time() + refine_total_time = refine_end_time - refine_start_time + + # End total timing + total_end_time = time.time() + total_overall_time = total_end_time - total_start_time + + # Record dictionary to store all the timing data + record = { + "book": book_id, + "image": image_path, + "path": book_path, + "status": status, # Set false if failed, set true is worked + "beforeContext": context[0], + "afterContext": context[1], + "genDesc": desc, + "genDesc-Start": genDesc_start_time, + "genDesc-End": genDesc_end_time, + "genDesc-Time": genDesc_total_time, + "genOCR": chars, + "genOCR-Start": ocr_start_time, + "genOCR-End": ocr_end_time, + "genOCR-Time": ocr_total_time, + "refineDesc": refined_desc, + "refineDesc-Time": refine_total_time, + "totalTime": total_overall_time, + } + + print(f"FINISHED BOOK {book_id} | IMAGE {image_path}") + + return record + + +def generateCSV(csv_file_path: str, benchmark_records: list[dict]): + fieldnames = benchmark_records[0].keys() + + with open(csv_file_path, mode="w", newline="", encoding="utf-8") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for record in benchmark_records: + writer.writerow(record) + + print(f"CSV file has been generated at: {csv_file_path}") + return + + +def benchmarkBooks(booksDir: str, srcsDir: str): + generator = AltTextGenerator( + ReplicateAPI(keys.ReplicateEricKey()), + Tesseract(), + OpenAIAPI(keys.OpenAIKey(), "gpt-3.5-turbo"), + ) + + records = [] + for bookId in os.listdir(booksDir): + try: + bookPath = os.path.join(booksDir, bookId) + + htmlpath = None + for object in os.listdir(bookPath): + if object.endswith(".html"): + htmlpath = os.path.join(bookPath, object) + break + generator.parseFile(htmlpath) + + srcs = [] + with open(f"{srcsDir}/ebook_{bookId}.txt", "r") as file: + for line in file: + srcs.append(line.split(f"{bookId}/")[1].strip()) + + for src in srcs: + try: + record = generator.genAltTextV2(src, bookId, src, bookPath) + records.append(record) + except Exception as e: + print(f"Error processing image {src} in book {bookId}: {e}") + except Exception as e: + print(f"Error processing book {bookId}: {e}") + + generateCSV("test_benchmark.csv", records) + + +if __name__ == "__main__": + print("RUNNING AUTOMATE.PY") + benchmarkBooks("./downloaded_books", "./book_outputs") diff --git a/tests/downloadbooks.py b/tests/downloadbooks.py new file mode 100644 index 0000000..89398ba --- /dev/null +++ b/tests/downloadbooks.py @@ -0,0 +1,71 @@ +# The goal of this file is to download the books and unzip them to be used by automate.py! + +import os +import requests +import zipfile +import re + +folder_path = "book_outputs" +download_folder = "downloaded_books/download_files" +extraction_folder = "downloaded_books" + + +def download_and_unzip_books(folder_path, download_folder, extraction_folder): + base_url = "https://www.gutenberg.org/cache/epub/{book_id}/pg{book_id}-h.zip" + + # Ensure the download and extraction folders exist + if not os.path.exists(download_folder): + os.makedirs(download_folder) + if not os.path.exists(extraction_folder): + os.makedirs(extraction_folder) + + # Iterate through each text file in the folder + for filename in os.listdir(folder_path): + if filename.endswith(".txt"): + # Use regex to extract only the numeric part of the book ID + match = re.search(r"\d+", filename) + if match: + book_id = match.group() + zip_file_path = os.path.join(download_folder, f"{book_id}.zip") + + # Check if the zip file already exists + if not os.path.isfile(zip_file_path): + url = base_url.format(book_id=book_id) + + # Download the zip file + try: + response = requests.get(url) + response.raise_for_status() # Raise an error for bad responses + + # Save the zip file to the specified download folder + with open(zip_file_path, "wb") as zip_file: + zip_file.write(response.content) + print( + f"Downloaded {book_id}.zip successfully to {download_folder}." + ) + except requests.RequestException as e: + print(f"Error downloading {book_id}.zip: {e}") + else: + print(f"{book_id}.zip already exists. Skipping download.") + + # Check if the book's extraction folder already exists + book_extraction_folder = os.path.join(extraction_folder, book_id) + if not os.path.exists(book_extraction_folder): + try: + # Unzip the file into the specified extraction folder + with zipfile.ZipFile(zip_file_path, "r") as zip_ref: + zip_ref.extractall(book_extraction_folder) + print(f"Extracted {book_id}.zip to {book_extraction_folder}.") + except zipfile.BadZipFile: + print( + f"Error unzipping {book_id}.zip: The file may be corrupt or not a zip file." + ) + else: + print( + f"Extraction folder for {book_id} already exists. Skipping extraction." + ) + else: + print(f"No book ID found in {filename}") + + +download_and_unzip_books(folder_path, download_folder, extraction_folder) diff --git a/tests/getbooks.py b/tests/getbooks.py new file mode 100644 index 0000000..bcbfb03 --- /dev/null +++ b/tests/getbooks.py @@ -0,0 +1,34 @@ +# Used to chunk the empty_alt_text.txt into multiple different more digestable .txt files +# Will potentially eventually be used to upload from the file right into a database of books +# Then will update the file paths, download & install the books with images + +import os + +input_file = "./empty_alt_text_sample.txt" # The file path of whatever initial .txt you are working with +output_folder = "./book_outputs" + + +def createIndividualBookFiles(input_file, output_folder): + # Ensure the output folder exists + if not os.path.exists(output_folder): + os.makedirs(output_folder) + + # Keep track of the last book number processed + last_book_number = None + + with open(input_file, "r") as file: + for line in file: + book_number = line.split()[0] # Extracting book number + # Check if this line is for a new book + if book_number != last_book_number: + output_file_name = f"ebook_{book_number}.txt" + output_path = os.path.join(output_folder, output_file_name) + # print(f"Creating/Updating file for book {book_number}") + last_book_number = book_number + + # Append to the file (creates a new file if it doesn't exist) + with open(output_path, "a") as output_file: + output_file.write(line) + + +createIndividualBookFiles(input_file, output_folder)