Cloud Spanner has officially released native change streams support, which is recommended instead of this solution.
For more information see https://cloud.google.com/spanner/docs/change-streams.
Spanner Change Publisher watches a Spanner database for changes and publishes these changes to Pubsub. It can monitor a single table, a set of tables, or all tables in a database.
Spanner Change Publisher can be included in an existing application, or it can be deployed as a standalone application.
A further introduction to Spanner Change Publisher can be found here.
Include the following dependency in your application to use Spanner Change Publisher.
<dependency>
<groupId>com.google.cloudspannerecosystem</groupId>
<artifactId>google-cloud-spanner-change-publisher</artifactId>
<version>1.2.1</version>
</dependency>
Publishes change events from a single table in a Spanner database to a Pubsub topic. The changed record is included in the Pubsub message as an Avro record.
String instance = "my-instance";
String database = "my-database";
String table = "MY_TABLE";
String topicName =
String.format("projects/%s/topics/my-topic", ServiceOptions.getDefaultProjectId());
// Setup a Spanner change watcher.
Spanner spanner = SpannerOptions.getDefaultInstance().getService();
DatabaseId databaseId = DatabaseId.of(SpannerOptions.getDefaultProjectId(), instance, database);
TableId tableId = TableId.of(databaseId, table);
SpannerTableChangeWatcher watcher = SpannerTableTailer.newBuilder(spanner, tableId).build();
// Setup Spanner change publisher.
DatabaseClient client = spanner.getDatabaseClient(databaseId);
SpannerTableChangeEventPublisher eventPublisher =
SpannerTableChangeEventPublisher.newBuilder(watcher, client)
.setTopicName(topicName)
.build();
// Start the change publisher. This will automatically also start the change watcher.
eventPublisher.startAsync().awaitRunning();
Publishes change events from all tables in a Spanner database to a separate Pubsub topic per table. The changed record is included in the Pubsub message as an Avro record.
String instance = "my-instance";
String database = "my-database";
// The %table% placeholder will automatically be replaced with the name of the table where the
// change occurred.
String topicNameFormat =
String.format(
"projects/%s/topics/spanner-update-%%table%%", ServiceOptions.getDefaultProjectId());
// Setup Spanner change watcher.
Spanner spanner = SpannerOptions.newBuilder().build().getService();
DatabaseId databaseId = DatabaseId.of(SpannerOptions.getDefaultProjectId(), instance, database);
SpannerDatabaseChangeWatcher watcher =
SpannerDatabaseTailer.newBuilder(spanner, databaseId).allTables().build();
// Setup Spanner change publisher.
DatabaseClient client = spanner.getDatabaseClient(databaseId);
SpannerDatabaseChangeEventPublisher eventPublisher =
SpannerDatabaseChangeEventPublisher.newBuilder(watcher, client)
.setTopicNameFormat(topicNameFormat)
.build();
// Start the change publisher. This will automatically also start the change watcher.
eventPublisher.startAsync().awaitRunning();
Take a look at Samples.java for additional examples of more advanced use cases.
Spanner Change Event Publisher can be run as a standalone application to publish changes from a Spanner database to Pubsub. Follow these steps to build and start the application:
- Build the application including all dependencies by executing
mvn package
in the root of this project. This will generate the filespanner-publisher.jar
in the target folder. - Configure the required properties to specify the Spanner database to monitor and the Pubsub topic to publish to. The configuration can be specified using system properties or a properties file. The below example uses the minimum set of system properties that is needed to monitor all tables in a database and publish the changes to a separate Pubsub topic per table. See the scep.properties.example file in the resources folder for a full list of properties. You can also copy this file and use it as a starting point for your own properties file.
- Start the application using the command
java -Dscep.spanner.instance=my-instance -Dscep.spanner.database=my-database -Dscep.pubsub.topicNameFormat=spanner-update-%table% -jar target/spanner-publisher.jar
.
Additional configuration can be specified using system properties or a
properties file. The application will by default look for a file named
scep.properties
in the current directory, but a different file and/or
location can be specified using the system property scep.properties
.
Example:
java -Dscep.properties=/path/to/configuration.properties spanner-publisher.jar
The most important configuration parameters are:
# Spanner database to monitor
scep.spanner.project=my-project
scep.spanner.instance=my-instance
scep.spanner.database=my-database
# Specific credentials to use for Spanner
scep.spanner.credentials=/path/to/credentials.json
# Selection of which tables to monitor
# Monitor all tables in the database. Cannot be used in combination with
# scep.spanner.includedTables
scep.spanner.allTables=true
# Exclude these tables. Can only be used in combination with
# scep.spanner.allTables=true
scep.spanner.excludedTables=TABLE1,TABLE2,TABLE3
# Include these tables. Can only be used in combination with
# scep.spanner.allTables=false
scep.spanner.includedTables=TABLE1,TABLE2,TABLE3
# Pubsub project and topic name (format)
scep.pubsub.project=my-pubsub-project
scep.pubsub.topicNameFormat=spanner-update-%database%-%table%
# Specific credentials to use for Pubsub
scep.pubsub.credentials=/path/to/credentials.json
# Converter factory to convert from Spanner to Pubsub.
scep.pubsub.converterFactory=com.google.cloud.spanner.publisher.SpannerToJsonFactory
The spanner-publisher.jar
contains a complete example of all possible
parameters in the root of the .jar named scep.properties.example
.
The standalone application also has a simple demo mode that can be used to test the application. The demo mode only works with the emulators of Spanner and PubSub and is only intended for testing.
Follow these steps to execute the application in demo mode:
- Build and install all dependencies:
cd spanner-change-watcher
mvn clean install -DskipTests
- Copy the
scep.properties.example
file from the google-cloud-spanner-change-publisher/src/main/resources folder to the root folder of the google-cloud-spanner-change-publisher project and namescep.properties
. Change the value ofscep.demoMode
from false to true.
cd google-cloud-spanner-change-publisher
cp src/main/resources/scep.properties.example ./scep.properties
sed -i "" 's/scep.demoMode=false/scep.demoMode=true/' scep.properties
- Execute the following commands to start and configure the Spanner and PubSub emulators:
(gcloud beta emulators spanner start)&
(gcloud beta emulators pubsub start --project=my-pubsub-project --host-port=localhost:8085)&
export SPANNER_EMULATOR_HOST=localhost:9010
export PUBSUB_EMULATOR_HOST=localhost:8085
- Package and run the application in demo mode (execute in the google-cloud-spanner-change-publisher folder):
mvn package -DskipTests
java -jar target/spanner-publisher.jar
The application will automatically connect to the emulators that you have started and create a couple of Spanner tables, PubSub topics and a PubSub subscriber. The application should print two changes to the console.
Any application can subscribe to the changes that are published to Pubsub and take any appropriate action based on the data that is received.
String instance = "my-instance";
String database = "my-database";
String subscription = "projects/my-project/subscriptions/my-subscription";
// Create a Spanner client as we need this to decode the Avro records.
Spanner spanner = SpannerOptions.getDefaultInstance().getService();
DatabaseId databaseId = DatabaseId.of(SpannerOptions.getDefaultProjectId(), instance, database);
DatabaseClient client = spanner.getDatabaseClient(databaseId);
// Keep a cache of converters from Avro to Spanner as these are expensive to create.
Map<TableId, SpannerToAvro> converters = new HashMap<>();
// Start a subscriber.
Subscriber subscriber =
Subscriber.newBuilder(
ProjectSubscriptionName.of(pubsubProjectId, subscription),
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
// Get the change metadata.
DatabaseId database = DatabaseId.of(message.getAttributesOrThrow("Database"));
TableId table = TableId.of(database, message.getAttributesOrThrow("Table"));
Timestamp commitTimestamp =
Timestamp.parseTimestamp(message.getAttributesOrThrow("Timestamp"));
// Get the changed row and decode the data.
SpannerToAvro converter = converters.get(table);
if (converter == null) {
converter = new SpannerToAvro(client, table);
}
try {
GenericRecord record = converter.decodeRecord(message.getData());
System.out.println("--- Received changed record ---");
System.out.printf("Database: %s%n", database);
System.out.printf("Table: %s%n", table);
System.out.printf("Commit timestamp: %s%n", commitTimestamp);
System.out.printf("Data: %s%n", record);
} catch (Exception e) {
System.err.printf("Failed to decode avro record: %s%n", e.getMessage());
} finally {
consumer.ack();
}
}
})
.build();
subscriber.startAsync().awaitRunning();
- Cloud Spanner has officially released native change streams support, which is recommended instead of this solution. For more information see https://cloud.google.com/spanner/docs/change-streams.
- Spanner Change Publisher does not support PostgreSQL dialect databases.
- Spanner Change Publisher use commit timestamps to determine when a change has occurred. They cannot be used on tables that do not include a commit timestamp.
- Deletes are not detected, unless these are soft deletes that only update a deleted flag in the corresponding table.
- Spanner Change Publisher polls tables for changes. Polling on larger tables can take some time and cause some delay before a change is detected. The default poll interval is 1 second and is configurable. It does support sharding to lower the load on large tables. Take a look at the samples for Spanner Change Watcher for more information.
- Spanner Change Publisher emits changes on a row level basis, including the commit timestamp of the change. It does not emit an even containing all changes of a single transaction. If that is needed, the client application will need to group the row level changes together based on the commit timestamp.
- Spanner Change Publisher is not a managed solution and does not come with Cloud Spanner's SLO.
Please feel free to report issues and send pull requests, but note that this application is not officially supported as part of the Cloud Spanner product.