Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: no downtime reindex, second attempt #37

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ at [https://django-opensearch-dsl.readthedocs.io](https://django-opensearch-dsl.
- Opensearch auto mapping from Django models fields.
- Complex field type support (`ObjectField`, `NestedField`).
- Index fast using `parallel` indexing.
- Zero-downtime index migration for when the document mapping changes.

## Requirements

Expand Down
92 changes: 84 additions & 8 deletions django_opensearch_dsl/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
import sys
import time
from collections import deque
from datetime import datetime
from functools import partial
from typing import Optional, Iterable

from django.db import models
from django.db.models import QuerySet, Q
from opensearch_dsl import Document as DSLDocument
from opensearch_dsl.document import Document as DSLDocument, IndexMeta as DSLIndexMeta
from opensearchpy.helpers import bulk, parallel_bulk

from . import fields
from .apps import DODConfig
from .exceptions import ModelFieldNotMappedError
from .indices import Index
from .management.enums import OpensearchAction
from .search import Search
from .signals import post_index
Expand Down Expand Up @@ -44,9 +46,21 @@
}


class Document(DSLDocument):
class IndexMeta(DSLIndexMeta):
"""A specialized DSL IndexMeta that specializes the Document Index class."""

def __new__(mcs, *args, **kwargs):
"""Override `_index` with django_opensearch_dsl Index class."""
new_cls = super().__new__(mcs, *args, **kwargs)
if new_cls._index and new_cls._index._name: # noqa
new_cls._index.__class__ = Index # noqa
return new_cls


class Document(DSLDocument, metaclass=IndexMeta):
"""Allow the definition of Opensearch' index using Django `Model`."""

VERSION_NAME_SEPARATOR = "--"
_prepared_fields = []

def __init__(self, related_instance_to_ignore=None, **kwargs):
Expand All @@ -56,6 +70,64 @@ def __init__(self, related_instance_to_ignore=None, **kwargs):
self._related_instance_to_ignore = related_instance_to_ignore
self._prepared_fields = self.init_prepare()

@classmethod
def get_index_name(cls, suffix=None):
"""Compute the concrete Index name for the given (or not) suffix."""
name = cls._index._name # noqa
if suffix:
name += f"{cls.VERSION_NAME_SEPARATOR}{suffix}"
return name

@classmethod
def get_all_indices(cls, using=None):
"""Fetches from OpenSearch all concrete indices for this Document."""
return [
Index(name)
for name in sorted(
cls._get_connection(using=using).indices.get(f"{cls._index._name}{cls.VERSION_NAME_SEPARATOR}*").keys()
)
]

@classmethod
def get_active_index(cls, using=None):
"""Return the Index that's active for this Document."""
for index in cls.get_all_indices(using=using):
if index.exists_alias(name=cls._index._name): # noqa
return index

@classmethod
def migrate(cls, suffix, using=None):
"""Sets an alias of the Document Index name to a given concrete Index."""
index_name = cls.get_index_name(suffix)

actions_on_aliases = [
{"add": {"index": index_name, "alias": cls._index._name}}, # noqa
]

active_index = cls.get_active_index()
if active_index:
actions_on_aliases.insert(
0,
{"remove": {"index": active_index._name, "alias": cls._index._name}}, # noqa
)

if len(actions_on_aliases) == 1 and cls._index.exists():
cls._index.delete()
Comment on lines +114 to +115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you're doing here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is supposed to handle the case of a pre-existing non-versioned index: if there is only one action to apply (no index alias to remove from a versioned index), but the index exists, let's delete it. I have seen it working, but it's not unit-tested, and you seem to have seen it not working 😅


cls._get_connection(using=using).indices.update_aliases(body={"actions": actions_on_aliases})

@classmethod
def init(cls, suffix=None, using=None):
"""Init the Index with a named suffix to handle multiple versions.

Create an alias to the default index name if it doesn't exist.
"""
suffix = suffix or datetime.now().strftime("%Y%m%d%H%M%S%f")
index_name = cls.get_index_name(suffix)
super().init(index=index_name, using=using)
if not cls._index.exists():
cls.migrate(suffix, using=using)

@classmethod
def search(cls, using=None, index=None):
"""Return a `Search` object parametrized with the index' information."""
Expand Down Expand Up @@ -196,18 +268,18 @@ def generate_id(cls, object_instance):
"""
return object_instance.pk

def _prepare_action(self, object_instance, action):
def _prepare_action(self, object_instance, action, index_name=None):
return {
"_op_type": action,
"_index": self._index._name, # noqa
"_index": index_name or self._index._name, # noqa
"_id": self.generate_id(object_instance),
"_source" if action != "update" else "doc": (self.prepare(object_instance) if action != "delete" else None),
}

def _get_actions(self, object_list, action):
def _get_actions(self, object_list, action, **kwargs):
for object_instance in object_list:
if action == "delete" or self.should_index_object(object_instance):
yield self._prepare_action(object_instance, action)
yield self._prepare_action(object_instance, action, **kwargs)

def _bulk(self, *args, parallel=False, using=None, **kwargs):
"""Helper for switching between normal and parallel bulk operation."""
Expand All @@ -223,14 +295,18 @@ def should_index_object(self, obj):
"""
return True

def update(self, thing, action, *args, refresh=None, using=None, **kwargs): # noqa
def update(self, thing, action, *args, index_suffix=None, refresh=None, using=None, **kwargs): # noqa
"""Update document in OS for a model, iterable of models or queryset."""
if refresh is None:
refresh = getattr(self.Index, "auto_refresh", DODConfig.auto_refresh_enabled())

index_name = self.__class__.get_index_name(index_suffix) if index_suffix else None

if isinstance(thing, models.Model):
object_list = [thing]
else:
object_list = thing

return self._bulk(self._get_actions(object_list, action), *args, refresh=refresh, using=using, **kwargs)
return self._bulk(
self._get_actions(object_list, action, index_name=index_name), *args, refresh=refresh, using=using, **kwargs
)
Loading