Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor stream to notify first and next #13792

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Unreleased
- [changed] Internal change to stream implementation, to prepare for handshake. (#13792)

# 11.3.0
- [changed] Improve efficiency of memory persistence when processing a large number of writes. (#13572)

Expand Down
6 changes: 5 additions & 1 deletion Firestore/core/src/remote/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ bool Stream::IsStarted() const {

void Stream::Start() {
EnsureOnQueue();
response_count_ = 0;

if (state_ == State::Error) {
BackoffAndTryRestarting();
Expand Down Expand Up @@ -258,7 +259,10 @@ void Stream::OnStreamRead(const grpc::ByteBuffer& message) {
grpc_stream_->GetResponseHeaders()));
}

Status read_status = NotifyStreamResponse(message);
Status read_status = (++response_count_ == 1)
? NotifyFirstStreamResponse(message)
: NotifyNextStreamResponse(message);

if (!read_status.ok()) {
grpc_stream_->FinishImmediately();
// Don't expect gRPC to produce status -- since the error happened on the
Expand Down
5 changes: 4 additions & 1 deletion Firestore/core/src/remote/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ class Stream : public GrpcStreamObserver,
const std::string& app_check_token) = 0;
virtual void TearDown(GrpcStream* stream) = 0;
virtual void NotifyStreamOpen() = 0;
virtual util::Status NotifyStreamResponse(
virtual util::Status NotifyFirstStreamResponse(
const grpc::ByteBuffer& message) = 0;
virtual util::Status NotifyNextStreamResponse(
const grpc::ByteBuffer& message) = 0;
virtual void NotifyStreamClose(const util::Status& status) = 0;
// PORTING NOTE: C++ cannot rely on RTTI, unlike other platforms.
Expand Down Expand Up @@ -260,6 +262,7 @@ class Stream : public GrpcStreamObserver,
// Used to prevent auth if the stream happens to be restarted before token is
// received.
int close_count_ = 0;
int response_count_ = 0;
};

} // namespace remote
Expand Down
6 changes: 5 additions & 1 deletion Firestore/core/src/remote/watch_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ void WatchStream::NotifyStreamOpen() {
callback_->OnWatchStreamOpen();
}

Status WatchStream::NotifyStreamResponse(const grpc::ByteBuffer& message) {
Status WatchStream::NotifyFirstStreamResponse(const grpc::ByteBuffer& message) {
return NotifyNextStreamResponse(message);
}

Status WatchStream::NotifyNextStreamResponse(const grpc::ByteBuffer& message) {
ByteBufferReader reader{message};
auto response = watch_serializer_.ParseResponse(&reader);
if (!reader.ok()) {
Expand Down
5 changes: 4 additions & 1 deletion Firestore/core/src/remote/watch_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ class WatchStream : public Stream {
void TearDown(GrpcStream* grpc_stream) override;

void NotifyStreamOpen() override;
util::Status NotifyStreamResponse(const grpc::ByteBuffer& message) override;
util::Status NotifyFirstStreamResponse(
const grpc::ByteBuffer& message) override;
util::Status NotifyNextStreamResponse(
const grpc::ByteBuffer& message) override;
void NotifyStreamClose(const util::Status& status) override;

std::string GetDebugName() const override {
Expand Down
62 changes: 40 additions & 22 deletions Firestore/core/src/remote/write_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ const ByteString& WriteStream::last_stream_token() const {
return last_stream_token_;
}

void WriteStream::Start() {
handshake_complete_ = false;
Stream::Start();
}

void WriteStream::WriteHandshake() {
EnsureOnQueue();
HARD_ASSERT(IsOpen(), "Writing handshake requires an opened stream");
Expand Down Expand Up @@ -121,44 +126,57 @@ void WriteStream::NotifyStreamOpen() {

void WriteStream::NotifyStreamClose(const Status& status) {
callback_->OnWriteStreamClose(status);
// Delegate's logic might depend on whether handshake was completed, so only
// reset it after notifying.
handshake_complete_ = false;
}

Status WriteStream::NotifyStreamResponse(const grpc::ByteBuffer& message) {
Status WriteStream::NotifyFirstStreamResponse(const grpc::ByteBuffer& message) {
ByteBufferReader reader{message};
Message<google_firestore_v1_WriteResponse> response =
write_serializer_.ParseResponse(&reader);
if (!reader.ok()) {
return reader.status();
}

LOG_DEBUG("%s response: %s", GetDebugDescription(), response.ToString());
LOG_DEBUG("%s first response: %s", GetDebugDescription(),
response.ToString());

// Always capture the last stream token.
set_last_stream_token(ByteString::Take(response->stream_token));
response->stream_token = nullptr;

if (!handshake_complete()) {
// The first response is the handshake response
handshake_complete_ = true;
callback_->OnWriteStreamHandshakeComplete();
} else {
// A successful first write response means the stream is healthy.
// Note that we could consider a successful handshake healthy, however, the
// write itself might be causing an error we want to back off from.
backoff_.Reset();

auto version = write_serializer_.DecodeCommitVersion(&reader, *response);
auto results = write_serializer_.DecodeMutationResults(&reader, *response);
if (!reader.ok()) {
return reader.status();
}

callback_->OnWriteStreamMutationResult(version, std::move(results));
// The first response is the handshake response
handshake_complete_ = true;
callback_->OnWriteStreamHandshakeComplete();

return Status::OK();
}

Status WriteStream::NotifyNextStreamResponse(const grpc::ByteBuffer& message) {
ByteBufferReader reader{message};
Message<google_firestore_v1_WriteResponse> response =
write_serializer_.ParseResponse(&reader);
if (!reader.ok()) {
return reader.status();
}

LOG_DEBUG("%s next response: %s", GetDebugDescription(), response.ToString());

// Always capture the last stream token.
set_last_stream_token(ByteString::Take(response->stream_token));
response->stream_token = nullptr;

// A successful first write response means the stream is healthy.
// Note that we could consider a successful handshake healthy, however, the
// write itself might be causing an error we want to back off from.
backoff_.Reset();

auto version = write_serializer_.DecodeCommitVersion(&reader, *response);
auto results = write_serializer_.DecodeMutationResults(&reader, *response);
if (!reader.ok()) {
return reader.status();
}

callback_->OnWriteStreamMutationResult(version, std::move(results));

return Status::OK();
}

Expand Down
7 changes: 6 additions & 1 deletion Firestore/core/src/remote/write_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class WriteStream : public Stream {
return handshake_complete_;
}

void Start() override;

/**
* Sends an initial stream token to the server, performing the handshake
* required to make the StreamingWrite RPC work.
Expand All @@ -143,7 +145,10 @@ class WriteStream : public Stream {
void TearDown(GrpcStream* grpc_stream) override;

void NotifyStreamOpen() override;
util::Status NotifyStreamResponse(const grpc::ByteBuffer& message) override;
util::Status NotifyFirstStreamResponse(
const grpc::ByteBuffer& message) override;
util::Status NotifyNextStreamResponse(
const grpc::ByteBuffer& message) override;
void NotifyStreamClose(const util::Status& status) override;

std::string GetDebugName() const override {
Expand Down
27 changes: 22 additions & 5 deletions Firestore/core/test/unit/remote/stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,31 @@ class TestStream : public Stream {
observed_states_.push_back("NotifyStreamOpen");
}

util::Status NotifyStreamResponse(const grpc::ByteBuffer& message) override {
util::Status NotifyFirstStreamResponse(
const grpc::ByteBuffer& message) override {
std::string str = ByteBufferToString(message);
if (str.empty()) {
observed_states_.push_back("NotifyStreamResponse");
observed_states_.push_back("NotifyFirstStreamResponse");
} else {
observed_states_.push_back(StringFormat("NotifyStreamResponse(%s)", str));
observed_states_.push_back(
StringFormat("NotifyFirstStreamResponse(%s)", str));
}
return ResolveStreamResponse();
}

util::Status NotifyNextStreamResponse(
const grpc::ByteBuffer& message) override {
std::string str = ByteBufferToString(message);
if (str.empty()) {
observed_states_.push_back("NotifyNextStreamResponse");
} else {
observed_states_.push_back(
StringFormat("NotifyNextStreamResponse(%s)", str));
}
return ResolveStreamResponse();
}

util::Status ResolveStreamResponse() {
if (fail_next_stream_read_) {
fail_next_stream_read_ = false;
// The parent stream will issue a finish operation and block until it's
Expand Down Expand Up @@ -294,8 +311,8 @@ TEST_F(StreamTest, ObserverReceivesStreamRead) {
EXPECT_TRUE(firestore_stream->IsStarted());
EXPECT_TRUE(firestore_stream->IsOpen());
EXPECT_EQ(observed_states(),
States({"NotifyStreamOpen", "NotifyStreamResponse(foo)",
"NotifyStreamResponse(bar)"}));
States({"NotifyStreamOpen", "NotifyFirstStreamResponse(foo)",
"NotifyNextStreamResponse(bar)"}));
});
}

Expand Down
Loading