Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Scale to Zero with Reader/Writer Separation. #16720

Open
prudhvigodithi opened this issue Nov 25, 2024 · 9 comments
Open

[Feature Request] Scale to Zero with Reader/Writer Separation. #16720

prudhvigodithi opened this issue Nov 25, 2024 · 9 comments
Assignees
Labels
enhancement Enhancement or improvement to existing feature or request Roadmap:Search Project-wide roadmap label Search:Performance v2.19.0 Issues and PRs related to version 2.19.0

Comments

@prudhvigodithi
Copy link
Member

prudhvigodithi commented Nov 25, 2024

Is your feature request related to a problem? Please describe

Coming from the META issue #15306 achieve Scale to Zero with Reader/Writer Separation. With scale to zero we should be able to scale down the primary and replicas and keep only the search replicas for search traffic and ability to bring back the primary and regular replicas for write (index) traffic.

Describe the solution you'd like

Handle the scale to zero behavior and perform the actions based on an index setting. At high level

Update Cluster State (Initial)

  • Set remove_indexing_shards (or scale_down_indexing or index.read_only_mode) flag in index settings.
  • Update IndexMetadata to reflect scaled state.
  • Trigger cluster state update.

Store Original Configuration

  • Save number of original primary shards and replicas.
  • Preserve all allocation details.
  • This information will be used during scale-up.

Prepare for Scale Down

  • Flush all shards to ensure data persistence.
  • Upload latest state to remote store.
  • Perform final sync for search replicas.
  • Ensure all data is safely stored remotely.

Update Routing Table

  • Remove primary and replica assignments (close the shards).
  • Update routing table to reflect scaled state.
  • Maintain cluster state consistency.

Close Shards

  • Clean up local resources.
  • Maintain metadata in cluster state.

Handle Cluster Health

  • Implement custom health status (at cluster level, index level and shard level) for scaled-down indices with only active search only replicas.
  • Update cluster health APIs.
  • Provide clear status in cluster state.

Scale Up Process (when flag is removed)

  • Detect remove_indexing_shards flag removal.
  • Retrieve original configuration.
  • Recover from remote store.
  • Restore original routing.
  • Re-establish primary and replicas.

Related component

Search:Performance

@prudhvigodithi
Copy link
Member Author

prudhvigodithi commented Nov 25, 2024

[Triage]
Posted some POC (main...prudhvigodithi:OpenSearch:searchonly) results on to the meta issue:

Moving forward I will use this issue to discuss more on scale to zero topic.

Thank you
@mch2 @getsaurabh02 @vinaykpud

@prudhvigodithi
Copy link
Member Author

prudhvigodithi commented Nov 29, 2024

From the POC, with the scale to zero setting (remove_indexing_shards) main...prudhvigodithi:OpenSearch:searchonly we will have only the search replicas for the search traffic.

index           shard prirep state   unassigned.reason node
my_search_index 0     s      STARTED                   runTask-0

And once the index level scale to zero setting (remove_indexing_shards) is disabled it will restore both the primary and regular replicas and enables the write operations:

index           shard prirep state   unassigned.reason node
my_search_index 0     p      STARTED                   runTask-1
my_search_index 0     r      STARTED                   runTask-0
my_search_index 0     s      STARTED                   runTask-4

The logic ensures that the final sync for both the translog sync and remote store sync is completed before the shard is closed. To validate this, I’ve added additional log information during the process:

[2024-11-29T11:49:15,801][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] Translog has 3 uncommitted operations before closing shard [[my_search_index][0]]
[2024-11-29T11:49:15,802][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] The 1st indexShard.isSyncNeeded() is true
[2024-11-29T11:49:15,802][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] Performing final flush before closing shard
[2024-11-29T11:49:15,888][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] The indexShard.isSyncNeeded() is false
[2024-11-29T11:49:15,889][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] [my_search_index][0] Primary shard starting final sync. Current segments: _0.cfe,_0.cfs,_0.si,_1.cfe,_1.cfs,_1.si,_2.cfe,_2.cfs,_2.si,_3.cfe,_3.cfs,_3.si,_4.cfe,_4.cfs,_4.si,_5.cfe,_5.cfs,_5.si,_6.cfe,_6.cfs,_6.si,segments_4,write.lock
[2024-11-29T11:49:15,889][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] Performing final sync before closing shard, `remove_indexing_shards` is enabled
[2024-11-29T11:49:15,889][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] Translog is empty before closing shard [[my_search_index][0]]
[2024-11-29T11:49:15,889][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] [my_search_index][0] Primary shard sync completed, awaiting remote store sync
[2024-11-29T11:49:15,889][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] Awaiting final sync completion
[2024-11-29T11:49:15,890][INFO ][o.o.i.IndexService       ] [runTask-3] [my_search_index] [my_search_index][0] Primary shard final sync completed. Final segments: _0.cfe,_0.cfs,_0.si,_1.cfe,_1.cfs,_1.si,_2.cfe,_2.cfs,_2.si,_3.cfe,_3.cfs,_3.si,_4.cfe,_4.cfs,_4.si,_5.cfe,_5.cfs,_5.si,_6.cfe,_6.cfs,_6.si,segments_4,write.lock
[2024-11-29T11:49:15,897][DEBUG][o.o.c.c.C.CoordinatorPublication] [runTask-2] Publication ended successfully: Publication{term=4, version=16}

