Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add test for aborted mutations #1005

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,100 @@ def unit_of_work(txn, *args, **kw):
* 2,
)

def test_run_in_transaction_w_mutations_before_abort(self):
import datetime
from google.api_core.exceptions import Aborted
from google.protobuf.duration_pb2 import Duration
from google.rpc.error_details_pb2 import RetryInfo
from google.cloud.spanner_v1 import CommitRequest
from google.cloud.spanner_v1 import CommitResponse
from google.cloud.spanner_v1 import (
Transaction as TransactionPB,
TransactionOptions,
)
from google.cloud._helpers import UTC
from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud.spanner_v1.transaction import Transaction

TABLE_NAME = "citizens"
COLUMNS = ["email", "first_name", "last_name", "age"]
VALUES = [
["[email protected]", "Phred", "Phlyntstone", 32],
["[email protected]", "Bharney", "Rhubble", 31],
]
TRANSACTION_ID = b"FACEDACE"
RETRY_SECONDS = 1
RETRY_NANOS = 3456
transaction_pb = TransactionPB(id=TRANSACTION_ID)
now = datetime.datetime.utcnow().replace(tzinfo=UTC)
now_pb = _datetime_to_pb_timestamp(now)
response = CommitResponse(commit_timestamp=now_pb)
retry_info = RetryInfo(
retry_delay=Duration(seconds=RETRY_SECONDS, nanos=RETRY_NANOS)
)
trailing_metadata = [
("google.rpc.retryinfo-bin", retry_info.SerializeToString())
]
gax_api = self._make_spanner_api()
gax_api.begin_transaction.return_value = transaction_pb
gax_api.commit.side_effect = [response]
database = self._make_database()
database.spanner_api = gax_api
session = self._make_one(database)
session._session_id = self.SESSION_ID

called_with = []

def unit_of_work(txn, *args, **kw):
called_with.append((txn, args, kw))
txn.insert(TABLE_NAME, COLUMNS, VALUES)
if len(called_with) < 2:
raise _make_rpc_error(Aborted, trailing_metadata)
txn.insert(TABLE_NAME, COLUMNS, VALUES)

with mock.patch("time.sleep") as sleep_mock:
session.run_in_transaction(unit_of_work)

sleep_mock.assert_called_once_with(RETRY_SECONDS + RETRY_NANOS / 1.0e9)
self.assertEqual(len(called_with), 2)
for index, (txn, args, kw) in enumerate(called_with):
self.assertIsInstance(txn, Transaction)
if index == 0:
# Before abort should have two mutations.
self.assertEqual(len(txn._mutations), 1)
self.assertIsNone(txn.committed)
else:
# Commit request should have two mutations.
self.assertEqual(len(txn._mutations), 2)
self.assertEqual(txn.committed, now)
self.assertEqual(args, ())
self.assertEqual(kw, {})

expected_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())

# First call was aborted before commit operation, therefore no begin rpc was made during first attempt.
gax_api.begin_transaction.assert_called_once_with(
session=self.SESSION_NAME,
options=expected_options,
metadata=[
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
)
request = CommitRequest(
session=self.SESSION_NAME,
mutations=txn._mutations,
transaction_id=TRANSACTION_ID,
request_options=RequestOptions(),
)
gax_api.commit.assert_called_once_with(
request=request,
metadata=[
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
)

def test_run_in_transaction_w_callback_raises_abort_wo_metadata(self):
import datetime
from google.api_core.exceptions import Aborted
Expand Down