From 56e48c75856847bd47106919b048f22a576dd2a8 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Thu, 17 Aug 2023 17:12:46 +0530 Subject: [PATCH] test: add test for aborted mutations --- tests/unit/test_session.py | 94 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 3125e33f21..2fe881e0a0 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -1187,6 +1187,100 @@ def unit_of_work(txn, *args, **kw): * 2, ) + def test_run_in_transaction_w_mutations_before_abort(self): + import datetime + from google.api_core.exceptions import Aborted + from google.protobuf.duration_pb2 import Duration + from google.rpc.error_details_pb2 import RetryInfo + from google.cloud.spanner_v1 import CommitRequest + from google.cloud.spanner_v1 import CommitResponse + from google.cloud.spanner_v1 import ( + Transaction as TransactionPB, + TransactionOptions, + ) + from google.cloud._helpers import UTC + from google.cloud._helpers import _datetime_to_pb_timestamp + from google.cloud.spanner_v1.transaction import Transaction + + TABLE_NAME = "citizens" + COLUMNS = ["email", "first_name", "last_name", "age"] + VALUES = [ + ["phred@exammple.com", "Phred", "Phlyntstone", 32], + ["bharney@example.com", "Bharney", "Rhubble", 31], + ] + TRANSACTION_ID = b"FACEDACE" + RETRY_SECONDS = 1 + RETRY_NANOS = 3456 + transaction_pb = TransactionPB(id=TRANSACTION_ID) + now = datetime.datetime.utcnow().replace(tzinfo=UTC) + now_pb = _datetime_to_pb_timestamp(now) + response = CommitResponse(commit_timestamp=now_pb) + retry_info = RetryInfo( + retry_delay=Duration(seconds=RETRY_SECONDS, nanos=RETRY_NANOS) + ) + trailing_metadata = [ + ("google.rpc.retryinfo-bin", retry_info.SerializeToString()) + ] + gax_api = self._make_spanner_api() + gax_api.begin_transaction.return_value = transaction_pb + gax_api.commit.side_effect = [response] + database = self._make_database() + database.spanner_api = gax_api + session = self._make_one(database) + session._session_id = self.SESSION_ID + + called_with = [] + + def unit_of_work(txn, *args, **kw): + called_with.append((txn, args, kw)) + txn.insert(TABLE_NAME, COLUMNS, VALUES) + if len(called_with) < 2: + raise _make_rpc_error(Aborted, trailing_metadata) + txn.insert(TABLE_NAME, COLUMNS, VALUES) + + with mock.patch("time.sleep") as sleep_mock: + session.run_in_transaction(unit_of_work) + + sleep_mock.assert_called_once_with(RETRY_SECONDS + RETRY_NANOS / 1.0e9) + self.assertEqual(len(called_with), 2) + for index, (txn, args, kw) in enumerate(called_with): + self.assertIsInstance(txn, Transaction) + if index == 0: + # Before abort should have two mutations. + self.assertEqual(len(txn._mutations), 1) + self.assertIsNone(txn.committed) + else: + # Commit request should have two mutations. + self.assertEqual(len(txn._mutations), 2) + self.assertEqual(txn.committed, now) + self.assertEqual(args, ()) + self.assertEqual(kw, {}) + + expected_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) + + # First call was aborted before commit operation, therefore no begin rpc was made during first attempt. + gax_api.begin_transaction.assert_called_once_with( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) + request = CommitRequest( + session=self.SESSION_NAME, + mutations=txn._mutations, + transaction_id=TRANSACTION_ID, + request_options=RequestOptions(), + ) + gax_api.commit.assert_called_once_with( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) + def test_run_in_transaction_w_callback_raises_abort_wo_metadata(self): import datetime from google.api_core.exceptions import Aborted