Merge pull request #4368 from safwanrahman/comman

[Fix  #4333] Implement asynchronous search reindex functionality using celery
eric-search-upgrade
Eric Holscher 2018-07-31 14:38:35 +02:00 committed by GitHub
commit 463f9e23d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 382 additions and 93 deletions

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import pytest import pytest
from django.conf import settings
from rest_framework.test import APIClient from rest_framework.test import APIClient
try: try:
@ -46,7 +47,6 @@ def pytest_configure(config):
def settings_modification(settings): def settings_modification(settings):
settings.CELERY_ALWAYS_EAGER = True settings.CELERY_ALWAYS_EAGER = True
@pytest.fixture @pytest.fixture
def api_client(): def api_client():
return APIClient() return APIClient()

View File

@ -1,58 +0,0 @@
"""Reindex Elastic Search indexes"""
from __future__ import absolute_import
import logging
from optparse import make_option
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from django.conf import settings
from readthedocs.builds.constants import LATEST
from readthedocs.builds.models import Version
from readthedocs.projects.tasks import update_search
log = logging.getLogger(__name__)
class Command(BaseCommand):
help = __doc__
def add_arguments(self, parser):
parser.add_argument(
'-p',
dest='project',
default='',
help='Project to index'
)
def handle(self, *args, **options):
"""Build/index all versions or a single project's version"""
project = options['project']
queryset = Version.objects.all()
if project:
queryset = queryset.filter(project__slug=project)
if not queryset.exists():
raise CommandError(
'No project with slug: {slug}'.format(slug=project))
log.info("Building all versions for %s", project)
elif getattr(settings, 'INDEX_ONLY_LATEST', True):
queryset = queryset.filter(slug=LATEST)
for version in queryset:
log.info("Reindexing %s", version)
try:
commit = version.project.vcs_repo(version.slug).commit
except: # noqa
# An exception can be thrown here in production, but it's not
# documented what the exception here is
commit = None
try:
update_search(version.pk, commit,
delete_non_commit_files=False)
except Exception as e:
log.exception('Reindex failed for %s, %s', version, e)

View File

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.13 on 2018-07-27 09:54
from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('projects', '0027_add_htmlfile_model'),
]
operations = [
migrations.AddField(
model_name='importedfile',
name='modified_date',
field=models.DateTimeField(auto_now=True, default=django.utils.timezone.now, verbose_name='Modified date'),
preserve_default=False,
),
]

View File

@ -13,6 +13,7 @@ from django.conf import settings
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.urlresolvers import NoReverseMatch, reverse from django.core.urlresolvers import NoReverseMatch, reverse
from django.db import models from django.db import models
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible from django.utils.encoding import python_2_unicode_compatible
from django.utils.functional import cached_property from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
@ -911,6 +912,7 @@ class ImportedFile(models.Model):
path = models.CharField(_('Path'), max_length=255) path = models.CharField(_('Path'), max_length=255)
md5 = models.CharField(_('MD5 checksum'), max_length=255) md5 = models.CharField(_('MD5 checksum'), max_length=255)
commit = models.CharField(_('Commit'), max_length=255) commit = models.CharField(_('Commit'), max_length=255)
modified_date = models.DateTimeField(_('Modified date'), auto_now=True)
def get_absolute_url(self): def get_absolute_url(self):
return resolve(project=self.project, version_slug=self.version.slug, filename=self.path) return resolve(project=self.project, version_slug=self.version.slug, filename=self.path)

View File

@ -14,3 +14,7 @@ after_build = django.dispatch.Signal(providing_args=["version"])
project_import = django.dispatch.Signal(providing_args=["project"]) project_import = django.dispatch.Signal(providing_args=["project"])
files_changed = django.dispatch.Signal(providing_args=["project", "files"]) files_changed = django.dispatch.Signal(providing_args=["project", "files"])
bulk_post_create = django.dispatch.Signal(providing_args=["instance_list"])
bulk_post_delete = django.dispatch.Signal(providing_args=["instance_list"])

View File

