Merge pull request #210 from Gluejar/insert_dated_copyright
Insert dated copyright -> Se tests and main tests passed on my laptop.pull/1/head
commit
ed1f17169d
|
@ -1,4 +1,5 @@
|
|||
import requests
|
||||
import random
|
||||
from django.conf import settings
|
||||
from urllib import quote
|
||||
from functools import partial
|
||||
|
@ -48,8 +49,15 @@ class BooXtream(object):
|
|||
url = self.endpoint + 'booxtream.xml'
|
||||
kwargs['epub'] = '1' if epub else '0'
|
||||
kwargs['kf8mobi'] = '1' if kf8mobi else '0'
|
||||
|
||||
files= {'epubfile': epubfile} if epubfile else {}
|
||||
if epubfile:
|
||||
if hasattr(epubfile,'name') and str(epubfile.name).endswith('.epub'):
|
||||
files= {'epubfile': (str(epubfile.name),epubfile)}
|
||||
else:
|
||||
# give it a random file name so that kindlegen doesn't choke
|
||||
# needed for in-memory (StringIO) epubs
|
||||
files= {'epubfile': ('%012x.epub' % random.randrange(16**12),epubfile)}
|
||||
else:
|
||||
files={}
|
||||
resp = self.postrequest(url, data=kwargs, files=files)
|
||||
doc = ElementTree.fromstring(resp.content)
|
||||
|
||||
|
|
|
@ -1,10 +1,18 @@
|
|||
import unittest
|
||||
import time
|
||||
|
||||
# uses settings.BOOXTREAM_TEST_EPUB
|
||||
from . import settings
|
||||
import urllib2
|
||||
from tempfile import NamedTemporaryFile
|
||||
from StringIO import StringIO
|
||||
|
||||
class TestBooXtream(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# get a small epub test file as a file-like object
|
||||
self.epub2file = NamedTemporaryFile(delete=False)
|
||||
test_file_content = urllib2.urlopen('http://www.hxa.name/articles/content/EpubGuide-hxa7241.epub')
|
||||
self.epub2file.write(test_file_content.read())
|
||||
self.epub2file.seek(0)
|
||||
|
||||
|
||||
def _makeOne(self):
|
||||
from . import BooXtream
|
||||
manager = BooXtream()
|
||||
|
@ -31,8 +39,15 @@ class TestBooXtream(unittest.TestCase):
|
|||
'disclaimer':1,
|
||||
}
|
||||
params['referenceid']= 'order'+str(time.time())
|
||||
epubfile= open(settings.BOOXTREAM_TEST_EPUB)
|
||||
boox=inst.platform(epubfile=epubfile, **params)
|
||||
boox=inst.platform(epubfile=self.epub2file, **params)
|
||||
self.assertRegexpMatches(boox.download_link_epub,'download.booxtream.com/')
|
||||
self.assertFalse(boox.expired)
|
||||
self.assertEqual(boox.downloads_remaining,3)
|
||||
self.assertEqual(boox.downloads_remaining,3)
|
||||
|
||||
# make sure it works with an in-memory file
|
||||
self.epub2file.seek(0)
|
||||
in_mem_epub = StringIO()
|
||||
in_mem_epub.write(self.epub2file.read())
|
||||
in_mem_epub.seek(0)
|
||||
boox2=inst.platform(epubfile=in_mem_epub, **params)
|
||||
self.assertRegexpMatches(boox2.download_link_epub,'download.booxtream.com/')
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
"""
|
||||
Utilities that manipulate epub files
|
||||
"""
|
||||
|
||||
from pyepub import EPUB
|
||||
from StringIO import StringIO
|
||||
from django.template.loader import render_to_string
|
||||
|
||||
def personalize(epub_file, acq):
|
||||
output = EPUB(epub_file, "a")
|
||||
context={'acq':acq}
|
||||
part = StringIO(str(render_to_string('epub/datedcc_license.xhtml', context)))
|
||||
output.addpart(part, "datedcc_license.xhtml", "application/xhtml+xml", 1) #after title, we hope
|
||||
output.addmetadata('rights','%s after %s'%(acq.work.last_campaign().license_url,acq.work.last_campaign().cc_date))
|
||||
output.close()
|
||||
#output.writetodisk('testfile2.epub')
|
||||
return output
|
|
@ -31,6 +31,7 @@ regluit imports
|
|||
'''
|
||||
import regluit
|
||||
import regluit.core.isbn
|
||||
from regluit.core.epub import personalize
|
||||
|
||||
from regluit.core.signals import (
|
||||
successful_campaign,
|
||||
|
@ -251,7 +252,6 @@ class Offer(models.Model):
|
|||
def days_per_copy(self):
|
||||
return Decimal(float(self.price) / self.work.last_campaign().dollar_per_day )
|
||||
|
||||
|
||||
class Acq(models.Model):
|
||||
"""
|
||||
Short for Acquisition, this is a made-up word to describe the thing you acquire when you buy or borrow an ebook
|
||||
|
@ -265,7 +265,7 @@ class Acq(models.Model):
|
|||
choices=CHOICES)
|
||||
watermarked = models.ForeignKey("booxtream.Boox", null=True)
|
||||
nonce = models.CharField(max_length=32, null=True)
|
||||
|
||||
|
||||
@property
|
||||
def expired(self):
|
||||
if self.expires is None:
|
||||
|
@ -284,22 +284,24 @@ class Acq(models.Model):
|
|||
params={
|
||||
'customeremailaddress': self.user.email,
|
||||
'customername': self.user.username,
|
||||
'languagecode':'1043',
|
||||
'languagecode':'1033',
|
||||
'expirydays': 1,
|
||||
'downloadlimit': 7,
|
||||
'exlibris':1,
|
||||
'chapterfooter':1,
|
||||
'disclaimer':1,
|
||||
'disclaimer':0,
|
||||
'referenceid': '%s:%s:%s' % (self.work.id, self.user.id, self.id),
|
||||
'kf8mobi': True,
|
||||
'epub': True,
|
||||
}
|
||||
self.watermarked = watermarker.platform(epubfile= self.work.ebookfiles()[0].file, **params)
|
||||
personalized = personalize(self.work.ebookfiles()[0].file, self)
|
||||
personalized.filename.seek(0)
|
||||
self.watermarked = watermarker.platform(epubfile= personalized.filename, **params)
|
||||
self.save()
|
||||
return self.watermarked
|
||||
|
||||
def _hash(self):
|
||||
return hashlib.md5('%s:%s:%s'%(self.user.id,self.work.id,self.created)).hexdigest()
|
||||
return hashlib.md5('1c1a56974ef08edc%s:%s:%s'%(self.user.id,self.work.id,self.created)).hexdigest()
|
||||
|
||||
def add_acq_nonce(sender, instance, created, **kwargs):
|
||||
if created:
|
||||
|
|
|
@ -38,7 +38,8 @@ from regluit.core import (
|
|||
search,
|
||||
goodreads,
|
||||
librarything,
|
||||
tasks
|
||||
tasks,
|
||||
parameters,
|
||||
)
|
||||
from regluit.core.models import (
|
||||
Campaign,
|
||||
|
@ -62,6 +63,7 @@ from regluit.frontend.views import safe_get_work
|
|||
from regluit.payment.models import Transaction
|
||||
from regluit.payment.parameters import PAYMENT_TYPE_AUTHORIZATION
|
||||
from regluit.utils.localdatetime import now, date_today
|
||||
from regluit.pyepub import EPUB
|
||||
|
||||
class BookLoaderTests(TestCase):
|
||||
def setUp(self):
|
||||
|
@ -829,6 +831,7 @@ class EbookFileTests(TestCase):
|
|||
w = Work.objects.create(title="Work 1")
|
||||
e = Edition.objects.create(title=w.title,work=w)
|
||||
u = User.objects.create_user('test', 'test@example.org', 'testpass')
|
||||
c = Campaign.objects.create(work=w, type = parameters.BUY2UNGLUE, cc_date_initial= datetime(2020,1,1),target=1000, deadline=datetime(2020,1,1))
|
||||
|
||||
# download the test epub into a temp file
|
||||
temp = NamedTemporaryFile(delete=False)
|
||||
|
@ -850,12 +853,16 @@ class EbookFileTests(TestCase):
|
|||
# make sure we get rid of temp file
|
||||
os.remove(temp.name)
|
||||
|
||||
test_epub= EPUB(ebf.file, mode='a')
|
||||
self.assertEqual(len(test_epub.opf) , 4)
|
||||
self.assertTrue(len(test_epub.opf[2]) < 30)
|
||||
|
||||
acq=Acq.objects.create(user=u,work=w,license=TESTING)
|
||||
self.assertIsNot(acq.nonce, None)
|
||||
|
||||
url= acq.get_watermarked().download_link_epub
|
||||
self.assertRegexpMatches(url,'download.booxtream.com/')
|
||||
print url
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
|
||||
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
|
||||
<head>
|
||||
<title>License Information</title>
|
||||
</head>
|
||||
<body>
|
||||
<div class="booksection">
|
||||
<p>
|
||||
This copyrighted work was licensed to you, {{ acq.user.username }}, on {{ acq.created }} through https://unglue.it for your personal use only. It may not be transferred to third parties without the express written permission of the rights holder. This license has been embedded into the digital file, along with your identity, that of the licensor, and terms of the license. You can use this file to prove your license status. It may be unlawful for you to remove the embedded license. Unauthorized distribution of this work is not permitted and may have serious legal consequences including the revocation of your license.
|
||||
</p>
|
||||
<p> </p>
|
||||
<p class="copyk">Notwithstanding the above, after {{ acq.work.last_campaign.cc_date }}, this book is licensed under a {{ acq.work.last_campaign.license }} license. Details are available at <a href="{{ acq.work.last_campaign.license_url }}">{{ acq.work.last_campaign.license_url }}</a></p>
|
||||
<p class="copy"></p>
|
||||
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,404 @@
|
|||
import zipfile
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
from StringIO import StringIO
|
||||
import datetime
|
||||
|
||||
try:
|
||||
import lxml.etree as ET
|
||||
except ImportError:
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
TMP = {"opf": None, "ncx": None}
|
||||
FLO = None
|
||||
|
||||
NAMESPACE = {
|
||||
"dc": "{http://purl.org/dc/elements/1.1/}",
|
||||
"opf": "{http://www.idpf.org/2007/opf}",
|
||||
"ncx": "{http://www.daisy.org/z3986/2005/ncx/}"
|
||||
}
|
||||
|
||||
ET.register_namespace('dc', "http://purl.org/dc/elements/1.1/")
|
||||
ET.register_namespace('opf', "http://www.idpf.org/2007/opf")
|
||||
ET.register_namespace('ncx', "http://www.daisy.org/z3986/2005/ncx/")
|
||||
|
||||
|
||||
class InvalidEpub(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class EPUB(zipfile.ZipFile):
|
||||
"""
|
||||
EPUB file representation class.
|
||||
|
||||
"""
|
||||
def __init__(self, filename, mode="r"):
|
||||
"""
|
||||
Global Init Switch
|
||||
|
||||
:type filename: str or StringIO() or file like object for read or add
|
||||
:param filename: File to be processed
|
||||
:type mode: str
|
||||
:param mode: "w" or "r", mode to init the zipfile
|
||||
"""
|
||||
if mode == "w":
|
||||
if not isinstance(filename, StringIO):
|
||||
assert not os.path.exists(filename), \
|
||||
"Can't overwrite existing file: %s" % filename
|
||||
self.filename = filename
|
||||
zipfile.ZipFile.__init__(self, self.filename, mode="w")
|
||||
self.__init__write()
|
||||
elif mode == "a":
|
||||
assert not isinstance(filename, StringIO), \
|
||||
"Can't append to StringIO object, use write instead: %s" % filename
|
||||
if isinstance(filename, str):
|
||||
tmp = open(filename, "r") # ensure that the input file is never-ever overwritten
|
||||
else:
|
||||
# filename is already a file like object
|
||||
tmp=filename
|
||||
tmp.seek(0)
|
||||
initfile = StringIO()
|
||||
initfile.write(tmp.read())
|
||||
tmp.close()
|
||||
zipfile.ZipFile.__init__(self, initfile, mode="a")
|
||||
self.__init__read(initfile)
|
||||
else: # retrocompatibility?
|
||||
zipfile.ZipFile.__init__(self, filename, mode="r")
|
||||
self.__init__read(filename)
|
||||
|
||||
def __init__read(self, filename):
|
||||
"""
|
||||
Constructor to initialize the zipfile in read-only mode
|
||||
|
||||
:type filename: str or StringIO()
|
||||
:param filename: File to be processed
|
||||
"""
|
||||
self.filename = filename
|
||||
try:
|
||||
# Read the container
|
||||
f = self.read("META-INF/container.xml")
|
||||
except KeyError:
|
||||
# By specification, there MUST be a container.xml in EPUB
|
||||
print "The %s file is not a valid OCF." % str(filename)
|
||||
raise InvalidEpub
|
||||
try:
|
||||
# There MUST be a full path attribute on first grandchild...
|
||||
self.opf_path = ET.fromstring(f)[0][0].get("full-path")
|
||||
except IndexError:
|
||||
# ...else the file is invalid.
|
||||
print "The %s file is not a valid OCF." % str(filename)
|
||||
raise InvalidEpub
|
||||
|
||||
# NEW: json-able info tree
|
||||
self.info = {"metadata": {},
|
||||
"manifest": [],
|
||||
"spine": [],
|
||||
"guide": []}
|
||||
|
||||
self.root_folder = os.path.dirname(self.opf_path) # Used to compose absolute paths for reading in zip archive
|
||||
self.opf = ET.fromstring(self.read(self.opf_path)) # OPF tree
|
||||
|
||||
ns = re.compile(r'\{.*?\}') # RE to strip {namespace} mess
|
||||
|
||||
# Iterate over <metadata> section, fill EPUB.info["metadata"] dictionary
|
||||
for i in self.opf.find("{0}metadata".format(NAMESPACE["opf"])):
|
||||
tag = ns.sub('', i.tag)
|
||||
if tag not in self.info["metadata"]:
|
||||
self.info["metadata"][tag] = i.text or i.attrib
|
||||
else:
|
||||
self.info["metadata"][tag] = [self.info["metadata"][tag], i.text or i.attrib]
|
||||
|
||||
# Get id of the cover in <meta name="cover" />
|
||||
try:
|
||||
coverid = self.opf.find('.//{0}meta[@name="cover"]'.format(NAMESPACE["opf"])).get("content")
|
||||
except AttributeError:
|
||||
# It's a facultative field, after all
|
||||
coverid = None
|
||||
self.cover = coverid # This is the manifest ID of the cover
|
||||
|
||||
self.info["manifest"] = [{"id": x.get("id"), # Build a list of manifest items
|
||||
"href": x.get("href"),
|
||||
"mimetype": x.get("media-type")}
|
||||
for x in self.opf.find("{0}manifest".format(NAMESPACE["opf"])) if x.get("id")]
|
||||
|
||||
self.info["spine"] = [{"idref": x.get("idref")} # Build a list of spine items
|
||||
for x in self.opf.find("{0}spine".format(NAMESPACE["opf"])) if x.get("idref")]
|
||||
try:
|
||||
self.info["guide"] = [{"href": x.get("href"), # Build a list of guide items
|
||||
"type": x.get("type"),
|
||||
"title": x.get("title")}
|
||||
for x in self.opf.find("{0}guide".format(NAMESPACE["opf"])) if x.get("href")]
|
||||
except TypeError: # The guide element is optional
|
||||
self.info["guide"] = None
|
||||
|
||||
# Document identifier
|
||||
try:
|
||||
self.id = self.opf.find('.//{0}identifier[@id="{1}"]'.format(NAMESPACE["dc"],
|
||||
self.opf.get("unique-identifier"))).text
|
||||
except AttributeError:
|
||||
raise InvalidEpub # Cannot process an EPUB without unique-identifier
|
||||
# attribute of the package element
|
||||
# Get and parse the TOC
|
||||
toc_id = self.opf[2].get("toc")
|
||||
expr = ".//{0}item[@id='{1:s}']".format(NAMESPACE["opf"], toc_id)
|
||||
toc_name = self.opf.find(expr).get("href")
|
||||
self.ncx_path = os.path.join(self.root_folder, toc_name)
|
||||
self.ncx = ET.fromstring(self.read(self.ncx_path))
|
||||
self.contents = [{"name": i[0][0].text or "None", # Build a list of toc elements
|
||||
"src": os.path.join(self.root_folder, i[1].get("src")),
|
||||
"id":i.get("id")}
|
||||
for i in self.ncx.iter("{0}navPoint".format(NAMESPACE["ncx"]))] # The iter method
|
||||
# loops over nested
|
||||
# navPoints
|
||||
|
||||
def __init__write(self):
|
||||
"""
|
||||
Init an empty EPUB
|
||||
|
||||
"""
|
||||
self.opf_path = "OEBPS/content.opf" # Define a default folder for contents
|
||||
self.ncx_path = "OEBPS/toc.ncx"
|
||||
self.root_folder = "OEBPS"
|
||||
self.uid = '%s' % uuid.uuid4()
|
||||
|
||||
self.info = {"metadata": {},
|
||||
"manifest": [],
|
||||
"spine": [],
|
||||
"guide": []}
|
||||
|
||||
self.writestr('mimetype', "application/epub+zip")
|
||||
self.writestr('META-INF/container.xml', self._containerxml())
|
||||
self.info["metadata"]["creator"] = "py-clave server"
|
||||
self.info["metadata"]["title"] = ""
|
||||
self.info["metadata"]["language"] = ""
|
||||
|
||||
# Problem is: you can't overwrite file contents with python ZipFile
|
||||
# so you must add contents BEFORE finalizing the file
|
||||
# calling close() method.
|
||||
|
||||
self.opf = ET.fromstring(self._init_opf()) # opf property is always a ElementTree
|
||||
self.ncx = ET.fromstring(self._init_ncx()) # so is ncx. Consistent with self.(opf|ncx) built by __init_read()
|
||||
|
||||
self.writestr(self.opf_path, ET.tostring(self.opf, encoding="UTF-8")) # temporary opf & ncx
|
||||
self.writestr(self.ncx_path, ET.tostring(self.ncx, encoding="UTF-8")) # will be re-init on close()
|
||||
|
||||
def close(self):
|
||||
if self.fp is None: # Check file status
|
||||
return
|
||||
if self.mode == "r": # check file mode
|
||||
zipfile.ZipFile.close(self)
|
||||
return
|
||||
else:
|
||||
try:
|
||||
global TMP # in-memory copy of existing opf-ncx. When the epub gets re-init,
|
||||
# it loses track of modifications
|
||||
TMP["opf"] = self.opf
|
||||
TMP["ncx"] = self.ncx
|
||||
self._safeclose()
|
||||
zipfile.ZipFile.close(self) # give back control to superclass close method
|
||||
except RuntimeError: # zipfile.__del__ destructor calls close(), ignore
|
||||
return
|
||||
|
||||
def _safeclose(self):
|
||||
"""
|
||||
Preliminary operations before closing an EPUB
|
||||
Writes the empty or modified opf-ncx files before closing the zipfile
|
||||
"""
|
||||
if self.mode != "r":
|
||||
self._delete(self.opf_path, self.ncx_path) # see following horrible hack:
|
||||
# zipfile cannot manage overwriting on the archive
|
||||
# this basically RECREATES the epub from scratch
|
||||
# and is sure slow as hell
|
||||
# ... and a recipe for disaster.
|
||||
self.opf = TMP["opf"]
|
||||
self.ncx = TMP["ncx"] # get back the temporary copies
|
||||
|
||||
self.writestr(self.opf_path, ET.tostring(self.opf, encoding="UTF-8"))
|
||||
self.writestr(self.ncx_path, ET.tostring(self.ncx, encoding="UTF-8"))
|
||||
self.__init__read(FLO) # We may still need info dict of a closed EPUB
|
||||
|
||||
def _init_opf(self):
|
||||
"""
|
||||
Constructor for empty OPF
|
||||
:type return: xml.minidom.Document
|
||||
:return: xml.minidom.Document
|
||||
"""
|
||||
today = datetime.date.today()
|
||||
opf_tmpl = """<?xml version="1.0" encoding="utf-8" standalone="yes"?>
|
||||
<package xmlns="http://www.idpf.org/2007/opf" unique-identifier="BookId" version="2.0">
|
||||
<metadata xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:opf="http://www.idpf.org/2007/opf">
|
||||
<dc:identifier id="BookId" opf:scheme="UUID">{uid}</dc:identifier>
|
||||
<dc:title>{title}</dc:title>
|
||||
<dc:language>{lang}</dc:language>
|
||||
<dc:date opf:event="modification">{date}</dc:date>
|
||||
</metadata>
|
||||
<manifest>
|
||||
<item href="toc.ncx" id="ncx" media-type="application/x-dtbncx+xml" />
|
||||
</manifest>
|
||||
<spine toc="ncx">
|
||||
</spine>
|
||||
<guide>
|
||||
</guide>
|
||||
</package>"""
|
||||
|
||||
doc = opf_tmpl.format(uid=self.uid,
|
||||
date=today,
|
||||
title=self.info["metadata"]["title"],
|
||||
lang=self.info["metadata"]["language"])
|
||||
return doc
|
||||
|
||||
def _init_ncx(self):
|
||||
"""
|
||||
Constructor for empty OPF
|
||||
:type return: xml.minidom.Document
|
||||
:return: xml.minidom.Document
|
||||
"""
|
||||
ncx_tmpl = """<?xml version="1.0" encoding="utf-8"?>
|
||||
<!DOCTYPE ncx PUBLIC "-//NISO//DTD ncx 2005-1//EN"
|
||||
"http://www.daisy.org/z3986/2005/ncx-2005-1.dtd">
|
||||
<ncx xmlns="http://www.daisy.org/z3986/2005/ncx/" version="2005-1">
|
||||
<head>
|
||||
<meta name="dtb:uid" content="{uid}" />
|
||||
<meta name="dtb:depth" content="0" />
|
||||
<meta name="dtb:totalPageCount" content="0" />
|
||||
<meta name="dtb:maxPageNumber" content="0" />
|
||||
</head>
|
||||
<docTitle>
|
||||
<text>{title}</text>
|
||||
</docTitle>
|
||||
<navMap>
|
||||
</navMap>
|
||||
</ncx>"""
|
||||
|
||||
ncx = ncx_tmpl.format(uid=self.uid, title="Default")
|
||||
return ncx
|
||||
|
||||
def _containerxml(self):
|
||||
template = """<?xml version="1.0" encoding="UTF-8"?>
|
||||
<container version="1.0"
|
||||
xmlns="urn:oasis:names:tc:opendocument:xmlns:container">
|
||||
<rootfiles>
|
||||
<rootfile full-path="%s"
|
||||
media-type="application/oebps-package+xml"/>
|
||||
</rootfiles>
|
||||
</container>"""
|
||||
return template % self.opf_path
|
||||
|
||||
def _delete(self, *paths):
|
||||
"""
|
||||
Delete archive member
|
||||
Basically a hack: zince zipfile can't natively overwrite or delete resources,
|
||||
a new archive is created from scratch to a StringIO file object.
|
||||
The starting file is *never* overwritten.
|
||||
To write the new file to disk, use the writefiletodisk() instance method.
|
||||
|
||||
:type paths: str
|
||||
:param paths: files to be deleted inside EPUB file
|
||||
"""
|
||||
global FLO # File-Like-Object: this is obviously wrong: any better idea?
|
||||
# Also, the variable name is questionable
|
||||
FLO = StringIO()
|
||||
new_zip = zipfile.ZipFile(FLO, 'w')
|
||||
for item in self.infolist():
|
||||
if item.filename not in paths:
|
||||
try:
|
||||
new_zip.writestr(item.filename, self.read(item.filename))
|
||||
except zipfile.BadZipfile:
|
||||
pass
|
||||
zipfile.ZipFile.close(self) # Don't know why
|
||||
new_zip.close() # but it works, don't ever touch
|
||||
zipfile.ZipFile.__init__(self, FLO, mode="a")
|
||||
|
||||
def addmetadata(self, term, value, namespace='dc'):
|
||||
"""
|
||||
Add an metadata entry
|
||||
|
||||
:type term: str
|
||||
:param term: element name/tag for metadata item
|
||||
:type value: str
|
||||
:param value: a value
|
||||
:type namespace: str
|
||||
:param namespace. either a '{URI}' or a registered prefix ('dc', 'opf', 'ncx') are currently built-in
|
||||
"""
|
||||
assert self.mode != "r", "%s is not writable" % self
|
||||
namespace = NAMESPACE.get(namespace,namespace)
|
||||
element = ET.Element(namespace+term, attrib={})
|
||||
element.text = value
|
||||
self.opf[0].append(element)
|
||||
# note that info is ignoring namespace entirely
|
||||
if self.info["metadata"].has_key(term):
|
||||
self.info["metadata"][term] = [self.info["metadata"][term] , value]
|
||||
else:
|
||||
self.info["metadata"][term] = value
|
||||
|
||||
|
||||
def additem(self, fileObject, href, mediatype):
|
||||
"""
|
||||
Add a file to manifest only
|
||||
|
||||
:type fileObject: StringIO
|
||||
:param fileObject:
|
||||
:type href: str
|
||||
:param href:
|
||||
:type mediatype: str
|
||||
:param mediatype:
|
||||
"""
|
||||
assert self.mode != "r", "%s is not writable" % self
|
||||
element = ET.Element("item",
|
||||
attrib={"id": "id_"+str(uuid.uuid4())[:5], "href": href, "media-type": mediatype})
|
||||
|
||||
try:
|
||||
self.writestr(os.path.join(self.root_folder, element.attrib["href"]), fileObject.getvalue())
|
||||
except AttributeError:
|
||||
self.writestr(os.path.join(self.root_folder, element.attrib["href"]), fileObject)
|
||||
self.opf[1].append(element)
|
||||
return element.attrib["id"]
|
||||
|
||||
def addpart(self, fileObject, href, mediatype, position=None, reftype="text", linear="yes"):
|
||||
"""
|
||||
Add a file as part of the epub file, i.e. to manifest and spine (and guide?)
|
||||
|
||||
:param fileObject: file to be inserted
|
||||
:param href: path inside the epub archive
|
||||
:param mediatype: mimetype of the fileObject
|
||||
:type position: int
|
||||
:param position: order in spine [from 0 to len(opf/manifest))]
|
||||
:param linear: linear="yes" or "no"
|
||||
:param reftype: type to assign in guide/reference
|
||||
"""
|
||||
assert self.mode != "r", "%s is not writable" % self
|
||||
fileid = self.additem(fileObject, href, mediatype)
|
||||
itemref = ET.Element("itemref", attrib={"idref": fileid, "linear": linear})
|
||||
reference = ET.Element("reference", attrib={"title": href, "href": href, "type": reftype})
|
||||
if position is None or position>len(self.opf[2]):
|
||||
self.opf[2].append(itemref)
|
||||
self.opf[3].append(reference)
|
||||
else:
|
||||
self.opf[2].insert(position, itemref)
|
||||
if len(self.opf[3]) >= position+1:
|
||||
self.opf[3].insert(position, reference)
|
||||
|
||||
def writetodisk(self, filename):
|
||||
"""
|
||||
Writes the in-memory archive to disk
|
||||
|
||||
:type filename: str
|
||||
:param filename: name of the file to be writte
|
||||
"""
|
||||
if self.mode == "r":
|
||||
# The inferface should be consistent
|
||||
new_zip = zipfile.ZipFile(filename, 'w')
|
||||
for item in self.infolist():
|
||||
new_zip.writestr(item.filename, self.read(item.filename))
|
||||
new_zip.close()
|
||||
return
|
||||
# this is a bad habit
|
||||
f = open(filename, "w")
|
||||
try:
|
||||
self.filename.seek(0)
|
||||
except AttributeError: # file must be closed first
|
||||
self.close()
|
||||
self.filename.seek(0)
|
||||
f.write(self.filename.read())
|
||||
f.close()
|
|
@ -0,0 +1,44 @@
|
|||
import unittest
|
||||
import urllib2
|
||||
from tempfile import NamedTemporaryFile
|
||||
from StringIO import StringIO
|
||||
from . import EPUB
|
||||
try:
|
||||
import lxml.etree as ET
|
||||
except ImportError:
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
|
||||
class EpubTests(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# get a small epub test file as a file-like object
|
||||
self.epub2file = NamedTemporaryFile(delete=False)
|
||||
test_file_content = urllib2.urlopen('http://www.hxa.name/articles/content/EpubGuide-hxa7241.epub')
|
||||
self.epub2file.write(test_file_content.read())
|
||||
self.epub2file.seek(0)
|
||||
|
||||
|
||||
def test_instantiation(self):
|
||||
epub=EPUB(self.epub2file)
|
||||
self.assertNotEqual(epub.filename, None)
|
||||
self.assertEqual(len(epub.opf),4)
|
||||
self.assertEqual(len(epub.opf[0]),11) #metadata items
|
||||
self.assertEqual(len(epub.opf[1]),11) #manifest items
|
||||
self.assertEqual(len(epub.opf[2]),8) #spine items
|
||||
self.assertEqual(len(epub.opf[3]),3) #guide items
|
||||
|
||||
def test_addpart(self):
|
||||
epub=EPUB(self.epub2file,mode='a')
|
||||
self.assertNotEqual(epub.filename, None)
|
||||
part = StringIO('<?xml version="1.0" encoding="utf-8" standalone="yes"?>')
|
||||
epub.addpart(part, "testpart.xhtml", "application/xhtml+xml", 2)
|
||||
self.assertEqual(len(epub.opf[2]),9) #spine items
|
||||
|
||||
def test_addmetadata(self):
|
||||
epub=EPUB(self.epub2file,mode='a')
|
||||
epub.addmetadata('test', 'GOOD')
|
||||
self.assertIn('<dc:test>GOOD<',ET.tostring(epub.opf, encoding="UTF-8"))
|
||||
self.assertTrue(epub.opf.find('.//{http://purl.org/dc/elements/1.1/}test') is not None)
|
||||
self.assertEqual(epub.info['metadata']['test'], 'GOOD')
|
||||
|
Loading…
Reference in New Issue