Before closing the shard, when the scale to zero setting (remove_indexing_shards) is enabled, the primary shard undergoes a flush and sync process. I’ve observed that:

  • Before the flush, the translog contains uncommitted operations, and isSyncNeeded() returns true.

  • After the flush, isSyncNeeded() becomes false, and the translog operations are empty.

  • At this point, the shard proceeds with the closure.

Also I have seen steps closely align with those outlined in the close index operation, here we are dealing with just the shard close.

From multiple tests, I’ve consistently observed no uncommitted operations post-flush. Therefore, closing the shard appears safe when the scale to zero setting (remove_indexing_shards) is enabled, and only search replicas remain active thereafter.

Question

Is there an existing method or logic in remote store to handle scenarios where a shard closure occurs while the translog still contains uncommitted operations? Specifically:

  • Does the remote store code already include a mechanism to retry or alert users in such cases where the shard cannot be closed because of some uncommitted operations left ?

  • If this is a valid concern, I propose updating the scale-to-zero logic to first identify the primary shards for an index, then perform a flush and sync operation, followed by an additional check for any remaining translog operations. Only after ensuring the translog is clear would the process proceed with routing table updates (with the routing table update, if the shard is removed from the routing table the closeShard operation is automatically called). To achieve this AFAIK I would need to create a new transport action which can identify the primary shards across the nodes and run the flush and sync early, this action can be invoked from MetadataUpdateSettingsService (which runs on cluster manager node).

Before I make this change @shwetathareja can you please suggest if there is mechanism already exists from remote store that can I use to find the primary shards for an index across the nodes and run a final guarantee sync to the remote store? If so I can leverage this and just proceed with closing the shard.

Adding @mch2 @msfroh

Thank you

@prudhvigodithi
Copy link
Member Author

In the latest commit (prudhvigodithi@aa96f64 under the server/src/main/java/org/opensearch/action/admin/indices/scaleToZero/ package) I have introduced a new transport action TransportPreScaleSyncAction. This action is designed to ensure that all primary shards of an index are properly synchronized before scaling down or removing indexing shards, this action is triggered before updating the routing tables, only once the gets successful response moves forward in updating the routing tables and eventually closes the shard. So the logic to do the final sync during the close shard can be removed and be handled by the transport action. The transport action collects all primary shards of the index and identifies which nodes they are assigned to.

In the log I can see it filters the nodes and primary shards for a given index and does one final sync before proceeding updating the routing table and closing the shard.

 IDLE
