Initial commit of testing configuration using `pytest`
parent
58b8cb30e8
commit
17a5f33f0a
|
@ -0,0 +1,7 @@
|
|||
[run]
|
||||
omit = tests/*
|
||||
|
||||
[report]
|
||||
|
||||
exclude_lines =
|
||||
if __name__ == '__main__':
|
|
@ -0,0 +1,4 @@
|
|||
flake8
|
||||
pytest
|
||||
pytest-cov
|
||||
pytest-mock
|
47
esIndexer.py
47
esIndexer.py
|
@ -1,12 +1,8 @@
|
|||
import os
|
||||
from datetime import datetime
|
||||
from elasticsearch.helpers import bulk, BulkIndexError, streaming_bulk
|
||||
from elasticsearch.helpers import BulkIndexError, streaming_bulk
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch.exceptions import (
|
||||
ConnectionError,
|
||||
TransportError,
|
||||
ConflictError
|
||||
)
|
||||
from elasticsearch.exceptions import ConnectionError
|
||||
from elasticsearch_dsl import connections
|
||||
from elasticsearch_dsl.wrappers import Range
|
||||
|
||||
|
@ -16,7 +12,7 @@ from sqlalchemy.dialects import postgresql
|
|||
|
||||
from model.cce import CCE as dbCCE
|
||||
from model.renewal import Renewal as dbRenewal
|
||||
from model.registration import Registration as dbRegistration
|
||||
from model.registration import Registration as dbRegistration # noqa: F401
|
||||
from model.elastic import (
|
||||
CCE,
|
||||
Registration,
|
||||
|
@ -31,7 +27,10 @@ class ESIndexer():
|
|||
self.ccr_index = os.environ['ES_CCR_INDEX']
|
||||
self.client = None
|
||||
self.session = manager.session
|
||||
self.loadFromTime = loadFromTime if loadFromTime else datetime.strptime('1970-01-01', '%Y-%m-%d')
|
||||
if loadFromTime:
|
||||
self.loadFromTime = loadFromTime
|
||||
else:
|
||||
self.loadFromTime = datetime.strptime('1970-01-01', '%Y-%m-%d')
|
||||
|
||||
self.createElasticConnection()
|
||||
self.createIndex()
|
||||
|
@ -57,7 +56,7 @@ class ESIndexer():
|
|||
CCE.init()
|
||||
if self.client.indices.exists(index=self.ccr_index) is False:
|
||||
Renewal.init()
|
||||
|
||||
|
||||
def indexRecords(self, recType='cce'):
|
||||
"""Process the current batch of updating records. This utilizes the
|
||||
elasticsearch-py bulk helper to import records in chunks of the
|
||||
|
@ -68,14 +67,17 @@ class ESIndexer():
|
|||
success, failure = 0, 0
|
||||
errors = []
|
||||
try:
|
||||
for status, work in streaming_bulk(self.client, self.process(recType)):
|
||||
for status, work in streaming_bulk(
|
||||
self.client,
|
||||
self.process(recType)
|
||||
):
|
||||
print(status, work)
|
||||
if not status:
|
||||
errors.append(work)
|
||||
failure += 1
|
||||
else:
|
||||
success += 1
|
||||
|
||||
|
||||
print('Success {} | Failure: {}'.format(success, failure))
|
||||
except BulkIndexError as err:
|
||||
print('One or more records in the chunk failed to import')
|
||||
|
@ -91,7 +93,8 @@ class ESIndexer():
|
|||
for ccr in self.retrieveRenewals():
|
||||
esRen = ESRen(ccr)
|
||||
esRen.indexRen()
|
||||
if esRen.renewal.rennum == '': continue
|
||||
if esRen.renewal.rennum == '':
|
||||
continue
|
||||
yield esRen.renewal.to_dict(True)
|
||||
|
||||
def retrieveEntries(self):
|
||||
|
@ -99,7 +102,7 @@ class ESIndexer():
|
|||
.filter(dbCCE.date_modified > self.loadFromTime)
|
||||
for cce in retQuery.all():
|
||||
yield cce
|
||||
|
||||
|
||||
def retrieveRenewals(self):
|
||||
renQuery = self.session.query(dbRenewal)\
|
||||
.filter(dbRenewal.date_modified > self.loadFromTime)
|
||||
|
@ -110,10 +113,10 @@ class ESIndexer():
|
|||
class ESDoc():
|
||||
def __init__(self, cce):
|
||||
self.dbRec = cce
|
||||
self.entry = None
|
||||
|
||||
self.entry = None
|
||||
|
||||
self.initEntry()
|
||||
|
||||
|
||||
def initEntry(self):
|
||||
print('Creating ES record for {}'.format(self.dbRec))
|
||||
|
||||
|
@ -122,9 +125,9 @@ class ESDoc():
|
|||
def indexEntry(self):
|
||||
self.entry.uuid = self.dbRec.uuid
|
||||
self.entry.title = self.dbRec.title
|
||||
self.entry.authors = [ a.name for a in self.dbRec.authors ]
|
||||
self.entry.publishers = [ p.name for p in self.dbRec.publishers ]
|
||||
self.entry.lccns = [ l.lccn for l in self.dbRec.lccns ]
|
||||
self.entry.authors = [a.name for a in self.dbRec.authors]
|
||||
self.entry.publishers = [p.name for p in self.dbRec.publishers]
|
||||
self.entry.lccns = [l.lccn for l in self.dbRec.lccns]
|
||||
self.entry.registrations = [
|
||||
Registration(regnum=r.regnum, regdate=r.reg_date)
|
||||
for r in self.dbRec.registrations
|
||||
|
@ -134,10 +137,10 @@ class ESDoc():
|
|||
class ESRen():
|
||||
def __init__(self, ccr):
|
||||
self.dbRen = ccr
|
||||
self.renewal = None
|
||||
|
||||
self.renewal = None
|
||||
|
||||
self.initRenewal()
|
||||
|
||||
|
||||
def initRenewal(self):
|
||||
print('Creating ES record for {}'.format(self.dbRen))
|
||||
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
import os
|
||||
import yaml
|
||||
|
||||
|
||||
def loadConfig():
|
||||
with open('config.yaml', 'r') as yamlFile:
|
||||
config = yaml.safe_load(yamlFile)
|
||||
for section in config:
|
||||
sectionDict = config[section]
|
||||
for key, value in sectionDict.items():
|
||||
os.environ[key] = value
|
46
main.py
46
main.py
|
@ -1,7 +1,11 @@
|
|||
import argparse
|
||||
from datetime import datetime, timedelta
|
||||
import os
|
||||
import yaml
|
||||
|
||||
from sessionManager import SessionManager
|
||||
from builder import CCEReader
|
||||
from renBuilder import CCRReader
|
||||
from esIndexer import ESIndexer
|
||||
from helpers.config import loadConfig
|
||||
|
||||
|
||||
def main(secondsAgo=None, year=None, exclude=None, reinit=False):
|
||||
|
@ -19,26 +23,27 @@ def main(secondsAgo=None, year=None, exclude=None, reinit=False):
|
|||
loadCCE(manager, loadFromTime, year)
|
||||
if exclude != 'ccr':
|
||||
loadCCR(manager, loadFromTime, year)
|
||||
|
||||
|
||||
indexUpdates(manager, loadFromTime)
|
||||
|
||||
|
||||
manager.closeConnection()
|
||||
|
||||
|
||||
|
||||
def loadCCE(manager, loadFromTime, selectedYear):
|
||||
cceReader = CCEReader(manager)
|
||||
cceReader.loadYears(selectedYear)
|
||||
cceReader.getYearFiles(loadFromTime)
|
||||
cceReader.importYearData()
|
||||
|
||||
|
||||
|
||||
def loadCCR(manager, loadFromTime, selectedYear):
|
||||
ccrReader = CCRReader(manager)
|
||||
ccrReader.loadYears(selectedYear, loadFromTime)
|
||||
ccrReader.importYears()
|
||||
|
||||
|
||||
def indexUpdates(manager, loadFromTime):
|
||||
esIndexer = ESIndexer(manager, None)
|
||||
esIndexer = ESIndexer(manager, loadFromTime)
|
||||
esIndexer.indexRecords(recType='cce')
|
||||
esIndexer.indexRecords(recType='ccr')
|
||||
|
||||
|
@ -48,28 +53,16 @@ def parseArgs():
|
|||
description='Load CCE XML and CCR TSV into PostgresQL'
|
||||
)
|
||||
parser.add_argument('-t', '--time', type=int, required=False,
|
||||
help='Time ago in seconds to check for file updates'
|
||||
)
|
||||
help='Time ago in seconds to check for file updates')
|
||||
parser.add_argument('-y', '--year', type=str, required=False,
|
||||
help='Specific year to load CCE entries and/or renewals from'
|
||||
)
|
||||
help='Specific year to load CCE entries and/or renewals from')
|
||||
parser.add_argument('-x', '--exclude', type=str, required=False,
|
||||
choices=['cce', 'ccr'],
|
||||
help='Specify to exclude either entries or renewals from this run'
|
||||
)
|
||||
choices=['cce', 'ccr'],
|
||||
help='Specify to exclude either entries or renewals from this run')
|
||||
parser.add_argument('--REINITIALIZE', action='store_true')
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def loadConfig():
|
||||
with open('config.yaml', 'r') as yamlFile:
|
||||
config = yaml.safe_load(yamlFile)
|
||||
for section in config:
|
||||
sectionDict = config[section]
|
||||
for key, value in sectionDict.items():
|
||||
os.environ[key] = value
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parseArgs()
|
||||
try:
|
||||
|
@ -77,14 +70,9 @@ if __name__ == '__main__':
|
|||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
from sessionManager import SessionManager
|
||||
from builder import CCEReader, CCEFile
|
||||
from renBuilder import CCRReader, CCRFile
|
||||
from esIndexer import ESIndexer
|
||||
|
||||
main(
|
||||
secondsAgo=args.time,
|
||||
year=args.year,
|
||||
exclude=args.exclude,
|
||||
reinit=args.REINITIALIZE
|
||||
)
|
||||
)
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
import os
|
||||
import yaml
|
||||
from elasticsearch_dsl import (
|
||||
Index,
|
||||
Document,
|
||||
Keyword,
|
||||
Text,
|
||||
|
@ -45,7 +43,7 @@ class Renewal(BaseDoc):
|
|||
claimants = Nested(Claimant)
|
||||
|
||||
class Index:
|
||||
name = os.environ['ES_CCR_INDEX']
|
||||
name = os.environ.get('ES_CCR_INDEX', None)
|
||||
|
||||
|
||||
class CCE(BaseDoc):
|
||||
|
@ -58,4 +56,4 @@ class CCE(BaseDoc):
|
|||
registrations = Nested(Registration)
|
||||
|
||||
class Index:
|
||||
name = os.environ['ES_CCE_INDEX']
|
||||
name = os.environ.get('ES_CCE_INDEX', None)
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
from datetime import datetime
|
||||
from unittest.mock import MagicMock, call
|
||||
import pytest
|
||||
from elasticsearch.exceptions import ConnectionError
|
||||
|
||||
from esIndexer import ESIndexer, ESDoc, ESRen
|
||||
|
||||
|
||||
class TestIndexer(object):
|
||||
@pytest.fixture
|
||||
def setEnvVars(self, mocker):
|
||||
mocker.patch.dict('os.environ', {
|
||||
'ES_CCE_INDEX': 'test_cce',
|
||||
'ES_CCR_INDEX': 'test_ccr',
|
||||
'ES_HOST': 'test',
|
||||
'ES_PORT': '9999',
|
||||
'ES_TIMEOUT': '0'
|
||||
})
|
||||
|
||||
def test_indexerInit(self, mocker, setEnvVars):
|
||||
mockConfig = mocker.patch('esIndexer.configure_mappers')
|
||||
mockConn = mocker.patch('esIndexer.ESIndexer.createElasticConnection')
|
||||
mockCreate = mocker.patch('esIndexer.ESIndexer.createIndex')
|
||||
mockManager = MagicMock()
|
||||
mockManager.session = 'session'
|
||||
|
||||
testIndexer = ESIndexer(mockManager, 10)
|
||||
|
||||
assert testIndexer.cce_index == 'test_cce'
|
||||
assert testIndexer.ccr_index == 'test_ccr'
|
||||
assert testIndexer.session == 'session'
|
||||
assert mockConn.called
|
||||
assert mockCreate.called
|
||||
assert mockConfig.called
|
||||
|
||||
def test_indexerInit_no_time(self, mocker, setEnvVars):
|
||||
mockConfig = mocker.patch('esIndexer.configure_mappers')
|
||||
mockConn = mocker.patch('esIndexer.ESIndexer.createElasticConnection')
|
||||
mockCreate = mocker.patch('esIndexer.ESIndexer.createIndex')
|
||||
mockManager = MagicMock()
|
||||
mockManager.session = 'session'
|
||||
|
||||
testIndexer = ESIndexer(mockManager, None)
|
||||
|
||||
assert testIndexer.cce_index == 'test_cce'
|
||||
assert testIndexer.ccr_index == 'test_ccr'
|
||||
assert testIndexer.session == 'session'
|
||||
assert testIndexer.loadFromTime == datetime(1970, 1, 1)
|
||||
assert mockConn.called
|
||||
assert mockCreate.called
|
||||
assert mockConfig.called
|
||||
|
||||
def test_elastic_connection(self, mocker, setEnvVars):
|
||||
mockConfig = mocker.patch('esIndexer.configure_mappers')
|
||||
mockCreate = mocker.patch('esIndexer.ESIndexer.createIndex')
|
||||
mockManager = MagicMock()
|
||||
mockManager.session = 'session'
|
||||
|
||||
mockElastic = mocker.patch('esIndexer.Elasticsearch')
|
||||
mockElastic.return_value = 'test_client'
|
||||
mocker.patch('esIndexer.connections')
|
||||
|
||||
testIndexer = ESIndexer(mockManager, 10)
|
||||
|
||||
assert testIndexer.cce_index == 'test_cce'
|
||||
assert testIndexer.ccr_index == 'test_ccr'
|
||||
assert testIndexer.session == 'session'
|
||||
assert testIndexer.client == 'test_client'
|
||||
assert mockCreate.called
|
||||
assert mockConfig.called
|
||||
assert mockElastic.called_once_with(
|
||||
hosts=[{'host': 'test', 'port': '9999'}],
|
||||
timeout='0'
|
||||
)
|
||||
|
||||
def test_elastic_conn_err(self, mocker, setEnvVars):
|
||||
mocker.patch('esIndexer.configure_mappers')
|
||||
mocker.patch('esIndexer.ESIndexer.createIndex')
|
||||
mockManager = MagicMock()
|
||||
mockManager.session = 'session'
|
||||
|
||||
mockElastic = mocker.patch('esIndexer.Elasticsearch')
|
||||
mockElastic.side_effect = ConnectionError
|
||||
mocker.patch('esIndexer.connections')
|
||||
with pytest.raises(ConnectionError):
|
||||
ESIndexer(mockManager, 10)
|
||||
|
||||
#def test_index_create(self, mocker, monkeypatch, setEnvVars):
|
||||
# mockConfig = mocker.patch('esIndexer.configure_mappers')
|
||||
# mockConn = mocker.patch('esIndexer.ESIndexer.createElasticConnection')
|
||||
# mockCCE = mocker.patch('esIndexer.CCE')
|
||||
# mockCCR = mocker.patch('esIndexer.Renewal')
|
||||
# mockManager = MagicMock()
|
||||
# mockManager.session = 'session'
|
||||
|
||||
# mockClient = MagicMock()
|
||||
# mockClient.indices.exists.side_effect = [False, False]
|
||||
|
||||
# testIndexer = ESIndexer(mockManager, 10)
|
||||
|
||||
# assert testIndexer.cce_index == 'test_cce'
|
||||
# assert testIndexer.ccr_index == 'test_ccr'
|
||||
# assert testIndexer.session == 'session'
|
||||
# assert mockConn.called
|
||||
# assert mockConfig.called
|
||||
# assert mockCCE.init.called
|
||||
# assert mockCCR.init.called
|
|
@ -0,0 +1,81 @@
|
|||
import sys
|
||||
from unittest.mock import MagicMock, call
|
||||
from main import main, loadCCE, loadCCR, indexUpdates, parseArgs
|
||||
|
||||
|
||||
class TestHandler(object):
|
||||
def test_main_plain(self, mocker):
|
||||
mockSession = mocker.patch('main.SessionManager')
|
||||
mockLoadCCE = mocker.patch('main.loadCCE')
|
||||
mockLoadCCR = mocker.patch('main.loadCCR')
|
||||
mockIndex = mocker.patch('main.indexUpdates')
|
||||
|
||||
main()
|
||||
|
||||
assert mockSession.called
|
||||
assert mockLoadCCE.called
|
||||
assert mockLoadCCR.called
|
||||
assert mockIndex.called
|
||||
|
||||
def test_main_args(self, mocker):
|
||||
mockSession = mocker.patch('main.SessionManager')
|
||||
mockLoadCCE = mocker.patch('main.loadCCE')
|
||||
mockLoadCCR = mocker.patch('main.loadCCR')
|
||||
mockIndex = mocker.patch('main.indexUpdates')
|
||||
|
||||
main(
|
||||
secondsAgo=10,
|
||||
year=1900,
|
||||
exclude='ccr',
|
||||
reinit=True
|
||||
)
|
||||
|
||||
assert mockSession.called
|
||||
assert mockLoadCCE.called
|
||||
assert mockLoadCCR.not_called
|
||||
assert mockIndex.called
|
||||
|
||||
def test_cce_load(self, mocker):
|
||||
mockReader = mocker.patch('main.CCEReader')
|
||||
mockCCE = MagicMock()
|
||||
mockReader.return_value = mockCCE
|
||||
|
||||
loadCCE('manager', 10, None)
|
||||
|
||||
assert mockReader.called_once_with('manager')
|
||||
assert mockCCE.loadYears.called_once_with(None)
|
||||
assert mockCCE.getYearFiles.called_once_with(10)
|
||||
assert mockCCE.importYearData.called
|
||||
|
||||
def test_ccr_load(self, mocker):
|
||||
mockReader = mocker.patch('main.CCRReader')
|
||||
mockCCR = MagicMock()
|
||||
mockReader.return_value = mockCCR
|
||||
|
||||
loadCCR('manager', 10, None)
|
||||
|
||||
assert mockReader.called_once_with('manager')
|
||||
assert mockCCR.loadYears.called_once_with(None, 10)
|
||||
assert mockCCR.importYears.called
|
||||
|
||||
def test_indexer(self, mocker):
|
||||
mockIndexer = mocker.patch('main.ESIndexer')
|
||||
mockInd = MagicMock()
|
||||
mockIndexer.return_value = mockInd
|
||||
|
||||
indexUpdates('manager', 10)
|
||||
|
||||
assert mockIndexer.called_once_with('manager', 10)
|
||||
assert mockInd.indexRecords.mock_calls == \
|
||||
[call(recType='cce'), call(recType='ccr')]
|
||||
|
||||
def test_parseArgs_success(self, mocker):
|
||||
args = ['main', '--time', '10', '--year', '1900', '--exclude', 'ccr']
|
||||
mocker.patch.object(sys, 'argv', args)
|
||||
|
||||
args = parseArgs()
|
||||
|
||||
assert int(args.time) == 10
|
||||
assert int(args.year) == 1900
|
||||
assert args.exclude == 'ccr'
|
||||
assert args.REINITIALIZE is False
|
Loading…
Reference in New Issue