Skip to content

Commit

Permalink
feat: Add JoinField and update indexation command to select on which …
Browse files Browse the repository at this point in the history
…models to start the indexation
  • Loading branch information
Virgin Bitton authored and qcoumes committed Mar 25, 2024
1 parent 4d4f437 commit 4f04a28
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 42 deletions.
9 changes: 6 additions & 3 deletions django_opensearch_dsl/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ def search(cls, using=None, index=None):
using=cls._get_using(using), index=cls._default_index(index), doc_type=[cls], model=cls.django.model
)

def get_queryset(self, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None) -> QuerySet:
def get_queryset(
self, db_alias: str = None, filter_: Optional[Q] = None, exclude: Optional[Q] = None, count: int = None
) -> QuerySet:
"""Return the queryset that should be indexed by this doc type."""
qs = self.django.model.objects.all()
qs = self.django.model.objects.using(db_alias).all()

if filter_:
qs = qs.filter(filter_)
Expand All @@ -88,6 +90,7 @@ def _eta(self, start, done, total): # pragma: no cover

def get_indexing_queryset(
self,
db_alias: str = None,
verbose: bool = False,
filter_: Optional[Q] = None,
exclude: Optional[Q] = None,
Expand All @@ -97,7 +100,7 @@ def get_indexing_queryset(
) -> Iterable:
"""Divide the queryset into chunks."""
chunk_size = self.django.queryset_pagination
qs = self.get_queryset(filter_=filter_, exclude=exclude, count=count)
qs = self.get_queryset(db_alias=db_alias, filter_=filter_, exclude=exclude, count=count)
qs = qs.order_by("pk") if not qs.query.is_sliced else qs
count = qs.count()
model = self.django.model.__name__
Expand Down
4 changes: 4 additions & 0 deletions django_opensearch_dsl/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ class IpField(DODField, fields.Ip):
"""Allow indexing of IPv4 and IPv6 addresses."""


class JoinField(DODField, fields.Join):
"""Allow indexing of Join fields (with parent/child relation)."""


class LongField(DODField, fields.Long):
"""Allow indexing of long.
Expand Down
170 changes: 131 additions & 39 deletions django_opensearch_dsl/management/commands/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from django.core.exceptions import FieldError
from django.core.management import BaseCommand
from django.core.management.base import OutputWrapper
from django.db import DEFAULT_DB_ALIAS
from django.db.models import Q

from django_opensearch_dsl.registries import registry
Expand Down Expand Up @@ -109,6 +110,13 @@ def _manage_index(self, action, indices, force, verbosity, ignore_error, **optio
) # noqa
self.stdout.flush()
try:
# If current index depends on many different models, add them to
# index._doc_types before indexing to make sure all mappings of different models
# are taken into account.
index_models = registry.get_indices_raw().get(index, None)
for model in list(index_models):
index._doc_types.append(model)

if action == OpensearchAction.CREATE:
index.create()
elif action == OpensearchAction.DELETE:
Expand All @@ -133,14 +141,38 @@ def _manage_index(self, action, indices, force, verbosity, ignore_error, **optio
self.stdout.write(f"{pp} index '{index._name}'... {self.style.SUCCESS('OK')}") # noqa

def _manage_document(
self, action, indices, force, filters, excludes, verbosity, parallel, count, refresh, missing, **options
self,
action,
indices,
objects,
force,
filters,
excludes,
verbosity,
parallel,
count,
refresh,
missing,
database,
**options,
): # noqa
"""Manage the creation and deletion of indices."""
action = OpensearchAction(action)
known = registry.get_indices()
filter_ = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in filters)) if filters else None
exclude = functools.reduce(operator.and_, (Q(**{k: v}) for k, v in excludes)) if excludes else None

# Filter existing objects
valid_models = []
registered_models = [m.__name__.lower() for m in registry.get_models()]
if objects:
for model in objects:
if model.lower() in registered_models:
valid_models.append(model)
else:
self.stderr.write(f"Unknown object '{model}', choices are: '{registered_models}'")
exit(1)

# Filter indices
if indices:
# Ensure every given indices exists
Expand All @@ -165,23 +197,50 @@ def _manage_document(
# Check field, preparing to display expected actions
s = f"The following documents will be {action.past}:"
kwargs_list = []
for index in indices:

if objects:
django_models = [m for m in registry.get_models() if m.__name__.lower() in valid_models]
all_os_models = []
selected_os_models = []
indices_raw = registry.get_indices_raw()

for k, v in indices_raw.items():
for model in list(v):
all_os_models.append(model)

for os_model in all_os_models:
if os_model.django.model in django_models and os_model.Index.name in list(i._name for i in indices):
selected_os_models.append(os_model)

# Handle --missing
exclude_ = exclude
if missing and action == OpensearchAction.INDEX:
q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()])
exclude_ = exclude_ & q if exclude_ is not None else q

document = index._doc_types[0]() # noqa
try:
kwargs_list.append({"filter_": filter_, "exclude": exclude_, "count": count})
qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count).count()
except FieldError as e:
model = index._doc_types[0].django.model.__name__ # noqa
self.stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa
exit(1)
else:
s += f"\n\t- {qs} {document.django.model.__name__}."
for model in selected_os_models:
try:
kwargs_list.append({"filter_": filter_, "exclude": exclude_, "count": count})
qs = model().get_queryset(filter_=filter_, exclude=exclude_, count=count).count()
except FieldError as e:
self.stderr.write(f"Error while filtering on '{model.django.model.__name__}':\n{e}'") # noqa
exit(1)
else:
s += f"\n\t- {qs} {model.django.model.__name__}."
else:
for index in indices:
# Handle --missing
exclude_ = exclude
if missing and action == OpensearchAction.INDEX:
q = Q(pk__in=[h.meta.id for h in index.search().extra(stored_fields=[]).scan()])
exclude_ = exclude_ & q if exclude_ is not None else q

document = index._doc_types[0]() # noqa
try:
kwargs_list.append({"db_alias": database, "filter_": filter_, "exclude": exclude_, "count": count})
qs = document.get_queryset(filter_=filter_, exclude=exclude_, count=count).count()
except FieldError as e:
model = index._doc_types[0].django.model.__name__ # noqa
self.stderr.write(f"Error while filtering on '{model}' (from index '{index._name}'):\n{e}'") # noqa
exit(1)
else:
s += f"\n\t- {qs} {document.django.model.__name__}."

# Display expected actions
if verbosity or not force:
Expand All @@ -198,28 +257,53 @@ def _manage_document(
exit(1)

result = "\n"
for index, kwargs in zip(indices, kwargs_list):
document = index._doc_types[0]() # noqa
qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs)
success, errors = document.update(
qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False
)

success_str = self.style.SUCCESS(success) if success else success
errors_str = self.style.ERROR(len(errors)) if errors else len(errors)
model = document.django.model.__name__

if verbosity == 1:
result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n"
reasons = defaultdict(int)
for e in errors: # Count occurrence of each error
error = e.get(action, {"result": "unknown error"}).get("result", "unknown error")
reasons[error] += 1
for reasons, total in reasons.items():
result += f" - {reasons} : {total}\n"

if verbosity > 1:
result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n"
if objects:
for model, kwargs in zip(selected_os_models, kwargs_list):
document = model() # noqa
qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs)
success, errors = document.update(
qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False
)

success_str = self.style.SUCCESS(success) if success else success
errors_str = self.style.ERROR(len(errors)) if errors else len(errors)
model = document.django.model.__name__

if verbosity == 1:
result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n"
reasons = defaultdict(int)
for e in errors: # Count occurrence of each error
error = e.get(action, {"result": "unknown error"}).get("result", "unknown error")
reasons[error] += 1
for reasons, total in reasons.items():
result += f" - {reasons} : {total}\n"

if verbosity > 1:
result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n"

else:
for index, kwargs in zip(indices, kwargs_list):
document = index._doc_types[0]() # noqa
qs = document.get_indexing_queryset(stdout=self.stdout._out, verbose=verbosity, action=action, **kwargs)
success, errors = document.update(
qs, parallel=parallel, refresh=refresh, action=action, raise_on_error=False
)

success_str = self.style.SUCCESS(success) if success else success
errors_str = self.style.ERROR(len(errors)) if errors else len(errors)
model = document.django.model.__name__

if verbosity == 1:
result += f"{success_str} {model} successfully {action.past}, {errors_str} errors:\n"
reasons = defaultdict(int)
for e in errors: # Count occurrence of each error
error = e.get(action, {"result": "unknown error"}).get("result", "unknown error")
reasons[error] += 1
for reasons, total in reasons.items():
result += f" - {reasons} : {total}\n"

if verbosity > 1:
result += f"{success_str} {model} successfully {action}d, {errors_str} errors:\n {errors}\n"

if verbosity:
self.stdout.write(result + "\n")
Expand All @@ -237,7 +321,7 @@ def add_arguments(self, parser):
)
subparser.set_defaults(func=self.__list_index)

# 'manage' subcommand
# 'index' subcommand
subparser = subparsers.add_parser(
"index",
help="Manage the creation an deletion of indices.",
Expand Down Expand Up @@ -288,6 +372,13 @@ def add_arguments(self, parser):
OpensearchAction.UPDATE.value,
],
)
subparser.add_argument(
"-d",
"--database",
type=str,
default=None,
help="Nominates a database to use as source.",
)
subparser.add_argument(
"-f",
"--filters",
Expand Down Expand Up @@ -321,6 +412,7 @@ def add_arguments(self, parser):
subparser.add_argument(
"-i", "--indices", type=str, nargs="*", help="Only update documents on the given indices."
)
subparser.add_argument("-o", "--objects", type=str, nargs="*", help="Only update selected objects.")
subparser.add_argument(
"-c", "--count", type=int, default=None, help="Update at most COUNT objects (0 to index everything)."
)
Expand Down
4 changes: 4 additions & 0 deletions django_opensearch_dsl/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,9 @@ def __contains__(self, obj):
f"'in <{type(self).__name__}>' requires a Model subclass as left operand, not {type(dict).__name__}"
)

def get_indices_raw(self):
"""Get all indices as they are store in the registry or the indices for a list of models."""
return self._indices


registry = DocumentRegistry()

0 comments on commit 4f04a28

Please sign in to comment.