Added xisbn-like method based on Freebase data; Added a Freebase /book/book id to OpenLibrary work id mapper

pull/1/head
Raymond Yee 2011-12-05 09:19:07 -08:00
parent ec09c4754b
commit e121e07e72
1 changed files with 202 additions and 10 deletions

View File

@ -1,10 +1,22 @@
import requests import requests
import urllib import urllib
import httplib import httplib
from regluit.experimental.zotero_books import Zotero2, MyZotero
import json import json
from pprint import pprint from pprint import pprint
from itertools import islice from itertools import islice, izip, repeat
import logging
import sys, os
# a kludge to allow for isbn.py to be imported
# and not just in the context of the regluit Django app
try:
from regluit.core import isbn
except:
import isbn
try: try:
import unittest import unittest
@ -23,11 +35,22 @@ logger = logging.getLogger(__name__)
MASHUPBOOK_ISBN_13 = '9781590598580' MASHUPBOOK_ISBN_13 = '9781590598580'
MASHUPBOOK_ISBN_10 = '159059858X' MASHUPBOOK_ISBN_10 = '159059858X'
MASHUPBOOK_OLID = 'OL13439114M'
RY_OLID = 'OL4264806A' RY_OLID = 'OL4264806A'
SURFACING_WORK_OLID = 'OL675829W' SURFACING_WORK_OLID = 'OL675829W'
SURFACING_EDITION_OLID = 'OL8075248M' SURFACING_EDITION_OLID = 'OL8075248M'
# http://stackoverflow.com/questions/2348317/how-to-write-a-pager-for-python-iterators/2350904#2350904
def grouper(iterable, page_size):
page= []
for item in iterable:
page.append( item )
if len(page) == page_size:
yield page
page= []
yield page
class FreebaseException(Exception): class FreebaseException(Exception):
pass pass
@ -166,7 +189,7 @@ class OpenLibrary(object):
raise OpenLibraryException("problem in editions: %s " % e) raise OpenLibraryException("problem in editions: %s " % e)
@classmethod @classmethod
def works(cls, id, id_type='isbn'): def works0(cls, id, id_type='isbn'):
# http://openlibrary.org/api/volumes/brief/olid/OL8075248M.json # http://openlibrary.org/api/volumes/brief/olid/OL8075248M.json
# can there be more than 1 work for a given edition? # can there be more than 1 work for a given edition?
# will return a list of works # will return a list of works
@ -179,9 +202,30 @@ class OpenLibrary(object):
if (len(response.values()) == 1 and len(response.values()[0]['records'].values()) == 1): if (len(response.values()) == 1 and len(response.values()[0]['records'].values()) == 1):
return [re.match(r'^/works/(.*)$',work_key["key"]).group(1) for work_key in works_key] return [re.match(r'^/works/(.*)$',work_key["key"]).group(1) for work_key in works_key]
else: else:
raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works") raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works0")
except Exception, e: except Exception, e:
raise OpenLibraryException("problem in works: %s " % e) return []
@classmethod
def works(cls, ids, page_size=10):
"""generalize to handle more than one set of id at a time -- ids is an iterable"""
for (i, page) in enumerate(grouper(ids, page_size=page_size)):
response = cls.read(page)
for (id, id_type) in page:
key = "{1}:{0}".format(id, id_type)
val = response.get(key)
if val is not None:
if (len(val['records'].values()) == 1):
try:
works_key = val['records'].values()[0]["details"]["details"]["works"]
yield [re.match(r'^/works/(.*)$',work_key["key"]).group(1) for work_key in works_key]
except Exception, e:
pass
else:
raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works")
else:
yield []
class FreebaseBooks(object): class FreebaseBooks(object):
def __init__(self, username=None, password=None, main_or_sandbox='main'): def __init__(self, username=None, password=None, main_or_sandbox='main'):
@ -215,6 +259,7 @@ class FreebaseBooks(object):
"ISBN": [{}], "ISBN": [{}],
"LCCN": [{}], "LCCN": [{}],
"OCLC_number": [{}], "OCLC_number": [{}],
"openlibrary_id": [{}],
"book": { "book": {
"id": null, "id": null,
"name": null "name": null
@ -224,6 +269,26 @@ class FreebaseBooks(object):
resp = self.freebase.mqlreaditer(query) resp = self.freebase.mqlreaditer(query)
for r in resp: for r in resp:
yield r yield r
def editions_for_book(self, book_id):
MQL = u"""[{
"type": "/book/book_edition",
"id": null,
"isbn": [{}],
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
"book": {
"id": null,
"name": null
}
}]""".replace("\n"," ")
query = json.loads(MQL)
query[0]["book"]["id"] = book_id
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
def book_edition_by_id(self,id,id_type): def book_edition_by_id(self,id,id_type):
MQL = u"""[{ MQL = u"""[{
@ -233,6 +298,7 @@ class FreebaseBooks(object):
"ISBN": [{}], "ISBN": [{}],
"LCCN": [{}], "LCCN": [{}],
"OCLC_number": [{}], "OCLC_number": [{}],
"openlibrary_id": [{}],
"book": { "book": {
"id": null, "id": null,
"name": null "name": null
@ -241,18 +307,69 @@ class FreebaseBooks(object):
query = json.loads(MQL) query = json.loads(MQL)
if id_type == 'isbn': if id_type == 'isbn':
query[0][id_type][0].setdefault('name', id) query[0][id_type][0].setdefault('name', id)
elif id_type in ['LCCN', 'OCLC_number']: elif id_type in ['LCCN', 'OCLC_number', 'openlibrary_id']:
query[0][id_type][0].setdefault('value', id) query[0][id_type][0].setdefault('value', id)
if id_type in ['isbn', 'LCCN', 'OCLC_number']: if id_type in ['isbn', 'LCCN', 'OCLC_number', 'openlibrary_id']:
resp = self.freebase.mqlreaditer(query) resp = self.freebase.mqlreaditer(query)
for r in resp: for r in resp:
yield r yield r
else: else:
raise FreebaseException('id_type must be one of ISBN, LCCN, or OCLC_number, not %s' % (id_type)) raise FreebaseException('id_type must be one of ISBN, LCCN, OCLC_number or openlibrary_id, not %s' % (id_type))
def xisbn(self, isbn_val=None, book_id=None):
""" pass in either isbn_val or book_id and xisbn returns related ISBNs in Freebase. Handles case in which
either isbn or book_id is not None but not both
"""
if isbn_val is None and book_id is None:
raise Exception("One of isbn or book_id must be specified")
elif isbn_val is not None and book_id is not None:
raise Exception("Only only of isbn or book_id can be specified")
elif isbn_val is not None:
isbn_val = isbn.ISBN(isbn_val).to_string('13')
MQL = """[{
"type": "/book/book_edition",
"isbn": {
"name": null
},
"book": {
"editions": [{
"isbn": {
"name": null
}
}]
}
}]""".replace("\n"," ")
query = json.loads(MQL)
query[0]["book"]["editions"][0]["isbn"]["name"] = isbn_val
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r["isbn"]["name"]
elif book_id is not None:
for edition in self.editions_for_book(book_id=book_id):
for i in edition["isbn"]:
yield i["name"]
class WorkMapper(object):
@classmethod
def freebase_book_to_openlibrary_work(cls, fb_id, complete_search=False):
""" Try to map a Freebase ID by taking the ISBNs of associated editions and asking OpenLibrary for the work id"""
print "fb_id: ", fb_id
fb = FreebaseBooks()
work_ids = set()
# grab all ISBNs correponding to Freebase fb_id and comput the OpenLibrary work ID
# if complete_search is False, stop at first work id
for work_id_list in OpenLibrary.works(izip(fb.xisbn(book_id=fb_id), repeat('isbn'))):
for work_id in work_id_list:
if work_id not in work_ids:
work_ids.add(work_id)
yield work_id
if not complete_search:
raise StopIteration()
def look_up_my_zotero_books_in_hathi(): def look_up_my_zotero_books_in_hathi():
from regluit.experimental.zotero_books import MyZotero
zot = MyZotero() zot = MyZotero()
for (i,b) in enumerate(zot.get_books(20)): for (i,b) in enumerate(zot.get_books(20)):
try: try:
@ -305,15 +422,90 @@ class FreebaseBooksTest(TestCase):
edition = list(fb.book_edition_by_id('76074298', 'OCLC_number')) edition = list(fb.book_edition_by_id('76074298', 'OCLC_number'))
self.assertEqual(edition[0]['type'],'/book/book_edition') self.assertEqual(edition[0]['type'],'/book/book_edition')
self.assertEqual(edition[0]['book']['id'],'/m/021yncj') self.assertEqual(edition[0]['book']['id'],'/m/021yncj')
self.assertEqual(edition[0]['book']['name'],'Brave New Words: The Oxford Dictionary of Science Fiction') self.assertEqual(edition[0]['book']['name'],'Brave New Words: The Oxford Dictionary of Science Fiction')
# test openlibary_id Moby Dick
edition = list(fb.book_edition_by_id('9780486432151', 'isbn'))[0]
self.assertEqual(edition['openlibrary_id'][0]['value'], 'OL3685847M')
def test_editions_for_book(self):
fb = FreebaseBooks()
book_id = '/en/moby-dick'
editions = fb.editions_for_book(book_id)
for i, edition in enumerate(editions):
pass
def test_xisbn(self):
isbn_val = '9780486432151'
book_id = '/en/moby-dick'
fb = FreebaseBooks()
isbns = set(fb.xisbn(isbn_val))
isbns2 = set(fb.xisbn(book_id=book_id))
self.assertEqual(isbns, isbns2)
class OpenLibraryTest(TestCase):
def test_books(self):
book = OpenLibrary.books(MASHUPBOOK_ISBN_10)
self.assertEqual(book.values()[0]['info_url'], 'http://openlibrary.org/books/OL13439114M/Pro_Web_2.0_Mashups')
book_data = OpenLibrary.books('0385472579', jscmd='data')
self.assertEqual(book_data.values()[0]['title'], 'Zen Speaks')
self.assertEqual(book_data.values()[0]['identifiers']['openlibrary'][0], 'OL7440033M')
def test_books_olid(self):
# can we status of a pd book oclc:03544699 The Log of a Cowboy - Andy Adams, 1903
# http://openlibrary.org/books/OL7173600M/The_log_of_a_cowboy
book = OpenLibrary.books('OL7173600M', 'olid', jscmd='data')
self.assertEqual(book.values()[0]['title'], 'The log of a cowboy')
def test_books_oclc(self):
# http://openlibrary.org/books/OL6542070M/The_Montessori_method works
book = OpenLibrary.books('1181252','oclc',jscmd='data')
self.assertEqual(book.values()[0]['title'], 'The Montessori method')
def test_read(self):
results = OpenLibrary.read([(MASHUPBOOK_ISBN_10,'isbn'),('1181252','oclc')])
self.assertEqual(results['oclc:1181252']['records'].values()[0]['data']['ebooks'][0]['formats']['epub']['url'],
'http://www.archive.org/download/cu31924032538500/cu31924032538500.epub')
def test_covers(self):
self.assertEqual(OpenLibrary.covers(MASHUPBOOK_ISBN_10, size='M'),
'http://covers.openlibrary.org/b/isbn/159059858X-M.jpg')
def test_author_photos(self):
self.assertEqual(OpenLibrary.author_photos(RY_OLID,'S'), 'http://covers.openlibrary.org/a/olid/OL4264806A-S.jpg')
def test_editions(self):
# let's bring up the editions for Surfacing
for (i,ed) in enumerate(islice(OpenLibrary.editions(SURFACING_WORK_OLID),100)):
self.assertTrue(re.match(r'^OL(\d+)M$',ed))
def test_works0(self):
self.assertEqual(OpenLibrary.works0(SURFACING_EDITION_OLID,id_type='olid')[0], 'OL675829W')
def test_works(self):
ids = [(MASHUPBOOK_ISBN_10, 'isbn'), (SURFACING_EDITION_OLID,'olid'), ('233434','isbn')]
resp = list(OpenLibrary.works(ids))
self.assertEqual(resp, [['OL10306321W'], ['OL675829W'], []])
class WorkMapperTest(TestCase):
def test_freebase_book_to_openlibrary_work(self):
id = '/en/moby-dick'
id = '/en/wuthering_heights'
work_ids = list(WorkMapper.freebase_book_to_openlibrary_work(id, complete_search=True))
print work_ids
def test_work_info_from_openlibrary(self):
editions = list(OpenLibrary.editions(SURFACING_WORK_OLID))
print editions, len(editions)
def suite():
#testcases = [WorkMapperTest]
testcases = []
suites = unittest.TestSuite([unittest.TestLoader().loadTestsFromTestCase(testcase) for testcase in testcases])
suites.addTest(WorkMapperTest('test_work_info_from_openlibrary'))
#suites.addTest(SettingsTest('test_dev_me_alignment')) # give option to test this alignment
return suites
if __name__ == '__main__': if __name__ == '__main__':
#look_up_my_zotero_books_in_hathi() #look_up_my_zotero_books_in_hathi()
#ol_practice() #ol_practice()
#print len(list(islice(parse_project_gutenberg_catalog(),100000))) #print len(list(islice(parse_project_gutenberg_catalog(),100000)))
unittest.main() #unittest.main()
suites = suite()
#suites = unittest.defaultTestLoader.loadTestsFromModule(__import__('__main__'))
unittest.TextTestRunner().run(suites)