regluit/experimental/bookdata.py

565 lines
22 KiB
Python
Raw Normal View History

import requests
import urllib
import httplib
import json
from pprint import pprint
from itertools import islice, izip, repeat
import logging
import sys, os
# a kludge to allow for isbn.py to be imported
# and not just in the context of the regluit Django app
try:
from regluit.core import isbn
except:
import isbn
2011-11-09 17:09:58 +00:00
try:
import unittest
from unittest import TestCase
except:
from django.test import TestCase
from django.utils import unittest
import re
import freebase
import logging
logger = logging.getLogger(__name__)
MASHUPBOOK_ISBN_13 = '9781590598580'
MASHUPBOOK_ISBN_10 = '159059858X'
MASHUPBOOK_OLID = 'OL13439114M'
RY_OLID = 'OL4264806A'
SURFACING_WORK_OLID = 'OL675829W'
SURFACING_EDITION_OLID = 'OL8075248M'
# http://stackoverflow.com/questions/2348317/how-to-write-a-pager-for-python-iterators/2350904#2350904
def grouper(iterable, page_size):
page= []
for item in iterable:
page.append( item )
if len(page) == page_size:
yield page
page= []
yield page
2011-11-09 17:09:58 +00:00
class FreebaseException(Exception):
pass
class OpenLibraryException(Exception):
pass
class HathiException(Exception):
pass
def filter_none(d):
d2 = {}
for (k,v) in d.iteritems():
if v is not None:
d2[k] = v
return d2
def to_list(s):
"""if input is not a list return a list with s"""
if not isinstance(s,list):
return [s]
else:
return s
def hathi_bib(id, id_type='isbn', detail_level='brief'):
url = "http://catalog.hathitrust.org/api/volumes/brief/%s/%s.json" % (id_type, id)
r = requests.get(url)
if r.status_code == httplib.OK:
return r.content
else:
raise Exception("Hathi Bib API response: %s " % (httplib.responses[r.status_code]) )
# http://openlibrary.org/developers/api
class OpenLibrary(object):
@classmethod
def books(cls, id, id_type="isbn", format="json", callback=None, jscmd=None):
# http://openlibrary.org/dev/docs/api/books
# bibkeys: one of isbn, oclc, lccn, olid
# format: one of json, javascript
# jscmd: one of viewapi, data, details (deprecated in favor of data)
base_url = "http://openlibrary.org/api/books"
params = filter_none({'bibkeys':'%s:%s' % (id_type,id),
'format':format,
'callback':callback,
'jscmd':jscmd})
r = requests.get(base_url,params=params)
if r.status_code == httplib.OK:
return json.loads(r.content)
else:
raise OpenLibraryException("OpenLibrary API response: %s " % (httplib.responses[r.status_code]) )
@classmethod
def covers(cls, id, id_type='isbn', size='S'):
# http://openlibrary.org/dev/docs/api/covers
# id_type: one of 'id' (internal cover ID), olid (Open Library ID), isbn, oclc, lccn, goodreads, librarything
# size: one of s, m, l
# http://covers.openlibrary.org/b/$key/$value-$size.jpg
if id_type.lower() not in ['id','isbn','oclc','lccn','goodreads','librarything']:
raise OpenLibraryException("%s is an incorrect id_type for covers" % (id_type))
if size.upper() not in ['S','M','L']:
raise OpenLibraryException("%s is an incorrect size for covers" % (size))
return "http://covers.openlibrary.org/b/%s/%s-%s.jpg" % (id_type.lower(),id,size.upper())
@classmethod
def author_photos(cls, olid, size='S'):
#http://covers.openlibrary.org/a/$key/$value-$size.jpg
if size.upper() not in ['S','M','L']:
raise OpenLibraryException("%s is an incorrect size for author" % (size))
return "http://covers.openlibrary.org/a/olid/%s-%s.jpg" % (olid,size.upper())
@classmethod
def read(cls, queries):
# http://openlibrary.org/dev/docs/api/read -- most of its value to be used in browser JS?
# can do single or multiple requests
# example of a single request:
# http://openlibrary.org/api/volumes/brief/isbn/0596156715.json
# multiple
# http://openlibrary.org/api/volumes/brief/json/id:1;lccn:50006784|olid:OL6179000M;lccn:55011330
# the multiple format works, I think, for single requests
#url = "http://openlibrary.org/api/volumes/brief/%s/%s.json" % (id_type, id)
query_string = "|".join([ ";".join([ "%s:%s" % (id_type,id) for (id, id_type) in to_list(query)])
for query in to_list(queries)])
if query_string:
url = "http://openlibrary.org/api/volumes/brief/json/%s" % (query_string)
r = requests.get(url)
if r.status_code == httplib.OK:
return json.loads(r.content)
else:
raise OpenLibraryException("OpenLibrary API response: %s " % (httplib.responses[r.status_code]) )
else:
return None
@classmethod
def lists(cls):
# http://openlibrary.org/dev/docs/api/lists
raise NotImplementedError
@classmethod
def query_iter(cls,**kwargs):
# use limit for page size and offset as the starting point
kwargs2 = kwargs.copy()
kwargs2.setdefault('offset', 0)
more_items = True
while more_items:
items = cls.query(**kwargs2)
for item in items:
yield item
if len(items):
kwargs2['offset'] += len(items)
else:
more_items = False
@classmethod
def query(cls, **kwargs):
# limit and offset are special
base_url = "http://openlibrary.org/query.json"
r = requests.get(base_url, params=kwargs)
if r.status_code == httplib.OK:
return json.loads(r.content)
else:
raise OpenLibraryException("OpenLibrary API response: %s " % (httplib.responses[r.status_code]) )
@classmethod
def editions(cls, work_olid):
# http://openlibrary.org/query.json?type=/type/edition&works=/works/OL675829W&limit=5000
for item in cls.query_iter(type='/type/edition',works='/works/%s'%(work_olid), limit=10):
try:
yield re.match(r'^/books/(.*)$',item['key']).group(1)
except Exception, e:
raise OpenLibraryException("problem in editions: %s " % e)
@classmethod
def works0(cls, id, id_type='isbn'):
# http://openlibrary.org/api/volumes/brief/olid/OL8075248M.json
# can there be more than 1 work for a given edition?
# will return a list of works
# there is a bug in OL in which we have workless editions
response = cls.read((id,id_type))
# resp['olid:OL8075248M']['records']['/books/OL8075248M']["details"]["details"]["works"][0]["key"]
# resp.values()[0]['records'].values()[0]["details"]["details"]["works"][0]["key"]
try:
works_key = response.values()[0]['records'].values()[0]["details"]["details"]["works"]
if (len(response.values()) == 1 and len(response.values()[0]['records'].values()) == 1):
return [re.match(r'^/works/(.*)$',work_key["key"]).group(1) for work_key in works_key]
else:
raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works0")
except Exception, e:
return []
@classmethod
def works(cls, ids, page_size=10):
"""generalize to handle more than one set of id at a time -- ids is an iterable"""
for (i, page) in enumerate(grouper(ids, page_size=page_size)):
response = cls.read(page)
for (id, id_type) in page:
key = "{1}:{0}".format(id, id_type)
val = response.get(key)
if val is not None:
if (len(val['records'].values()) == 1):
try:
works_key = val['records'].values()[0]["details"]["details"]["works"]
yield [re.match(r'^/works/(.*)$',work_key["key"]).group(1) for work_key in works_key]
except Exception, e:
pass
else:
raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works")
else:
yield []
@classmethod
def json_for_olid(cls, olid, follow_redirect=True):
ol_types = {'M': 'books', 'W':'works', 'A':'authors'}
id_type = ol_types.get(str(olid)[-1].upper(), None)
if id_type is not None:
url = "http://openlibrary.org/{0}/{1}.json".format(id_type, olid.upper())
r = requests.get(url)
if r.status_code == httplib.OK:
# check to see whether type is redirect, retrieve that item it we are following redirects
resp = json.loads(r.content)
if resp["type"]["key"] == "/type/redirect" and follow_redirect:
redir_olid = resp["location"].split("/")[-1]
return OpenLibrary.json_for_olid(redir_olid)
else:
return resp
else:
raise OpenLibraryException("OpenLibrary API response: %s " % (httplib.responses[r.status_code]) )
else:
return None
class FreebaseBooks(object):
def __init__(self, username=None, password=None, main_or_sandbox='main'):
if main_or_sandbox == 'main':
self.freebase = freebase
else:
self.freebase = freebase.sandbox
if username is not None and password is not None:
self.freebase.login(username,password)
def books(self):
MQL = u"""[{
"type": "/book/book",
"id": null,
"key": [{
"namespace": "/wikipedia/en",
"value": null,
"type": "/type/key"
}]
}]
""".replace("\n"," ")
query = json.loads(MQL)
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
def book_editions(self):
MQL = u"""[{
"type": "/book/book_edition",
"id": null,
2011-11-09 17:09:58 +00:00
"isbn": [{}],
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
"book": {
"id": null,
"name": null
}
}]""".replace("\n"," ")
query = json.loads(MQL)
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
def editions_for_book(self, book_id):
MQL = u"""[{
"type": "/book/book_edition",
"id": null,
"isbn": [{}],
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
"book": {
"id": null,
"name": null
}
}]""".replace("\n"," ")
query = json.loads(MQL)
query[0]["book"]["id"] = book_id
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
2011-11-09 17:09:58 +00:00
def book_edition_by_id(self,id,id_type):
MQL = u"""[{
"type": "/book/book_edition",
"id": null,
"isbn": [{}],
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
2011-11-09 17:09:58 +00:00
"book": {
"id": null,
"name": null
}
}]""".replace("\n"," ")
query = json.loads(MQL)
if id_type == 'isbn':
query[0][id_type][0].setdefault('name', id)
elif id_type in ['LCCN', 'OCLC_number', 'openlibrary_id']:
2011-11-09 17:09:58 +00:00
query[0][id_type][0].setdefault('value', id)
if id_type in ['isbn', 'LCCN', 'OCLC_number', 'openlibrary_id']:
2011-11-09 17:09:58 +00:00
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
else:
raise FreebaseException('id_type must be one of ISBN, LCCN, OCLC_number or openlibrary_id, not %s' % (id_type))
def xisbn(self, isbn_val=None, book_id=None):
""" pass in either isbn_val or book_id and xisbn returns related ISBNs in Freebase. Handles case in which
either isbn or book_id is not None but not both
"""
if isbn_val is None and book_id is None:
raise Exception("One of isbn or book_id must be specified")
elif isbn_val is not None and book_id is not None:
raise Exception("Only only of isbn or book_id can be specified")
elif isbn_val is not None:
isbn_val = isbn.ISBN(isbn_val).to_string('13')
MQL = """[{
"type": "/book/book_edition",
"isbn": {
"name": null
},
"book": {
"editions": [{
"isbn": {
"name": null
}
}]
}
}]""".replace("\n"," ")
query = json.loads(MQL)
query[0]["book"]["editions"][0]["isbn"]["name"] = isbn_val
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r["isbn"]["name"]
elif book_id is not None:
for edition in self.editions_for_book(book_id=book_id):
for i in edition["isbn"]:
yield i["name"]
class WorkMapper(object):
@classmethod
def freebase_book_to_openlibrary_work(cls, fb_id, complete_search=False):
""" Try to map a Freebase ID by taking the ISBNs of associated editions and asking OpenLibrary for the work id"""
print "fb_id: ", fb_id
fb = FreebaseBooks()
work_ids = set()
# grab all ISBNs correponding to Freebase fb_id and comput the OpenLibrary work ID
# if complete_search is False, stop at first work id
for work_id_list in OpenLibrary.works(izip(fb.xisbn(book_id=fb_id), repeat('isbn'))):
for work_id in work_id_list:
if work_id not in work_ids:
work_ids.add(work_id)
yield work_id
if not complete_search:
raise StopIteration()
2011-11-09 17:09:58 +00:00
def look_up_my_zotero_books_in_hathi():
from regluit.experimental.zotero_books import MyZotero
zot = MyZotero()
for (i,b) in enumerate(zot.get_books(20)):
try:
print b, hathi_bib(b['isbn'])
except Exception, e:
print e
def ol_practice():
print OpenLibrary.books(MASHUPBOOK_ISBN_10)
pprint (OpenLibrary.books(MASHUPBOOK_ISBN_13, jscmd='data'))
print OpenLibrary.books('0385472579', jscmd='data')
print (OpenLibrary.covers(MASHUPBOOK_ISBN_10, size='M'))
print (OpenLibrary.author_photos(RY_OLID,'S'))
# can we status of a pd book oclc:03544699 The Log of a Cowboy - Andy Adams, 1903
# http://openlibrary.org/books/OL7173600M/The_log_of_a_cowboy -- not working?
print OpenLibrary.books('OL7173600M' 'olid', jscmd='data')
# http://openlibrary.org/books/OL6542070M/The_Montessori_method works
print OpenLibrary.books('1181252','oclc',jscmd='data')
print OpenLibrary.read([(MASHUPBOOK_ISBN_10,'isbn'),('1181252','oclc')])
# let's bring up the editions for Surfacing
for (i,ed) in enumerate(islice(OpenLibrary.editions(SURFACING_WORK_OLID),100)):
print i, ed
# let's get the Work ID for one of the editions
pprint(OpenLibrary.works(SURFACING_EDITION_OLID,id_type='olid'))
2011-11-09 17:09:58 +00:00
class FreebaseBooksTest(TestCase):
def test_books_iter(self):
fb = FreebaseBooks()
books = list(islice(fb.books(),4))
self.assertEqual(len(books),4)
for book in books[0:1]:
self.assertTrue(book["type"], "/book/book")
def test_book_editions_iter(self):
fb = FreebaseBooks()
editions = list(islice(fb.book_editions(),4))
self.assertEqual(len(editions),4)
for edition in editions[0:1]:
self.assertTrue(edition["type"], "/book/book_edition")
def test_book_edition_by_id(self):
fb = FreebaseBooks()
# http://www.amazon.com/New-Collected-Poems-Czeslaw-Milosz/dp/006019667X
edition = list(fb.book_edition_by_id('9780060196677','isbn'))
self.assertEqual(edition[0]['type'],'/book/book_edition')
self.assertEqual(edition[0]['book']['id'],'/m/0c1t1yk')
self.assertEqual(edition[0]['book']['name'],'New and collected poems 1931-2001')
edition = list(fb.book_edition_by_id('76074298', 'OCLC_number'))
self.assertEqual(edition[0]['type'],'/book/book_edition')
self.assertEqual(edition[0]['book']['id'],'/m/021yncj')
self.assertEqual(edition[0]['book']['name'],'Brave New Words: The Oxford Dictionary of Science Fiction')
# test openlibary_id Moby Dick
edition = list(fb.book_edition_by_id('9780486432151', 'isbn'))[0]
self.assertEqual(edition['openlibrary_id'][0]['value'], 'OL3685847M')
def test_editions_for_book(self):
fb = FreebaseBooks()
book_id = '/en/moby-dick'
editions = fb.editions_for_book(book_id)
for i, edition in enumerate(editions):
pass
def test_xisbn(self):
isbn_val = '9780486432151'
book_id = '/en/moby-dick'
fb = FreebaseBooks()
isbns = set(fb.xisbn(isbn_val))
isbns2 = set(fb.xisbn(book_id=book_id))
self.assertEqual(isbns, isbns2)
2011-11-09 17:09:58 +00:00
class OpenLibraryTest(TestCase):
def test_books(self):
book = OpenLibrary.books(MASHUPBOOK_ISBN_10)
self.assertEqual(book.values()[0]['info_url'], 'http://openlibrary.org/books/OL13439114M/Pro_Web_2.0_Mashups')
book_data = OpenLibrary.books('0385472579', jscmd='data')
self.assertEqual(book_data.values()[0]['title'], 'Zen Speaks')
self.assertEqual(book_data.values()[0]['identifiers']['openlibrary'][0], 'OL7440033M')
def test_books_olid(self):
# can we status of a pd book oclc:03544699 The Log of a Cowboy - Andy Adams, 1903
# http://openlibrary.org/books/OL7173600M/The_log_of_a_cowboy
book = OpenLibrary.books('OL7173600M', 'olid', jscmd='data')
self.assertEqual(book.values()[0]['title'], 'The log of a cowboy')
def test_books_oclc(self):
# http://openlibrary.org/books/OL6542070M/The_Montessori_method works
book = OpenLibrary.books('1181252','oclc',jscmd='data')
self.assertEqual(book.values()[0]['title'], 'The Montessori method')
def test_read(self):
results = OpenLibrary.read([(MASHUPBOOK_ISBN_10,'isbn'),('1181252','oclc')])
self.assertEqual(results['oclc:1181252']['records'].values()[0]['data']['ebooks'][0]['formats']['epub']['url'],
'http://www.archive.org/download/cu31924032538500/cu31924032538500.epub')
def test_covers(self):
self.assertEqual(OpenLibrary.covers(MASHUPBOOK_ISBN_10, size='M'),
'http://covers.openlibrary.org/b/isbn/159059858X-M.jpg')
def test_author_photos(self):
self.assertEqual(OpenLibrary.author_photos(RY_OLID,'S'), 'http://covers.openlibrary.org/a/olid/OL4264806A-S.jpg')
def test_editions(self):
# let's bring up the editions for Surfacing
for (i,ed) in enumerate(islice(OpenLibrary.editions(SURFACING_WORK_OLID),100)):
self.assertTrue(re.match(r'^OL(\d+)M$',ed))
def test_works0(self):
self.assertEqual(OpenLibrary.works0(SURFACING_EDITION_OLID,id_type='olid')[0], 'OL675829W')
def test_works(self):
ids = [(MASHUPBOOK_ISBN_10, 'isbn'), (SURFACING_EDITION_OLID,'olid'), ('233434','isbn')]
resp = list(OpenLibrary.works(ids))
self.assertEqual(resp, [['OL10306321W'], ['OL675829W'], []])
def test_json_for_olid(self):
# manifestation
# http://openlibrary.org/books/OL13439114M.json
id = "OL13439114M"
edition = OpenLibrary.json_for_olid(id)
self.assertEqual(edition["title"], "Pro Web 2.0 Mashups")
self.assertEqual(edition["identifiers"]["librarything"], ['2771144'])
self.assertEqual(edition["subjects"], ['Mashups (World Wide Web)'])
# work
# http://openlibrary.org/works/OL10306321W.json
id = "OL10306321W"
work = OpenLibrary.json_for_olid(id)
self.assertEqual(work["title"], "Pro Web 2.0 Mashups")
self.assertEqual(work["type"]["key"], "/type/work")
self.assertEqual(work["authors"][0]["type"]["key"], "/type/author_role")
self.assertEqual(work["authors"][0]["author"]["key"], "/authors/OL4264806A")
# author
# http://openlibrary.org/authors/OL4264806A.json
id = "OL4264806A"
author = OpenLibrary.json_for_olid(id)
self.assertEqual(author["name"], "Raymond Yee")
# redirect ok?
# "OL14917149W" -> "OL362684W"
id = "OL14917149W"
work = OpenLibrary.json_for_olid(id,follow_redirect=True)
self.assertEqual(work["title"], "King Richard III")
self.assertEqual(work["key"], "/works/OL362684W")
work = OpenLibrary.json_for_olid(id,follow_redirect=False)
self.assertEqual(work["type"]["key"], "/type/redirect")
class WorkMapperTest(TestCase):
def test_freebase_book_to_openlibrary_work(self):
id = '/en/moby-dick'
id = '/en/wuthering_heights'
work_ids = list(WorkMapper.freebase_book_to_openlibrary_work(id, complete_search=True))
print work_ids
def test_work_info_from_openlibrary(self):
editions = list(OpenLibrary.editions(SURFACING_WORK_OLID))
print editions, len(editions)
def suite():
#testcases = [WorkMapperTest]
testcases = []
suites = unittest.TestSuite([unittest.TestLoader().loadTestsFromTestCase(testcase) for testcase in testcases])
suites.addTest(OpenLibraryTest('test_json_for_olid'))
#suites.addTest(SettingsTest('test_dev_me_alignment')) # give option to test this alignment
return suites
if __name__ == '__main__':
#look_up_my_zotero_books_in_hathi()
#ol_practice()
#print len(list(islice(parse_project_gutenberg_catalog(),100000)))
#unittest.main()
suites = suite()
#suites = unittest.defaultTestLoader.loadTestsFromModule(__import__('__main__'))
unittest.TextTestRunner().run(suites)