[2024-12-02T13:33:35,788][INFO ][o.o.a.a.i.s.TransportPreScaleSyncAction] [runTask-0] nodeId is tXQCvdmyQgqiKRY9uoOcdw
[2024-12-02T13:33:35,816][DEBUG][o.o.c.c.PublicationTransportHandler] [runTask-1] received diff cluster state version [23] with uuid [pyQn719MT6WrjWojYqrpGQ], diff size [531]
[2024-12-02T13:33:35,789][INFO ][o.o.a.a.i.s.TransportPreScaleSyncAction] [runTask-2] The handleShardSyncRequest is called
[2024-12-02T13:33:35,816][DEBUG][o.o.c.c.PublicationTransportHandler] [runTask-3] received diff cluster state version [23] with uuid [pyQn719MT6WrjWojYqrpGQ], diff size [531]
[2024-12-02T13:33:35,789][INFO ][o.o.a.a.i.s.TransportPreScaleSyncAction] [runTask-4] The handleShardSyncRequest is called
[2024-12-02T13:33:35,816][DEBUG][o.o.c.c.PublicationTransportHandler] [runTask-5] received diff cluster state version [23] with uuid [pyQn719MT6WrjWojYqrpGQ], diff size [531]
[2024-12-02T13:33:35,788][INFO ][o.o.a.a.i.s.TransportPreScaleSyncAction] [runTask-0] shards is [[my_search_index][0]]
[2024-12-02T13:33:35,823][DEBUG][o.o.c.c.Coordinator      ] [runTask-1] handlePublishRequest: handling version [23] from [{runTask-0}{xKbjRg5qSbSMVN-JiB-7Rg}{_c2v0hBqRFi-k1EONAg8fw}{127.0.0.1}{127.0.0.1:9300}{dimr}{testattr=test, shard_indexing_pressure_enabled=true}]
[2024-12-02T13:33:35,791][INFO ][o.o.a.a.i.s.TransportPreScaleSyncAction] [runTask-2] Doing final flush before closing shard
[2024-12-02T13:33:35,820][DEBUG][o.o.c.c.Coordinator      ] [runTask-3] handlePublishRequest: handling version [23] from [{runTask-0}{xKbjRg5qSbSMVN-JiB-7Rg}{_c2v0hBqRFi-k1EONAg8fw}{127.0.0.1}{127.0.0.1:9300}{dimr}{testattr=test, shard_indexing_pressure_enabled=true}]
[2024-12-02T13:33:35,791][INFO ][o.o.a.a.i.s.TransportPreScaleSyncAction] [runTask-4] Doing final flush before closing shard
[2024-12-02T13:33:35,823][DEBUG][o.o.c.c.Coordinator      ] [runTask-5] handlePublishRequest: handling version [23] from [{runTask-0}{xKbjRg5qSbSMVN-JiB-7Rg}{_c2v0hBqRFi-k1EONAg8fw}{127.0.0.1}{127.0.0.1:9300}{dimr}{testattr=test, shard_indexing_pressure_enabled=true}]
[2024-12-02T13:33:35,789][INFO ][o.o.a.a.i.s.TransportPreScaleSyncAction] [runTask-0] nodeId is I2lIdLMRR3C6wCi-4gJ26Q
[2024-12-02T13:33:35,815][DEBUG][o.o.c.c.PublicationTransportHandler] [runTask-2] received diff cluster state version [23] with uuid [pyQn719MT6WrjWojYqrpGQ], diff size [531]
[2024-12-02T13:33:35,815][DEBUG][o.o.c.c.PublicationTransportHandler] [runTask-4] received diff cluster state version [23] with uuid [pyQn719MT6WrjWojYqrpGQ], diff size [531]
[2024-12-02T13:33:35,789][INFO ][o.o.a.a.i.s.TransportPreScaleSyncAction] [runTask-0] shards is [[my_search_index][1]]

Finally by then it reaches the closeShard method, the translog is empty and sync required is false, which means safe to close the shard without any data loss.

