Merge remote-tracking branch 'Gluejar/master' into production

pull/91/head
eric 2018-04-16 20:10:26 -04:00
commit 89b4221dcd
19 changed files with 1067 additions and 54882 deletions

View File

@ -25,7 +25,7 @@ def onix_feed(facet, max=None):
editions = facet.facet_object.filter_model("Edition",editions).distinct()
for edition in editions:
edition_prod = product(edition, facet.facet_object)
if edition_prod:
if edition_prod is not None:
feed.append(edition_prod)
return etree.tostring(feed, pretty_print=True)
@ -34,7 +34,7 @@ def onix_feed_for_work(work):
feed.append(header(work))
for edition in models.Edition.objects.filter(work=work,ebooks__isnull=False).distinct():
edition_prod = product(edition)
if edition_prod:
if edition_prod is not None:
feed.append(product(edition))
return etree.tostring(feed, pretty_print=True)

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -49,7 +49,7 @@ def add_by_oclc(isbn, work=None):
def add_by_oclc_from_google(oclc):
if oclc:
logger.info("adding book by oclc %s", oclc)
logger.info(u"adding book by oclc %s", oclc)
else:
return None
try:
@ -59,10 +59,10 @@ def add_by_oclc_from_google(oclc):
try:
results = _get_json(url, {"q": '"OCLC%s"' % oclc})
except LookupFailure, e:
logger.exception("lookup failure for %s", oclc)
logger.exception(u"lookup failure for %s", oclc)
return None
if not results.has_key('items') or not results['items']:
logger.warn("no google hits for %s", oclc)
logger.warn(u"no google hits for %s", oclc)
return None
try:
@ -70,16 +70,16 @@ def add_by_oclc_from_google(oclc):
models.Identifier(type='oclc', value=oclc, edition=e, work=e.work).save()
return e
except LookupFailure, e:
logger.exception("failed to add edition for %s", oclc)
logger.exception(u"failed to add edition for %s", oclc)
except IntegrityError, e:
logger.exception("google books data for %s didn't fit our db", oclc)
logger.exception(u"google books data for %s didn't fit our db", oclc)
return None
def valid_isbn(isbn):
try:
return identifier_cleaner('isbn')(isbn)
except:
logger.exception("invalid isbn: %s", isbn)
logger.exception(u"invalid isbn: %s", isbn)
return None
def add_by_isbn(isbn, work=None, language='xx', title=''):
@ -88,13 +88,13 @@ def add_by_isbn(isbn, work=None, language='xx', title=''):
try:
e = add_by_isbn_from_google(isbn, work=work)
except LookupFailure:
logger.exception("failed google lookup for %s", isbn)
logger.exception(u"failed google lookup for %s", isbn)
# try again some other time
return None
if e:
return e
logger.info("null came back from add_by_isbn_from_google: %s", isbn)
logger.info(u"null came back from add_by_isbn_from_google: %s", isbn)
# if there's a a title, we want to create stub editions and
# works, even if google doesn't know about it # but if it's not valid,
@ -129,10 +129,10 @@ def get_google_isbn_results(isbn):
try:
results = _get_json(url, {"q": "isbn:%s" % isbn})
except LookupFailure:
logger.exception("lookup failure for %s", isbn)
logger.exception(u"lookup failure for %s", isbn)
return None
if not results.has_key('items') or not results['items']:
logger.warn("no google hits for %s", isbn)
logger.warn(u"no google hits for %s", isbn)
return None
return results
@ -201,7 +201,7 @@ def update_edition(edition):
# if the language of the edition no longer matches that of the parent work,
# attach edition to the
if edition.work.language != language:
logger.info("reconnecting %s since it is %s instead of %s",
logger.info(u"reconnecting %s since it is %s instead of %s",
googlebooks_id, language, edition.work.language)
old_work = edition.work
@ -210,7 +210,7 @@ def update_edition(edition):
edition.work = new_work
edition.save()
for identifier in edition.identifiers.all():
logger.info("moving identifier %s", identifier.value)
logger.info(u"moving identifier %s", identifier.value)
identifier.work = new_work
identifier.save()
if old_work and old_work.editions.count() == 0:
@ -256,7 +256,7 @@ def add_by_isbn_from_google(isbn, work=None):
edition.new = False
return edition
logger.info("adding new book by isbn %s", isbn)
logger.info(u"adding new book by isbn %s", isbn)
results = get_google_isbn_results(isbn)
if results:
try:
@ -267,9 +267,9 @@ def add_by_isbn_from_google(isbn, work=None):
isbn=isbn
)
except LookupFailure, e:
logger.exception("failed to add edition for %s", isbn)
logger.exception(u"failed to add edition for %s", isbn)
except IntegrityError, e:
logger.exception("google books data for %s didn't fit our db", isbn)
logger.exception(u"google books data for %s didn't fit our db", isbn)
return None
return None
@ -320,7 +320,7 @@ def add_by_googlebooks_id(googlebooks_id, work=None, results=None, isbn=None):
if results:
item = results
else:
logger.info("loading metadata from google for %s", googlebooks_id)
logger.info(u"loading metadata from google for %s", googlebooks_id)
url = "https://www.googleapis.com/books/v1/volumes/%s" % googlebooks_id
item = _get_json(url)
d = item['volumeInfo']
@ -343,7 +343,7 @@ def add_by_googlebooks_id(googlebooks_id, work=None, results=None, isbn=None):
if len(language) > 5:
language = language[0:5]
if work and work.language != language:
logger.info("not connecting %s since it is %s instead of %s",
logger.info(u"not connecting %s since it is %s instead of %s",
googlebooks_id, language, work.language)
work = None
# isbn = None
@ -371,7 +371,7 @@ def add_by_googlebooks_id(googlebooks_id, work=None, results=None, isbn=None):
try:
e = models.Identifier.objects.get(type='goog', value=googlebooks_id).edition
e.new = False
logger.warning(" whoa nellie, somebody else created an edition while we were working.")
logger.warning(u" whoa nellie, somebody else created an edition while we were working.")
if work.new:
work.delete()
return e
@ -404,19 +404,19 @@ def relate_isbn(isbn, cluster_size=1):
"""add a book by isbn and then see if there's an existing work to add it to so as to make a
cluster bigger than cluster_size.
"""
logger.info("finding a related work for %s", isbn)
logger.info(u"finding a related work for %s", isbn)
edition = add_by_isbn(isbn)
if edition is None:
return None
if edition.work is None:
logger.info("didn't add related to null work")
logger.info(u"didn't add related to null work")
return None
if edition.work.editions.count() > cluster_size:
return edition.work
for other_isbn in thingisbn(isbn):
# 979's come back as 13
logger.debug("other_isbn: %s", other_isbn)
logger.debug(u"other_isbn: %s", other_isbn)
if len(other_isbn) == 10:
other_isbn = regluit.core.isbn.convert_10_to_13(other_isbn)
related_edition = add_by_isbn(other_isbn, work=edition.work)
@ -427,7 +427,7 @@ def relate_isbn(isbn, cluster_size=1):
related_edition.work = edition.work
related_edition.save()
elif related_edition.work_id != edition.work_id:
logger.debug("merge_works path 1 %s %s", edition.work_id, related_edition.work_id)
logger.debug(u"merge_works path 1 %s %s", edition.work_id, related_edition.work_id)
merge_works(related_edition.work, edition.work)
if related_edition.work.editions.count() > cluster_size:
return related_edition.work
@ -438,7 +438,7 @@ def add_related(isbn):
The initial seed ISBN will be added if it's not already there.
"""
# make sure the seed edition is there
logger.info("adding related editions for %s", isbn)
logger.info(u"adding related editions for %s", isbn)
new_editions = []
@ -446,14 +446,14 @@ def add_related(isbn):
if edition is None:
return new_editions
if edition.work is None:
logger.warning("didn't add related to null work")
logger.warning(u"didn't add related to null work")
return new_editions
# this is the work everything will hang off
work = edition.work
other_editions = {}
for other_isbn in thingisbn(isbn):
# 979's come back as 13
logger.debug("other_isbn: %s", other_isbn)
logger.debug(u"other_isbn: %s", other_isbn)
if len(other_isbn) == 10:
other_isbn = regluit.core.isbn.convert_10_to_13(other_isbn)
related_edition = add_by_isbn(other_isbn, work=work)
@ -466,7 +466,7 @@ def add_related(isbn):
related_edition.work = work
related_edition.save()
elif related_edition.work_id != work.id:
logger.debug("merge_works path 1 %s %s", work.id, related_edition.work_id)
logger.debug(u"merge_works path 1 %s %s", work.id, related_edition.work_id)
work = merge_works(work, related_edition.work)
else:
if other_editions.has_key(related_language):
@ -476,14 +476,14 @@ def add_related(isbn):
# group the other language editions together
for lang_group in other_editions.itervalues():
logger.debug("lang_group (ed, work): %s", [(ed.id, ed.work_id) for ed in lang_group])
logger.debug(u"lang_group (ed, work): %s", [(ed.id, ed.work_id) for ed in lang_group])
if len(lang_group) > 1:
lang_edition = lang_group[0]
logger.debug("lang_edition.id: %s", lang_edition.id)
logger.debug(u"lang_edition.id: %s", lang_edition.id)
# compute the distinct set of works to merge into lang_edition.work
works_to_merge = set([ed.work for ed in lang_group[1:]]) - set([lang_edition.work])
for w in works_to_merge:
logger.debug("merge_works path 2 %s %s", lang_edition.work_id, w.id)
logger.debug(u"merge_works path 2 %s %s", lang_edition.work_id, w.id)
merged_work = merge_works(lang_edition.work, w)
models.WorkRelation.objects.get_or_create(
to_work=lang_group[0].work,
@ -498,17 +498,21 @@ def thingisbn(isbn):
Library Thing. (takes isbn_10 or isbn_13, returns isbn_10, except for 979 isbns,
which come back as isbn_13')
"""
logger.info("looking up %s at ThingISBN", isbn)
logger.info(u"looking up %s at ThingISBN", isbn)
url = "https://www.librarything.com/api/thingISBN/%s" % isbn
xml = requests.get(url, headers={"User-Agent": settings.USER_AGENT}).content
doc = ElementTree.fromstring(xml)
return [e.text for e in doc.findall('isbn')]
try:
doc = ElementTree.fromstring(xml)
return [e.text for e in doc.findall('isbn')]
except SyntaxError:
# LibraryThing down
return []
def merge_works(w1, w2, user=None):
"""will merge the second work (w2) into the first (w1)
"""
logger.info("merging work %s into %s", w2.id, w1.id)
logger.info(u"merging work %s into %s", w2.id, w1.id)
# don't merge if the works are the same or at least one of the works has no id
#(for example, when w2 has already been deleted)
if w1 is None or w2 is None or w1.id == w2.id or w1.id is None or w2.id is None:
@ -583,7 +587,7 @@ def detach_edition(e):
will detach edition from its work, creating a new stub work. if remerge=true, will see if
there's another work to attach to
"""
logger.info("splitting edition %s from %s", e, e.work)
logger.info(u"splitting edition %s from %s", e, e.work)
w = models.Work(title=e.title, language=e.work.language)
w.save()
@ -618,7 +622,7 @@ def add_openlibrary(work, hard_refresh=False):
work.save()
# find the first ISBN match in OpenLibrary
logger.info("looking up openlibrary data for work %s", work.id)
logger.info(u"looking up openlibrary data for work %s", work.id)
e = None # openlibrary edition json
w = None # openlibrary work json
@ -633,7 +637,7 @@ def add_openlibrary(work, hard_refresh=False):
try:
e = _get_json(url, params, type='ol')
except LookupFailure:
logger.exception("OL lookup failed for %s", isbn_key)
logger.exception(u"OL lookup failed for %s", isbn_key)
e = {}
if e.has_key(isbn_key):
if e[isbn_key].has_key('details'):
@ -673,7 +677,7 @@ def add_openlibrary(work, hard_refresh=False):
)
if e[isbn_key]['details'].has_key('works'):
work_key = e[isbn_key]['details']['works'].pop(0)['key']
logger.info("got openlibrary work %s for isbn %s", work_key, isbn_key)
logger.info(u"got openlibrary work %s for isbn %s", work_key, isbn_key)
models.Identifier.get_or_add(type='olwk', value=work_key, work=work)
try:
w = _get_json("https://openlibrary.org" + work_key, type='ol')
@ -691,14 +695,14 @@ def add_openlibrary(work, hard_refresh=False):
if w.has_key('subjects') and len(w['subjects']) > len(subjects):
subjects = w['subjects']
except LookupFailure:
logger.exception("OL lookup failed for %s", work_key)
logger.exception(u"OL lookup failed for %s", work_key)
if not subjects:
logger.warn("unable to find work %s at openlibrary", work.id)
logger.warn(u"unable to find work %s at openlibrary", work.id)
return
# add the subjects to the Work
for s in subjects:
logger.info("adding subject %s to work %s", s, work.id)
logger.info(u"adding subject %s to work %s", s, work.id)
subject = models.Subject.set_by_name(s, work=work)
work.save()
@ -716,9 +720,9 @@ def _get_json(url, params={}, type='gb'):
if response.status_code == 200:
return json.loads(response.content)
else:
logger.error("unexpected HTTP response: %s", response)
logger.error(u"unexpected HTTP response: %s", response)
if response.content:
logger.error("response content: %s", response.content)
logger.error(u"response content: %s", response.content)
raise LookupFailure("GET failed: url=%s and params=%s" % (url, params))
@ -766,7 +770,7 @@ def load_gutenberg_edition(title, gutenberg_etext_id, ol_work_id, seed_isbn, url
ebook = models.Ebook()
if len(ebooks) > 1:
logger.warning("There is more than one Ebook matching url {0}".format(url))
logger.warning(u"There is more than one Ebook matching url {0}".format(url))
ebook.format = format
@ -826,8 +830,6 @@ def edition_for_etype(etype, metadata, default=None):
for key in metadata.edition_identifiers.keys():
return edition_for_ident(key, metadata.identifiers[key])
MATCH_LICENSE = re.compile(r'creativecommons.org/licenses/([^/]+)/')
def load_ebookfile(url, etype):
'''
return a ContentFile if a new ebook has been loaded
@ -960,8 +962,7 @@ class BasePandataLoader(object):
if contentfile:
contentfile_name = '/loaded/ebook_{}.{}'.format(edition.id, key)
path = default_storage.save(contentfile_name, contentfile)
lic = MATCH_LICENSE.search(metadata.rights_url)
license = 'CC {}'.format(lic.group(1).upper()) if lic else ''
license = cc.license_from_cc_url(metadata.rights_url)
ebf = models.EbookFile.objects.create(
format=key,
edition=edition,

View File

@ -1,8 +1,11 @@
# coding=utf-8
# mostly constants related to Creative Commons
''' mostly constants related to Creative Commons
# let's be DRY with these parameters
## need to add versioned CC entries
'''
import re
INFO_CC = (
('CC BY-NC-ND', 'by-nc-nd', 'Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported (CC BY-NC-ND 3.0)', 'https://creativecommons.org/licenses/by-nc-nd/3.0/', 'Creative Commons Attribution-NonCommercial-NoDerivs'),
@ -162,3 +165,15 @@ def match_license(license_string):
except ValueError:
pass
return RIGHTS_ALIAS.get(license_string, None)
MATCH_LICENSE = re.compile(r'creativecommons.org/licenses/([^/]+)/')
def license_from_cc_url(rights_url):
if not rights_url:
return None
lic = MATCH_LICENSE.search(rights_url)
if lic:
return 'CC {}'.format(lic.group(1).upper())
if rights_url.find('openedition.org') >= 0:
return 'OPENEDITION'
return ''

View File

@ -52,3 +52,9 @@ def add_by_webpage(url, work=None, user=None):
def add_by_sitemap(url, maxnum=None):
return add_from_bookdatas(scrape_sitemap(url, maxnum=maxnum))
def scrape_language(url):
scraper = get_scraper(url)
return scraper.metadata.get('language')

View File

@ -1,36 +1,48 @@
#!/usr/bin/env python
# encoding: utf-8
import logging
import datetime
import json
import logging
import re
from itertools import islice
import requests
from django.db.models import (Q, F)
from django.db.models import Q
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
import regluit
from oaipmh.client import Client
from oaipmh.error import IdDoesNotExistError
from oaipmh.metadata import MetadataRegistry, oai_dc_reader
from regluit.core import bookloader, cc
from regluit.core import models, tasks
from regluit.core import bookloader
from regluit.core.bookloader import add_by_isbn, merge_works
from regluit.core.bookloader import merge_works
from regluit.core.isbn import ISBN
from regluit.core.loaders.utils import type_for_url
from regluit.core.validation import valid_subject
from . import scrape_language
from .doab_utils import doab_lang_to_iso_639_1, online_to_download, url_to_provider
logger = logging.getLogger(__name__)
springercover = re.compile(r'ftp.+springer\.de.+(\d{13}\.jpg)$', flags=re.U)
def unlist(alist):
if not alist:
return None
return alist[0]
SPRINGER_COVER = re.compile(r'ftp.+springer\.de.+(\d{13}\.jpg)$', flags=re.U)
SPRINGER_IMAGE = u'https://images.springer.com/sgw/books/medium/{}.jpg'
def store_doab_cover(doab_id, redo=False):
"""
returns tuple: 1) cover URL, 2) whether newly created (boolean)
"""
cover_file_name= '/doab/%s/cover' % (doab_id)
cover_file_name = '/doab/%s/cover' % (doab_id)
# if we don't want to redo and the cover exists, return the URL of the cover
@ -44,16 +56,16 @@ def store_doab_cover(doab_id, redo=False):
if r.status_code == 302:
redirurl = r.headers['Location']
if redirurl.startswith(u'ftp'):
springerftp = springercover.match(redirurl)
springerftp = SPRINGER_COVER.match(redirurl)
if springerftp:
redirurl = u'https://images.springer.com/sgw/books/medium/{}.jpg'.format(springerftp.groups(1))
redirurl = SPRINGER_IMAGE.format(springerftp.groups(1))
r = requests.get(redirurl)
else:
r = requests.get(url)
cover_file = ContentFile(r.content)
cover_file.content_type = r.headers.get('content-type', '')
path = default_storage.save(cover_file_name, cover_file)
default_storage.save(cover_file_name, cover_file)
return (default_storage.url(cover_file_name), True)
except Exception, e:
# if there is a problem, return None for cover URL
@ -74,8 +86,7 @@ def update_cover_doab(doab_id, edition, store_cover=True):
edition.cover_image = cover_url
edition.save()
return cover_url
else:
return None
return None
def attach_more_doab_metadata(edition, description, subjects,
publication_date, publisher_name=None, language=None, authors=u''):
@ -108,7 +119,7 @@ def attach_more_doab_metadata(edition, description, subjects,
if not work.age_level:
work.age_level = '18-'
if language:
if language and language != 'xx':
work.language = language
work.save()
@ -117,7 +128,7 @@ def attach_more_doab_metadata(edition, description, subjects,
if edition.authors.all().count() < len(authlist):
edition.authors.clear()
if authlist is not None:
for [rel,auth] in authlist:
for [rel, auth] in authlist:
edition.add_author(auth, rel)
return edition
@ -145,9 +156,11 @@ def load_doab_edition(title, doab_id, url, format, rights,
"""
load a record from doabooks.org represented by input parameters and return an ebook
"""
logger.info('load doab {} {} {} {} {}'.format(doab_id, format, rights, language, provider))
if language and isinstance(language, list):
language = language[0]
if language == 'xx' and format == 'online':
language = scrape_language(url)
# check to see whether the Edition hasn't already been loaded first
# search by url
ebooks = models.Ebook.objects.filter(url=url)
@ -168,25 +181,27 @@ def load_doab_edition(title, doab_id, url, format, rights,
raise Exception("There is more than one Ebook matching url {0}".format(url))
elif len(ebooks) == 1:
ebook = ebooks[0]
doab_identifer = models.Identifier.get_or_add(type='doab',value=doab_id,
work=ebook.edition.work)
doab_identifer = models.Identifier.get_or_add(type='doab', value=doab_id,
work=ebook.edition.work)
# update the cover id
cover_url = update_cover_doab(doab_id, ebook.edition)
# attach more metadata
attach_more_doab_metadata(ebook.edition,
description=kwargs.get('description'),
subjects=kwargs.get('subject'),
publication_date=kwargs.get('date'),
publisher_name=kwargs.get('publisher'),
language=language,
authors=kwargs.get('authors'),)
attach_more_doab_metadata(
ebook.edition,
description=unlist(kwargs.get('description')),
subjects=kwargs.get('subject'),
publication_date=unlist(kwargs.get('date')),
publisher_name=unlist(kwargs.get('publisher')),
language=language,
authors=kwargs.get('creator'),
)
# make sure all isbns are added
add_all_isbns(isbns, None, language=language, title=title)
return ebook
add_all_isbns(isbns, ebook.edition.work, language=language, title=title)
return ebook.edition
# remaining case --> no ebook, load record, create ebook if there is one.
assert len(ebooks) == 0
assert not ebooks
# we need to find the right Edition/Work to tie Ebook to...
@ -209,10 +224,10 @@ def load_doab_edition(title, doab_id, url, format, rights,
if edition is not None:
# if this is a new edition, then add related editions asynchronously
if getattr(edition,'new', False):
if getattr(edition, 'new', False):
tasks.populate_edition.delay(edition.isbn_13)
doab_identifer = models.Identifier.get_or_add(type='doab', value=doab_id,
work=edition.work)
work=edition.work)
# we need to create Edition(s) de novo
else:
@ -226,7 +241,7 @@ def load_doab_edition(title, doab_id, url, format, rights,
work = models.Work(language='xx', title=title, age_level='18-')
work.save()
doab_identifer = models.Identifier.get_or_add(type='doab', value=doab_id,
work=work)
work=work)
# if work has any ebooks already, attach the ebook to the corresponding edition
# otherwise pick the first one
@ -245,67 +260,35 @@ def load_doab_edition(title, doab_id, url, format, rights,
work.selected_edition = edition
work.save()
if format in ('pdf', 'epub', 'mobi'):
if format in ('pdf', 'epub', 'mobi', 'html', 'online'):
ebook = models.Ebook()
ebook.format = format
ebook.provider = provider
ebook.url = url
ebook.url = url
ebook.rights = rights
# tie the edition to ebook
ebook.edition = edition
if format == "online":
ebook.active = False
ebook.save()
# update the cover id (could be done separately)
cover_url = update_cover_doab(doab_id, edition)
# attach more metadata
attach_more_doab_metadata(edition,
description=kwargs.get('description'),
subjects=kwargs.get('subject'),
publication_date=kwargs.get('date'),
publisher_name=kwargs.get('publisher'),
authors=kwargs.get('authors'),)
return ebook
attach_more_doab_metadata(
edition,
description=unlist(kwargs.get('description')),
subjects=kwargs.get('subject'),
publication_date=unlist(kwargs.get('date')),
publisher_name=unlist(kwargs.get('publisher')),
authors=kwargs.get('creator'),
)
return edition
def load_doab_records(fname, limit=None):
success_count = 0
ebook_count = 0
records = json.load(open(fname))
for (i, book) in enumerate(islice(records,limit)):
d = dict(book)
d['isbns'] = split_isbns(d['isbns_raw']) # use stricter isbn string parsing.
try:
ebook = load_doab_edition(**d)
success_count += 1
if ebook:
ebook_count +=1
except Exception, e:
logger.error(e)
logger.error(book)
logger.info("Number of records processed: " + str(success_count))
logger.info("Number of ebooks processed: " + str(ebook_count))
"""
#
#tools to parse the author lists in doab.csv
from pandas import DataFrame
url = "http://www.doabooks.org/doab?func=csv"
df_csv = DataFrame.from_csv(url)
out=[]
for val in df_csv.values:
isbn = split_isbns(val[0])
if isbn:
auths = []
if val[2] == val[2] and val[-2] == val[-2]: # test for NaN auths and licenses
auths = creator_list(val[2])
out.append(( isbn[0], auths))
open("/Users/eric/doab_auths.json","w+").write(json.dumps(out,indent=2, separators=(',', ': ')))
"""
#
au = re.compile(r'\(Authors?\)', flags=re.U)
ed = re.compile(r'\([^\)]*(dir.|[Eeé]ds?.|org.|coord.|Editor|a cura di|archivist)[^\)]*\)', flags=re.U)
@ -326,13 +309,13 @@ def fnf(auth):
if len(parts) == 1:
return parts[0].strip()
elif len(parts) == 2:
return u'{} {}'.format(parts[1].strip(),parts[0].strip())
return u'{} {}'.format(parts[1].strip(), parts[0].strip())
else:
if parts[1].strip() in ('der','van', 'von', 'de', 'ter'):
return u'{} {} {}'.format(parts[2].strip(),parts[1].strip(),parts[0].strip())
if parts[1].strip() in ('der', 'van', 'von', 'de', 'ter'):
return u'{} {} {}'.format(parts[2].strip(), parts[1].strip(), parts[0].strip())
#print auth
#print re.search(namelist,auth).group(0)
return u'{} {}, {}'.format(parts[2].strip(),parts[0].strip(),parts[1].strip())
return u'{} {}, {}'.format(parts[2].strip(), parts[0].strip(), parts[1].strip())
def creator(auth, editor=False):
@ -353,64 +336,84 @@ def creator(auth, editor=False):
auth = au.sub('', auth)
return ['aut', fnf(auth)]
def split_auths(auths):
if ';' in auths or '/' in auths:
return namesep2.split(auths)
else:
nl = namelist.match(auths.strip())
if nl:
if nl.group(3).endswith(' de') \
or ' de ' in nl.group(3) \
or nl.group(3).endswith(' da') \
or nl.group(1).endswith(' Jr.') \
or ' e ' in nl.group(1):
return [auths]
else:
return namesep.split(auths)
else :
return [auths]
def split_isbns(isbns):
result = []
for isbn in isbnsep.split(isbns):
isbn = ISBN(isbn)
if isbn.valid:
result.append(isbn.to_string())
return result
def creator_list(creators):
auths = []
if re.search(edlist, creators):
for auth in split_auths(edlist.sub(u'', creators)):
if auth:
auths.append(creator(auth, editor=True))
else:
for auth in split_auths(unicode(creators)):
if auth:
auths.append(creator(auth))
for auth in creators:
auths.append(creator(auth))
return auths
def load_doab_auths(fname, limit=None):
doab_auths = json.load(open(fname))
recnum = 0
failed = 0
for [isbnraw, authlist] in doab_auths:
isbn = ISBN(isbnraw).to_string()
try:
work = models.Identifier.objects.get(type='isbn',value=isbn).work
except models.Identifier.DoesNotExist:
print 'isbn = {} not found'.format(isbnraw)
failed += 1
if work.preferred_edition.authors.all().count() < len(authlist):
work.preferred_edition.authors.clear()
if authlist is None:
print "null authlist; isbn={}".format(isbn)
continue
for [rel,auth] in authlist:
work.preferred_edition.add_author(auth, rel)
recnum +=1
if limit and recnum > limit:
break
logger.info("Number of records processed: " + str(recnum))
logger.info("Number of missing isbns: " + str(failed))
DOAB_OAIURL = 'https://www.doabooks.org/oai'
DOAB_PATT = re.compile(r'[\./]doabooks\.org/doab\?.*rid:(\d{1,8}).*')
mdregistry = MetadataRegistry()
mdregistry.registerReader('oai_dc', oai_dc_reader)
doab_client = Client(DOAB_OAIURL, mdregistry)
def add_by_doab(doab_id, record=None):
try:
record = record if record else doab_client.getRecord(
metadataPrefix='oai_dc',
identifier='oai:doab-books:{}'.format(doab_id)
)
metadata = record[1].getMap()
isbns = []
url = None
for ident in metadata.pop('identifier', []):
if ident.startswith('ISBN: '):
isbn = ISBN(ident[6:])
if isbn.error:
continue
isbn.validate()
isbns.append(isbn.to_string())
elif ident.find('doabooks.org') >= 0:
# should already know the doab_id
continue
else:
url = ident
language = doab_lang_to_iso_639_1(unlist(metadata.pop('language', None)))
urls = online_to_download(url)
edition = None
for dl_url in urls:
format = type_for_url(dl_url)
if 'format' in metadata:
del metadata['format']
edition = load_doab_edition(
unlist(metadata.pop('title', None)),
doab_id,
dl_url,
format,
cc.license_from_cc_url(unlist(metadata.pop('rights', None))),
language,
isbns,
url_to_provider(dl_url) if dl_url else None,
**metadata
)
return edition
except IdDoesNotExistError:
return None
def getdoab(url):
id_match = DOAB_PATT.search(url)
if id_match:
return id_match.group(1)
return False
def load_doab_oai(from_year=2000, limit=100000):
'''
use oai feed to get oai updates
'''
from_ = datetime.datetime(year=from_year, month=1, day=1)
doab_ids = []
for record in doab_client.listRecords(metadataPrefix='oai_dc', from_=from_):
if not record[1]:
continue
idents = record[1].getMap()['identifier']
if idents:
for ident in idents:
doab = getdoab(ident)
if doab:
doab_ids.append(doab)
e = add_by_doab(doab, record=record)
logger.info(u'updated:\t{}\t{}'.format(doab, e.title))
if len(doab_ids) > limit:
break

126
core/loaders/doab_utils.py Normal file
View File

@ -0,0 +1,126 @@
"""
doab_utils.py
"""
import re
import urlparse
import requests
from regluit.utils.lang import get_language_code
from .utils import get_soup
# utility functions for converting lists of individual items into individual items
# let's do a mapping of the DOAB languages into the language codes used
# mostly, we just handle mispellings
# also null -> xx
EXTRA_LANG_MAP = dict([
(u'chinese', 'de'),
(u'deutsch', 'de'),
(u'eng', 'en'),
(u'englilsh', 'en'),
(u'englilsh', 'en'),
(u'englisch', 'en'),
(u'espanol', 'es'),
(u'ger', 'de'),
(u'fra', 'fr'),
(u'fre', 'fr'),
(u'francese', 'fr'),
(u'ita', 'it'),
(u'italiano', 'it'),
(u'norwegian', 'no'),
(u'por', 'pt'),
(u'portugese', 'pt'),
(u'slovene', 'sl'),
(u'spa', 'es'),
(u'spagnolo', 'es'),
])
sep = re.compile(r'[ \-;^,/]+')
def doab_lang_to_iso_639_1(lang):
if lang is None or not lang:
return "xx"
else:
lang = sep.split(lang)[0]
code = get_language_code(lang)
if code:
return code
else:
return EXTRA_LANG_MAP.get(lang.lower(), 'xx')
DOMAIN_TO_PROVIDER = dict([
[u'www.doabooks.org', u'Directory of Open Access Books'],
[u'www.oapen.org', u'OAPEN Library'],
[u'books.openedition.org', u'OpenEdition Books'],
[u'digitalcommons.usu.edu', u'DigitalCommons, Utah State University'],
[u'www.aupress.ca', u'Athabasca University Press'],
[u'dspace.ucalgary.ca', u'Institutional Repository at the University of Calgary'],
[u'www.degruyter.com', u'De Gruyter Online'],
[u'dx.doi.org', u'DOI Resolver'],
[u'www.openbookpublishers.com', u'Open Book Publishers'],
[u'www.adelaide.edu.au', u'University of Adelaide'],
[u'hdl.handle.net', u'Handle Proxy'],
[u'link.springer.com', u'Springer'],
[u'www.bloomsburyacademic.com', u'Bloomsbury Academic'],
[u'www.ledizioni.it', u'Ledizioni'],
[u'ccdigitalpress.org', u'Computers and Composition Digital Press'],
[u'leo.cilea.it', u'LEO '],
[u'www.springerlink.com', u'Springer'],
[u'www.palgraveconnect.com', u'Palgrave Connect'],
[u'www.ubiquitypress.com', u'Ubiquity Press'],
[u'ebooks.iospress.nl', u'IOS Press Ebooks'],
[u'antropologie.zcu.cz', u'AntropoWeb'],
[u'www.unito.it', u"University of Turin"],
[u'leo.cineca.it', u'Letteratura Elettronica Online'],
[u'hw.oeaw.ac.at', u'Austrian Academy of Sciences'],
[u'www.co-action.net', u'Co-Action Publishing'],
[u'www.aliprandi.org', u'Simone Aliprandi'],
[u'www.maestrantonella.it', u'maestrantonella.it'],
[u'www.antilia.to.it', u'antilia.to.it'],
[u'www.scribd.com', u'Scribd'],
[u'ledibooks.com', u'LediBooks'],
[u'press.openedition.org', u'OpenEdition Press'],
[u'oapen.org', u'OAPEN Library'],
[u'www.ebooks.iospress.nl', u'IOS Press Ebooks'],
[u'windsor.scholarsportal.info', u'Scholars Portal'],
[u'www.unimib.it', u'University of Milano-Bicocca'],
[u'books.mdpi.com', u'MDPI Books'],
[u'www.dropbox.com', u'Dropbox'],
[u'dl.dropboxusercontent.com', u'Dropbox'],
])
def url_to_provider(url):
netloc = urlparse.urlparse(url).netloc
return DOMAIN_TO_PROVIDER.get(netloc, netloc)
FRONTIERSIN = re.compile(r'frontiersin.org/books/[^/]+/(\d+)')
def online_to_download(url):
urls = []
if url.find(u'mdpi.com/books/pdfview/book/') >= 0:
doc = get_soup(url)
if doc:
obj = doc.find('object', type='application/pdf')
if obj:
urls.append(obj['data'].split('#')[0])
elif url.find(u'books.scielo.org/') >= 0:
doc = get_soup(url)
if doc:
obj = doc.find('a', class_='pdf_file')
if obj:
urls.append(urlparse.urljoin(url, obj['href']))
obj = doc.find('a', class_='epub_file')
if obj:
urls.append(urlparse.urljoin(url, obj['href']))
elif FRONTIERSIN.search(url):
booknum = FRONTIERSIN.search(url).group(1)
urls.append(u'https://www.frontiersin.org/GetFile.aspx?ebook={}&fileformat=EPUB'.format(booknum))
urls.append(u'https://www.frontiersin.org/GetFile.aspx?ebook={}&fileformat=PDF'.format(booknum))
else:
urls.append(url)
return urls

28
core/loaders/tests.py Normal file
View File

@ -0,0 +1,28 @@
from django.conf import settings
from django.test import TestCase
from regluit.core.models import Ebook, Edition, Work
from .utils import dl_online
class LoaderTests(TestCase):
def setUp(self):
pass
def test_downloads(self):
if not (settings.TEST_INTEGRATION):
return
work = Work(title="online work")
work.save()
edition = Edition(work=work)
edition.save()
dropbox_url = 'https://www.dropbox.com/s/h5jzpb4vknk8n7w/Jakobsson_The_Troll_Inside_You_EBook.pdf?dl=0'
dropbox_ebook = Ebook.objects.create(format='online', url=dropbox_url, edition=edition)
dropbox_ebf = dl_online(dropbox_ebook)
self.assertTrue(dropbox_ebf.ebook.filesize)
jbe_url = 'http://www.jbe-platform.com/content/books/9789027295958'
jbe_ebook = Ebook.objects.create(format='online', url=jbe_url, edition=edition)
jbe_ebf = dl_online(jbe_ebook)
self.assertTrue(jbe_ebf.ebook.filesize)

View File

@ -1,15 +1,24 @@
import csv
import re
import requests
import logging
import re
import sys
import time
import unicodedata
import urlparse
from bs4 import BeautifulSoup
import requests
from django.conf import settings
from django.core.files.base import ContentFile
from regluit.core.models import Work, Edition, Author, PublisherName, Identifier, Subject
from regluit.core.isbn import ISBN
from regluit.core.bookloader import add_by_isbn_from_google, merge_works
from regluit.api.crosswalks import inv_relator_contrib
from regluit.bisac.models import BisacHeading
from regluit.core.bookloader import add_by_isbn_from_google, merge_works
from regluit.core.isbn import ISBN
from regluit.core.models import (
Author, Ebook, EbookFile, Edition, Identifier, path_for_file, PublisherName, Subject, Work,
)
logger = logging.getLogger(__name__)
@ -34,6 +43,12 @@ def utf8_general_ci_norm(s):
s1 = unicodedata.normalize('NFD', s)
return ''.join(c for c in s1 if not unicodedata.combining(c)).upper()
def get_soup(url):
response = requests.get(url, headers={"User-Agent": settings.USER_AGENT})
if response.status_code == 200:
return BeautifulSoup(response.content, 'lxml')
return None
def get_authors(book):
authors=[]
if book.get('AuthorsList',''):
@ -331,14 +346,15 @@ def loaded_book_ok(book, work, edition):
return True
ID_URLPATTERNS = {
'goog': re.compile(r'[\./]google\.com/books\?.*id=([a-zA-Z0-9\-_]{12})'),
'olwk': re.compile(r'[\./]openlibrary\.org(/works/OL\d{1,8}W)'),
'gdrd': re.compile(r'[\./]goodreads\.com/book/show/(\d{1,8})'),
'ltwk': re.compile(r'[\./]librarything\.com/work/(\d{1,8})'),
'oclc': re.compile(r'\.worldcat\.org/.*oclc/(\d{8,12})'),
'doi': re.compile(r'[\./]doi\.org/(10\.\d+/\S+)'),
'gtbg': re.compile(r'[\./]gutenberg\.org/ebooks/(\d{1,6})'),
'glue': re.compile(r'[\./]unglue\.it/work/(\d{1,7})'),
'goog': re.compile(r'[\./]google\.com/books\?.*id=(?P<id>[a-zA-Z0-9\-_]{12})'),
'olwk': re.compile(r'[\./]openlibrary\.org(?P<id>/works/OL\d{1,8}W)'),
'doab': re.compile(r'([\./]doabooks\.org/doab\?.*rid:|=oai:doab-books:)(?P<id>\d{1,8})'),
'gdrd': re.compile(r'[\./]goodreads\.com/book/show/(?P<id>\d{1,8})'),
'ltwk': re.compile(r'[\./]librarything\.com/work/(?P<id>\d{1,8})'),
'oclc': re.compile(r'\.worldcat\.org/.*oclc/(?P<id>\d{8,12})'),
'doi': re.compile(r'[\./]doi\.org/(?P<id>10\.\d+/\S+)'),
'gtbg': re.compile(r'[\./]gutenberg\.org/ebooks/(?P<id>\d{1,6})'),
'glue': re.compile(r'[\./]unglue\.it/work/(?P<id>\d{1,7})'),
}
def ids_from_urls(url):
@ -346,7 +362,111 @@ def ids_from_urls(url):
for ident in ID_URLPATTERNS.keys():
id_match = ID_URLPATTERNS[ident].search(url)
if id_match:
ids[ident] = id_match.group(1)
ids[ident] = id_match.group('id')
return ids
DROPBOX_DL = re.compile(r'"(https://dl.dropboxusercontent.com/content_link/[^"]+)"')
def dl_online(ebook):
if ebook.format != 'online':
return
if ebook.url.find(u'dropbox.com/s/') >= 0:
response = requests.get(ebook.url, headers={"User-Agent": settings.USER_AGENT})
if response.status_code == 200:
match_dl = DROPBOX_DL.search(response.content)
if match_dl:
return make_dl_ebook(match_dl.group(1), ebook)
elif ebook.url.find(u'jbe-platform.com/content/books/') >= 0:
doc = get_soup(ebook.url)
if doc:
obj = doc.select_one('div.fulltexticoncontainer-PDF a')
if obj:
dl_url = urlparse.urljoin(ebook.url, obj['href'])
return make_dl_ebook(dl_url, ebook)
def make_dl_ebook(url, ebook):
if EbookFile.objects.filter(source=ebook.url):
return EbookFile.objects.filter(source=ebook.url)[0]
response = requests.get(url, headers={"User-Agent": settings.USER_AGENT})
if response.status_code == 200:
filesize = int(response.headers.get("Content-Length", 0))
filesize = filesize if filesize else None
format = type_for_url(url, content_type=response.headers.get('content-type'))
if format != 'online':
new_ebf = EbookFile.objects.create(
edition=ebook.edition,
format=format,
source=ebook.url,
)
new_ebf.file.save(path_for_file(new_ebf, None), ContentFile(response.content))
new_ebf.save()
new_ebook = Ebook.objects.create(
edition=ebook.edition,
format=format,
provider='Unglue.it',
url=new_ebf.file.url,
rights=ebook.rights,
filesize=filesize,
version_label=ebook.version_label,
version_iter=ebook.version_iter,
)
new_ebf.ebook = new_ebook
new_ebf.save()
return new_ebf
def type_for_url(url, content_type=None):
if not url:
return ''
if url.find('books.openedition.org') >= 0:
return ('online')
ct = content_type if content_type else contenttyper.calc_type(url)
if re.search("pdf", ct):
return "pdf"
elif re.search("octet-stream", ct) and re.search("pdf", url, flags=re.I):
return "pdf"
elif re.search("octet-stream", ct) and re.search("epub", url, flags=re.I):
return "epub"
elif re.search("text/plain", ct):
return "text"
elif re.search("text/html", ct):
if url.find('oapen.org/view') >= 0:
return "html"
return "online"
elif re.search("epub", ct):
return "epub"
elif re.search("mobi", ct):
return "mobi"
return "other"
class ContentTyper(object):
""" """
def __init__(self):
self.last_call = dict()
def content_type(self, url):
try:
r = requests.head(url)
return r.headers.get('content-type')
except:
return None
def calc_type(self, url):
delay = 1
# is there a delay associated with the url
netloc = urlparse.urlparse(url).netloc
# wait if necessary
last_call = self.last_call.get(netloc)
if last_call is not None:
now = time.time()
min_time_next_call = last_call + delay
if min_time_next_call > now:
time.sleep(min_time_next_call-now)
self.last_call[netloc] = time.time()
# compute the content-type
return self.content_type(url)
contenttyper = ContentTyper()

View File

@ -1,17 +0,0 @@
import os
from django.conf import settings
from django.contrib.auth.models import User
from django.core.management.base import BaseCommand
from regluit.core.loaders import doab
class Command(BaseCommand):
help = "load doab books"
args = "<limit> <file_name>"
def handle(self, limit=None, file_name="../../../bookdata/doab.json", **options):
command_dir = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(command_dir, file_name)
doab.load_doab_records(file_path, limit=int(limit))

View File

@ -0,0 +1,21 @@
from django.core.management.base import BaseCommand
from regluit.core.loaders.utils import dl_online
from regluit.core.models import Ebook
class Command(BaseCommand):
help = "harvest downloadable ebooks from 'online' ebooks"
args = "<limit>"
def handle(self, limit=0, **options):
limit = int(limit) if limit else 0
onlines = Ebook.objects.filter(format='online')
done = 0
for online in onlines:
new_ebf = dl_online(online)
if new_ebf:
done += 1
if done > limit:
break
print 'harvested {} ebooks'.format(done)

View File

@ -0,0 +1,10 @@
from django.core.management.base import BaseCommand
from regluit.core.loaders import doab
class Command(BaseCommand):
help = "load doab books by doab_id via oai"
args = "<doab_id>"
def handle(self, doab_id, **options):
doab.add_by_doab(doab_id)

View File

@ -0,0 +1,18 @@
from django.core.management.base import BaseCommand
from regluit.core.loaders import doab
class Command(BaseCommand):
help = "load doab books via oai"
args = "<from_year> <limit>"
def handle(self, from_year= None, limit=None, **options):
from_year = int(from_year) if from_year else None
limit = int(limit) if limit else None
if limit:
doab.load_doab_oai(from_year=from_year, limit=limit)
else:
if from_year:
doab.load_doab_oai(from_year=from_year)
else:
doab.load_doab_oai()

View File

@ -1083,7 +1083,7 @@ class EbookFile(models.Model):
source=self.file.url
)
new_mobi_ebf.file.save(path_for_file('ebf', None), mobi_cf)
new_mobi_ebf.file.save(path_for_file(new_mobi_ebf, None), mobi_cf)
new_mobi_ebf.save()
if self.ebook:
new_ebook = Ebook.objects.create(

View File

@ -1,25 +1,21 @@
# encoding: utf-8
"""
external library imports
"""
#external library imports
import os
from datetime import datetime, timedelta
from decimal import Decimal as D
from math import factorial
from time import sleep, mktime
from urlparse import parse_qs, urlparse
from tempfile import NamedTemporaryFile
from celery.task import chord
from celery.task.sets import TaskSet
import requests
import requests_mock
import os
"""
django imports
"""
#django imports
from django.conf import settings
from django.contrib.auth.models import User
from django_comments.models import Comment
from django.contrib.contenttypes.models import ContentType
from django.contrib.sites.models import Site
from django.core.files import File as DjangoFile
@ -31,9 +27,10 @@ from django.test.client import Client
from django.test.utils import override_settings
from django.utils import unittest
"""
regluit imports
"""
from django_comments.models import Comment
#regluit imports
from regluit.core import (
isbn,
bookloader,
@ -56,7 +53,6 @@ from regluit.core.models import (
Premium,
Subject,
Publisher,
PublisherName,
Offer,
EbookFile,
Acq,
@ -72,14 +68,14 @@ from regluit.payment.parameters import PAYMENT_TYPE_AUTHORIZATION
from regluit.utils.localdatetime import now, date_today
from regluit.pyepub import EPUB
from .epub import test_epub
from .pdf import ask_pdf, test_pdf
from .pdf import test_pdf
TESTDIR = os.path.join(os.path.dirname(__file__), '../test/')
YAML_VERSIONFILE = os.path.join(TESTDIR, 'versiontest.yaml')
YAML_HUCKFILE = os.path.join(TESTDIR, 'raw/master/metadata.yaml')
class BookLoaderTests(TestCase):
fixtures = ['initial_data.json','bookloader.json']
fixtures = ['initial_data.json', 'bookloader.json']
def setUp(self):
self.user = User.objects.create_user('core_test', 'test@example.org', 'core_test')
@ -90,23 +86,27 @@ class BookLoaderTests(TestCase):
noebook_id = bookloader.load_from_yaml(YAML_VERSIONFILE)
noebook = models.Work.objects.get(id=noebook_id)
self.assertEqual( noebook.first_ebook(), None)
self.assertEqual(noebook.first_ebook(), None)
huck_id = bookloader.load_from_yaml(YAML_HUCKFILE, test_mode=True)
huck = models.Work.objects.get(id=huck_id)
self.assertTrue( huck.ebooks().count()>1)
self.assertTrue(huck.ebooks().count() > 1)
def test_add_by_yaml(self):
space_id = bookloader.load_from_yaml('https://github.com/gitenberg-dev/metadata/raw/master/samples/pandata.yaml')
huck_id = bookloader.load_from_yaml('https://github.com/GITenberg/Adventures-of-Huckleberry-Finn_76/raw/master/metadata.yaml')
space_id = bookloader.load_from_yaml(
'https://github.com/gitenberg-dev/metadata/raw/master/samples/pandata.yaml'
)
huck_id = bookloader.load_from_yaml(
'https://github.com/GITenberg/Adventures-of-Huckleberry-Finn_76/raw/master/metadata.yaml'
)
space = models.Work.objects.get(id=space_id)
huck = models.Work.objects.get(id=huck_id)
#test ebook archiving
num_ebf= EbookFile.objects.all().count()
num_ebf = EbookFile.objects.all().count()
for ebook in huck.ebooks().all():
f = ebook.get_archive()
self.assertTrue(EbookFile.objects.all().count()>num_ebf)
self.assertTrue(EbookFile.objects.all().count() > num_ebf)
def test_add_by_isbn_mock(self):
with requests_mock.Mocker(real_http=True) as m:
@ -175,7 +175,7 @@ class BookLoaderTests(TestCase):
return
w = models.Work(title='silly title', language='xx')
w.save()
e = models.Edition(title=w.title,work=w)
e = models.Edition(title=w.title, work=w)
e.save()
models.Identifier(type='isbn', value='9781449319793', work=w, edition=e).save()
bookloader.update_edition(e)
@ -211,10 +211,11 @@ class BookLoaderTests(TestCase):
def test_add_related(self):
# add one edition
edition = bookloader.add_by_isbn('0441007465') #Neuromancer; editions in fixture but not joined
#Neuromancer; editions in fixture not joined
edition = bookloader.add_by_isbn('0441007465')
edbefore = models.Edition.objects.count()
before = models.Work.objects.count()
lang=edition.work.language
lang = edition.work.language
langbefore = models.Work.objects.filter(language=lang).count()
# ask for related editions to be added using the work we just created
with requests_mock.Mocker(real_http=True) as m:
@ -252,12 +253,13 @@ class BookLoaderTests(TestCase):
def test_merge_works_mechanics(self):
"""Make sure then merge_works is still okay when we try to merge works with themselves and with deleted works"""
"""Make sure then merge_works is still okay when we try to merge
works with themselves and with deleted works"""
before = models.Work.objects.count()
wasbefore = models.WasWork.objects.count()
sub1= Subject(name='test1')
sub1 = Subject(name='test1')
sub1.save()
sub2= Subject(name='test2')
sub2 = Subject(name='test2')
sub2.save()
w1 = Work(title="Work 1")
w1.save()
@ -265,7 +267,7 @@ class BookLoaderTests(TestCase):
w2 = Work(title="Work 2")
w2.save()
w2.subjects.add(sub1,sub2)
w2.subjects.add(sub1, sub2)
e1 = Edition(work=w1)
e1.save()
@ -273,7 +275,7 @@ class BookLoaderTests(TestCase):
e2 = Edition(work=w2)
e2.save()
eb1 = Ebook(edition = e2)
eb1 = Ebook(edition=e2)
eb1.save()
e2a = Edition(work=w2)
@ -293,7 +295,7 @@ class BookLoaderTests(TestCase):
w2_id = w2.id
# first try to merge work 1 into itself -- should not do anything
bookloader.merge_works(w1,w1)
bookloader.merge_works(w1, w1)
self.assertEqual(models.Work.objects.count(), before + 2)
# merge the second work into the first
@ -319,11 +321,11 @@ class BookLoaderTests(TestCase):
self.assertEqual(r.status_code, 200)
# if the work has a selected edition, then don't touch the work.
w3= Work(title='work 3')
e_pref= Edition(work=w1)
w1.selected_edition=e_pref
w3 = Work(title='work 3')
e_pref = Edition(work=w1)
w1.selected_edition = e_pref
bookloader.merge_works(w3, w1)
self.assertTrue(w1.title=='Work 1')
self.assertTrue(w1.title == 'Work 1')
def test_merge_works(self):
before = models.Work.objects.count()
@ -398,8 +400,8 @@ class BookLoaderTests(TestCase):
w3 = models.Edition.get_by_isbn(isbn1).work
# and that relevant Campaigns and Wishlists are updated
c1=Campaign.objects.get(pk=c1.pk)
c2=Campaign.objects.get(pk=c2.pk)
c1 = Campaign.objects.get(pk=c1.pk)
c2 = Campaign.objects.get(pk=c2.pk)
self.assertEqual(c1.work, c2.work)
self.assertEqual(user.wishlist.works.all().count(), 1)
@ -417,20 +419,19 @@ class BookLoaderTests(TestCase):
with open(os.path.join(TESTDIR, 'gb_latinlanguage.json')) as gb:
m.get('https://www.googleapis.com/books/v1/volumes', content=gb.read())
edition = bookloader.add_by_oclc('1246014')
# we've seen the public domain status of this book fluctuate -- and the OCLC number can disappear. So if the ebook count is 2 then test
# we've seen the public domain status of this book fluctuate -- and the OCLC
# number can disappear. So if the ebook count is 2 then test
#if edition is not None and edition.ebooks.count() == 2:
self.assertEqual(edition.ebooks.count(), 2)
#ebook_epub = edition.ebooks.all()[0]
ebook_epub = edition.ebooks.filter(format='epub')[0]
self.assertEqual(ebook_epub.format, 'epub')
#self.assertEqual(ebook_epub.url, 'http://books.google.com/books/download/The_Latin_language.epub?id=N1RfAAAAMAAJ&ie=ISO-8859-1&output=epub&source=gbs_api')
self.assertEqual(parse_qs(urlparse(ebook_epub.url).query).get("id"), ['N1RfAAAAMAAJ'])
self.assertEqual(parse_qs(urlparse(ebook_epub.url).query).get("output"), ['epub'])
self.assertEqual(ebook_epub.provider, 'Google Books')
self.assertEqual(ebook_epub.set_provider(), 'Google Books')
ebook_pdf = edition.ebooks.filter(format='pdf')[0]
self.assertEqual(ebook_pdf.format, 'pdf')
#self.assertEqual(ebook_pdf.url, 'http://books.google.com/books/download/The_Latin_language.pdf?id=N1RfAAAAMAAJ&ie=ISO-8859-1&output=pdf&sig=ACfU3U2yLt3nmTncB8ozxOWUc4iHKUznCA&source=gbs_api')
self.assertEqual(parse_qs(urlparse(ebook_pdf.url).query).get("id"), ['N1RfAAAAMAAJ'])
self.assertEqual(parse_qs(urlparse(ebook_pdf.url).query).get("output"), ['pdf'])
self.assertEqual(ebook_pdf.provider, 'Google Books')
@ -441,12 +442,12 @@ class BookLoaderTests(TestCase):
self.assertEqual(w.first_epub_url(), ebook_epub.url)
self.assertEqual(w.first_pdf_url(), ebook_pdf.url)
ebook_pdf.url='https://en.wikisource.org/wiki/Frankenstein'
ebook_pdf.url = 'https://en.wikisource.org/wiki/Frankenstein'
self.assertEqual(ebook_pdf.set_provider(), 'Wikisource')
self.user.wishlist.add_work(w, 'test')
tasks.report_new_ebooks(date_today())
r = self.client.get("/notification/" )
r = self.client.get("/notification/")
self.assertEqual(r.status_code, 200)
ebook_pdf.increment()
@ -471,9 +472,9 @@ class BookLoaderTests(TestCase):
subjects = [s.name for s in work.subjects.all()]
self.assertTrue(len(subjects) > 10)
self.assertTrue('Science fiction' in subjects)
self.assertTrue('/works/OL27258W' in work.identifiers.filter(type='olwk').values_list('value',flat=True) )
self.assertTrue('888628' in work.identifiers.filter(type='gdrd').values_list('value',flat=True))
self.assertTrue('609' in work.identifiers.filter(type='ltwk').values_list('value',flat=True))
self.assertTrue('/works/OL27258W' in work.identifiers.filter(type='olwk').values_list('value', flat=True))
self.assertTrue('888628' in work.identifiers.filter(type='gdrd').values_list('value', flat=True))
self.assertTrue('609' in work.identifiers.filter(type='ltwk').values_list('value', flat=True))
def test_unicode_openlibrary(self):
with requests_mock.Mocker(real_http=True) as m:
@ -493,10 +494,21 @@ class BookLoaderTests(TestCase):
license = 'https://www.gutenberg.org/license'
lang = 'en'
format = 'epub'
publication_date = datetime(2001,7,1)
seed_isbn = '9780142000083' # https://www.amazon.com/Moby-Dick-Whale-Penguin-Classics-Deluxe/dp/0142000086
publication_date = datetime(2001, 7, 1)
# https://www.amazon.com/Moby-Dick-Whale-Penguin-Classics-Deluxe/dp/0142000086
seed_isbn = '9780142000083'
ebook = bookloader.load_gutenberg_edition(title, gutenberg_etext_id, ol_work_id, seed_isbn, epub_url, format, license, lang, publication_date)
ebook = bookloader.load_gutenberg_edition(
title,
gutenberg_etext_id,
ol_work_id,
seed_isbn,
epub_url,
format,
license,
lang,
publication_date
)
self.assertEqual(ebook.url, epub_url)
def tearDown(self):
@ -506,8 +518,13 @@ class BookLoaderTests(TestCase):
class SearchTests(TestCase):
def test_search_mock(self):
with requests_mock.Mocker(real_http=True) as m:
with open(os.path.join(TESTDIR, 'gb_melville.json')) as gb, open(os.path.join(TESTDIR, 'gb_melville2.json')) as gb2:
m.get('https://www.googleapis.com/books/v1/volumes', [{'content':gb2.read()}, {'content':gb.read()}])
with open(
os.path.join(TESTDIR, 'gb_melville.json')
) as gb, open(os.path.join(TESTDIR, 'gb_melville2.json')) as gb2:
m.get(
'https://www.googleapis.com/books/v1/volumes',
[{'content':gb2.read()}, {'content':gb.read()}]
)
self.test_pagination(mocking=True)
self.test_basic_search(mocking=True)
self.test_googlebooks_search(mocking=True)
@ -523,7 +540,10 @@ class SearchTests(TestCase):
self.assertTrue(r.has_key('author'))
self.assertTrue(r.has_key('description'))
self.assertTrue(r.has_key('cover_image_thumbnail'))
self.assertTrue(r['cover_image_thumbnail'].startswith('https') or r['cover_image_thumbnail'].startswith('http'))
self.assertTrue(
r['cover_image_thumbnail'].startswith('https')
or r['cover_image_thumbnail'].startswith('http')
)
self.assertTrue(r.has_key('publisher'))
self.assertTrue(r.has_key('isbn_13'))
self.assertTrue(r.has_key('googlebooks_id'))
@ -556,19 +576,19 @@ class CampaignTests(TestCase):
work=w, type=2,
cc_date_initial=datetime(this_year + 100, 1, 1),
)
self.assertTrue(c.set_dollar_per_day()<0.34)
self.assertTrue(c.dollar_per_day>0.31)
t = Transaction(type=1, campaign=c, approved=True, amount= D(6000.1), status="Complete")
self.assertTrue(c.set_dollar_per_day() < 0.34)
self.assertTrue(c.dollar_per_day > 0.31)
t = Transaction(type=1, campaign=c, approved=True, amount=D(6000.1), status="Complete")
t.save()
c.status = 'ACTIVE'
c.save()
c.update_left()
#print(w.percent_of_goal())
self.assertEqual(w.percent_unglued(),3)
self.assertTrue(w.percent_of_goal()>49)
ofr = Offer.objects.create(work=w,price=D(10),active=True)
self.assertTrue(c.days_per_copy <D(32.26))
self.assertTrue(c.days_per_copy >D(29.41))
self.assertEqual(w.percent_unglued(), 3)
self.assertTrue(w.percent_of_goal() > 49)
ofr = Offer.objects.create(work=w, price=D(10), active=True)
self.assertTrue(c.days_per_copy < D(32.26))
self.assertTrue(c.days_per_copy > D(29.41))
def test_required_fields(self):
# a campaign must have a target, deadline and a work
@ -601,20 +621,25 @@ class CampaignTests(TestCase):
w2 = Work()
w2.save()
# INITIALIZED
c1 = Campaign(target=D('1000.00'),deadline=Campaign.latest_ending(),work=w)
c1 = Campaign(target=D('1000.00'), deadline=Campaign.latest_ending(), work=w)
c1.save()
self.assertEqual(c1.status, 'INITIALIZED')
# ACTIVATED
c2 = Campaign(target=D('1000.00'),deadline=datetime(2013,1,1),work=w,description='dummy description')
c2 = Campaign(
target=D('1000.00'),
deadline=datetime(2013, 1, 1),
work=w,
description='dummy description'
)
c2.save()
self.assertEqual(c2.status, 'INITIALIZED')
u = User.objects.create_user('claimer', 'claimer@example.org', 'claimer')
u.save()
rh = RightsHolder(owner = u, rights_holder_name = 'rights holder name')
rh = RightsHolder(owner=u, rights_holder_name='rights holder name')
rh.save()
cl = Claim(rights_holder = rh, work = w, user = u, status = 'active')
cl = Claim(rights_holder=rh, work=w, user=u, status='active')
cl.save()
cl2 = Claim(rights_holder = rh, work = w2, user = u, status = 'active')
cl2 = Claim(rights_holder=rh, work=w2, user=u, status='active')
cl2.save()
c2.activate()
self.assertEqual(c2.status, 'ACTIVE')
@ -624,31 +649,42 @@ class CampaignTests(TestCase):
# RESUMING
c2.resume(reason="for testing")
#self.assertEqual(c2.suspended, None)
self.assertEqual(c2.status,'ACTIVE')
self.assertEqual(c2.status, 'ACTIVE')
# should not let me suspend a campaign that hasn't been initialized
self.assertRaises(UnglueitError, c1.suspend, "for testing")
# UNSUCCESSFUL
c3 = Campaign(target=D('1000.00'),deadline=now() - timedelta(days=1),work=w2,description='dummy description')
c3 = Campaign(
target=D('1000.00'),
deadline=now() - timedelta(days=1),
work=w2,
description='dummy description'
)
c3.save()
c3.activate()
self.assertEqual(c3.status, 'ACTIVE')
# at this point, since the deadline has passed, the status should change and be UNSUCCESSFUL
# at this point, since the deadline has passed,
# the status should change and be UNSUCCESSFUL
self.assertTrue(c3.update_status())
self.assertEqual(c3.status, 'UNSUCCESSFUL')
# premiums
pr1= Premium(type='CU', campaign=c3, amount=10, description='botsnack', limit=1)
pr1 = Premium(type='CU', campaign=c3, amount=10, description='botsnack', limit=1)
pr1.save()
self.assertEqual(pr1.premium_remaining,1)
self.assertEqual(pr1.premium_remaining, 1)
#cloning (note we changed c3 to w2 to make it clonable)
c7= c3.clone()
c7 = c3.clone()
self.assertEqual(c7.status, 'INITIALIZED')
self.assertEqual(c7.premiums.all()[0].description , 'botsnack')
self.assertEqual(c7.premiums.all()[0].description, 'botsnack')
# SUCCESSFUL
c4 = Campaign(target=D('1000.00'),deadline=now() - timedelta(days=1),work=w,description='dummy description')
c4 = Campaign(
target=D('1000.00'),
deadline=now() - timedelta(days=1),
work=w,
description='dummy description'
)
c4.save()
c4.activate()
t = Transaction()
@ -663,7 +699,12 @@ class CampaignTests(TestCase):
self.assertEqual(c4.status, 'SUCCESSFUL')
# WITHDRAWN
c5 = Campaign(target=D('1000.00'),deadline=datetime(2013,1,1),work=w,description='dummy description')
c5 = Campaign(
target=D('1000.00'),
deadline=datetime(2013, 1, 1),
work=w,
description='dummy description'
)
c5.save()
c5.activate().withdraw('testing')
self.assertEqual(c5.status, 'WITHDRAWN')
@ -671,9 +712,14 @@ class CampaignTests(TestCase):
# testing percent-of-goal
w2 = Work()
w2.save()
c6 = Campaign(target=D('1000.00'),deadline=now() + timedelta(days=1),work=w2,description='dummy description')
c6 = Campaign(
target=D('1000.00'),
deadline=now() + timedelta(days=1),
work=w2,
description='dummy description'
)
c6.save()
cl = Claim(rights_holder = rh, work = w2, user = u, status = 'active')
cl = Claim(rights_holder=rh, work=w2, user=u, status='active')
cl.save()
c6.activate()
t = Transaction()
@ -687,7 +733,7 @@ class CampaignTests(TestCase):
self.assertEqual(w2.percent_of_goal(), 23)
self.assertEqual(c1.launchable, False)
c1.description="description"
c1.description = "description"
self.assertEqual(c1.launchable, True)
c1.work.create_offers()
self.assertEqual(c1.work.offers.count(), 2)
@ -695,14 +741,14 @@ class CampaignTests(TestCase):
c1.type = 2
c1.save()
self.assertEqual(c1.launchable, False)
of1=c1.work.offers.get(license=2)
of1.price=D(2)
of1.active=True
of1 = c1.work.offers.get(license=2)
of1.price = D(2)
of1.active = True
of1.save()
self.assertEqual(c1.launchable, False)
e1= models.Edition(title="title",work=c1.work)
e1 = models.Edition(title="title", work=c1.work)
e1.save()
ebf1= models.EbookFile(edition=e1, format=1)
ebf1 = models.EbookFile(edition=e1, format=1)
ebf1.save()
c1.set_cc_date_initial()
self.assertEqual(c1.cc_date, settings.MAX_CC_DATE)
@ -717,7 +763,7 @@ class WishlistTest(TestCase):
user = User.objects.create_user('test', 'test@example.org', 'testpass')
edition = bookloader.add_by_isbn('0441007465')
work = edition.work
num_wishes=work.num_wishes
num_wishes = work.num_wishes
user.wishlist.add_work(work, 'test')
self.assertEqual(user.wishlist.works.count(), 1)
self.assertEqual(work.num_wishes, num_wishes+1)
@ -732,7 +778,7 @@ class CeleryTaskTest(TestCase):
n = 10
task = tasks.fac.delay(n)
result = task.get(timeout=10)
self.assertEqual(result,factorial(n))
self.assertEqual(result, factorial(n))
def test_subtask(self):
n = 30
@ -741,7 +787,7 @@ class CeleryTaskTest(TestCase):
result = job.apply_async()
while not result.ready():
sleep(0.2)
self.assertEqual(result.join(),[factorial(x) for x in range(n)])
self.assertEqual(result.join(), [factorial(x) for x in range(n)])
class GoodreadsTest(TestCase):
@ -751,7 +797,10 @@ class GoodreadsTest(TestCase):
return
# test to see whether the core undeletable shelves are on the list
gr_uid = "767708" # for Raymond Yee
gc = goodreads.GoodreadsClient(key=settings.GOODREADS_API_KEY, secret=settings.GOODREADS_API_SECRET)
gc = goodreads.GoodreadsClient(
key=settings.GOODREADS_API_KEY,
secret=settings.GOODREADS_API_SECRET
)
shelves = gc.shelves_list(gr_uid)
shelf_names = [s['name'] for s in shelves['user_shelves']]
self.assertTrue('currently-reading' in shelf_names)
@ -763,7 +812,10 @@ class GoodreadsTest(TestCase):
if not settings.GOODREADS_API_SECRET:
return
gr_uid = "767708" # for Raymond Yee
gc = goodreads.GoodreadsClient(key=settings.GOODREADS_API_KEY, secret=settings.GOODREADS_API_SECRET)
gc = goodreads.GoodreadsClient(
key=settings.GOODREADS_API_KEY,
secret=settings.GOODREADS_API_SECRET
)
reviews = gc.review_list_unauth(user_id=gr_uid, shelf='read')
# test to see whether there is a book field in each of the review
# url for test is https://www.goodreads.com/review/list.xml?id=767708&shelf=read&page=1&per_page=20&order=a&v=2&key=[key]
@ -776,7 +828,7 @@ class LibraryThingTest(TestCase):
lt_username = 'yujx'
lt = librarything.LibraryThing(username=lt_username)
books = list(lt.parse_user_catalog(view_style=5))
self.assertEqual(len(books),1)
self.assertEqual(len(books), 1)
self.assertEqual(books[0]['isbn'], '0471925675')
self.assertEqual(books[0]['work_id'], '80826')
self.assertEqual(books[0]['book_id'], '79883733')
@ -812,19 +864,19 @@ class ISBNTest(TestCase):
self.assertEqual(isbn_python_13.to_string(), bookloader.valid_isbn(python_10_wrong))
# do conversion -- first the outside methods
self.assertEqual(isbn.convert_10_to_13(isbn.strip(python_10)),isbn.strip(python_13))
self.assertEqual(isbn.convert_10_to_13(isbn.strip(python_10)),isbn.strip(python_13))
self.assertEqual(isbn.convert_13_to_10('xxxxxxxxxxxxx'),None)
self.assertEqual(isbn.convert_10_to_13('xxxxxxxxxx'),None)
self.assertEqual(isbn.convert_10_to_13(isbn.strip(python_10)), isbn.strip(python_13))
self.assertEqual(isbn.convert_10_to_13(isbn.strip(python_10)), isbn.strip(python_13))
self.assertEqual(isbn.convert_13_to_10('xxxxxxxxxxxxx'), None)
self.assertEqual(isbn.convert_10_to_13('xxxxxxxxxx'), None)
self.assertEqual(None, bookloader.valid_isbn('xxxxxxxxxxxxx'))
self.assertEqual(None, bookloader.valid_isbn('xxxxxxxxxx'))
# check formatting
self.assertEqual(isbn.ISBN(python_13).to_string(type='13'), '9780672329784')
self.assertEqual(isbn.ISBN(python_13).to_string('13',True), '978-0-672-32978-4')
self.assertEqual(isbn.ISBN(python_13).to_string('13', True), '978-0-672-32978-4')
self.assertEqual(isbn.ISBN(python_13).to_string(type='10'), '0672329786')
self.assertEqual(isbn.ISBN(python_10).to_string(type='13'), '9780672329784')
self.assertEqual(isbn.ISBN(python_10).to_string(10,True), '0-672-32978-6')
self.assertEqual(isbn.ISBN(python_10).to_string(10, True), '0-672-32978-6')
# complain if one tries to get ISBN-10 for a 979 ISBN 13
# making up a 979 ISBN
@ -844,9 +896,12 @@ class ISBNTest(TestCase):
self.assertEqual(isbn.ISBN(python_13).validate(), python_10)
# curious about set membership
self.assertEqual(len(set([isbn.ISBN(milosz_10), isbn.ISBN(milosz_13)])),2)
self.assertEqual(len(set([str(isbn.ISBN(milosz_10)), str(isbn.ISBN(milosz_13))])),2)
self.assertEqual(len(set([isbn.ISBN(milosz_10).to_string(), isbn.ISBN(milosz_13).to_string()])),1)
self.assertEqual(len(set([isbn.ISBN(milosz_10), isbn.ISBN(milosz_13)])), 2)
self.assertEqual(len(set([str(isbn.ISBN(milosz_10)), str(isbn.ISBN(milosz_13))])), 2)
self.assertEqual(
len(set([isbn.ISBN(milosz_10).to_string(), isbn.ISBN(milosz_13).to_string()])),
1
)
class EncryptedKeyTest(TestCase):
def test_create_read_key(self):
@ -880,11 +935,11 @@ class WorkTests(TestCase):
self.w2 = models.Work.objects.create()
def test_preferred_edition(self):
ww = models.WasWork.objects.create(work=self.w1, was= self.w2.id)
ww = models.WasWork.objects.create(work=self.w1, was=self.w2.id)
e1 = models.Edition.objects.create(work=self.w1)
self.assertEqual(e1, self.w1.preferred_edition)
e2 = models.Edition.objects.create(work=self.w1)
self.w1.selected_edition=e2
self.w1.selected_edition = e2
self.w1.save()
self.assertEqual(e2, self.w1.preferred_edition)
self.assertEqual(e2, self.w2.preferred_edition)
@ -956,9 +1011,13 @@ class LocaldatetimeTest(TestCase):
else:
reload(localdatetime)
self.assertAlmostEqual(mktime(datetime.now().timetuple()), mktime(localdatetime.now().timetuple()), 1.0)
self.assertAlmostEqual(
mktime(datetime.now().timetuple()),
mktime(localdatetime.now().timetuple()),
1.0
)
@override_settings(LOCALDATETIME_NOW=lambda : datetime.now() + timedelta(365))
@override_settings(LOCALDATETIME_NOW=lambda: datetime.now() + timedelta(365))
def test_LOCALDATETIME_NOW_year_ahead(self):
try:
@ -968,12 +1027,20 @@ class LocaldatetimeTest(TestCase):
else:
reload(localdatetime)
self.assertAlmostEqual(mktime((datetime.now() + timedelta(365)).timetuple()), mktime(localdatetime.now().timetuple()), 1.0)
self.assertAlmostEqual(
mktime((datetime.now() + timedelta(365)).timetuple()),
mktime(localdatetime.now().timetuple()),
1.0
)
def test_no_time_override(self):
from regluit.utils import localdatetime
self.assertAlmostEqual(mktime(datetime.now().timetuple()), mktime(localdatetime.now().timetuple()), 1.0)
self.assertAlmostEqual(
mktime(datetime.now().timetuple()),
mktime(localdatetime.now().timetuple()),
1.0
)
def tearDown(self):
# restore localdatetime.now() to what's in the settings file
@ -991,7 +1058,7 @@ class MailingListTests(TestCase):
from postmonkey import PostMonkey
pm = PostMonkey(settings.MAILCHIMP_API_KEY)
if settings.TEST_INTEGRATION:
self.assertEqual(pm.ping(),"Everything's Chimpy!" )
self.assertEqual(pm.ping(), "Everything's Chimpy!")
self.user = User.objects.create_user('chimp_test', 'eric@gluejar.com', 'chimp_test')
self.assertTrue(self.user.profile.on_ml)
@ -1009,18 +1076,19 @@ class EbookFileTests(TestCase):
Read the test epub file
"""
w = Work.objects.create(title="Work 1")
e = Edition.objects.create(title=w.title,work=w)
e = Edition.objects.create(title=w.title, work=w)
u = User.objects.create_user('test', 'test@example.org', 'testpass')
rh = RightsHolder.objects.create(owner = u, rights_holder_name = 'rights holder name')
cl = Claim.objects.create(rights_holder = rh, work = w, user = u, status = 'active')
c = Campaign.objects.create(work = w,
type = parameters.BUY2UNGLUE,
cc_date_initial = datetime(2020,1,1),
target = 1000,
deadline = datetime(2020,1,1),
license = 'CC BY',
description = "dummy description",
)
rh = RightsHolder.objects.create(owner=u, rights_holder_name='rights holder name')
cl = Claim.objects.create(rights_holder=rh, work=w, user=u, status='active')
c = Campaign.objects.create(
work=w,
type=parameters.BUY2UNGLUE,
cc_date_initial=datetime(2020, 1, 1),
target=1000,
deadline=datetime(2020, 1, 1),
license='CC BY',
description="dummy description",
)
# download the test epub into a temp file
temp = NamedTemporaryFile(delete=False)
test_file_content = requests.get(settings.BOOXTREAM_TEST_EPUB_URL).content
@ -1033,7 +1101,7 @@ class EbookFileTests(TestCase):
temp_file = open(temp.name)
dj_file = DjangoFile(temp_file)
ebf = EbookFile( format='epub', edition=e, file=dj_file)
ebf = EbookFile(format='epub', edition=e, file=dj_file)
ebf.save()
temp_file.close()
@ -1041,16 +1109,16 @@ class EbookFileTests(TestCase):
# make sure we get rid of temp file
os.remove(temp.name)
test_epub= EPUB(ebf.file, mode='a')
self.assertEqual(len(test_epub.opf) , 4)
test_epub = EPUB(ebf.file, mode='a')
self.assertEqual(len(test_epub.opf), 4)
self.assertTrue(len(test_epub.opf[2]) < 30)
acq=Acq.objects.create(user=u,work=w,license=TESTING)
acq = Acq.objects.create(user=u,work=w,license=TESTING)
self.assertIsNot(acq.nonce, None)
url= acq.get_watermarked().download_link_epub
self.assertRegexpMatches(url,'github.com/eshellman/42_ebook/blob/master/download/42')
#self.assertRegexpMatches(url,'booxtream.com/')
url = acq.get_watermarked().download_link_epub
self.assertRegexpMatches(url, 'github.com/eshellman/42_ebook/blob/master/download/42')
#self.assertRegexpMatches(url, 'booxtream.com/')
with self.assertRaises(UnglueitError) as cm:
c.activate()
@ -1058,24 +1126,24 @@ class EbookFileTests(TestCase):
off.save()
c.activate()
#flip the campaign to success
c.cc_date_initial= datetime(2012,1,1)
c.cc_date_initial = datetime(2012, 1, 1)
c.update_status()
self.assertEqual( c.work.ebooks().count(),2 )
c.do_watermark=False
self.assertEqual(c.work.ebooks().count(), 2)
c.do_watermark = False
c.save()
url= acq.get_watermarked().download_link_epub
url = acq.get_watermarked().download_link_epub
def test_ebookfile_thanks(self):
w = Work.objects.create(title="Work 2")
e = Edition.objects.create(title=w.title,work=w)
e = Edition.objects.create(title=w.title, work=w)
u = User.objects.create_user('test2', 'test@example.org', 'testpass')
rh = RightsHolder.objects.create(owner = u, rights_holder_name = 'rights holder name 2')
cl = Claim.objects.create(rights_holder = rh, work = w, user = u, status = 'active')
c = Campaign.objects.create(work = w,
type = parameters.THANKS,
license = 'CC BY-NC',
description = "Please send me money",
)
rh = RightsHolder.objects.create(owner=u, rights_holder_name='rights holder name 2')
cl = Claim.objects.create(rights_holder=rh, work=w, user=u, status='active')
c = Campaign.objects.create(work=w,
type=parameters.THANKS,
license='CC BY-NC',
description="Please send me money",
)
# download the test epub into a temp file
temp = NamedTemporaryFile(delete=False)
test_file_content = requests.get(settings.TEST_PDF_URL).content
@ -1087,9 +1155,9 @@ class EbookFileTests(TestCase):
temp_file = open(temp.name)
dj_file = DjangoFile(temp_file)
ebf = EbookFile( format='pdf', edition=e, file=dj_file)
ebf = EbookFile(format='pdf', edition=e, file=dj_file)
ebf.save()
eb = Ebook( format='pdf', edition=e, url=ebf.file.url, provider='Unglue.it')
eb = Ebook(format='pdf', edition=e, url=ebf.file.url, provider='Unglue.it')
eb.save()
ebf.ebook = eb
ebf.save()
@ -1117,9 +1185,9 @@ class EbookFileTests(TestCase):
temp_file = open(temp.name)
dj_file = DjangoFile(temp_file)
ebf = EbookFile( format='epub', edition=e, file=dj_file)
ebf = EbookFile(format='epub', edition=e, file=dj_file)
ebf.save()
eb = Ebook( format='epub', edition=e, url=ebf.file.url, provider='Unglue.it')
eb = Ebook(format='epub', edition=e, url=ebf.file.url, provider='Unglue.it')
eb.save()
ebf.ebook = eb
ebf.save()
@ -1130,15 +1198,15 @@ class EbookFileTests(TestCase):
os.remove(temp.name)
#test the ask-appender
c.add_ask_to_ebfs()
self.assertTrue( c.work.ebookfiles().filter(asking = True, format='epub').count() > 0)
self.assertTrue(c.work.ebookfiles().filter(asking=True, format='epub').count() > 0)
if settings.MOBIGEN_URL:
self.assertTrue( c.work.ebookfiles().filter(asking = True, format='mobi').count() > 0)
self.assertTrue( c.work.ebookfiles().filter(asking = True, ebook__active=True).count() > 0)
self.assertTrue( c.work.ebookfiles().filter(asking = False, ebook__active=True).count() == 0)
self.assertTrue(c.work.ebookfiles().filter(asking=True, format='mobi').count() > 0)
self.assertTrue(c.work.ebookfiles().filter(asking=True, ebook__active=True).count() > 0)
self.assertTrue(c.work.ebookfiles().filter(asking=False, ebook__active=True).count() == 0)
#test the unasker
c.revert_asks()
self.assertTrue( c.work.ebookfiles().filter(asking = True, ebook__active=True).count() == 0)
self.assertTrue( c.work.ebookfiles().filter(asking = False, ebook__active=True).count() > 0)
self.assertTrue(c.work.ebookfiles().filter(asking=True, ebook__active=True).count() == 0)
self.assertTrue(c.work.ebookfiles().filter(asking=False, ebook__active=True).count() > 0)
class MobigenTests(TestCase):
def test_convert_to_mobi(self):
@ -1147,10 +1215,11 @@ class MobigenTests(TestCase):
"""
from regluit.core.mobigen import convert_to_mobi
if settings.TEST_INTEGRATION:
output = convert_to_mobi("https://github.com/GITenberg/Moby-Dick--Or-The-Whale_2701/releases/download/0.2.0/Moby-Dick-Or-The-Whale.epub")
self.assertTrue(len(output)>2207877)
output = convert_to_mobi(
"https://github.com/GITenberg/Moby-Dick--Or-The-Whale_2701/releases/download/0.2.0/Moby-Dick-Or-The-Whale.epub"
)
self.assertTrue(len(output) > 2207877)
from .signals import handle_transaction_charged
@override_settings(LOCAL_TEST=True)
class LibTests(TestCase):
fixtures = ['initial_data.json']
@ -1159,33 +1228,47 @@ class LibTests(TestCase):
def test_purchase(self):
w = Work.objects.create(title="Work 1")
e = Edition.objects.create(title=w.title,work=w)
e = Edition.objects.create(title=w.title, work=w)
u = User.objects.create_user('test', 'test@example.org', 'testpass')
lu = User.objects.create_user('library', 'testu@example.org', 'testpass')
lib = Library.objects.create(user=lu,owner=u)
c = Campaign.objects.create(work=w, type = parameters.BUY2UNGLUE, cc_date_initial= datetime(2020,1,1),target=1000, deadline=datetime(2020,1,1))
lib = Library.objects.create(user=lu, owner=u)
c = Campaign.objects.create(
work=w,
type=parameters.BUY2UNGLUE,
cc_date_initial=datetime(2020, 1, 1),
target=1000,
deadline=datetime(2020, 1, 1)
)
new_acq = Acq.objects.create(user=lib.user,work=c.work,license= LIBRARY)
new_acq = Acq.objects.create(user=lib.user, work=c.work, license=LIBRARY)
self.assertTrue(new_acq.borrowable)
reserve_acq = Acq.objects.create(user=u,work=c.work,license= RESERVE, lib_acq = new_acq)
reserve_acq = Acq.objects.create(user=u, work=c.work, license=RESERVE, lib_acq=new_acq)
self.assertTrue(reserve_acq.borrowable)
self.assertFalse(new_acq.borrowable)
self.assertTrue(reserve_acq.expires< now()+timedelta(hours=25))
self.assertTrue(reserve_acq.expires < now() + timedelta(hours=25))
reserve_acq.borrow()
self.assertTrue(reserve_acq.expires> now()+timedelta(hours=25))
self.assertTrue(reserve_acq.expires > now() + timedelta(hours=25))
u2 = User.objects.create_user('user2', 'test2@example.org', 'testpass')
Hold.objects.get_or_create(library=lib,work=w,user=u2)
Hold.objects.get_or_create(library=lib, work=w, user=u2)
reserve_acq.expire_in(timedelta(seconds=0))
tasks.refresh_acqs()
self.assertEqual(reserve_acq.holds.count(),0)
self.assertEqual(reserve_acq.holds.count(), 0)
class GitHubTests(TestCase):
def test_ebooks_in_github_release(self):
(repo_owner, repo_name, repo_tag) = ('GITenberg', 'Adventures-of-Huckleberry-Finn_76', '0.0.50')
ebooks = bookloader.ebooks_in_github_release(repo_owner, repo_name,
tag=repo_tag, token=settings.GITHUB_PUBLIC_TOKEN)
(repo_owner, repo_name, repo_tag) = (
'GITenberg',
'Adventures-of-Huckleberry-Finn_76',
'0.0.50'
)
ebooks = bookloader.ebooks_in_github_release(
repo_owner,
repo_name,
tag=repo_tag,
token=settings.GITHUB_PUBLIC_TOKEN
)
expected_set = set([
('epub', u'Adventures-of-Huckleberry-Finn.epub'),
('mobi', u'Adventures-of-Huckleberry-Finn.mobi'),
@ -1197,43 +1280,45 @@ class GitHubTests(TestCase):
class OnixLoaderTests(TestCase):
fixtures = ['initial_data.json']
def test_load(self):
TEST_BOOKS = [{'': u'',
'Author1First': u'Joseph',
'Author1Last': u'Necvatal',
'Author1Role': u'',
'Author2First': u'',
'Author2Last': u'',
'Author2Role': u'',
'Author3First': u'',
'Author3Last': u'',
'Author3Role': u'',
'AuthorBio': u'',
'AuthorsList': u'Joseph Nechvatal',
'BISACCode1': u'',
'BISACCode2': u'',
'BISACCode3': u'',
'Book-level DOI': u'10.3998/ohp.9618970.0001.001',
'ClothISBN': u'N/A',
'CopyrightYear': u'2011',
'DescriptionBrief': u'',
'DescriptionLong': u'',
'Excerpt': u'',
'FullTitle': u'Immersion into Noise',
'License': u'CC BY-SA',
'List Price in USD (paper ISBN)': u'23.99',
'ListPriceCurrencyType': u'',
'PaperISBN': u'9781607852414',
'Publisher': u'Open Humanities Press',
'SubjectListMARC': u'',
'Subtitle': u'',
'TableOfContents': u'',
'Title': u'Immersion into Noise',
'URL': u'https://doi.org/10.3998/ohp.9618970.0001.001',
'eISBN': u'N/A',
'eListPrice': u'N/A',
'ePublicationDate': u'',
'eTerritoryRights': u''},
{'': u'',
TEST_BOOKS = [{
'': u'',
'Author1First': u'Joseph',
'Author1Last': u'Necvatal',
'Author1Role': u'',
'Author2First': u'',
'Author2Last': u'',
'Author2Role': u'',
'Author3First': u'',
'Author3Last': u'',
'Author3Role': u'',
'AuthorBio': u'',
'AuthorsList': u'Joseph Nechvatal',
'BISACCode1': u'',
'BISACCode2': u'',
'BISACCode3': u'',
'Book-level DOI': u'10.3998/ohp.9618970.0001.001',
'ClothISBN': u'N/A',
'CopyrightYear': u'2011',
'DescriptionBrief': u'',
'DescriptionLong': u'',
'Excerpt': u'',
'FullTitle': u'Immersion into Noise',
'License': u'CC BY-SA',
'List Price in USD (paper ISBN)': u'23.99',
'ListPriceCurrencyType': u'',
'PaperISBN': u'9781607852414',
'Publisher': u'Open Humanities Press',
'SubjectListMARC': u'',
'Subtitle': u'',
'TableOfContents': u'',
'Title': u'Immersion into Noise',
'URL': u'https://doi.org/10.3998/ohp.9618970.0001.001',
'eISBN': u'N/A',
'eListPrice': u'N/A',
'ePublicationDate': u'',
'eTerritoryRights': u''
}, {
'': u'',
'CAD price eub': u'9.95',
'Title': u'That Greece Might Still Be Free',
'USD price epub': u'9.95',
@ -1289,12 +1374,9 @@ class OnixLoaderTests(TestCase):
'GBP price mobi': u'5.95', 'Format 1': u'Paperback ', 'EUR price PDF': u'7.95', 'Format 3': u'pdf',
'Format 2': u'Hardback', 'Format 5': u'mobi', 'Format 4': u'epub', 'MARC Code1': u'aut',
'MARC Code2': u'aui', 'MARC Code3': u'', 'MARC Code4': u'', 'MARC Code5': u'',
'MARC Code6': u'', 'ISO Language Code': u'en'}
]
'MARC Code6': u'', 'ISO Language Code': u'en'
}]
results = load_from_books(TEST_BOOKS)
for (book, work, edition) in results:
assert (loaded_book_ok(book, work, edition))
assert loaded_book_ok(book, work, edition)

View File

@ -21,6 +21,7 @@ from regluit.core.bookloader import (
from regluit.core.parameters import WORK_IDENTIFIERS
from regluit.core.loaders import add_by_webpage
from regluit.core.loaders.doab import add_by_doab
from regluit.core.loaders.utils import ids_from_urls
from regluit.frontend.forms import EditionForm, IdentifierForm
@ -106,6 +107,11 @@ def get_edition_for_id(id_type, id_value, user=None):
if edition:
return user_edition(edition, user)
if identifiers.has_key('doab'):
edition = add_by_doab(identifiers['doab'])
if edition:
return user_edition(edition, user)
if identifiers.has_key('oclc'):
edition = add_by_oclc(identifiers['oclc'])
if edition:

View File

@ -6,16 +6,11 @@ PyJWT==1.4.1
PyPDF2==1.23
PyGithub==1.15.0
PyYAML==3.11
git+git://github.com/urschrei/pyzotero.git@v0.9.51
SPARQLWrapper==1.6.4
WebOb==1.2.3
WebTest==1.4.0
amqp==1.4.9
anyjson==0.3.3
billiard==3.3.0.23
awscli==1.10.26
boto==2.42.0
#git+ssh://git@github.com/Gluejar/boto.git@2.3.0
celery==3.1.23
certifi==2016.2.28
# pip installing pillow seems to delete distribute
@ -33,7 +28,6 @@ django-jsonfield==1.0.0
#django-kombu==0.9.4
django-maintenancemode==0.11.2
django-mptt==0.8.5
#django-nose-selenium==0.7.3
#django-notification==0.2
git+git://github.com/eshellman/django-notification.git@412c7a03a327195a1017c2be92c8e2caabc880b6
django-registration==2.1.2
@ -42,9 +36,7 @@ django-smtp-ssl==1.0
django-storages==1.4.1
django-tastypie==0.13.3
django-transmeta==0.7.3
feedparser==5.1.2
fef-questionnaire==4.0.1
freebase==1.0.8
#gitenberg.metadata==0.1.6
git+https://github.com/gitenberg-dev/gitberg-build
#git+ssh://git@github.com/gitenberg-dev/metadata.git@0.1.11
@ -53,7 +45,7 @@ html5lib==1.0b3
httplib2==0.7.5
isodate==0.5.1
kombu==3.0.35
lxml==2.3.5
lxml==4.2.1
defusedxml==0.4.1
mechanize==0.2.5
mimeparse==0.1.3
@ -66,6 +58,7 @@ paramiko==1.14.1
postmonkey==1.0b
pycrypto==2.6
pymarc==3.0.2
pyoai==2.5.0
pyparsing==2.0.3
python-dateutil==2.5.3
python-mimeparse==0.1.4
@ -80,7 +73,7 @@ requests==2.10.0
requests-mock==1.2.0
requests-oauthlib==0.6.2
selenium==2.53.1
six==1.9.0
six==1.11.0
sorl-thumbnail==12.3
ssh==1.7.14
stevedore==1.12.0

View File

@ -1,6 +1,10 @@
from django.conf.global_settings import LANGUAGES
lang2code = dict([ (lang[1].lower(), lang[0]) for lang in LANGUAGES ])
code2lang = dict(LANGUAGES)
def get_language_code(language):
return lang2code.get(language.lower().strip(), '')
language = language.lower().strip()
if language in code2lang:
return language
return lang2code.get(language, '')