Merge branch 'master' of github.com:Gluejar/regluit

pull/1/head
Andromeda Yelton 2011-12-05 16:17:02 -05:00
commit a94c04b890
3 changed files with 21520 additions and 10 deletions

View File

@ -1,10 +1,22 @@
import requests
import urllib
import httplib
from regluit.experimental.zotero_books import Zotero2, MyZotero
import json
from pprint import pprint
from itertools import islice
from itertools import islice, izip, repeat
import logging
import sys, os
# a kludge to allow for isbn.py to be imported
# and not just in the context of the regluit Django app
try:
from regluit.core import isbn
except:
import isbn
try:
import unittest
@ -23,11 +35,22 @@ logger = logging.getLogger(__name__)
MASHUPBOOK_ISBN_13 = '9781590598580'
MASHUPBOOK_ISBN_10 = '159059858X'
MASHUPBOOK_OLID = 'OL13439114M'
RY_OLID = 'OL4264806A'
SURFACING_WORK_OLID = 'OL675829W'
SURFACING_EDITION_OLID = 'OL8075248M'
# http://stackoverflow.com/questions/2348317/how-to-write-a-pager-for-python-iterators/2350904#2350904
def grouper(iterable, page_size):
page= []
for item in iterable:
page.append( item )
if len(page) == page_size:
yield page
page= []
yield page
class FreebaseException(Exception):
pass
@ -166,7 +189,7 @@ class OpenLibrary(object):
raise OpenLibraryException("problem in editions: %s " % e)
@classmethod
def works(cls, id, id_type='isbn'):
def works0(cls, id, id_type='isbn'):
# http://openlibrary.org/api/volumes/brief/olid/OL8075248M.json
# can there be more than 1 work for a given edition?
# will return a list of works
@ -179,9 +202,30 @@ class OpenLibrary(object):
if (len(response.values()) == 1 and len(response.values()[0]['records'].values()) == 1):
return [re.match(r'^/works/(.*)$',work_key["key"]).group(1) for work_key in works_key]
else:
raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works")
raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works0")
except Exception, e:
raise OpenLibraryException("problem in works: %s " % e)
return []
@classmethod
def works(cls, ids, page_size=10):
"""generalize to handle more than one set of id at a time -- ids is an iterable"""
for (i, page) in enumerate(grouper(ids, page_size=page_size)):
response = cls.read(page)
for (id, id_type) in page:
key = "{1}:{0}".format(id, id_type)
val = response.get(key)
if val is not None:
if (len(val['records'].values()) == 1):
try:
works_key = val['records'].values()[0]["details"]["details"]["works"]
yield [re.match(r'^/works/(.*)$',work_key["key"]).group(1) for work_key in works_key]
except Exception, e:
pass
else:
raise OpenLibraryException("Assumption of 1 key in response invalid in OpenLibrary.works")
else:
yield []
class FreebaseBooks(object):
def __init__(self, username=None, password=None, main_or_sandbox='main'):
@ -215,6 +259,7 @@ class FreebaseBooks(object):
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
"book": {
"id": null,
"name": null
@ -224,6 +269,26 @@ class FreebaseBooks(object):
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
def editions_for_book(self, book_id):
MQL = u"""[{
"type": "/book/book_edition",
"id": null,
"isbn": [{}],
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
"book": {
"id": null,
"name": null
}
}]""".replace("\n"," ")
query = json.loads(MQL)
query[0]["book"]["id"] = book_id
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
def book_edition_by_id(self,id,id_type):
MQL = u"""[{
@ -233,6 +298,7 @@ class FreebaseBooks(object):
"ISBN": [{}],
"LCCN": [{}],
"OCLC_number": [{}],
"openlibrary_id": [{}],
"book": {
"id": null,
"name": null
@ -241,18 +307,69 @@ class FreebaseBooks(object):
query = json.loads(MQL)
if id_type == 'isbn':
query[0][id_type][0].setdefault('name', id)
elif id_type in ['LCCN', 'OCLC_number']:
elif id_type in ['LCCN', 'OCLC_number', 'openlibrary_id']:
query[0][id_type][0].setdefault('value', id)
if id_type in ['isbn', 'LCCN', 'OCLC_number']:
if id_type in ['isbn', 'LCCN', 'OCLC_number', 'openlibrary_id']:
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r
else:
raise FreebaseException('id_type must be one of ISBN, LCCN, or OCLC_number, not %s' % (id_type))
raise FreebaseException('id_type must be one of ISBN, LCCN, OCLC_number or openlibrary_id, not %s' % (id_type))
def xisbn(self, isbn_val=None, book_id=None):
""" pass in either isbn_val or book_id and xisbn returns related ISBNs in Freebase. Handles case in which
either isbn or book_id is not None but not both
"""
if isbn_val is None and book_id is None:
raise Exception("One of isbn or book_id must be specified")
elif isbn_val is not None and book_id is not None:
raise Exception("Only only of isbn or book_id can be specified")
elif isbn_val is not None:
isbn_val = isbn.ISBN(isbn_val).to_string('13')
MQL = """[{
"type": "/book/book_edition",
"isbn": {
"name": null
},
"book": {
"editions": [{
"isbn": {
"name": null
}
}]
}
}]""".replace("\n"," ")
query = json.loads(MQL)
query[0]["book"]["editions"][0]["isbn"]["name"] = isbn_val
resp = self.freebase.mqlreaditer(query)
for r in resp:
yield r["isbn"]["name"]
elif book_id is not None:
for edition in self.editions_for_book(book_id=book_id):
for i in edition["isbn"]:
yield i["name"]
class WorkMapper(object):
@classmethod
def freebase_book_to_openlibrary_work(cls, fb_id, complete_search=False):
""" Try to map a Freebase ID by taking the ISBNs of associated editions and asking OpenLibrary for the work id"""
print "fb_id: ", fb_id
fb = FreebaseBooks()
work_ids = set()
# grab all ISBNs correponding to Freebase fb_id and comput the OpenLibrary work ID
# if complete_search is False, stop at first work id
for work_id_list in OpenLibrary.works(izip(fb.xisbn(book_id=fb_id), repeat('isbn'))):
for work_id in work_id_list:
if work_id not in work_ids:
work_ids.add(work_id)
yield work_id
if not complete_search:
raise StopIteration()
def look_up_my_zotero_books_in_hathi():
from regluit.experimental.zotero_books import MyZotero
zot = MyZotero()
for (i,b) in enumerate(zot.get_books(20)):
try:
@ -305,15 +422,90 @@ class FreebaseBooksTest(TestCase):
edition = list(fb.book_edition_by_id('76074298', 'OCLC_number'))
self.assertEqual(edition[0]['type'],'/book/book_edition')
self.assertEqual(edition[0]['book']['id'],'/m/021yncj')
self.assertEqual(edition[0]['book']['name'],'Brave New Words: The Oxford Dictionary of Science Fiction')
self.assertEqual(edition[0]['book']['name'],'Brave New Words: The Oxford Dictionary of Science Fiction')
# test openlibary_id Moby Dick
edition = list(fb.book_edition_by_id('9780486432151', 'isbn'))[0]
self.assertEqual(edition['openlibrary_id'][0]['value'], 'OL3685847M')
def test_editions_for_book(self):
fb = FreebaseBooks()
book_id = '/en/moby-dick'
editions = fb.editions_for_book(book_id)
for i, edition in enumerate(editions):
pass
def test_xisbn(self):
isbn_val = '9780486432151'
book_id = '/en/moby-dick'
fb = FreebaseBooks()
isbns = set(fb.xisbn(isbn_val))
isbns2 = set(fb.xisbn(book_id=book_id))
self.assertEqual(isbns, isbns2)
class OpenLibraryTest(TestCase):
def test_books(self):
book = OpenLibrary.books(MASHUPBOOK_ISBN_10)
self.assertEqual(book.values()[0]['info_url'], 'http://openlibrary.org/books/OL13439114M/Pro_Web_2.0_Mashups')
book_data = OpenLibrary.books('0385472579', jscmd='data')
self.assertEqual(book_data.values()[0]['title'], 'Zen Speaks')
self.assertEqual(book_data.values()[0]['identifiers']['openlibrary'][0], 'OL7440033M')
def test_books_olid(self):
# can we status of a pd book oclc:03544699 The Log of a Cowboy - Andy Adams, 1903
# http://openlibrary.org/books/OL7173600M/The_log_of_a_cowboy
book = OpenLibrary.books('OL7173600M', 'olid', jscmd='data')
self.assertEqual(book.values()[0]['title'], 'The log of a cowboy')
def test_books_oclc(self):
# http://openlibrary.org/books/OL6542070M/The_Montessori_method works
book = OpenLibrary.books('1181252','oclc',jscmd='data')
self.assertEqual(book.values()[0]['title'], 'The Montessori method')
def test_read(self):
results = OpenLibrary.read([(MASHUPBOOK_ISBN_10,'isbn'),('1181252','oclc')])
self.assertEqual(results['oclc:1181252']['records'].values()[0]['data']['ebooks'][0]['formats']['epub']['url'],
'http://www.archive.org/download/cu31924032538500/cu31924032538500.epub')
def test_covers(self):
self.assertEqual(OpenLibrary.covers(MASHUPBOOK_ISBN_10, size='M'),
'http://covers.openlibrary.org/b/isbn/159059858X-M.jpg')
def test_author_photos(self):
self.assertEqual(OpenLibrary.author_photos(RY_OLID,'S'), 'http://covers.openlibrary.org/a/olid/OL4264806A-S.jpg')
def test_editions(self):
# let's bring up the editions for Surfacing
for (i,ed) in enumerate(islice(OpenLibrary.editions(SURFACING_WORK_OLID),100)):
self.assertTrue(re.match(r'^OL(\d+)M$',ed))
def test_works0(self):
self.assertEqual(OpenLibrary.works0(SURFACING_EDITION_OLID,id_type='olid')[0], 'OL675829W')
def test_works(self):
ids = [(MASHUPBOOK_ISBN_10, 'isbn'), (SURFACING_EDITION_OLID,'olid'), ('233434','isbn')]
resp = list(OpenLibrary.works(ids))
self.assertEqual(resp, [['OL10306321W'], ['OL675829W'], []])
class WorkMapperTest(TestCase):
def test_freebase_book_to_openlibrary_work(self):
id = '/en/moby-dick'
id = '/en/wuthering_heights'
work_ids = list(WorkMapper.freebase_book_to_openlibrary_work(id, complete_search=True))
print work_ids
def test_work_info_from_openlibrary(self):
editions = list(OpenLibrary.editions(SURFACING_WORK_OLID))
print editions, len(editions)
def suite():
#testcases = [WorkMapperTest]
testcases = []
suites = unittest.TestSuite([unittest.TestLoader().loadTestsFromTestCase(testcase) for testcase in testcases])
suites.addTest(WorkMapperTest('test_work_info_from_openlibrary'))
#suites.addTest(SettingsTest('test_dev_me_alignment')) # give option to test this alignment
return suites
if __name__ == '__main__':
#look_up_my_zotero_books_in_hathi()
#ol_practice()
#print len(list(islice(parse_project_gutenberg_catalog(),100000)))
unittest.main()
#unittest.main()
suites = suite()
#suites = unittest.defaultTestLoader.loadTestsFromModule(__import__('__main__'))
unittest.TextTestRunner().run(suites)

View File

@ -0,0 +1,622 @@
"""
Module to parse the Project Gutenberg Catalog and map to various work IDs
"""
import unittest
import os
import json
from copy import deepcopy
from freebase.api.mqlkey import quotekey, unquotekey
import freebase
import requests
from lxml import html
import httplib
from urlparse import urljoin
from urllib import urlencode
from pprint import pprint
from itertools import islice, chain
import time
import re
from itertools import islice
import logging
from google.refine import refine
from datetime import datetime
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Text, Sequence, Boolean, not_, and_
from sqlalchemy.orm import mapper, sessionmaker
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import UniqueConstraint
from bookdata import WorkMapper
logging.basicConfig(filename='gutenberg.log')
logger = logging.getLogger(__name__)
def filter_none(d):
d2 = {}
for (k,v) in d.iteritems():
if v is not None:
d2[k] = v
return d2
# http://stackoverflow.com/questions/2348317/how-to-write-a-pager-for-python-iterators/2350904#2350904
def grouper(iterable, page_size):
page= []
for item in iterable:
page.append( item )
if len(page) == page_size:
yield page
page= []
yield page
def singleton(cls):
instances = {}
def getinstance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return getinstance
Base = declarative_base()
class GutenbergText(object):
pass
class GutenbergFile(object):
pass
class WikipediaLink(Base):
__tablename__ = 'WikipediaLink'
__table_args__ = (
UniqueConstraint('gutenberg_etext_id', 'wikipedia_href', name='wikipedia_etext_id'),
{'mysql_engine':'MyISAM'}
)
id = Column(Integer, primary_key=True)
gutenberg_etext_id = Column('gutenberg_etext_id', Integer(11))
wikipedia_href = Column('wikipedia_href', String(255))
wikipedia_title = Column('wikipedia_title', String(255))
class FreebaseEntity(Base):
__tablename__ = 'FreebaseEntity'
__table_args__ = (
{'mysql_engine':'MyISAM'}
)
id = Column('id', String(255), primary_key=True)
wikipedia_href = Column('wikipedia_href', String(255))
is_book_book = Column('is_book_book', Boolean)
@singleton
class GluejarDB(object):
def __init__(self, user="gluejar", pw="gluejar", db="Gluejar", host="127.0.0.1", port=3306):
mysql_connect_path = "mysql+mysqldb://%s:%s@%s:%s/%s?charset=utf8" % (user,pw,host,port,db)
engine = create_engine(mysql_connect_path, echo=False)
metadata = MetaData(engine)
Base.metadata.create_all(engine)
gutenbergtext = Table('GutenbergText', metadata, autoload=True)
mapper(GutenbergText, gutenbergtext)
gutenbergfile = Table('GutenbergFile', metadata, autoload=True)
mapper(GutenbergFile, gutenbergfile)
Session = sessionmaker(bind=engine)
session = Session()
self.session = session
def commit_db(self):
self.session.commit()
def rollback(self):
self.session.rollback()
def gutenberg_texts(self):
items = self.session.query(GutenbergText).all()
for item in items:
yield item
def filtered_wikipedia_links(self):
"""generate wikipedia links that are in the main Wikipedia namespace"""
TO_FILTER = ['File:%', 'Portal:%', 'Portal talk:%', "Talk:%",
'Template:%', 'Template talk:%', 'User:%','User talk:%',
'Wikipedia:%', 'Wikipedia talk:%']
total_filter = and_(*[not_(WikipediaLink.wikipedia_title.like(f)) for f in TO_FILTER])
items = self.session.query(WikipediaLink).filter(total_filter)
for item in items:
yield item
def parse_project_gutenberg_catalog(fname='/Users/raymondyee/D/Document/Gluejar/gutenberg/catalog.rdf'):
#URL = http://www.gutenberg.org/feeds/catalog.rdf.zip
import re
def text(node):
node.normalize()
return node.childNodes[0].data
RDF_NS = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#'
DC_NS = 'http://purl.org/dc/elements/1.1/'
DCTERMS_NS = 'http://purl.org/dc/terms/'
PGTERMS_NS = 'http://www.gutenberg.org/rdfterms/'
from xml.dom.pulldom import START_ELEMENT, parse
doc = parse(fname)
for event, node in doc:
if event == START_ELEMENT and node.localName == "etext":
doc.expandNode(node)
# etext_id
id = node.getAttributeNS(RDF_NS,'ID')
try:
etext_id = int(re.match(r'^etext(\d+)$', id).group(1))
except:
etext_id = None
# title
try:
title = text(node.getElementsByTagNameNS(DC_NS,'title')[0])
title = title.replace("\n"," ").replace("\r"," ")
except:
title = None
# friendly_title
try:
friendly_title = text(node.getElementsByTagNameNS(PGTERMS_NS,'friendlytitle')[0])
friendly_title = friendly_title.replace("\n"," ").replace("\r"," ")
except:
friendly_title = None
# lang
try:
lang = text(node.getElementsByTagNameNS(DC_NS,'language')[0].getElementsByTagNameNS(DCTERMS_NS,'ISO639-2')[0].getElementsByTagNameNS(RDF_NS,'value')[0])
except Exception, e:
logger.debug(e)
lang = None
# rights
try:
rights_node = node.getElementsByTagNameNS(DC_NS,'rights')[0]
rights = rights_node.getAttributeNS(RDF_NS, 'resource')
if rights == '':
rights = text(rights_node)
except Exception, e:
logger.debug(e)
right = None
# created
# <dc:created><dcterms:W3CDTF><rdf:value>2011-11-02</rdf:value></dcterms:W3CDTF></dc:created>
try:
created_str = text(node.getElementsByTagNameNS(DC_NS,'created')[0].getElementsByTagNameNS(DCTERMS_NS,'W3CDTF')[0].getElementsByTagNameNS(RDF_NS,'value')[0])
created = datetime.date(datetime.strptime(created_str, "%Y-%m-%d"))
except Exception, e:
logger.debug(e)
created = None
# creator
try:
creator = text(node.getElementsByTagNameNS(DC_NS,'creator')[0])
except Exception, e:
logger.debug(e)
creator = None
yield {'type':'text', 'etext_id':etext_id, 'title':title, 'friendly_title':friendly_title,
'lang':lang, 'rights':rights, 'created':created, 'creator':creator}
if event == START_ELEMENT and node.localName == "file":
doc.expandNode(node)
# about
try:
about = node.getAttributeNS(RDF_NS,'about')
except Exception, e:
logger.debug(e)
about = None
# isFormatOf
try:
is_format_of_raw = node.getElementsByTagNameNS(DCTERMS_NS,'isFormatOf')[0].getAttributeNS(RDF_NS,'resource')
is_format_of = int(re.match(r'#etext(\d+)$',is_format_of_raw).group(1))
except Exception, e:
logger.debug(e)
is_format_of = None
# format: grab the first one
try:
format = text(node.getElementsByTagNameNS(DC_NS,'format')[0].getElementsByTagNameNS(DCTERMS_NS,'IMT')[0].getElementsByTagNameNS(RDF_NS,'value')[0])
except Exception, e:
logger.debug(e)
format = None
# modified
try:
modified_str = text(node.getElementsByTagNameNS(DCTERMS_NS,'modified')[0].getElementsByTagNameNS(DCTERMS_NS,'W3CDTF')[0].getElementsByTagNameNS(RDF_NS,'value')[0])
modified = datetime.date(datetime.strptime(modified_str, "%Y-%m-%d"))
except Exception, e:
logger.info(e)
modified = None
# extent
try:
extent = int(text(node.getElementsByTagNameNS(DCTERMS_NS,'extent')[0]))
except Exception, e:
raise e
logger.info(e)
extent = None
yield {'type':'file', 'about':about, 'is_format_of':is_format_of, 'format':format, 'modified':modified,
'extent':extent}
def walk_through_catalog(fname='/Users/raymondyee/D/Document/Gluejar/gutenberg/catalog.rdf',max=100000):
for i, item in enumerate(islice(parse_project_gutenberg_catalog(fname),max)):
print i, item
def load_texts_to_db(fname='/Users/raymondyee/D/Document/Gluejar/gutenberg/catalog_texts.rdf', max=None):
gluejar_db = GluejarDB()
for (i, item) in enumerate(islice(parse_project_gutenberg_catalog(fname),max)):
print i, item
if item['type'] == 'text':
try:
book = gluejar_db.session.query(GutenbergText).filter(GutenbergText.etext_id == item['etext_id']).one()
except:
book = GutenbergText()
book.etext_id = item['etext_id']
gluejar_db.session.add(book)
book.title = item['title']
book.friendly_title = item['friendly_title']
book.lang = item['lang']
book.rights = item['rights']
book.created = item['created']
book.creator = item['creator']
gluejar_db.commit_db()
def load_files_to_db(fname='/Users/raymondyee/D/Document/Gluejar/gutenberg/catalog_files.rdf', max=100000):
gluejar_db = GluejarDB()
for (i, item) in enumerate(islice(parse_project_gutenberg_catalog(fname),max)):
print i, item
if item['type'] == 'file':
# try to write if it's a problem do a query to update -- about is unique
try:
file = GutenbergFile()
file.about = item['about']
gluejar_db.session.add(file)
gluejar_db.commit_db()
except IntegrityError, e:
gluejar_db.session.rollback()
file = gluejar_db.session.query(GutenbergFile).filter(GutenbergFile.about== item['about']).one()
file.is_format_of = item['is_format_of']
file.format = item['format']
file.modified = item['modified']
file.extent = item['extent']
gluejar_db.commit_db()
gluejar_db.commit_db()
def external_links_in_wikipedia(target, limit=500, offset=0):
# e.g., http://en.wikipedia.org/w/index.php?title=Special:LinkSearch&target=http%3A%2F%2Fwww.gutenberg.org%2Fetext%2F&limit=500&offset=0
base_url = "http://en.wikipedia.org/w/index.php"
params = filter_none({"title":"Special:LinkSearch", "target":target,
"limit":limit, offset:offset})
url = "%s?%s" % (base_url, urlencode(params))
# page through all the results
more_pages = True
while more_pages:
r = requests.get(url)
if r.status_code != httplib.OK:
raise Exception("Problem with request on %s %s: %s %s", base_url, params, r.status_code, r.content)
etree = html.fromstring(r.content)
links = etree.xpath("//ol")[0].xpath("li")
for link in links:
(target_a, source_a) = link.xpath('a')
yield {"target":target_a.attrib["href"], "source_href": source_a.attrib["href"], "source_title":source_a.text}
# is there another page
following_page = etree.xpath("//a[@class='mw-nextlink']")
if len(following_page) > 0:
url = urljoin(url, following_page[0].attrib["href"])
else:
more_pages = False
def load_wikipedia_external_links_into_db(max=None):
targets = ["http://www.gutenberg.org/etext", "http://www.gutenberg.org/ebook"]
links = chain(*[external_links_in_wikipedia(target) for target in targets])
gluejar_db = GluejarDB()
for (i, link) in enumerate(islice(links,max)):
link_target = link["target"]
try:
etext_id = re.search(r'\/(\d+)$', link_target).group(1)
except:
etext_id = None
print i, link["source_href"], link["source_title"], link_target, etext_id
if etext_id is not None:
wl = WikipediaLink()
wl.gutenberg_etext_id = etext_id
wl.wikipedia_href = link["source_href"]
wl.wikipedia_title = link["source_title"]
gluejar_db.session.add(wl)
try:
gluejar_db.commit_db()
except Exception, e:
print e
gluejar_db.rollback()
def map_wikipedia_links_to_freebase_ids(max=None ,page_size=5):
fb = FreebaseClient('rdhyee', 'fbkule!')
db = GluejarDB()
wikipedia_ids = list( (wl.wikipedia_href for wl in islice(db.filtered_wikipedia_links(), max)) )
for id in wikipedia_ids:
print id
resp = fb.wikipedia_href_to_freebase_id(wikipedia_ids,page_size=page_size)
for (i,r) in enumerate(resp):
print i, r
if len(r): # an actual result
print r[0]['id'], r[0]['type'], r[0]['key'][0]['value']
fb_entity = FreebaseEntity()
fb_entity.id = r[0]['id']
try:
db.session.add(fb_entity)
db.commit_db()
except IntegrityError, e:
db.rollback()
fb_entity = db.session.query(FreebaseEntity).filter(FreebaseEntity.id==r[0]['id']).one()
fb_entity.wikipedia_href = '/wiki/%s' % (unquotekey(r[0]['key'][0]['value']))
fb_entity.is_book_book = '/book/book' in r[0]['type']
db.commit_db()
def map_refine_fb_links_to_openlibrary_work_ids(max=None):
refine_proj_id = "1884515736058"
refine_obj = refine.Refine(refine.RefineServer())
proj = refine_obj.open_project(refine_proj_id)
cols_to_extract = ['etext_id', 'title', 'name', 'fb_id', 'fb_id_judgement', 'wikipedia_title']
limit = max if max is not None else 1000000
response = proj.get_rows(limit=limit)
print "response.total: ", response.total
for i, row in enumerate(islice(response.rows,10)):
print i, row.index, row['etext_id'], row['title'], row['name'], row['fb_id'], row['fb_id_judgement'],
work_ids = list(WorkMapper.freebase_book_to_openlibrary_work(row['fb_id'], complete_search=True))
print work_ids
class FreebaseClient(object):
def __init__(self, username=None, password=None, main_or_sandbox='main'):
if main_or_sandbox == 'main':
self.freebase = freebase
else:
self.freebase = freebase.sandbox
if username is not None and password is not None:
self.freebase.login(username,password)
def wikipedia_href_to_freebase_id (self, hrefs, page_size = 10, chop_wiki=True):
MQL = u"""[{
"type": [],
"id": null,
"key": [{
"namespace": "/wikipedia/en",
"type": "/type/key",
"value": null
}]
}]
""".replace("\n"," ")
for (page_num, page) in enumerate(grouper(hrefs, page_size)):
queries = []
for (href_num, href) in enumerate(page):
query = json.loads(MQL)
if chop_wiki:
href = href[6:] if href.startswith('/wiki/') else href
query[0]['key'][0]['value'] = quotekey(href)
print "%d, %d %s " % (page_num, href_num, href)
queries.append(query)
if len(queries):
try:
resp = self.freebase.mqlreadmulti(queries)
#print "fb resp, len(resp): %s %d" % (resp, len(resp))
for r in resp:
yield r
except Exception, e:
# for now, write out the stuff in the queries and then move on -- better to try on smaller pieces
print "Metaweb Error: %s for page %s" % (e, page)
class WikipediaLinksTest(unittest.TestCase):
def test_external_links(self):
target = "http://www.gutenberg.org/etext"
max = 10
links = []
for (i, link) in enumerate(islice(external_links_in_wikipedia(target), max)):
print i, link
links.append((link["source_href"],link["target"]))
self.assertEqual(len(links), max)
class DatabaseTest(unittest.TestCase):
def test_insert_1_wikipedia_link(self):
gluejar_db = GluejarDB()
wl = WikipediaLink()
wl.gutenberg_etext_id = 13920
wl.wikipedia_href = "/wiki/stuffffdsfsf"
wl.wikipedia_title = "stuffffdsfsf"
# add one, read it back, and then delete it
gluejar_db.session.add(wl)
gluejar_db.commit_db()
query = gluejar_db.session.query(WikipediaLink).filter(WikipediaLink.wikipedia_href == "/wiki/stuffffdsfsf")
obj = query.first()
self.assertEqual(obj.wikipedia_href, "/wiki/stuffffdsfsf")
gluejar_db.session.delete(obj)
gluejar_db.commit_db()
def test_integrity_constraint_wikipedia_link(self):
gluejar_db = GluejarDB()
wl = WikipediaLink()
wl.gutenberg_etext_id = 13920
wl.wikipedia_href = "/wiki/stuffffdsfsf"
wl.wikipedia_title = "stuffffdsfsf"
wl2 = WikipediaLink()
wl2.gutenberg_etext_id = 13920
wl2.wikipedia_href = "/wiki/stuffffdsfsf"
wl2.wikipedia_title = "stuffffdsfsf2"
# try to add links with the same value twice
gluejar_db.session.add(wl)
gluejar_db.session.add(wl2)
self.assertRaises(Exception, gluejar_db.commit_db)
gluejar_db.rollback()
# delete the first item
query = gluejar_db.session.query(WikipediaLink).filter(WikipediaLink.wikipedia_href == "/wiki/stuffffdsfsf")
obj = query.first()
self.assertEqual(obj.wikipedia_href, "/wiki/stuffffdsfsf")
gluejar_db.session.delete(obj)
gluejar_db.commit_db()
def test_filtered_wikipedia_links(self):
db = GluejarDB()
for item in islice(db.filtered_wikipedia_links(),100):
print item.wikipedia_title, item.wikipedia_href
self.assertTrue(True)
class ChainTest(unittest.TestCase):
def test_chain(self):
self.assertTrue(True)
max = None
sizes = [5, 8, 9]
numbers = chain(*(xrange(size) for size in sizes))
for (i, num) in enumerate(islice(numbers,max)):
pass
self.assertEqual(i+1,sum(sizes))
class FreebaseTest(unittest.TestCase):
def test_query(self):
fb = FreebaseClient()
resp = list(fb.wikipedia_href_to_freebase_id(['Peter_and_Wendy', 'King_Lear']))
for r in resp:
#print r
#print r[0]['id'], r[0]['type']
self.assertTrue('/book/book' in r[0]['type'])
def test_query_and_db_insert(self):
fb = FreebaseClient()
db = GluejarDB()
resp = list(fb.wikipedia_href_to_freebase_id(['Peter_and_Wendy', 'King_Lear', 'Hamlet']))
for r in resp:
print r
print r[0]['id'], r[0]['type'], r[0]['key'][0]['value']
self.assertTrue('/book/book' in r[0]['type'])
fb_entity = FreebaseEntity()
fb_entity.id = r[0]['id']
try:
db.session.add(fb_entity)
db.commit_db()
except IntegrityError, e:
db.rollback()
fb_entity = db.session.query(FreebaseEntity).filter(FreebaseEntity.id==r[0]['id']).one()
fb_entity.wikipedia_href = '/wiki/%s' % (r[0]['key'][0]['value'])
fb_entity.is_book_book = '/book/book' in r[0]['type']
db.commit_db()
# return True if no crashing
self.assertTrue(True)
class RefineTest(unittest.TestCase):
def setUp(self):
self.refine_obj = refine.Refine(refine.RefineServer())
def test_project_listing(self):
# https://raw.github.com/PaulMakepeace/refine-client-py/master/refine.py
projects = self.refine_obj.list_projects().items()
def date_to_epoch(json_dt):
"Convert a JSON date time into seconds-since-epoch."
return time.mktime(time.strptime(json_dt, '%Y-%m-%dT%H:%M:%SZ'))
projects.sort(key=lambda v: date_to_epoch(v[1]['modified']), reverse=True)
for project_id, project_info in projects:
print('{0:>14}: {1}'.format(project_id, project_info['name']))
id = int(project_id) # check to see whether there will be a non-int
def test_project_name(self):
id = "1884515736058"
print self.refine_obj.get_project_name(id)
def test_columns(self):
id = "1884515736058"
proj = self.refine_obj.open_project(id)
models = proj.get_models()
cols = proj.columns
pprint(models)
print models.keys()
print cols
def test_iterate_rows(self):
id = "1884515736058"
proj = self.refine_obj.open_project(id)
cols_to_extract = ['etext_id', 'title', 'name', 'fb_id', 'fb_id_judgement', 'wikipedia_title']
response = proj.get_rows(limit=10)
print "response.total: ", response.total
for i, row in enumerate(islice(response.rows,10)):
print i, row.flagged, row.starred, row.index,
print i, [row[c] for c in cols_to_extract]
def suite():
testcases = []
suites = unittest.TestSuite([unittest.TestLoader().loadTestsFromTestCase(testcase) for testcase in testcases])
suites.addTest(RefineTest('test_iterate_rows'))
#suites.addTest(SettingsTest('test_dev_me_alignment')) # give option to test this alignment
return suites
if __name__ == '__main__':
#walk_through_catalog(fname='/Users/raymondyee/D/Document/Gluejar/gutenberg/catalog_texts.rdf',max=100)
#walk_through_catalog(fname='/Users/raymondyee/D/Document/Gluejar/gutenberg/catalog_files.rdf',max=1000)
#load_texts_to_db(max=10)
#load_files_to_db(max=None)
#load_wikipedia_external_links_into_db(None)
#map_wikipedia_links_to_freebase_ids(None, page_size=10)
map_refine_fb_links_to_openlibrary_work_ids(max=10)
#unittest.main()
suites = suite()
#suites = unittest.defaultTestLoader.loadTestsFromModule(__import__('__main__'))
#unittest.TextTestRunner().run(suites)

File diff suppressed because it is too large Load Diff