[2024-12-02T13:33:36,017][DEBUG][o.o.c.c.Coordinator      ] [runTask-5] handlePublishRequest: handling version [24] from [{runTask-0}{xKbjRg5qSbSMVN-JiB-7Rg}{_c2v0hBqRFi-k1EONAg8fw}{127.0.0.1}{127.0.0.1:9300}{dimr}{testattr=test, shard_indexing_pressure_enabled=true}]
[2024-12-02T13:33:36,126][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] Translog is empty before closing shard [[my_search_index][1]]
[2024-12-02T13:33:36,125][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] Translog is empty before closing shard [[my_search_index][0]]
[2024-12-02T13:33:36,129][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] The 1st indexShard.isSyncNeeded() is false
[2024-12-02T13:33:36,126][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] The 1st indexShard.isSyncNeeded() is false
[2024-12-02T13:33:36,132][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] Doing final flush before closing shard
[2024-12-02T13:33:36,126][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] Doing final flush before closing shard
[2024-12-02T13:33:36,201][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] The indexShard.isSyncNeeded() is false
[2024-12-02T13:33:36,206][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] [my_search_index][1] Primary shard starting final sync. Current segments: _0.cfe,_0.cfs,_0.si,_1.cfe,_1.cfs,_1.si,_2.cfe,_2.cfs,_2.si,_3.cfe,_3.cfs,_3.si,_4.cfe,_4.cfs,_4.si,_5.cfe,_5.cfs,_5.si,segments_9,write.lock
[2024-12-02T13:33:36,209][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] Doing final sync before closing shard, remove_indexing_shards is enabled
[2024-12-02T13:33:36,209][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] Translog is empty before closing shard [[my_search_index][1]]
[2024-12-02T13:33:36,289][DEBUG][o.o.c.c.C.CoordinatorPublication] [runTask-0] publication ended successfully: Publication{term=1, version=24}
[2024-12-02T13:33:36,212][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] [my_search_index][1] Primary shard sync completed, waiting for remote store sync
[2024-12-02T13:33:36,216][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] The indexShard.isSyncNeeded() is false
[2024-12-02T13:33:36,212][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] Waiting for final sync to complete
[2024-12-02T13:33:36,220][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] [my_search_index][0] Primary shard starting final sync. Current segments: _0.cfe,_0.cfs,_0.si,_1.cfe,_1.cfs,_1.si,_2.cfe,_2.cfs,_2.si,_3.cfe,_3.cfs,_3.si,_4.cfe,_4.cfs,_4.si,_5.cfe,_5.cfs,_5.si,_6.cfe,_6.cfs,_6.si,_7.cfe,_7.cfs,_7.si,_8.cfe,_8.cfs,_8.si,segments_9,write.lock
[2024-12-02T13:33:36,213][INFO ][o.o.i.IndexService       ] [runTask-2] [my_search_index] [my_search_index][1] Primary shard final sync completed. Final segments: _0.cfe,_0.cfs,_0.si,_1.cfe,_1.cfs,_1.si,_2.cfe,_2.cfs,_2.si,_3.cfe,_3.cfs,_3.si,_4.cfe,_4.cfs,_4.si,_5.cfe,_5.cfs,_5.si,segments_9,write.lock
[2024-12-02T13:33:36,220][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] Doing final sync before closing shard, remove_indexing_shards is enabled
[2024-12-02T13:33:36,221][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] Translog is empty before closing shard [[my_search_index][0]]
[2024-12-02T13:33:36,223][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] [my_search_index][0] Primary shard sync completed, waiting for remote store sync
[2024-12-02T13:33:36,223][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] Waiting for final sync to complete
[2024-12-02T13:33:36,224][INFO ][o.o.i.IndexService       ] [runTask-4] [my_search_index] [my_search_index][0] Primary shard final sync completed. Final segments: _0.cfe,_0.cfs,_0.si,_1.cfe,_1.cfs,_1.si,_2.cfe,_2.cfs,_2.si,_3.cfe,_3.cfs,_3.si,_4.cfe,_4.cfs,_4.si,_5.cfe,_5.cfs,_5.si,_6.cfe,_6.cfs,_6.si,_7.cfe,_7.cfs,_7.si,_8.cfe,_8.cfs,_8.si,segments_9,write.lock

@shwetathareja
Copy link
Member

Thanks @prudhvigodithi for the detailed proposal!

Couple of things:

  1. Once the index setting - remove_indexing_shards is applied on the index, would it reject any new indexing traffic or wait for close shard to reject it. closeShard operation adds a INDEX_CLOSED_BLOCK block first which blocks both read and write. We may need a different block for scaling to zero for read/ write.
  2. I don't think we changed anything in the closeShard behavior between document replication (via local disk) or remote store segment replication. @sachinpkale please correct me if I missed something here.
  3. The current closeShard or close engine behavior is best effort and any exception while flushing may get swallowed.
    public void flushAndClose() throws IOException {
    if (isClosed.get() == false) {
    logger.trace("flushAndClose now acquire writeLock");
    try (ReleasableLock lock = writeLock.acquire()) {
    logger.trace("flushAndClose now acquired writeLock");
    try {
    logger.debug("flushing shard on close - this might take some time to sync files to disk");
    try {
    // TODO we might force a flush in the future since we have the write lock already even though recoveries
    // are running.
    flush();
    } catch (AlreadyClosedException ex) {
    logger.debug("engine already closed - skipping flushAndClose");
    }
    } finally {
    close(); // double close is not a problem
    }
    }
    }
    awaitPendingClose();
    }
    and it will proceed with closing the engine. You can simulate this by throwing exception from flush method.

Also, If i understand correctly search-replica are not capable of applying translog operations. We need a deterministic way to ensure all the segments are created and uploaded before the indexing copies (primary + replica) are closed gracefully.

@prudhvigodithi
Copy link
Member Author

