regluit/core/loaders/utils.py

464 lines
16 KiB
Python
Raw Normal View History

2016-05-19 13:17:23 +00:00
import csv
import logging
import re
import sys
import unicodedata
import urlparse
import requests
from django.conf import settings
from django.core.files.base import ContentFile
2016-05-19 13:17:23 +00:00
from regluit.api.crosswalks import inv_relator_contrib
from regluit.bisac.models import BisacHeading
from regluit.core.bookloader import add_by_isbn_from_google, merge_works
from regluit.core.isbn import ISBN
from regluit.core.models import (
Author, Ebook, EbookFile, Edition, Identifier, path_for_file, PublisherName, Subject, Work,
)
2016-05-19 13:17:23 +00:00
logger = logging.getLogger(__name__)
def UnicodeDictReader(utf8_data, **kwargs):
csv_reader = csv.DictReader(utf8_data, **kwargs)
for row in csv_reader:
yield {key: unicode(value, 'utf-8') for key, value in row.iteritems()}
def utf8_general_ci_norm(s):
"""
Normalize a la MySQL utf8_general_ci collation
(As of 2016.05.24, we're using the utf8_general_ci collation for author names)
2017-07-27 19:13:04 +00:00
https://stackoverflow.com/questions/1036454/what-are-the-diffrences-between-utf8-general-ci-and-utf8-unicode-ci/1036459#1036459
* converts to Unicode normalization form D for canonical decomposition
* removes any combining characters
* converts to upper case
"""
s1 = unicodedata.normalize('NFD', s)
return ''.join(c for c in s1 if not unicodedata.combining(c)).upper()
2016-05-19 13:17:23 +00:00
def get_authors(book):
authors=[]
2016-06-10 21:57:53 +00:00
if book.get('AuthorsList',''):
#UMich
for i in range(1,3):
fname=u'Author{}First'.format(i)
lname=u'Author{}Last'.format(i)
role=u'Author{}Role'.format(i)
authname = u'{} {}'.format(book[fname],book[lname])
if authname != u' ':
role = book[role] if book[role].strip() else 'A01'
authors.append((authname,role))
else:
break
authlist = book["AuthorsList"].replace(' and ', ', ').split(', ')
if len(authlist)>3:
for authname in authlist[3:]:
authors.append((authname, 'A01'))
else:
#OBP
for i in range(1,6):
fname= book.get(u'Contributor {} first name'.format(i), '')
lname= book.get(u'Contributor {} surname'.format(i), '')
role= book.get(u'ONIX Role Code (List 17){}'.format(i), '')
authname = u'{} {}'.format(fname,lname)
if authname != u' ':
role = role if role.strip() else 'A01'
authors.append((authname,role))
else:
break
2016-05-19 13:17:23 +00:00
return authors
def get_subjects(book):
subjects=[]
2016-06-10 21:57:53 +00:00
for i in range(1,5):
key = u'BISACCode{}'.format(i) #UMich dialect
key2 = u'BISAC subject code {}'.format(i) #OBP dialect
code = book.get(key,'')
code = code if code else book.get(key2,'')
if code != '':
2016-05-19 13:17:23 +00:00
try:
2016-06-10 21:57:53 +00:00
bisac=BisacHeading.objects.get(notation=code)
2016-05-19 13:17:23 +00:00
subjects.append(bisac)
except BisacHeading.DoesNotExist:
2016-06-10 21:57:53 +00:00
logger.warning( "Please add BISAC {}".format(code))
2016-05-19 13:17:23 +00:00
return subjects
def add_subject(subject_name, work, authority=''):
2016-05-19 13:17:23 +00:00
try:
subject= Subject.objects.get(name=subject_name)
except Subject.DoesNotExist:
subject=Subject.objects.create(name=subject_name, authority=authority)
subject.works.add(work)
2016-06-10 21:57:53 +00:00
def get_title(book):
title = book.get('FullTitle','') #UMICH
if title:
return title
title = book.get('Title','') #OBP
sub = book.get('Subtitle','')
if sub:
return u'{}: {}'.format(title,sub)
else:
return title
2016-05-19 13:17:23 +00:00
def get_cover(book):
2016-06-10 21:57:53 +00:00
cover_url = book.get('Cover URL','') #OBP
if cover_url:
return cover_url
2016-05-19 13:17:23 +00:00
url = book['URL']
if "10.3998" in url:
# code for umich books; can generalize, of course!
idmatch= re.search( r'([^/]+)\.(\d+\.\d+\.\d+)', url)
if idmatch:
book_id = idmatch.group(2)
if idmatch.group(1) == 'ohp':
cover_url = "http://quod.lib.umich.edu/o/ohp/images/{}.jpg".format(book_id)
elif idmatch.group(1) == 'ump':
cover_url = "http://quod.lib.umich.edu/u/ump/images/{}.jpg".format(book_id)
else:
cover_url = "http://quod.lib.umich.edu/d/dculture/images/{}.jpg".format(book_id)
cover = requests.head(cover_url)
if cover.status_code<400:
return cover_url
else:
logger.warning( "bad cover: {} for: {}".format(cover_url, url))
def get_isbns(book):
isbns = []
edition = None
2016-06-10 21:57:53 +00:00
#'ISBN 1' is OBP, others are UMICH
for code in ['eISBN', 'ISBN 3','PaperISBN', 'ISBN 2', 'ClothISBN', 'ISBN 1', 'ISBN 4', 'ISBN 5']:
if book.get(code, '') not in ('','N/A'):
2016-05-19 13:17:23 +00:00
values = book[code].split(',')
for value in values:
isbn = ISBN(value).to_string()
if isbn:
isbns.append(isbn)
for isbn in isbns :
if not edition:
edition = Edition.get_by_isbn(isbn)
return (isbns, edition )
2016-06-10 21:57:53 +00:00
def get_pubdate(book):
value = book.get('CopyrightYear','') #UMICH
if value:
return value
value = book.get('publication year','') #OBP
sub = book.get('publication month','')
sub2 = book.get('publication day','')
if sub2:
return u'{}-{}-{}'.format(value,sub,sub2)
elif sub:
return u'{}-{}'.format(value,sub,sub2)
else:
return value
def get_publisher(book):
value = book.get('Publisher','')
if value:
return value
if book.get('DOI prefix','')=='10.11647':
return "Open Book Publishers"
def get_url(book):
url = book.get('URL','')
url = url if url else u'https://doi.org/{}/{}'.format( book.get('DOI prefix',''),book.get('DOI suffix',''))
2016-06-10 21:57:53 +00:00
return url
def get_description(book):
value = book.get('DescriptionBrief','')
value = value if value else book.get('Plain Text Blurb','')
return value
def get_language(book):
value = book.get('ISO Language Code','')
return value
2016-05-19 13:17:23 +00:00
def load_from_books(books):
''' books is an iterator of book dicts.
each book must have attributes
2016-06-10 21:57:53 +00:00
(umich dialect)
2016-05-19 13:17:23 +00:00
eISBN, ClothISBN, PaperISBN, Publisher, FullTitle, Title, Subtitle, AuthorsList,
Author1Last, Author1First, Author1Role, Author2Last, Author2First, Author2Role, Author3Last,
Author3First, Author3Role, AuthorBio, TableOfContents, Excerpt, DescriptionLong,
DescriptionBrief, BISACCode1, BISACCode2, BISACCode3, CopyrightYear, ePublicationDate,
eListPrice, ListPriceCurrencyType, List Price in USD (paper ISBN), eTerritoryRights,
SubjectListMARC, , Book-level DOI, URL, License
2016-06-10 21:57:53 +00:00
2016-05-19 13:17:23 +00:00
'''
# Goal: get or create an Edition and Work for each given book
results = []
for (i, book) in enumerate(books):
# try first to get an Edition already in DB with by one of the ISBNs in book
2016-05-19 13:17:23 +00:00
(isbns, edition) = get_isbns(book)
2016-06-10 21:57:53 +00:00
if len(isbns)==0:
continue
title=get_title(book)
2016-05-19 13:17:23 +00:00
authors = get_authors(book)
# if matching by ISBN doesn't work, then create a Work and Edition
# with a title and the first ISBN
2016-06-10 21:57:53 +00:00
if not edition:
2016-05-19 13:17:23 +00:00
work = Work(title=title)
work.save()
edition= Edition(title=title, work=work)
edition.save()
Identifier.set(type='isbn', value=isbns[0], edition=edition, work=work)
2016-05-19 13:17:23 +00:00
work=edition.work
# at this point, work and edition exist
2016-06-10 21:57:53 +00:00
url = get_url(book)
if url:
Identifier.set(type='http', value=url, edition=edition, work=work)
# make sure each isbn is represented by an Edition
# also associate authors, publication date, cover, publisher
2016-05-19 13:17:23 +00:00
for isbn in isbns:
2016-06-10 21:57:53 +00:00
edition = add_by_isbn_from_google(isbn, work=work)
if edition and edition.work != work:
work = merge_works(work, edition.work)
2016-05-19 13:17:23 +00:00
if not edition:
edition= Edition(title=title, work=work)
edition.save()
Identifier.set(type='isbn', value=isbn, edition=edition, work=work)
2016-05-19 13:17:23 +00:00
edition.authors.clear()
for (author, role) in authors:
2016-05-19 13:17:23 +00:00
edition.add_author(author, inv_relator_contrib.get(role, 'aut'))
2016-06-10 21:57:53 +00:00
edition.publication_date = get_pubdate(book)
2016-05-19 13:17:23 +00:00
edition.cover_image = get_cover(book)
edition.save()
2016-06-10 21:57:53 +00:00
edition.set_publisher(get_publisher(book))
# possibly replace work.description
2016-06-10 21:57:53 +00:00
description = get_description(book)
2016-05-19 13:17:23 +00:00
if len(description)>len (work.description):
work.description = description
2016-06-10 21:57:53 +00:00
work.save()
# set language
lang= get_language(book)
if lang:
work.language = lang
work.save()
# add a bisac subject (and ancestors) to work
2016-05-19 13:17:23 +00:00
for bisacsh in get_subjects(book):
while bisacsh:
add_subject(bisacsh.full_label, work, authority="bisacsh")
bisacsh = bisacsh.parent
2016-05-19 13:17:23 +00:00
logging.info(u'loaded work {}'.format(work.title))
loading_ok = loaded_book_ok(book, work, edition)
results.append((book, work, edition))
try:
2016-06-10 21:57:53 +00:00
logger.info (u"{} {} {}\n".format(i, title, loading_ok))
except Exception as e:
2016-06-10 21:57:53 +00:00
logger.info (u"{} {}\n".format(i, title, str(e) ))
return results
def loaded_book_ok(book, work, edition):
isbns = get_isbns(book)[0]
authors = get_authors(book)
subjects = get_subjects(book)
if (work is None) or (edition is None):
return False
try:
2016-06-10 21:57:53 +00:00
url_id = Identifier.objects.get(type='http', value=get_url(book))
if url_id is None:
2016-06-10 21:57:53 +00:00
logger.info ("url_id problem: work.id {}, url: {}".format(work.id, get_url(book)))
return False
except Exception as e:
2016-05-24 23:21:36 +00:00
logger.info (str(e))
return False
# isbns
for isbn in isbns:
if Identifier.objects.filter(type='isbn', value=isbn).count() <> 1:
# print ("isbn problem: work.id {}, isbn: {}".format(work.id, isbn))
return False
else:
try:
edition_for_isbn = Identifier.objects.get(type='isbn', value=isbn).edition
except Exception as e:
print (e)
return False
# authors
# print set([ed.name for ed in edition_for_isbn.authors.all()])
if (set([utf8_general_ci_norm(author[0]) for author in authors]) !=
set([utf8_general_ci_norm(ed.name) for ed in edition_for_isbn.authors.all()])):
print "problem with authors"
return False
try:
2016-06-10 21:57:53 +00:00
edition_for_isbn.publication_date = get_pubdate(book)
edition_for_isbn.cover_image = get_cover(book)
2016-06-10 21:57:53 +00:00
edition_for_isbn.set_publisher(get_publisher(book))
except:
return False
# work description
2016-06-10 21:57:53 +00:00
description = get_description(book)
if not ((work.description == description) or (len(description) <len (work.description))):
return False
# bisac
for bisacsh in subjects:
while bisacsh:
try:
subject = Subject.objects.get(name=bisacsh.full_label)
except:
return False
if subject not in work.subjects.all():
return False
bisacsh = bisacsh.parent
2017-07-27 19:13:04 +00:00
return True
ID_URLPATTERNS = {
'goog': re.compile(r'[\./]google\.com/books\?.*id=([a-zA-Z0-9\-_]{12})'),
2017-08-08 01:46:21 +00:00
'olwk': re.compile(r'[\./]openlibrary\.org(/works/OL\d{1,8}W)'),
'doab': re.compile(r'[\./]doabooks\.org/doab\?.*rid:(\d{1,8}).*'),
2017-08-08 01:46:21 +00:00
'gdrd': re.compile(r'[\./]goodreads\.com/book/show/(\d{1,8})'),
'ltwk': re.compile(r'[\./]librarything\.com/work/(\d{1,8})'),
'oclc': re.compile(r'\.worldcat\.org/.*oclc/(\d{8,12})'),
2017-07-27 19:13:04 +00:00
'doi': re.compile(r'[\./]doi\.org/(10\.\d+/\S+)'),
2017-08-08 01:46:21 +00:00
'gtbg': re.compile(r'[\./]gutenberg\.org/ebooks/(\d{1,6})'),
'glue': re.compile(r'[\./]unglue\.it/work/(\d{1,7})'),
2017-07-27 19:13:04 +00:00
}
def ids_from_urls(url):
ids = {}
for ident in ID_URLPATTERNS.keys():
id_match = ID_URLPATTERNS[ident].search(url)
if id_match:
ids[ident] = id_match.group(1)
return ids
DROPBOX_DL = re.compile(r'"(https://dl.dropboxusercontent.com/content_link/[^"]+)"')
def dl_online(ebook):
if ebook.format != 'online':
return
if ebook.url.find(u'dropbox.com/s/') >= 0:
response = requests.get(ebook.url, headers={"User-Agent": settings.USER_AGENT})
if response.status_code == 200:
match_dl = DROPBOX_DL.search(response.content)
if match_dl:
make_dl_ebook(match_dl.group(1), ebook)
elif ebook.url.find(u'jbe-platform.com/content/books/') >= 0:
doc = get_soup(ebook.url)
if doc:
obj = doc.select_one('div.fulltexticoncontainer-PDF a')
if obj:
dl_url = urlparse.urljoin(ebook.url, obj['href'])
make_dl_ebook(dl_url, ebook)
def make_dl_ebook(url, ebook):
if EbookFile.objects.filter(source=ebook.url):
return EbookFile.objects.filter(source=ebook.url)[0]
response = requests.get(url, headers={"User-Agent": settings.USER_AGENT})
if response.status_code == 200:
filesize = int(response.headers.get("Content-Length", 0))
filesize = filesize if filesize else None
format = type_for_url(url, content_type=response.headers.get('content-type'))
if format != 'online':
new_ebf = EbookFile.objects.create(
edition=ebook.edition,
format=format,
source=ebook.url
)
new_ebf.file.save(path_for_file(new_ebf, None), ContentFile(response.content))
new_ebf.save()
new_ebook = Ebook.objects.create(
edition=ebook.edition,
format=format,
provider='Unglue.it',
url=new_ebf.file.url,
rights=ebook.rights,
version_label=ebook.version_label,
version_iter=ebook.version_iter,
)
new_ebf.ebook = new_ebook
new_ebf.save()
return new_ebf
def type_for_url(url, content_type=None):
if not url:
return ''
if url.find('books.openedition.org') >= 0:
return ('online')
ct = content_type if content_type else contenttyper.calc_type(url)
if re.search("pdf", ct):
return "pdf"
elif re.search("octet-stream", ct) and re.search("pdf", url, flags=re.I):
return "pdf"
elif re.search("octet-stream", ct) and re.search("epub", url, flags=re.I):
return "epub"
elif re.search("text/plain", ct):
return "text"
elif re.search("text/html", ct):
if url.find('oapen.org/view') >= 0:
return "html"
return "online"
elif re.search("epub", ct):
return "epub"
elif re.search("mobi", ct):
return "mobi"
return "other"
class ContentTyper(object):
""" """
def __init__(self):
self.last_call = dict()
def content_type(self, url):
try:
r = requests.head(url)
return r.headers.get('content-type')
except:
return None
def calc_type(self, url):
delay = 1
# is there a delay associated with the url
netloc = urlparse.urlparse(url).netloc
# wait if necessary
last_call = self.last_call.get(netloc)
if last_call is not None:
now = time.time()
min_time_next_call = last_call + delay
if min_time_next_call > now:
time.sleep(min_time_next_call-now)
self.last_call[netloc] = time.time()
# compute the content-type
return self.content_type(url)
contenttyper = ContentTyper()