From 3f63ef336f821a71abb86cb2043a73cca92d9921 Mon Sep 17 00:00:00 2001 From: eric Date: Sun, 15 Sep 2013 21:43:01 -0400 Subject: [PATCH] introduce pyepub --- pyepub/__init__.py | 383 +++++++++++++++++++++++++++++++++++++++++++++ pyepub/tests.py | 33 ++++ 2 files changed, 416 insertions(+) create mode 100644 pyepub/__init__.py create mode 100644 pyepub/tests.py diff --git a/pyepub/__init__.py b/pyepub/__init__.py new file mode 100644 index 00000000..f363b6e1 --- /dev/null +++ b/pyepub/__init__.py @@ -0,0 +1,383 @@ +import zipfile +import os +import re +import uuid +from StringIO import StringIO +from io import BytesIO +import datetime + +try: + import lxml.etree as ET +except ImportError: + import xml.etree.ElementTree as ET + +TMP = {"opf": None, "ncx": None} +FLO = None + +NAMESPACE = { + "dc": "{http://purl.org/dc/elements/1.1/}", + "opf": "{http://www.idpf.org/2007/opf}", + "ncx": "{http://www.daisy.org/z3986/2005/ncx/}" +} + +ET.register_namespace('dc', "http://purl.org/dc/elements/1.1/") +ET.register_namespace('opf', "http://www.idpf.org/2007/opf") +ET.register_namespace('ncx', "http://www.daisy.org/z3986/2005/ncx/") + + +class InvalidEpub(Exception): + pass + + +class EPUB(zipfile.ZipFile): + """ + EPUB file representation class. + + """ + def __init__(self, filename, mode="r"): + """ + Global Init Switch + + :type filename: str or StringIO() or file like object for read or add + :param filename: File to be processed + :type mode: str + :param mode: "w" or "r", mode to init the zipfile + """ + if mode == "w": + if not isinstance(filename, StringIO): + assert not os.path.exists(filename), \ + "Can't overwrite existing file: %s" % filename + self.filename = filename + zipfile.ZipFile.__init__(self, self.filename, mode="w") + self.__init__write() + elif mode == "a": + assert not isinstance(filename, StringIO), \ + "Can't append to StringIO object, use write instead: %s" % filename + if isinstance(filename, str): + tmp = open(filename, "r") # ensure that the input file is never-ever overwritten + else: + # filename is already a file like object + tmp=filename + tmp.seek(0) + initfile = StringIO() + initfile.write(tmp.read()) + tmp.close() + zipfile.ZipFile.__init__(self, initfile, mode="a") + self.__init__read(initfile) + else: # retrocompatibility? + zipfile.ZipFile.__init__(self, filename, mode="r") + self.__init__read(filename) + + def __init__read(self, filename): + """ + Constructor to initialize the zipfile in read-only mode + + :type filename: str or StringIO() + :param filename: File to be processed + """ + self.filename = filename + try: + # Read the container + f = self.read("META-INF/container.xml") + except KeyError: + # By specification, there MUST be a container.xml in EPUB + print "The %s file is not a valid OCF." % str(filename) + raise InvalidEpub + try: + # There MUST be a full path attribute on first grandchild... + self.opf_path = ET.fromstring(f)[0][0].get("full-path") + except IndexError: + # ...else the file is invalid. + print "The %s file is not a valid OCF." % str(filename) + raise InvalidEpub + + # NEW: json-able info tree + self.info = {"metadata": {}, + "manifest": [], + "spine": [], + "guide": []} + + self.root_folder = os.path.dirname(self.opf_path) # Used to compose absolute paths for reading in zip archive + self.opf = ET.fromstring(self.read(self.opf_path)) # OPF tree + + ns = re.compile(r'\{.*?\}') # RE to strip {namespace} mess + + # Iterate over section, fill EPUB.info["metadata"] dictionary + for i in self.opf.find("{0}metadata".format(NAMESPACE["opf"])): + tag = ns.sub('', i.tag) + if tag not in self.info["metadata"]: + self.info["metadata"][tag] = i.text or i.attrib + else: + self.info["metadata"][tag] = [self.info["metadata"][tag], i.text or i.attrib] + + # Get id of the cover in + try: + coverid = self.opf.find('.//{0}meta[@name="cover"]'.format(NAMESPACE["opf"])).get("content") + except AttributeError: + # It's a facultative field, after all + coverid = None + self.cover = coverid # This is the manifest ID of the cover + + self.info["manifest"] = [{"id": x.get("id"), # Build a list of manifest items + "href": x.get("href"), + "mimetype": x.get("media-type")} + for x in self.opf.find("{0}manifest".format(NAMESPACE["opf"])) if x.get("id")] + + self.info["spine"] = [{"idref": x.get("idref")} # Build a list of spine items + for x in self.opf.find("{0}spine".format(NAMESPACE["opf"])) if x.get("idref")] + try: + self.info["guide"] = [{"href": x.get("href"), # Build a list of guide items + "type": x.get("type"), + "title": x.get("title")} + for x in self.opf.find("{0}guide".format(NAMESPACE["opf"])) if x.get("href")] + except TypeError: # The guide element is optional + self.info["guide"] = None + + # Document identifier + try: + self.id = self.opf.find('.//{0}identifier[@id="{1}"]'.format(NAMESPACE["dc"], + self.opf.get("unique-identifier"))).text + except AttributeError: + raise InvalidEpub # Cannot process an EPUB without unique-identifier + # attribute of the package element + # Get and parse the TOC + toc_id = self.opf[2].get("toc") + expr = ".//{0}item[@id='{1:s}']".format(NAMESPACE["opf"], toc_id) + toc_name = self.opf.find(expr).get("href") + self.ncx_path = os.path.join(self.root_folder, toc_name) + self.ncx = ET.fromstring(self.read(self.ncx_path)) + self.contents = [{"name": i[0][0].text or "None", # Build a list of toc elements + "src": os.path.join(self.root_folder, i[1].get("src")), + "id":i.get("id")} + for i in self.ncx.iter("{0}navPoint".format(NAMESPACE["ncx"]))] # The iter method + # loops over nested + # navPoints + + def __init__write(self): + """ + Init an empty EPUB + + """ + self.opf_path = "OEBPS/content.opf" # Define a default folder for contents + self.ncx_path = "OEBPS/toc.ncx" + self.root_folder = "OEBPS" + self.uid = '%s' % uuid.uuid4() + + self.info = {"metadata": {}, + "manifest": [], + "spine": [], + "guide": []} + + self.writestr('mimetype', "application/epub+zip") + self.writestr('META-INF/container.xml', self._containerxml()) + self.info["metadata"]["creator"] = "py-clave server" + self.info["metadata"]["title"] = "" + self.info["metadata"]["language"] = "" + + # Problem is: you can't overwrite file contents with python ZipFile + # so you must add contents BEFORE finalizing the file + # calling close() method. + + self.opf = ET.fromstring(self._init_opf()) # opf property is always a ElementTree + self.ncx = ET.fromstring(self._init_ncx()) # so is ncx. Consistent with self.(opf|ncx) built by __init_read() + + self.writestr(self.opf_path, ET.tostring(self.opf, encoding="UTF-8")) # temporary opf & ncx + self.writestr(self.ncx_path, ET.tostring(self.ncx, encoding="UTF-8")) # will be re-init on close() + + def close(self): + if self.fp is None: # Check file status + return + if self.mode == "r": # check file mode + zipfile.ZipFile.close(self) + return + else: + print "epub closing" + try: + global TMP # in-memory copy of existing opf-ncx. When the epub gets re-init, + # it loses track of modifications + TMP["opf"] = self.opf + TMP["ncx"] = self.ncx + self._safeclose() + zipfile.ZipFile.close(self) # give back control to superclass close method + except RuntimeError: # zipfile.__del__ destructor calls close(), ignore + return + + def _safeclose(self): + """ + Preliminary operations before closing an EPUB + Writes the empty or modified opf-ncx files before closing the zipfile + """ + if self.mode != "r": + self._delete(self.opf_path, self.ncx_path) # see following horrible hack: + # zipfile cannot manage overwriting on the archive + # this basically RECREATES the epub from scratch + # and is sure slow as hell + # ... and a recipe for disaster. + self.opf = TMP["opf"] + self.ncx = TMP["ncx"] # get back the temporary copies + + self.writestr(self.opf_path, ET.tostring(self.opf, encoding="UTF-8")) + self.writestr(self.ncx_path, ET.tostring(self.ncx, encoding="UTF-8")) + self.__init__read(FLO) # We may still need info dict of a closed EPUB + + def _init_opf(self): + """ + Constructor for empty OPF + :type return: xml.minidom.Document + :return: xml.minidom.Document + """ + today = datetime.date.today() + opf_tmpl = """ + + + {uid} + {title} + {lang} + {date} + + + + + + + + + """ + + doc = opf_tmpl.format(uid=self.uid, + date=today, + title=self.info["metadata"]["title"], + lang=self.info["metadata"]["language"]) + return doc + + def _init_ncx(self): + """ + Constructor for empty OPF + :type return: xml.minidom.Document + :return: xml.minidom.Document + """ + ncx_tmpl = """ + + + + + + + + + + {title} + + + + """ + + ncx = ncx_tmpl.format(uid=self.uid, title="Default") + return ncx + + def _containerxml(self): + template = """ + + + + + """ + return template % self.opf_path + + def _delete(self, *paths): + """ + Delete archive member + Basically a hack: zince zipfile can't natively overwrite or delete resources, + a new archive is created from scratch to a StringIO file object. + The starting file is *never* overwritten. + To write the new file to disk, use the writefiletodisk() instance method. + + :type paths: str + :param paths: files to be deleted inside EPUB file + """ + global FLO # File-Like-Object: this is obviously wrong: any better idea? + # Also, the variable name is questionable + FLO = StringIO() + new_zip = zipfile.ZipFile(FLO, 'w') + for item in self.infolist(): + if item.filename not in paths: + try: + new_zip.writestr(item.filename, self.read(item.filename)) + except zipfile.BadZipfile: + pass + zipfile.ZipFile.close(self) # Don't know why + new_zip.close() # but it works, don't ever touch + zipfile.ZipFile.__init__(self, FLO, mode="a") + + def additem(self, fileObject, href, mediatype): + """ + Add a file to manifest only + + :type fileObject: StringIO + :param fileObject: + :type href: str + :param href: + :type mediatype: str + :param mediatype: + """ + assert self.mode != "r", "%s is not writable" % self + element = ET.Element("item", + attrib={"id": "id_"+str(uuid.uuid4())[:5], "href": href, "media-type": mediatype}) + + try: + self.writestr(os.path.join(self.root_folder, element.attrib["href"]), fileObject.getvalue()) + except AttributeError: + self.writestr(os.path.join(self.root_folder, element.attrib["href"]), fileObject) + self.opf[1].append(element) + return element.attrib["id"] + + def addpart(self, fileObject, href, mediatype, position=None, reftype="text", linear="yes"): + """ + Add a file as part of the epub file, i.e. to manifest and spine (and guide?) + + :param fileObject: file to be inserted + :param href: path inside the epub archive + :param mediatype: mimetype of the fileObject + :type position: int + :param position: order in spine [from 0 to len(opf/manifest))] + :param linear: linear="yes" or "no" + :param reftype: type to assign in guide/reference + """ + assert self.mode != "r", "%s is not writable" % self + fileid = self.additem(fileObject, href, mediatype) + itemref = ET.Element("itemref", attrib={"idref": fileid, "linear": linear}) + reference = ET.Element("reference", attrib={"title": href, "href": href, "type": reftype}) + if position is None or position>len(self.opf[2]): + self.opf[2].append(itemref) + self.opf[3].append(reference) + else: + self.opf[2].insert(position, itemref) + if len(self.opf[3]) >= position+1: + self.opf[3].insert(position, reference) + + def writetodisk(self, filename): + """ + Writes the in-memory archive to disk + + :type filename: str + :param filename: name of the file to be writte + """ + if self.mode == "r": + # The inferface should be consistent + new_zip = zipfile.ZipFile(filename, 'w') + for item in self.infolist(): + new_zip.writestr(item.filename, self.read(item.filename)) + new_zip.close() + return + # this is a bad habit + f = open(filename, "w") + try: + self.filename.seek(0) + except AttributeError: # file must be closed first + self.close() + self.filename.seek(0) + f.write(self.filename.read()) + f.close() diff --git a/pyepub/tests.py b/pyepub/tests.py new file mode 100644 index 00000000..f117cc1a --- /dev/null +++ b/pyepub/tests.py @@ -0,0 +1,33 @@ +import unittest +import urllib2 +from tempfile import NamedTemporaryFile +from StringIO import StringIO +from . import EPUB + + +class EpubTests(unittest.TestCase): + + def setUp(self): + # get a small epub test file as a file-like object + self.epub2file = NamedTemporaryFile(delete=False) + test_file_content = urllib2.urlopen('http://www.hxa.name/articles/content/EpubGuide-hxa7241.epub') + self.epub2file.write(test_file_content.read()) + self.epub2file.seek(0) + + + def test_instantiation(self): + epub=EPUB(self.epub2file) + self.assertNotEqual(epub.filename, None) + self.assertEqual(len(epub.opf),4) + self.assertEqual(len(epub.opf[0]),11) #metadata items + self.assertEqual(len(epub.opf[1]),11) #manifest items + self.assertEqual(len(epub.opf[2]),8) #spine items + self.assertEqual(len(epub.opf[3]),3) #guide items + + def test_addpart(self): + epub=EPUB(self.epub2file,mode='a') + self.assertNotEqual(epub.filename, None) + part = StringIO('') + epub.addpart(part, "testpart.xhtml", "application/xhtml+xml", 2) + self.assertEqual(len(epub.opf[2]),9) #spine items +