bardo-copyright-db/esIndexer.py

156 lines
5.0 KiB
Python

import os
from datetime import datetime
from elasticsearch.helpers import bulk, BulkIndexError, streaming_bulk
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import (
ConnectionError,
TransportError,
ConflictError
)
from elasticsearch_dsl import connections
from elasticsearch_dsl.wrappers import Range
from sqlalchemy import or_
from sqlalchemy.orm import configure_mappers, raiseload
from sqlalchemy.dialects import postgresql
from model.cce import CCE as dbCCE
from model.renewal import Renewal as dbRenewal
from model.registration import Registration as dbRegistration
from model.elastic import (
CCE,
Registration,
Renewal,
Claimant
)
class ESIndexer():
def __init__(self, manager, loadFromTime):
self.cce_index = os.environ['ES_CCE_INDEX']
self.ccr_index = os.environ['ES_CCR_INDEX']
self.client = None
self.session = manager.session
self.loadFromTime = loadFromTime if loadFromTime else datetime.strptime('1970-01-01', '%Y-%m-%d')
self.createElasticConnection()
self.createIndex()
configure_mappers()
def createElasticConnection(self):
host = os.environ['ES_HOST']
port = os.environ['ES_PORT']
timeout = int(os.environ['ES_TIMEOUT'])
try:
self.client = Elasticsearch(
hosts=[{'host': host, 'port': port}],
timeout=timeout
)
except ConnectionError as err:
print('Failed to connect to ElasticSearch instance')
raise err
connections.connections._conns['default'] = self.client
def createIndex(self):
if self.client.indices.exists(index=self.cce_index) is False:
CCE.init()
if self.client.indices.exists(index=self.ccr_index) is False:
Renewal.init()
def indexRecords(self, recType='cce'):
"""Process the current batch of updating records. This utilizes the
elasticsearch-py bulk helper to import records in chunks of the
provided size. If a record in the batch errors that is reported and
logged but it does not prevent the other records in the batch from
being imported.
"""
success, failure = 0, 0
errors = []
try:
for status, work in streaming_bulk(self.client, self.process(recType)):
print(status, work)
if not status:
errors.append(work)
failure += 1
else:
success += 1
print('Success {} | Failure: {}'.format(success, failure))
except BulkIndexError as err:
print('One or more records in the chunk failed to import')
raise err
def process(self, recType):
if recType == 'cce':
for cce in self.retrieveEntries():
esEntry = ESDoc(cce)
esEntry.indexEntry()
yield esEntry.entry.to_dict(True)
elif recType == 'ccr':
for ccr in self.retrieveRenewals():
esRen = ESRen(ccr)
esRen.indexRen()
if esRen.renewal.rennum == '': continue
yield esRen.renewal.to_dict(True)
def retrieveEntries(self):
retQuery = self.session.query(dbCCE)\
.filter(dbCCE.date_modified > self.loadFromTime)
for cce in retQuery.all():
yield cce
def retrieveRenewals(self):
renQuery = self.session.query(dbRenewal)\
.filter(dbRenewal.date_modified > self.loadFromTime)
for ccr in renQuery.all():
yield ccr
class ESDoc():
def __init__(self, cce):
self.dbRec = cce
self.entry = None
self.initEntry()
def initEntry(self):
print('Creating ES record for {}'.format(self.dbRec))
self.entry = CCE(meta={'id': self.dbRec.uuid})
def indexEntry(self):
self.entry.uuid = self.dbRec.uuid
self.entry.title = self.dbRec.title
self.entry.authors = [ a.name for a in self.dbRec.authors ]
self.entry.publishers = [ p.name for p in self.dbRec.publishers ]
self.entry.lccns = [ l.lccn for l in self.dbRec.lccns ]
self.entry.registrations = [
Registration(regnum=r.regnum, regdate=r.reg_date)
for r in self.dbRec.registrations
]
class ESRen():
def __init__(self, ccr):
self.dbRen = ccr
self.renewal = None
self.initRenewal()
def initRenewal(self):
print('Creating ES record for {}'.format(self.dbRen))
self.renewal = Renewal(meta={'id': self.dbRen.renewal_num})
def indexRen(self):
self.renewal.uuid = self.dbRen.uuid
self.renewal.rennum = self.dbRen.renewal_num
self.renewal.rendate = self.dbRen.renewal_date
self.renewal.title = self.dbRen.title
self.renewal.authors = self.dbRen.author
self.renewal.claimants = [
Claimant(name=c.name, claim_type=c.claimant_type)
for c in self.dbRen.claimants
]