regluit/experimental/bookdata.py

565 lines
22 KiB
Python

import requests
import urllib
import httplib
import json
from pprint import pprint
from itertools import islice, izip, repeat
import logging
import sys, os
# a kludge to allow for isbn.py to be imported
# and not just in the context of the regluit Django app
try:
from regluit.core import isbn
except:
import isbn
try:
import unittest
from unittest import TestCase
except:
from django.test import TestCase
from django.utils import unittest
import re
import freebase
import logging
logger = logging.getLogger(__name__)
MASHUPBOOK_ISBN_13 = '9781590598580'
MASHUPBOOK_ISBN_10 = '159059858X'
MASHUPBOOK_OLID = 'OL13439114M'
RY_OLID = 'OL4264806A'
SURFACING_WORK_OLID = 'OL675829W'
SURFACING_EDITION_OLID = 'OL8075248M'
# http://stackoverflow.com/questions/2348317/how-to-write-a-pager-for-python-iterators/2350904#2350904
def grouper(iterable, page_size):
page= []
for item in iterable:
page.append( item )
if len(page) == page_size:
yield page
page= []
yield page
class FreebaseException(Exception):
pass
class OpenLibraryException(Exception):
pass
class HathiException(Exception):
pass
def filter_none(d):
d2 = {}
for (k,v) in d.iteritems():
if v is not None:
d2[k] = v
return d2
def to_list(s):
"""if input is not a list return a list with s"""
if not isinstance(s,list):
return [s]
else:
return s
def hathi_bib(id, id_type='isbn', detail_level='brief'):
url = "http://catalog.hathitrust.org/api/volumes/brief/%s/%s.json" % (id_type, id)
r = requests.get(url)
if r.status_code == httplib.OK:
return r.content
else:
raise Exception("Hathi Bib API response: %s " % (httplib.responses[r.status_code]) )
# http://openlibrary.org/developers/api
class OpenLibrary(object):
@classmethod
def books(cls, id, id_type="isbn", format="json", callback=None, jscmd=None):
# http://openlibrary.org/dev/docs/api/books
# bibkeys: one of isbn, oclc, lccn, olid
# format: one of json, javascript
# jscmd: one of viewapi, data, details (deprecated in favor of data)
base_url = "http://openlibrary.org/api/books"
params = filter_none({'bibkeys':'%s:%s' % (id_type,id),
'format':format,
'callback':callback,
'jscmd':jscmd})
r = requests.get(base_url,params=params)
if r.status_code == httplib.OK:
return json.loads(r.content)
else:
raise OpenLibraryException("OpenLibrary API response: %s " % (httplib.responses[r.status_code]) )
@classmethod
def covers(cls, id, id_type='isbn', size='S'):
# http://openlibrary.org/dev/docs/api/covers
# id_type: one of 'id' (internal cover ID), olid (Open Library ID), isbn, oclc, lccn, goodreads, librarything
# size: one of s, m, l
# http://covers.openlibrary.org/b/$key/$value-$size.jpg
if id_type.lower() not in ['id','isbn','oclc','lccn','goodreads','librarything']:
raise OpenLibraryException("%s is an incorrect id_type for covers" % (id_type))
if size.upper() not in ['S','M','L']:
raise OpenLibraryException("%s is an incorrect size for covers" % (size))
return "http://covers.openlibrary.org/b/%s/%s-%s.jpg" % (id_type.lower(),id,size.upper())
@classmethod
def author_photos(cls, olid, size='S'):
#http://covers.openlibrary.org/a/$key/$value-$size.jpg
if size.upper() not in ['S','M','L']:
raise OpenLibraryException("%s is an incorrect size for author" % (size))
return "http://covers.openlibrary.org/a/olid/%s-%s.jpg" % (olid,size.upper())
@classmethod
def read(cls, queries):
# http://openlibrary.org/dev/docs/api/read -- most of its value to be used in browser JS?
# can do single or multiple requests
# example of a single request:
# http://openlibrary.org/api/volumes/brief/isbn/0596156715.json
# multiple
# http://openlibrary.org/api/volumes/brief/json/id:1;lccn:50006784|olid:OL6179000M;lccn:55011330
# the multiple format works, I think, for single requests
#url = "http://openlibrary.org/api/volumes/brief/%s/%s.json" % (id_type, id)
query_string = "|".join([ ";".join([ "%s:%s" % (id_type,id) for (id, id_type) in to_list(query)])
for query in to_list(queries)])
if query_string:
url = "http://openlibrary.org/api/volumes/brief/json/%s" % (query_string)
r = requests.get(url)
if r.status_code == httplib.OK:
return json.loads(r.content)
else:
raise OpenLibraryException("OpenLibrary API response: %s " % (httplib.responses[r.status_code]) )
else:
return None
@classmethod
def lists(cls):
# http://openlibrary.org/dev/docs/api/lists
raise NotImplementedError
@classmethod
def query_iter(cls,**kwargs):
# use limit for page size and offset as the starting point
kwargs2 = kwargs.copy()
kwargs2.setdefault('offset', 0)
more_items = True
while more_items:
items = cls.query(**kwargs2)
for item in items:
yield item
if len(items):
kwargs2['offset'] += len(items)
else:
more_items = False
@classmethod
def query(cls, **kwargs):
# limit and offset are special
base_url = "http://openlibrary.org/query.json"
r = requests.get(base_url, params=kwargs)
if r.status_code == httplib.OK:
return json.loads(r.content)
else:
raise OpenLibraryException("OpenLibrary API response: %s " % (httplib.responses[r.status_code]) )
@classmethod
def editions(cls, work_olid):
# http://openlibrary.org/query.json?type=/type/edition&works=/works/OL675829W&limit=5000
for item in cls.query_iter(type='/type/edition',works='/works/%s'%(work_olid), limit=10):
try:
yield re.match(r'^/books/(.*)$',item['key']).group(1)
except Exception, e:
raise OpenLibraryException("problem in editions: %s " % e)
@classmethod
def works0(cls, id, id_type='isbn'):
# http://openlibrary.org/api/volumes/brief/olid/OL8075248M.json
# can there be more than 1 work for a given edition?
# will return a list of works
# there is a bug in OL in which we have workless editions
response = cls.read((id,id_type))
# resp['olid:OL8075248M']['records']['/books/OL8075248M']["details"]["details"]["works"][0]["key"]
# resp.values()[0]['records'].values()[0]["details"]["details"]["works"][0]["key"]
try:
works_key = response.values()[0]['records'].values()[0]["details"]["details"]["works"]
if (len(response.values()) == 1 and len(response.values()[0]['records'].values()) == 1):
return [re.match(r'^/works/(.*)$',work_key["key"]).group(1) for work_key in works_key]
else:
raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works0")
except Exception, e:
return []
@classmethod
def works(cls, ids, page_size=10):
"""generalize to handle more than one set of id at a time -- ids is an iterable"""
for (i, page) in enumerate(grouper(ids, page_size=page_size)):
response = cls.read(page)
for (id, id_type) in page:
key = "{1}:{0}".format(id, id_type)
val = response.get(key)
if val is not None:
if (len(val['records'].values()) == 1):
try:
works_key = val['records'].values()[0]["details"]["details"]["works"]
yield [re.match(r'^/works/(.*)$',work_key["key"]).group(1) for work_key in works_key]
except Exception, e:
pass
else:
raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works")
else:
yield []
@classmethod
def json_for_olid(cls, olid, follow_redirect=True):
ol_types = {'M': 'books', 'W':'works', 'A':'authors'}
id_type = ol_types.get(str(olid)[-1].upper(), None)
if id_type is not None:
url = "http://openlibrary.org/{0}/{1}.json".format(id_type, olid.upper())
r = requests.get(url)
if r.status_code == httplib.OK:
# check to see whether type is redirect, retrieve that item it we are following redirects
resp = json.loads(r.content)
if resp["type"]["key"] == "/type/redirect" and follow_redirect:
redir_olid = resp["location"].split("/")[-1]
return OpenLibrary.json_for_olid(redir_olid)
else:
return resp
else:
raise OpenLibraryException("OpenLibrary API response: %s " % (httplib.responses[r.status_code]) )
else:
return None
class FreebaseBooks(object):
def __init__(self, username=None, password=None, main_or_sandbox='main'):
if main_or_sandbox == 'main':
self.freebase = freebase
else:
self.freebase = freebase.sandbox
if username is not None and password is not None:
self.freebase.login(username,password)
def books(self):
MQL = u"""[{
"type": "/book/book",
"id": null,
"key": [{
"namespace": "/wikipedia/en",
"value": null,
"type": "/type/key"
}]
}]
""".replace("\n"," ")
query = json.loads(MQL)
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
def book_editions(self):
MQL = u"""[{
"type": "/book/book_edition",
"id": null,
"isbn": [{}],
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
"book": {
"id": null,
"name": null
}
}]""".replace("\n"," ")
query = json.loads(MQL)
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
def editions_for_book(self, book_id):
MQL = u"""[{
"type": "/book/book_edition",
"id": null,
"isbn": [{}],
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
"book": {
"id": null,
"name": null
}
}]""".replace("\n"," ")
query = json.loads(MQL)
query[0]["book"]["id"] = book_id
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
def book_edition_by_id(self,id,id_type):
MQL = u"""[{
"type": "/book/book_edition",
"id": null,
"isbn": [{}],
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
"book": {
"id": null,
"name": null
}
}]""".replace("\n"," ")
query = json.loads(MQL)
if id_type == 'isbn':
query[0][id_type][0].setdefault('name', id)
elif id_type in ['LCCN', 'OCLC_number', 'openlibrary_id']:
query[0][id_type][0].setdefault('value', id)
if id_type in ['isbn', 'LCCN', 'OCLC_number', 'openlibrary_id']:
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
else:
raise FreebaseException('id_type must be one of ISBN, LCCN, OCLC_number or openlibrary_id, not %s' % (id_type))
def xisbn(self, isbn_val=None, book_id=None):
""" pass in either isbn_val or book_id and xisbn returns related ISBNs in Freebase. Handles case in which
either isbn or book_id is not None but not both
"""
if isbn_val is None and book_id is None:
raise Exception("One of isbn or book_id must be specified")
elif isbn_val is not None and book_id is not None:
raise Exception("Only only of isbn or book_id can be specified")
elif isbn_val is not None:
isbn_val = isbn.ISBN(isbn_val).to_string('13')
MQL = """[{
"type": "/book/book_edition",
"isbn": {
"name": null
},
"book": {
"editions": [{
"isbn": {
"name": null
}
}]
}
}]""".replace("\n"," ")
query = json.loads(MQL)
query[0]["book"]["editions"][0]["isbn"]["name"] = isbn_val
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r["isbn"]["name"]
elif book_id is not None:
for edition in self.editions_for_book(book_id=book_id):
for i in edition["isbn"]:
yield i["name"]
class WorkMapper(object):
@classmethod
def freebase_book_to_openlibrary_work(cls, fb_id, complete_search=False):
""" Try to map a Freebase ID by taking the ISBNs of associated editions and asking OpenLibrary for the work id"""
print "fb_id: ", fb_id
fb = FreebaseBooks()
work_ids = set()
# grab all ISBNs correponding to Freebase fb_id and comput the OpenLibrary work ID
# if complete_search is False, stop at first work id
for work_id_list in OpenLibrary.works(izip(fb.xisbn(book_id=fb_id), repeat('isbn'))):
for work_id in work_id_list:
if work_id not in work_ids:
work_ids.add(work_id)
yield work_id
if not complete_search:
raise StopIteration()
def look_up_my_zotero_books_in_hathi():
from regluit.experimental.zotero_books import MyZotero
zot = MyZotero()
for (i,b) in enumerate(zot.get_books(20)):
try:
print b, hathi_bib(b['isbn'])
except Exception, e:
print e
def ol_practice():
print OpenLibrary.books(MASHUPBOOK_ISBN_10)
pprint (OpenLibrary.books(MASHUPBOOK_ISBN_13, jscmd='data'))
print OpenLibrary.books('0385472579', jscmd='data')
print (OpenLibrary.covers(MASHUPBOOK_ISBN_10, size='M'))
print (OpenLibrary.author_photos(RY_OLID,'S'))
# can we status of a pd book oclc:03544699 The Log of a Cowboy - Andy Adams, 1903
# http://openlibrary.org/books/OL7173600M/The_log_of_a_cowboy -- not working?
print OpenLibrary.books('OL7173600M' 'olid', jscmd='data')
# http://openlibrary.org/books/OL6542070M/The_Montessori_method works
print OpenLibrary.books('1181252','oclc',jscmd='data')
print OpenLibrary.read([(MASHUPBOOK_ISBN_10,'isbn'),('1181252','oclc')])
# let's bring up the editions for Surfacing
for (i,ed) in enumerate(islice(OpenLibrary.editions(SURFACING_WORK_OLID),100)):
print i, ed
# let's get the Work ID for one of the editions
pprint(OpenLibrary.works(SURFACING_EDITION_OLID,id_type='olid'))
class FreebaseBooksTest(TestCase):
def test_books_iter(self):
fb = FreebaseBooks()
books = list(islice(fb.books(),4))
self.assertEqual(len(books),4)
for book in books[0:1]:
self.assertTrue(book["type"], "/book/book")
def test_book_editions_iter(self):
fb = FreebaseBooks()
editions = list(islice(fb.book_editions(),4))
self.assertEqual(len(editions),4)
for edition in editions[0:1]:
self.assertTrue(edition["type"], "/book/book_edition")
def test_book_edition_by_id(self):
fb = FreebaseBooks()
# http://www.amazon.com/New-Collected-Poems-Czeslaw-Milosz/dp/006019667X
edition = list(fb.book_edition_by_id('9780060196677','isbn'))
self.assertEqual(edition[0]['type'],'/book/book_edition')
self.assertEqual(edition[0]['book']['id'],'/m/0c1t1yk')
self.assertEqual(edition[0]['book']['name'],'New and collected poems 1931-2001')
edition = list(fb.book_edition_by_id('76074298', 'OCLC_number'))
self.assertEqual(edition[0]['type'],'/book/book_edition')
self.assertEqual(edition[0]['book']['id'],'/m/021yncj')
self.assertEqual(edition[0]['book']['name'],'Brave New Words: The Oxford Dictionary of Science Fiction')
# test openlibary_id Moby Dick
edition = list(fb.book_edition_by_id('9780486432151', 'isbn'))[0]
self.assertEqual(edition['openlibrary_id'][0]['value'], 'OL3685847M')
def test_editions_for_book(self):
fb = FreebaseBooks()
book_id = '/en/moby-dick'
editions = fb.editions_for_book(book_id)
for i, edition in enumerate(editions):
pass
def test_xisbn(self):
isbn_val = '9780486432151'
book_id = '/en/moby-dick'
fb = FreebaseBooks()
isbns = set(fb.xisbn(isbn_val))
isbns2 = set(fb.xisbn(book_id=book_id))
self.assertEqual(isbns, isbns2)
class OpenLibraryTest(TestCase):
def test_books(self):
book = OpenLibrary.books(MASHUPBOOK_ISBN_10)
self.assertEqual(book.values()[0]['info_url'], 'http://openlibrary.org/books/OL13439114M/Pro_Web_2.0_Mashups')
book_data = OpenLibrary.books('0385472579', jscmd='data')
self.assertEqual(book_data.values()[0]['title'], 'Zen Speaks')
self.assertEqual(book_data.values()[0]['identifiers']['openlibrary'][0], 'OL7440033M')
def test_books_olid(self):
# can we status of a pd book oclc:03544699 The Log of a Cowboy - Andy Adams, 1903
# http://openlibrary.org/books/OL7173600M/The_log_of_a_cowboy
book = OpenLibrary.books('OL7173600M', 'olid', jscmd='data')
self.assertEqual(book.values()[0]['title'], 'The log of a cowboy')
def test_books_oclc(self):
# http://openlibrary.org/books/OL6542070M/The_Montessori_method works
book = OpenLibrary.books('1181252','oclc',jscmd='data')
self.assertEqual(book.values()[0]['title'], 'The Montessori method')
def test_read(self):
results = OpenLibrary.read([(MASHUPBOOK_ISBN_10,'isbn'),('1181252','oclc')])
self.assertEqual(results['oclc:1181252']['records'].values()[0]['data']['ebooks'][0]['formats']['epub']['url'],
'http://www.archive.org/download/cu31924032538500/cu31924032538500.epub')
def test_covers(self):
self.assertEqual(OpenLibrary.covers(MASHUPBOOK_ISBN_10, size='M'),
'http://covers.openlibrary.org/b/isbn/159059858X-M.jpg')
def test_author_photos(self):
self.assertEqual(OpenLibrary.author_photos(RY_OLID,'S'), 'http://covers.openlibrary.org/a/olid/OL4264806A-S.jpg')
def test_editions(self):
# let's bring up the editions for Surfacing
for (i,ed) in enumerate(islice(OpenLibrary.editions(SURFACING_WORK_OLID),100)):
self.assertTrue(re.match(r'^OL(\d+)M$',ed))
def test_works0(self):
self.assertEqual(OpenLibrary.works0(SURFACING_EDITION_OLID,id_type='olid')[0], 'OL675829W')
def test_works(self):
ids = [(MASHUPBOOK_ISBN_10, 'isbn'), (SURFACING_EDITION_OLID,'olid'), ('233434','isbn')]
resp = list(OpenLibrary.works(ids))
self.assertEqual(resp, [['OL10306321W'], ['OL675829W'], []])
def test_json_for_olid(self):
# manifestation
# http://openlibrary.org/books/OL13439114M.json
id = "OL13439114M"
edition = OpenLibrary.json_for_olid(id)
self.assertEqual(edition["title"], "Pro Web 2.0 Mashups")
self.assertEqual(edition["identifiers"]["librarything"], ['2771144'])
self.assertEqual(edition["subjects"], ['Mashups (World Wide Web)'])
# work
# http://openlibrary.org/works/OL10306321W.json
id = "OL10306321W"
work = OpenLibrary.json_for_olid(id)
self.assertEqual(work["title"], "Pro Web 2.0 Mashups")
self.assertEqual(work["type"]["key"], "/type/work")
self.assertEqual(work["authors"][0]["type"]["key"], "/type/author_role")
self.assertEqual(work["authors"][0]["author"]["key"], "/authors/OL4264806A")
# author
# http://openlibrary.org/authors/OL4264806A.json
id = "OL4264806A"
author = OpenLibrary.json_for_olid(id)
self.assertEqual(author["name"], "Raymond Yee")
# redirect ok?
# "OL14917149W" -> "OL362684W"
id = "OL14917149W"
work = OpenLibrary.json_for_olid(id,follow_redirect=True)
self.assertEqual(work["title"], "King Richard III")
self.assertEqual(work["key"], "/works/OL362684W")
work = OpenLibrary.json_for_olid(id,follow_redirect=False)
self.assertEqual(work["type"]["key"], "/type/redirect")
class WorkMapperTest(TestCase):
def test_freebase_book_to_openlibrary_work(self):
id = '/en/moby-dick'
id = '/en/wuthering_heights'
work_ids = list(WorkMapper.freebase_book_to_openlibrary_work(id, complete_search=True))
print work_ids
def test_work_info_from_openlibrary(self):
editions = list(OpenLibrary.editions(SURFACING_WORK_OLID))
print editions, len(editions)
def suite():
#testcases = [WorkMapperTest]
testcases = []
suites = unittest.TestSuite([unittest.TestLoader().loadTestsFromTestCase(testcase) for testcase in testcases])
suites.addTest(OpenLibraryTest('test_json_for_olid'))
#suites.addTest(SettingsTest('test_dev_me_alignment')) # give option to test this alignment
return suites
if __name__ == '__main__':
#look_up_my_zotero_books_in_hathi()
#ol_practice()
#print len(list(islice(parse_project_gutenberg_catalog(),100000)))
#unittest.main()
suites = suite()
#suites = unittest.defaultTestLoader.loadTestsFromModule(__import__('__main__'))
unittest.TextTestRunner().run(suites)