@ -32,7 +32,8 @@ from slumber.exceptions import HttpClientError
from .constants import LOG_TEMPLATE from .constants import LOG_TEMPLATE
from .exceptions import RepositoryError from .exceptions import RepositoryError
from .models import ImportedFile, Project, Domain, Feature, HTMLFile from .models import ImportedFile, Project, Domain, Feature, HTMLFile
from .signals import before_vcs, after_vcs, before_build, after_build, files_changed from .signals import before_vcs, after_vcs, before_build, after_build, files_changed, \
bulk_post_create, bulk_post_delete
from readthedocs.builds.constants import ( from readthedocs.builds.constants import (
BUILD_STATE_BUILDING, BUILD_STATE_CLONING, BUILD_STATE_FINISHED, BUILD_STATE_BUILDING, BUILD_STATE_CLONING, BUILD_STATE_FINISHED,
BUILD_STATE_INSTALLING, LATEST, LATEST_VERBOSE_NAME, STABLE_VERBOSE_NAME) BUILD_STATE_INSTALLING, LATEST, LATEST_VERBOSE_NAME, STABLE_VERBOSE_NAME)
@ -986,6 +987,7 @@ def _manage_imported_files(version, path, commit):
:param commit: Commit that updated path :param commit: Commit that updated path
""" """
changed_files = set() changed_files = set()
created_html_files = []
for root, __, filenames in os.walk(path): for root, __, filenames in os.walk(path):
for filename in filenames: for filename in filenames:
if fnmatch.fnmatch(filename, '*.html'): if fnmatch.fnmatch(filename, '*.html'):
@ -1015,15 +1017,27 @@ def _manage_imported_files(version, path, commit):
obj.commit = commit obj.commit = commit
obj.save() obj.save()
# Delete the HTMLFile first from previous versions if model_class == HTMLFile:
HTMLFile.objects.filter(project=version.project, # the `obj` is HTMLFile, so add it to the list
version=version created_html_files.append(obj)
).exclude(commit=commit).delete()
# Send bulk_post_create signal for bulk indexing to Elasticsearch
bulk_post_create.send(sender=HTMLFile, instance_list=created_html_files)
# Delete the HTMLFile first from previous commit and
# send bulk_post_delete signal for bulk removing from Elasticsearch
delete_queryset = (HTMLFile.objects.filter(project=version.project, version=version)
.exclude(commit=commit))
# Keep the objects into memory to send it to signal
instance_list = list(delete_queryset)
# Safely delete from database
delete_queryset.delete()
# Always pass the list of instance, not queryset.
bulk_post_delete.send(sender=HTMLFile, instance_list=instance_list)
# Delete ImportedFiles from previous versions # Delete ImportedFiles from previous versions
ImportedFile.objects.filter(project=version.project, (ImportedFile.objects.filter(project=version.project, version=version)
version=version .exclude(commit=commit).delete())
).exclude(commit=commit).delete()
changed_files = [ changed_files = [
resolve_path( resolve_path(
version.project, filename=file, version_slug=version.slug, version.project, filename=file, version_slug=version.slug,

View File

@ -0,0 +1 @@
default_app_config = 'readthedocs.search.apps.SearchConfig'

View File

@ -0,0 +1,10 @@
"""Project app config"""
from django.apps import AppConfig
class SearchConfig(AppConfig):
name = 'readthedocs.search'
def ready(self):
from .signals import index_html_file, remove_html_file

View File

@ -3,9 +3,9 @@ from django_elasticsearch_dsl import DocType, Index, fields
from elasticsearch_dsl.query import SimpleQueryString, Bool from elasticsearch_dsl.query import SimpleQueryString, Bool
from readthedocs.projects.models import Project, HTMLFile from readthedocs.projects.models import Project, HTMLFile
from .conf import SEARCH_EXCLUDED_FILE
from readthedocs.search.faceted_search import ProjectSearch, FileSearch from readthedocs.search.faceted_search import ProjectSearch, FileSearch
from .conf import SEARCH_EXCLUDED_FILE
from .mixins import RTDDocTypeMixin
project_conf = settings.ES_INDEXES['project'] project_conf = settings.ES_INDEXES['project']
project_index = Index(project_conf['name']) project_index = Index(project_conf['name'])
@ -17,7 +17,7 @@ page_index.settings(**page_conf['settings'])
@project_index.doc_type @project_index.doc_type
class ProjectDocument(DocType): class ProjectDocument(RTDDocTypeMixin, DocType):
class Meta(object): class Meta(object):
model = Project model = Project
@ -47,11 +47,12 @@ class ProjectDocument(DocType):
@page_index.doc_type @page_index.doc_type
class PageDocument(DocType): class PageDocument(RTDDocTypeMixin, DocType):
class Meta(object): class Meta(object):
model = HTMLFile model = HTMLFile
fields = ('commit',) fields = ('commit',)
ignore_signals = settings.ES_PAGE_IGNORE_SIGNALS
project = fields.KeywordField(attr='project.slug') project = fields.KeywordField(attr='project.slug')
version = fields.KeywordField(attr='version.slug') version = fields.KeywordField(attr='version.slug')
@ -121,21 +122,3 @@ class PageDocument(DocType):
queryset = (queryset.filter(project__documentation_type='sphinx') queryset = (queryset.filter(project__documentation_type='sphinx')
.exclude(name__in=SEARCH_EXCLUDED_FILE)) .exclude(name__in=SEARCH_EXCLUDED_FILE))
return queryset return queryset
def update(self, thing, refresh=None, action='index', **kwargs):
"""Overwrite in order to index only certain files"""
# Object not exist in the provided queryset should not be indexed
# TODO: remove this overwrite when the issue has been fixed
# See below link for more information
# https://github.com/sabricot/django-elasticsearch-dsl/issues/111
# Moreover, do not need to check if its a delete action
# Because while delete action, the object is already remove from database
if isinstance(thing, HTMLFile) and action != 'delete':
# Its a model instance.
queryset = self.get_queryset()
obj = queryset.filter(pk=thing.pk)
if not obj.exists():
return None
return super(PageDocument, self).update(thing=thing, refresh=refresh,
action=action, **kwargs)

View File

@ -0,0 +1,101 @@
import datetime
import logging
from celery import chord, chain
from django.apps import apps
from django.conf import settings
from django.core.management import BaseCommand
from django.utils import timezone
from django_elasticsearch_dsl.registries import registry
from ...tasks import (index_objects_to_es, switch_es_index, create_new_es_index,
index_missing_objects)
from ...utils import chunk_queryset
log = logging.getLogger(__name__)
class Command(BaseCommand):
@staticmethod
def _get_indexing_tasks(app_label, model_name, queryset, document_class, index_name):
queryset = queryset.values_list('id', flat=True)
chunked_queryset = chunk_queryset(queryset, settings.ES_TASK_CHUNK_SIZE)
for chunk in chunked_queryset:
data = {
'app_label': app_label,
'model_name': model_name,
'document_class': document_class,
'index_name': index_name,
'objects_id': list(chunk)
}
yield index_objects_to_es.si(**data)
def _run_reindex_tasks(self, models):
for doc in registry.get_documents(models):
queryset = doc().get_queryset()
# Get latest object from the queryset
index_time = timezone.now()
app_label = queryset.model._meta.app_label
model_name = queryset.model.__name__
index_name = doc._doc_type.index
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
new_index_name = "{}_{}".format(index_name, timestamp)
pre_index_task = create_new_es_index.si(app_label=app_label,
model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)
indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name,
queryset=queryset,
document_class=str(doc),
index_name=new_index_name)
post_index_task = switch_es_index.si(app_label=app_label, model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)
# Task to run in order to add the objects
# that has been inserted into database while indexing_tasks was running
# We pass the creation time of latest object, so its possible to index later items
missed_index_task = index_missing_objects.si(app_label=app_label,
model_name=model_name,
document_class=str(doc),
index_generation_time=index_time)
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords
chord_tasks = chord(header=indexing_tasks, body=post_index_task)
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain
chain(pre_index_task, chord_tasks, missed_index_task).apply_async()
message = ("Successfully issued tasks for {}.{}, total {} items"
.format(app_label, model_name, queryset.count()))
log.info(message)
def add_arguments(self, parser):
parser.add_argument(
'--models',
dest='models',
type=str,
nargs='*',
help=("Specify the model to be updated in elasticsearch."
"The format is <app_label>.<model_name>")
)
def handle(self, *args, **options):
"""
Index models into Elasticsearch index asynchronously using celery.
You can specify model to get indexed by passing
`--model <app_label>.<model_name>` parameter.
Otherwise, it will reindex all the models
"""
models = None
if options['models']:
models = [apps.get_model(model_name) for model_name in options['models']]
self._run_reindex_tasks(models=models)

View File

@ -0,0 +1,68 @@
from django.db import models
from django.core.paginator import Paginator
class RTDDocTypeMixin(object):
"""
Override some methods of DocType of DED
Changelog as following:
- Do not index object that not exist in the provided queryset
- Take additional argument in update method `index_name` to update specific index
Issues:
- https://github.com/sabricot/django-elasticsearch-dsl/issues/111
"""
def _prepare_action(self, object_instance, action, index_name=None):
"""Overwrite to take `index_name` from parameters for setting index dynamically"""
return {
'_op_type': action,
'_index': index_name or str(self._doc_type.index),
'_type': self._doc_type.mapping.doc_type,
'_id': object_instance.pk,
'_source': (
self.prepare(object_instance) if action != 'delete' else None
),
}
def _get_actions(self, object_list, action, index_name=None):
"""Overwrite to take `index_name` from parameters for setting index dynamically"""
if self._doc_type.queryset_pagination is not None:
paginator = Paginator(
object_list, self._doc_type.queryset_pagination
)
for page in paginator.page_range:
for object_instance in paginator.page(page).object_list:
yield self._prepare_action(object_instance, action, index_name)
else:
for object_instance in object_list:
yield self._prepare_action(object_instance, action, index_name)
def update(self, thing, refresh=None, action='index', index_name=None, **kwargs):
"""Update each document in ES for a model, iterable of models or queryset"""
if refresh is True or (
refresh is None and self._doc_type.auto_refresh
):
kwargs['refresh'] = True
# TODO: remove this overwrite when the issue has been fixed
# https://github.com/sabricot/django-elasticsearch-dsl/issues/111
if isinstance(thing, models.Model):
# Its a model instance.
# Do not need to check if its a delete action
# Because while delete action, the object is already remove from database
if action != 'delete':
queryset = self.get_queryset()
obj = queryset.filter(pk=thing.pk)
if not obj.exists():
return None
object_list = [thing]
else:
object_list = thing
return self.bulk(
self._get_actions(object_list, action, index_name=index_name), **kwargs
)

View File

@ -1,7 +1,37 @@
"""We define custom Django signals to trigger before executing searches.""" """We define custom Django signals to trigger before executing searches."""
from __future__ import absolute_import from __future__ import absolute_import
import django.dispatch import django.dispatch
from django.dispatch import receiver
from django_elasticsearch_dsl.apps import DEDConfig
from django_elasticsearch_dsl.registries import registry
from readthedocs.projects.models import HTMLFile
from readthedocs.projects.signals import bulk_post_create, bulk_post_delete
from readthedocs.search.documents import PageDocument
from readthedocs.search.tasks import index_objects_to_es
before_project_search = django.dispatch.Signal(providing_args=["body"]) before_project_search = django.dispatch.Signal(providing_args=["body"])
before_file_search = django.dispatch.Signal(providing_args=["body"]) before_file_search = django.dispatch.Signal(providing_args=["body"])
before_section_search = django.dispatch.Signal(providing_args=["body"]) before_section_search = django.dispatch.Signal(providing_args=["body"])
@receiver(bulk_post_create, sender=HTMLFile)
def index_html_file(instance_list, **_):
kwargs = {
'app_label': HTMLFile._meta.app_label,
'model_name': HTMLFile.__name__,
'document_class': str(PageDocument),
'index_name': None, # No need to change the index name
'objects_id': [obj.id for obj in instance_list],
}
# Do not index if autosync is disabled globally
if DEDConfig.autosync_enabled():
index_objects_to_es(**kwargs)
@receiver(bulk_post_delete, sender=HTMLFile)
def remove_html_file(instance_list, **_):
# Do not index if autosync is disabled globally
if DEDConfig.autosync_enabled():
registry.delete(instance_list)

View File

@ -0,0 +1,99 @@
import logging
from django.apps import apps
from django_elasticsearch_dsl.registries import registry
from readthedocs.worker import app
log = logging.getLogger(__name__)
def _get_index(indices, index_name):
"""
Get Index from all the indices
:param indices: DED indices list
:param index_name: Name of the index
:return: DED Index
"""
for index in indices:
if str(index) == index_name:
return index
def _get_document(model, document_class):
"""
Get DED document class object from the model and name of document class
:param model: The model class to find the document
:param document_class: the name of the document class.
:return: DED DocType object
"""
documents = registry.get_documents(models=[model])
for document in documents:
if str(document) == document_class:
return document
@app.task(queue='web')
def create_new_es_index(app_label, model_name, index_name, new_index_name):
model = apps.get_model(app_label, model_name)
indices = registry.get_indices(models=[model])
old_index = _get_index(indices=indices, index_name=index_name)
new_index = old_index.clone(name=new_index_name)
new_index.create()
@app.task(queue='web')
def switch_es_index(app_label, model_name, index_name, new_index_name):
model = apps.get_model(app_label, model_name)
indices = registry.get_indices(models=[model])
old_index = _get_index(indices=indices, index_name=index_name)
new_index = old_index.clone(name=new_index_name)
old_index_actual_name = None
if old_index.exists():
# Alias can not be used to delete an index.
# https://www.elastic.co/guide/en/elasticsearch/reference/6.0/indices-delete-index.html
# So get the index actual name to delete it
old_index_info = old_index.get()
# The info is a dictionary and the key is the actual name of the index
old_index_actual_name = old_index_info.keys()[0]
# Put alias into the new index name and delete the old index if its exist
new_index.put_alias(name=index_name)
if old_index_actual_name:
old_index.connection.indices.delete(index=old_index_actual_name)
@app.task(queue='web')
def index_objects_to_es(app_label, model_name, document_class, index_name, objects_id):
model = apps.get_model(app_label, model_name)
document = _get_document(model=model, document_class=document_class)
# Use queryset from model as the ids are specific
queryset = model.objects.all().filter(id__in=objects_id).iterator()
log.info("Indexing model: {}, id:'{}'".format(model.__name__, objects_id))
document().update(queryset, index_name=index_name)
@app.task(queue='web')
def index_missing_objects(app_label, model_name, document_class, index_generation_time):
"""
Task to insure that none of the object is missed from indexing.
The object ids are sent to `index_objects_to_es` task for indexing.
While the task is running, new objects can be created/deleted in database
and they will not be in the tasks for indexing into ES.
This task will index all the objects that got into DB after the `latest_indexed` timestamp
to ensure that everything is in ES index.
"""
model = apps.get_model(app_label, model_name)
document = _get_document(model=model, document_class=document_class)
queryset = document().get_queryset().exclude(modified_date__lte=index_generation_time)
document().update(queryset.iterator())
log.info("Indexed {} missing objects from model: {}'".format(queryset.count(), model.__name__))
# TODO: Figure out how to remove the objects from ES index that has been deleted

View File

@ -58,9 +58,5 @@ def get_dummy_processed_json(instance):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def mock_processed_json(mocker): def mock_processed_json(mocker):
# patch the function from `projects.tasks` because it has been point to there
# http://www.voidspace.org.uk/python/mock/patch.html#where-to-patch
mocked_function = mocker.patch.object(HTMLFile, 'get_processed_json', autospec=True) mocked_function = mocker.patch.object(HTMLFile, 'get_processed_json', autospec=True)
mocked_function.side_effect = get_dummy_processed_json mocked_function.side_effect = get_dummy_processed_json

View File

@ -321,3 +321,12 @@ def get_project_list_or_404(project_slug, user):
project_list = list(subprojects) + [project] project_list = list(subprojects) + [project]
return project_list return project_list
def chunk_queryset(queryset, chunk_size):
"""Yield successive `chunk_size` chunks of queryset."""
# Based on https://stackoverflow.com/a/312464
# licensed under cc by-sa 3.0
total = queryset.count()
for i in range(0, total, chunk_size):
yield queryset[i:i + chunk_size]

View File

@ -324,6 +324,9 @@ class CommunityBaseSettings(Settings):
'hosts': '127.0.0.1:9200' 'hosts': '127.0.0.1:9200'
}, },
} }
# Chunk size for elasticsearch reindex celery tasks
ES_TASK_CHUNK_SIZE = 100
ES_PAGE_IGNORE_SIGNALS = True
# ANALYZER = 'analysis': { # ANALYZER = 'analysis': {
# 'analyzer': { # 'analyzer': {

View File

@ -48,6 +48,9 @@ class CommunityDevSettings(CommunityBaseSettings):
'test:8000', 'test:8000',
) )
# Disable auto syncing elasticsearch documents in development
ELASTICSEARCH_DSL_AUTOSYNC = False
@property @property
def LOGGING(self): # noqa - avoid pep8 N802 def LOGGING(self): # noqa - avoid pep8 N802
logging = super(CommunityDevSettings, self).LOGGING logging = super(CommunityDevSettings, self).LOGGING

View File

@ -16,6 +16,8 @@ class CommunityTestSettings(CommunityDevSettings):
DEBUG = False DEBUG = False
TEMPLATE_DEBUG = False TEMPLATE_DEBUG = False
ES_PAGE_IGNORE_SIGNALS = False
ELASTICSEARCH_DSL_AUTOSYNC = True
@property @property
def ES_INDEXES(self): # noqa - avoid pep8 N802 def ES_INDEXES(self): # noqa - avoid pep8 N802