Thanks @shwetathareja

  1. Once the index setting - remove_indexing_shards is applied on the index, would it reject any new indexing traffic or wait for close shard to reject it. closeShard operation adds a INDEX_CLOSED_BLOCK block first which blocks both read and write. We may need a different block for scaling to zero for read/ write.

The setting remove_indexing_shards.enabled first checks for blocks.write, which is expected to be enabled as a prerequisite. Once blocks.write is enabled,applying remove_indexing_shards.enabled ensures that any new indexing traffic is rejected.

curl -X PUT "http://localhost:9200/my_search_index/_settings" -H 'Content-Type: application/json' -d '{
  "index": {
    "blocks.write": true
  }
}'

2. I don't think we changed anything in the closeShard behavior between document replication (via local disk) or remote store segment replication.

Ya moving forward coming from latest comment #16720 (comment), the idea is to not touch the closeShard method to add the flush, final sync logic here, rather have a transport action that does this and update the routing table, once the routing tables are updated the closeShard method is eventually called.

Also, If i understand correctly search-replica are not capable of applying translog operations. We need a deterministic way to ensure all the segments are created and uploaded before the indexing copies (primary + replica) are closed gracefully.

When called indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); and indexShard.waitForRemoteStoreSync(); ensures the segments (pending/new) are created and uploaded to remote store? The waitForRemoteStoreSync blocks the calling thread, waiting for the remote store to synchronize until the internal Remote Upload Timeout, is there anything else I'm missing to ensure the remote store has all the latest segments ?.

@mch2
Copy link
Member

mch2 commented Dec 4, 2024

I think we should probably apply this through an explicit api similar to _close instead of an index setting given this is a destructive operation. And similarly an api to undo & bring the writers back. The sequence of events when its applied can then be largely similar to on close. I also +1 introducing a new block type that can't be explicitly removed through the block API.

So basically:

  1. API call -> _shutdown_writers (or something better :))
  2. Apply new block
  3. Validate before close similar to TransportVerifyShardBeforeCloseAction. Using indexShard.waitForRemoteStoreSync post flush makes sense to me here. I think if any shard fails at this point we should throw, undo the block & fail the request.
  4. update routing to remove primaries & wait for shard removal
  5. ack the call

@shwetathareja
Copy link
Member

shwetathareja commented Dec 4, 2024

+1 to @mch2 for having an explicit API for scale in or scale out operation like _scale. Also we need to ensure we stop writes, then refresh and verify there are no pending translogs and flush to ensure they are backed up on remote.

When you apply the index block, it will not be applied atomically to all the shards at the same time as it is a cluster state change and some nodes can be slow to apply cluster state change. The writes will keep on coming on some of the shards in the meantime.

@prudhvigodithi
Copy link
Member Author

prudhvigodithi commented Dec 24, 2024

I’ve been away for a while, but coming back to the implementation now. Thanks, everyone, for the suggestions! The existing setup uses index settings, but we can transition this to an explicit API (may be this can be part of the index API https://opensearch.org/docs/latest/api-reference/index-apis/index/) for scaling up or down the shards. We can also add an index block. Since the block won’t be applied atomically to all shards at once, the current POC first checks for blocks.write, which is expected to be enabled as a prerequisite. This ensures that any new indexing traffic is rejected. With this setting, we don’t need to handle the block here WDYT @shwetathareja @mch2. The API call will first verify the prerequisites, then perform actions to flush, sync the shards, update routing to remove primaries, wait for shard removal, and finally acknowledge the call.

Thank you

@prudhvigodithi
Copy link
Member Author

I was able to make this work with _scale API, example curl -X POST "http://localhost:9200/my-index/_scale/down". After some research I have noticed the setting a write block in cluster-wide is eventually consistent and relies on each node applying the new cluster state, which can introduce a short delay, better to use operation permits which are immediate (I assume local to each shard) which also what used in TransportVerifyShardBeforeCloseAction.

@getsaurabh02 getsaurabh02 moved this from Todo to In Progress in Performance Roadmap Jan 2, 2025
@getsaurabh02 getsaurabh02 added v2.19.0 Issues and PRs related to version 2.19.0 Roadmap:Search Project-wide roadmap label labels Jan 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Roadmap:Search Project-wide roadmap label Search:Performance v2.19.0 Issues and PRs related to version 2.19.0
Projects
Status: New
Status: In Progress
Status: 🆕 New
Development

No branches or pull requests

4 participants