Integrated current alt-text package structure into benchmarking. Completed benchmarking functionality.
commit
224b90b233
|
@ -4,5 +4,10 @@
|
|||
/books
|
||||
/tests/outputs
|
||||
|
||||
**/empty_alt_text.txt
|
||||
**/empty_alt_text_sample.txt
|
||||
**/book_outputs
|
||||
**/downloaded_books
|
||||
|
||||
**/keys.py
|
||||
**/vertex-key.json
|
|
@ -83,7 +83,7 @@ class AltText(ABC):
|
|||
|
||||
# PARSING METHODS
|
||||
@abstractmethod
|
||||
def parse(self, data: str) -> bs4.BeautifulSoup | epub.EpubBook:
|
||||
def parse(self, data: str) -> typing.Union[bs4.BeautifulSoup, epub.EpubBook]:
|
||||
"""Parses data into a BeautifulSoup or EpubBook object.
|
||||
|
||||
Args:
|
||||
|
@ -95,7 +95,9 @@ class AltText(ABC):
|
|||
pass
|
||||
|
||||
@abstractmethod
|
||||
def parseFile(self, filepath: str) -> bs4.BeautifulSoup | epub.EpubBook:
|
||||
def parseFile(
|
||||
self, filepath: str
|
||||
) -> typing.Union[bs4.BeautifulSoup, epub.EpubBook]:
|
||||
"""Parses data from a file into a BeautifulSoup or EpubBook object.
|
||||
|
||||
Args:
|
||||
|
@ -162,7 +164,7 @@ class AltText(ABC):
|
|||
pass
|
||||
|
||||
@abstractmethod
|
||||
def export(self) -> str | epub.EpubBook:
|
||||
def export(self) -> typing.Union[str, epub.EpubBook]:
|
||||
"""Exports the current data.
|
||||
|
||||
Returns:
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
### DESCENGINE CLASSES
|
||||
class DescEngine(ABC):
|
||||
@abstractmethod
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class LangEngine(ABC):
|
||||
@abstractmethod
|
||||
def _completion(self, prompt: str) -> str:
|
||||
|
|
|
@ -2,6 +2,7 @@ import requests
|
|||
|
||||
from .langengine import LangEngine
|
||||
|
||||
|
||||
class PrivateGPT(LangEngine):
|
||||
def __init__(self, host) -> None:
|
||||
self.host = host
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class OCREngine(ABC):
|
||||
@abstractmethod
|
||||
def genChars(self, imgData: bytes, src: str, context: str = None) -> str:
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
# automate.py - tests the generation of images and benchmarks the systems
|
||||
# run getbooks.py then downloadbooks.py with input (.txt file), use output for next steps
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import csv
|
||||
|
||||
import keys
|
||||
|
||||
sys.path.append("../")
|
||||
from src.alttext.alttext import AltTextHTML
|
||||
from src.alttext.descengine.replicateapi import ReplicateAPI
|
||||
from src.alttext.ocrengine.tesseract import Tesseract
|
||||
from src.alttext.langengine.openaiapi import OpenAIAPI
|
||||
|
||||
|
||||
class AltTextGenerator(AltTextHTML):
|
||||
# Use genAltTextV2
|
||||
# ADD benchmark time stamps
|
||||
def genAltTextV2(self, src: str, book_id, image_path, book_path) -> str:
|
||||
print(f"PROCESSING BOOK {book_id} | IMAGE {image_path}")
|
||||
status = False
|
||||
# Start total timing
|
||||
total_start_time = time.time()
|
||||
|
||||
imgdata = self.getImgData(src)
|
||||
context = self.getContext(self.getImg(src))
|
||||
|
||||
# Description generation timing
|
||||
# print("starting desc")
|
||||
genDesc_start_time = time.time()
|
||||
desc = self.genDesc(imgdata, src, context)
|
||||
genDesc_end_time = time.time()
|
||||
genDesc_total_time = genDesc_end_time - genDesc_start_time
|
||||
|
||||
# OCR processing timing
|
||||
# print("starting ocr")
|
||||
ocr_start_time = time.time()
|
||||
chars = self.genChars(imgdata, src).strip()
|
||||
ocr_end_time = time.time()
|
||||
ocr_total_time = ocr_end_time - ocr_start_time
|
||||
|
||||
# Refinement processing timing
|
||||
# print("starting refinement")
|
||||
refine_start_time = time.time()
|
||||
refined_desc = self.langEngine.refineAlt(desc, chars, context, None)
|
||||
refine_end_time = time.time()
|
||||
refine_total_time = refine_end_time - refine_start_time
|
||||
|
||||
# End total timing
|
||||
total_end_time = time.time()
|
||||
total_overall_time = total_end_time - total_start_time
|
||||
|
||||
# Record dictionary to store all the timing data
|
||||
record = {
|
||||
"book": book_id,
|
||||
"image": image_path,
|
||||
"path": book_path,
|
||||
"status": status, # Set false if failed, set true is worked
|
||||
"beforeContext": context[0],
|
||||
"afterContext": context[1],
|
||||
"genDesc": desc,
|
||||
"genDesc-Start": genDesc_start_time,
|
||||
"genDesc-End": genDesc_end_time,
|
||||
"genDesc-Time": genDesc_total_time,
|
||||
"genOCR": chars,
|
||||
"genOCR-Start": ocr_start_time,
|
||||
"genOCR-End": ocr_end_time,
|
||||
"genOCR-Time": ocr_total_time,
|
||||
"refineDesc": refined_desc,
|
||||
"refineDesc-Time": refine_total_time,
|
||||
"totalTime": total_overall_time,
|
||||
}
|
||||
|
||||
print(f"FINISHED BOOK {book_id} | IMAGE {image_path}")
|
||||
|
||||
return record
|
||||
|
||||
|
||||
def generateCSV(csv_file_path: str, benchmark_records: list[dict]):
|
||||
fieldnames = benchmark_records[0].keys()
|
||||
|
||||
with open(csv_file_path, mode="w", newline="", encoding="utf-8") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for record in benchmark_records:
|
||||
writer.writerow(record)
|
||||
|
||||
print(f"CSV file has been generated at: {csv_file_path}")
|
||||
return
|
||||
|
||||
|
||||
def benchmarkBooks(booksDir: str, srcsDir: str):
|
||||
generator = AltTextGenerator(
|
||||
ReplicateAPI(keys.ReplicateEricKey()),
|
||||
Tesseract(),
|
||||
OpenAIAPI(keys.OpenAIKey(), "gpt-3.5-turbo"),
|
||||
)
|
||||
|
||||
records = []
|
||||
for bookId in os.listdir(booksDir):
|
||||
try:
|
||||
bookPath = os.path.join(booksDir, bookId)
|
||||
|
||||
htmlpath = None
|
||||
for object in os.listdir(bookPath):
|
||||
if object.endswith(".html"):
|
||||
htmlpath = os.path.join(bookPath, object)
|
||||
break
|
||||
generator.parseFile(htmlpath)
|
||||
|
||||
srcs = []
|
||||
with open(f"{srcsDir}/ebook_{bookId}.txt", "r") as file:
|
||||
for line in file:
|
||||
srcs.append(line.split(f"{bookId}/")[1].strip())
|
||||
|
||||
for src in srcs:
|
||||
try:
|
||||
record = generator.genAltTextV2(src, bookId, src, bookPath)
|
||||
records.append(record)
|
||||
except Exception as e:
|
||||
print(f"Error processing image {src} in book {bookId}: {e}")
|
||||
except Exception as e:
|
||||
print(f"Error processing book {bookId}: {e}")
|
||||
|
||||
generateCSV("test_benchmark.csv", records)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("RUNNING AUTOMATE.PY")
|
||||
benchmarkBooks("./downloaded_books", "./book_outputs")
|
|
@ -0,0 +1,71 @@
|
|||
# The goal of this file is to download the books and unzip them to be used by automate.py!
|
||||
|
||||
import os
|
||||
import requests
|
||||
import zipfile
|
||||
import re
|
||||
|
||||
folder_path = "book_outputs"
|
||||
download_folder = "downloaded_books/download_files"
|
||||
extraction_folder = "downloaded_books"
|
||||
|
||||
|
||||
def download_and_unzip_books(folder_path, download_folder, extraction_folder):
|
||||
base_url = "https://www.gutenberg.org/cache/epub/{book_id}/pg{book_id}-h.zip"
|
||||
|
||||
# Ensure the download and extraction folders exist
|
||||
if not os.path.exists(download_folder):
|
||||
os.makedirs(download_folder)
|
||||
if not os.path.exists(extraction_folder):
|
||||
os.makedirs(extraction_folder)
|
||||
|
||||
# Iterate through each text file in the folder
|
||||
for filename in os.listdir(folder_path):
|
||||
if filename.endswith(".txt"):
|
||||
# Use regex to extract only the numeric part of the book ID
|
||||
match = re.search(r"\d+", filename)
|
||||
if match:
|
||||
book_id = match.group()
|
||||
zip_file_path = os.path.join(download_folder, f"{book_id}.zip")
|
||||
|
||||
# Check if the zip file already exists
|
||||
if not os.path.isfile(zip_file_path):
|
||||
url = base_url.format(book_id=book_id)
|
||||
|
||||
# Download the zip file
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status() # Raise an error for bad responses
|
||||
|
||||
# Save the zip file to the specified download folder
|
||||
with open(zip_file_path, "wb") as zip_file:
|
||||
zip_file.write(response.content)
|
||||
print(
|
||||
f"Downloaded {book_id}.zip successfully to {download_folder}."
|
||||
)
|
||||
except requests.RequestException as e:
|
||||
print(f"Error downloading {book_id}.zip: {e}")
|
||||
else:
|
||||
print(f"{book_id}.zip already exists. Skipping download.")
|
||||
|
||||
# Check if the book's extraction folder already exists
|
||||
book_extraction_folder = os.path.join(extraction_folder, book_id)
|
||||
if not os.path.exists(book_extraction_folder):
|
||||
try:
|
||||
# Unzip the file into the specified extraction folder
|
||||
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
|
||||
zip_ref.extractall(book_extraction_folder)
|
||||
print(f"Extracted {book_id}.zip to {book_extraction_folder}.")
|
||||
except zipfile.BadZipFile:
|
||||
print(
|
||||
f"Error unzipping {book_id}.zip: The file may be corrupt or not a zip file."
|
||||
)
|
||||
else:
|
||||
print(
|
||||
f"Extraction folder for {book_id} already exists. Skipping extraction."
|
||||
)
|
||||
else:
|
||||
print(f"No book ID found in {filename}")
|
||||
|
||||
|
||||
download_and_unzip_books(folder_path, download_folder, extraction_folder)
|
|
@ -0,0 +1,34 @@
|
|||
# Used to chunk the empty_alt_text.txt into multiple different more digestable .txt files
|
||||
# Will potentially eventually be used to upload from the file right into a database of books
|
||||
# Then will update the file paths, download & install the books with images
|
||||
|
||||
import os
|
||||
|
||||
input_file = "./empty_alt_text_sample.txt" # The file path of whatever initial .txt you are working with
|
||||
output_folder = "./book_outputs"
|
||||
|
||||
|
||||
def createIndividualBookFiles(input_file, output_folder):
|
||||
# Ensure the output folder exists
|
||||
if not os.path.exists(output_folder):
|
||||
os.makedirs(output_folder)
|
||||
|
||||
# Keep track of the last book number processed
|
||||
last_book_number = None
|
||||
|
||||
with open(input_file, "r") as file:
|
||||
for line in file:
|
||||
book_number = line.split()[0] # Extracting book number
|
||||
# Check if this line is for a new book
|
||||
if book_number != last_book_number:
|
||||
output_file_name = f"ebook_{book_number}.txt"
|
||||
output_path = os.path.join(output_folder, output_file_name)
|
||||
# print(f"Creating/Updating file for book {book_number}")
|
||||
last_book_number = book_number
|
||||
|
||||
# Append to the file (creates a new file if it doesn't exist)
|
||||
with open(output_path, "a") as output_file:
|
||||
output_file.write(line)
|
||||
|
||||
|
||||
createIndividualBookFiles(input_file, output_folder)
|
Loading…
Reference in New Issue