From bc2c9652aaae38d0d130e0aa869b4882e0e0512f Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 16 Aug 2024 11:31:37 +0200 Subject: [PATCH 1/3] Refactor BQ to expose all beam's configurations --- .../scio/bigquery/BigQueryClientIT.scala | 2 +- .../spotify/scio/bigquery/BigQueryIOIT.scala | 4 +- .../scio/bigquery/TypedBigQueryIT.scala | 35 +- .../bigquery/types/BigQueryStorageIT.scala | 16 +- .../scio/bigquery/types/BigQueryTypeIT.scala | 7 +- .../scala/com/spotify/scio/ScioContext.scala | 5 + .../coders/instances/BeamTypeCoders.scala | 6 + .../com/spotify/scio/values/ErrorSink.scala | 56 + .../scio/examples/complete/AutoComplete.scala | 2 +- .../complete/StreamingWordExtract.scala | 2 +- .../complete/TrafficMaxLaneFlow.scala | 2 +- .../examples/complete/TrafficRoutes.scala | 2 +- .../examples/complete/game/GameStats.scala | 4 +- .../complete/game/HourlyTeamScore.scala | 2 +- .../examples/complete/game/LeaderBoard.scala | 4 +- .../examples/complete/game/UserScore.scala | 2 +- .../examples/cookbook/BigQueryTornadoes.scala | 4 +- .../cookbook/CombinePerKeyExamples.scala | 4 +- .../cookbook/DistinctByKeyExample.scala | 4 +- .../examples/cookbook/FilterExamples.scala | 4 +- .../scio/examples/cookbook/JoinExamples.scala | 20 +- .../examples/cookbook/MaxPerKeyExamples.scala | 4 +- .../cookbook/StorageBigQueryTornadoes.scala | 14 +- .../examples/cookbook/TriggerExample.scala | 2 +- .../extra/TypedBigQueryTornadoes.scala | 2 +- .../extra/TypedStorageBigQueryTornadoes.scala | 2 +- .../StorageBigQueryTornadoesTest.scala | 16 +- .../TypedStorageBigQueryTornadoesTest.scala | 9 +- .../spotify/scio/bigquery/BigQueryIO.scala | 1050 +++++------------ .../spotify/scio/bigquery/BigQueryTypes.scala | 125 +- .../spotify/scio/bigquery/MockBigQuery.scala | 4 +- .../scio/bigquery/client/BigQuery.scala | 4 +- .../scio/bigquery/client/QueryOps.scala | 2 +- .../dynamic/syntax/SCollectionSyntax.scala | 8 +- .../bigquery/syntax/SCollectionSyntax.scala | 119 +- .../bigquery/syntax/ScioContextSyntax.scala | 325 +++-- .../com/spotify/scio/bigquery/taps.scala | 107 +- .../scio/bigquery/BigQueryIOTest.scala | 231 ++-- .../scio/bigquery/BigQueryTypesTest.scala | 4 +- site/src/main/paradox/FAQ.md | 2 +- site/src/main/paradox/io/BigQuery.md | 2 +- 41 files changed, 1001 insertions(+), 1218 deletions(-) create mode 100644 scio-core/src/main/scala/com/spotify/scio/values/ErrorSink.scala diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryClientIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryClientIT.scala index 02d2914e66..295c671ea0 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryClientIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryClientIT.scala @@ -154,7 +154,7 @@ class BigQueryClientIT extends AnyFlatSpec with Matchers { "TableService.getRows" should "work" in { val rows = - bq.tables.rows(Table.Spec("bigquery-public-data:samples.shakespeare")).take(10).toList + bq.tables.rows(Table("bigquery-public-data:samples.shakespeare")).take(10).toList val columns = Set("word", "word_count", "corpus", "corpus_date") all(rows.map(_.keySet().asScala)) shouldBe columns } diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala index b29dc500dc..4e6c46f2b2 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala @@ -45,7 +45,7 @@ class BigQueryIOIT extends PipelineSpec { "Select" should "read typed values from a SQL query" in runWithRealContext(options) { sc => - val scoll = sc.read(BigQueryTyped[ShakespeareFromQuery]) + val scoll = sc.typedBigQueryStorage[ShakespeareFromQuery]() scoll should haveSize(10) scoll should satisfy[ShakespeareFromQuery] { _.forall(_.getClass == classOf[ShakespeareFromQuery]) @@ -54,7 +54,7 @@ class BigQueryIOIT extends PipelineSpec { "TableRef" should "read typed values from table" in runWithRealContext(options) { sc => - val scoll = sc.read(BigQueryTyped[ShakespeareFromTable]) + val scoll = sc.typedBigQueryStorage[ShakespeareFromTable]() scoll.take(10) should haveSize(10) scoll should satisfy[ShakespeareFromTable] { _.forall(_.getClass == classOf[ShakespeareFromTable]) diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala index 74cba90165..78930009e4 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala @@ -20,7 +20,6 @@ package com.spotify.scio.bigquery import com.google.protobuf.ByteString import com.spotify.scio._ import com.spotify.scio.avro._ -import com.spotify.scio.bigquery.BigQueryTypedTable.Format import com.spotify.scio.bigquery.client.BigQuery import com.spotify.scio.testing._ import magnolify.scalacheck.auto._ @@ -69,7 +68,7 @@ object TypedBigQueryIT { val now = Instant.now().toString(TIME_FORMATTER) val spec = s"data-integration-test:bigquery_avro_it.$name${now}_${Random.nextInt(Int.MaxValue)}" - Table.Spec(spec) + Table(spec) } private val tableRowTable = table("records_tablerow") private val avroTable = table("records_avro") @@ -101,37 +100,25 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { BigQuery.defaultInstance().tables.delete(avroLogicalTypeTable.ref) } - "TypedBigQuery" should "read records" in { + "typedBigQuery" should "read records" in { val sc = ScioContext(options) sc.typedBigQuery[Record](tableRowTable) should containInAnyOrder(records) sc.run() } - it should "convert to avro format" in { + "bigQueryTableFormat" should "read TableRow records" in { val sc = ScioContext(options) - implicit val coder = avroGenericRecordCoder(Record.avroSchema) - sc.typedBigQuery[Record](tableRowTable) - .map(Record.toAvro) - .map(Record.fromAvro) should containInAnyOrder( - records - ) + val format = BigQueryIO.Format.Default(BigQueryType[Record]) + val data = sc.bigQueryTableFormat(tableRowTable, format) + data should containInAnyOrder(records) sc.run() } - "BigQueryTypedTable" should "read TableRow records" in { + it should "read GenericRecord records" in { val sc = ScioContext(options) - sc - .bigQueryTable(tableRowTable) - .map(Record.fromTableRow) should containInAnyOrder(records) - sc.run() - } - - it should "read GenericRecord recors" in { - val sc = ScioContext(options) - implicit val coder = avroGenericRecordCoder(Record.avroSchema) - sc - .bigQueryTable(tableRowTable, Format.GenericRecord) - .map(Record.fromAvro) should containInAnyOrder(records) + val format = BigQueryIO.Format.Avro(BigQueryType[Record]) + val data = sc.bigQueryTableFormat(tableRowTable, format) + data should containInAnyOrder(records) sc.run() } @@ -157,7 +144,7 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { |} """.stripMargin) val tap = sc - .bigQueryTable(tableRowTable, Format.GenericRecord) + .bigQueryTableFormat(tableRowTable, BigQueryIO.Format.Avro()) .saveAsBigQueryTable(avroTable, schema = schema, createDisposition = CREATE_IF_NEEDED) val result = sc.run().waitUntilDone() diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala index 0f12f40135..20f86fce81 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala @@ -125,7 +125,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) val p = sc - .typedBigQuery[NestedWithFields]() + .typedBigQueryStorage[NestedWithFields]() .map(r => (r.required.int, r.required.string, r.optional.get.int)) .internal PAssert.that(p).containsInAnyOrder(expected) @@ -139,7 +139,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) val p = sc - .typedBigQuery[NestedWithRestriction]() + .typedBigQueryStorage[NestedWithRestriction]() .map { r => val (req, opt, rep) = (r.required, r.optional.get, r.repeated.head) (req.int, req.string, opt.int, opt.string, rep.int, rep.string) @@ -155,8 +155,10 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) + val bqt = BigQueryType[NestedWithRestriction] + val source = Table(bqt.table.get, "required.int < 3") val p = sc - .typedBigQueryStorage[NestedWithRestriction](rowRestriction = "required.int < 3") + .typedBigQueryStorage[NestedWithRestriction](source) .map { r => val (req, opt, rep) = (r.required, r.optional.get, r.repeated.head) (req.int, req.string, opt.int, opt.string, rep.int, rep.string) @@ -172,7 +174,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) val p = sc - .typedBigQuery[NestedWithAll](Table.Spec(NestedWithAll.table.format("nested"))) + .typedBigQueryStorage[NestedWithAll](Table(NestedWithAll.table.format("nested"))) .map(r => (r.required.int, r.required.string, r.optional.get.int)) .internal PAssert.that(p).containsInAnyOrder(expected) @@ -232,7 +234,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) - val p = sc.typedBigQuery[FromTable]().internal + val p = sc.typedBigQueryStorage[FromTable]().internal PAssert.that(p).containsInAnyOrder(expected) sc.run() } @@ -243,7 +245,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) val p = sc - .typedBigQueryStorage[ToTableRequired](Table.Spec("data-integration-test:storage.required")) + .typedBigQueryStorage[ToTableRequired](Table("data-integration-test:storage.required")) .internal PAssert.that(p).containsInAnyOrder(expected) sc.run() @@ -272,7 +274,7 @@ class BigQueryStorageIT extends AnyFlatSpec with Matchers { val (sc, _) = ContextAndArgs( Array("--project=data-integration-test", "--tempLocation=gs://data-integration-test-eu/temp") ) - val p = sc.typedBigQuery[FromQuery]().internal + val p = sc.typedBigQueryStorage[FromQuery]().internal PAssert.that(p).containsInAnyOrder(expected) sc.run() } diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryTypeIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryTypeIT.scala index c08ea44056..78b36c29d5 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryTypeIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryTypeIT.scala @@ -187,10 +187,9 @@ class BigQueryTypeIT extends AnyFlatSpec with Matchers { tableReference.setProjectId("data-integration-test") tableReference.setDatasetId("partition_a") tableReference.setTableId("table_$LATEST") - Table.Ref(tableReference).latest().ref.getTableId shouldBe "table_20170302" + Table(tableReference).latest().ref.getTableId shouldBe "table_20170302" - Table - .Spec("data-integration-test:partition_a.table_$LATEST") + Table("data-integration-test:partition_a.table_$LATEST") .latest() .ref .getTableId shouldBe "table_20170302" @@ -210,7 +209,7 @@ class BigQueryTypeIT extends AnyFlatSpec with Matchers { val bqt = BigQueryType[FromTableT] bqt.isQuery shouldBe false bqt.isTable shouldBe true - bqt.query shouldBe None + bqt.queryRaw shouldBe None bqt.table shouldBe Some("bigquery-public-data:samples.shakespeare") val fields = bqt.schema.getFields.asScala fields.size shouldBe 4 diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index ce3e169b6a..0668087905 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -851,6 +851,11 @@ class ScioContext private[scio] ( this.applyTransform(Create.timestamped(v.asJava).withCoder(coder)) } + // ======================================================================= + // Error handler + // ======================================================================= + def errorSink(): ErrorSink = ErrorSink(this) + // ======================================================================= // Metrics // ======================================================================= diff --git a/scio-core/src/main/scala/com/spotify/scio/coders/instances/BeamTypeCoders.scala b/scio-core/src/main/scala/com/spotify/scio/coders/instances/BeamTypeCoders.scala index df8cca640b..9e77d0c23d 100644 --- a/scio-core/src/main/scala/com/spotify/scio/coders/instances/BeamTypeCoders.scala +++ b/scio-core/src/main/scala/com/spotify/scio/coders/instances/BeamTypeCoders.scala @@ -29,6 +29,7 @@ import org.apache.beam.sdk.io.FileIO.ReadableFile import org.apache.beam.sdk.io.fs.{MatchResult, MetadataCoderV2, ResourceId, ResourceIdCoder} import org.apache.beam.sdk.io.ReadableFileCoder import org.apache.beam.sdk.schemas.{Schema => BSchema} +import org.apache.beam.sdk.transforms.errorhandling.BadRecord import org.apache.beam.sdk.transforms.windowing.{ BoundedWindow, GlobalWindow, @@ -66,6 +67,11 @@ trait BeamTypeCoders extends CoderGrammar { str => DefaultJsonObjectParser.parseAndClose(new StringReader(str), ScioUtil.classOf[T]), DefaultJsonObjectParser.getJsonFactory().toString(_) ) + + // rely on serializable + implicit val badRecordCoder: Coder[BadRecord] = kryo + implicit val badRecordRecordCoder: Coder[BadRecord.Record] = kryo + implicit val badRecordFailurCoder: Coder[BadRecord.Failure] = kryo } private[coders] object BeamTypeCoders extends BeamTypeCoders { diff --git a/scio-core/src/main/scala/com/spotify/scio/values/ErrorSink.scala b/scio-core/src/main/scala/com/spotify/scio/values/ErrorSink.scala new file mode 100644 index 0000000000..d8f9959f5f --- /dev/null +++ b/scio-core/src/main/scala/com/spotify/scio/values/ErrorSink.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.values + +import com.spotify.scio.ScioContext +import org.apache.beam.sdk.transforms.PTransform +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} +import org.apache.beam.sdk.values.{PCollection, PCollectionTuple, TupleTag} + +/** + * A sink for error records. + * + * Once the [[sink]] is materialized, the [[handler]] must not be used anymore. + */ +sealed trait ErrorSink { + def handler: ErrorHandler[BadRecord, _] + def sink: SCollection[BadRecord] +} + +object ErrorSink { + + private class SinkSideOutput(tag: TupleTag[BadRecord]) + extends PTransform[PCollection[BadRecord], PCollectionTuple] { + override def expand(input: PCollection[BadRecord]): PCollectionTuple = + PCollectionTuple.of(tag, input) + } + + private[scio] def apply(context: ScioContext): ErrorSink = { + new ErrorSink { + private val tupleTag: TupleTag[BadRecord] = new TupleTag[BadRecord]() + + override val handler: ErrorHandler[BadRecord, PCollectionTuple] = + context.pipeline.registerBadRecordErrorHandler(new SinkSideOutput(tupleTag)) + + override def sink: SCollection[BadRecord] = { + handler.close() + val output = handler.getOutput + context.wrap(output.get(tupleTag)) + } + } + } +} diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/AutoComplete.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/AutoComplete.scala index 62ffb8440a..a70839ccae 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/AutoComplete.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/AutoComplete.scala @@ -145,7 +145,7 @@ object AutoComplete { if (outputToBigqueryTable) { tags .map(kv => Record(kv._1, kv._2.map(p => Tag(p._1, p._2)).toList)) - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) } if (outputToDatastore) { val kind = args.getOrElse("kind", "autocomplete-demo") diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/StreamingWordExtract.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/StreamingWordExtract.scala index e6d428cc04..0eafc26805 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/StreamingWordExtract.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/StreamingWordExtract.scala @@ -49,7 +49,7 @@ object StreamingWordExtract { .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .map(_.toUpperCase) .map(s => TableRow("string_field" -> s)) - .saveAsBigQueryTable(Table.Spec(args("output")), schema) + .saveAsBigQueryTable(Table(args("output")), schema) val result = sc.run() exampleUtils.waitToFinish(result.pipelineResult) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficMaxLaneFlow.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficMaxLaneFlow.scala index 34fc0f9f5b..78cdecc7b4 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficMaxLaneFlow.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficMaxLaneFlow.scala @@ -126,7 +126,7 @@ object TrafficMaxLaneFlow { ts ) } - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) val result = sc.run() exampleUtils.waitToFinish(result.pipelineResult) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficRoutes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficRoutes.scala index c24b283b34..f9f4b3511d 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficRoutes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/TrafficRoutes.scala @@ -111,7 +111,7 @@ object TrafficRoutes { .map { case (r, ts) => Record(r.route, r.avgSpeed, r.slowdownEvent, ts) } - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) val result = sc.run() exampleUtils.waitToFinish(result.pipelineResult) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/GameStats.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/GameStats.scala index 1750c3c3ef..01d846c6fd 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/GameStats.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/GameStats.scala @@ -113,7 +113,7 @@ object GameStats { // Done using windowing information, convert back to regular `SCollection` .toSCollection // Save to the BigQuery table defined by "output" in the arguments passed in + "_team" suffix - .saveAsTypedBigQueryTable(Table.Spec(args("output") + "_team")) + .saveAsTypedBigQueryTable(Table(args("output") + "_team")) userEvents // Window over a variable length of time - sessions end after sessionGap minutes no activity @@ -141,7 +141,7 @@ object GameStats { AvgSessionLength(mean, fmt.print(w.start())) } // Save to the BigQuery table defined by "output" + "_sessions" suffix - .saveAsTypedBigQueryTable(Table.Spec(args("output") + "_sessions")) + .saveAsTypedBigQueryTable(Table(args("output") + "_sessions")) // Execute the pipeline val result = sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/HourlyTeamScore.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/HourlyTeamScore.scala index 3baf8c8c0e..95fb9ebd29 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/HourlyTeamScore.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/HourlyTeamScore.scala @@ -91,7 +91,7 @@ object HourlyTeamScore { TeamScoreSums(team, score, start) } // Save to the BigQuery table defined by "output" in the arguments passed in - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/LeaderBoard.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/LeaderBoard.scala index a77e34219a..e7533ba9d9 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/LeaderBoard.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/LeaderBoard.scala @@ -96,7 +96,7 @@ object LeaderBoard { // Done with windowing information, convert back to regular `SCollection` .toSCollection // Save to the BigQuery table defined by "output" in the arguments passed in + "_team" suffix - .saveAsTypedBigQueryTable(Table.Spec(args("output") + "_team")) + .saveAsTypedBigQueryTable(Table(args("output") + "_team")) gameEvents // Use a global window for unbounded data, which updates calculation every 10 minutes, @@ -126,7 +126,7 @@ object LeaderBoard { // Map summed results from tuples into `UserScoreSums` case class, so we can save to BQ .map(kv => UserScoreSums(kv._1, kv._2, fmt.print(Instant.now()))) // Save to the BigQuery table defined by "output" in the arguments passed in + "_user" suffix - .saveAsTypedBigQueryTable(Table.Spec(args("output") + "_user")) + .saveAsTypedBigQueryTable(Table(args("output") + "_user")) // Execute the pipeline val result = sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/UserScore.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/UserScore.scala index 946ac76620..590314e2f3 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/UserScore.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/complete/game/UserScore.scala @@ -62,7 +62,7 @@ object UserScore { // Map summed results from tuples into `UserScoreSums` case class, so we can save to BQ .map(UserScoreSums.tupled) // Save to the BigQuery table defined by "output" in the arguments passed in - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/BigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/BigQueryTornadoes.scala index 13125695e4..12bcf84817 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/BigQueryTornadoes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/BigQueryTornadoes.scala @@ -45,7 +45,7 @@ object BigQueryTornadoes { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) val resultTap = sc .bigQueryTable(table) // Extract months with tornadoes @@ -55,7 +55,7 @@ object BigQueryTornadoes { // Map `(Long, Long)` tuples into result `TableRow`s .map(kv => TableRow("month" -> kv._1, "tornado_count" -> kv._2)) // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Access the loaded tables resultTap diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/CombinePerKeyExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/CombinePerKeyExamples.scala index cc032fe712..209861d479 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/CombinePerKeyExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/CombinePerKeyExamples.scala @@ -47,7 +47,7 @@ object CombinePerKeyExamples { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.SHAKESPEARE_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.SHAKESPEARE_TABLE)) sc.bigQueryTable(table) // Extract words and corresponding play names .flatMap { row => @@ -64,7 +64,7 @@ object CombinePerKeyExamples { // Map `(String, String)` tuples into result `TableRow`s .map(kv => TableRow("word" -> kv._1, "all_plays" -> kv._2)) // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/DistinctByKeyExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/DistinctByKeyExample.scala index ded1fd8551..ba145ba180 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/DistinctByKeyExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/DistinctByKeyExample.scala @@ -46,7 +46,7 @@ object DistinctByKeyExample { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.SHAKESPEARE_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.SHAKESPEARE_TABLE)) sc.bigQueryTable(table) // Extract words and corresponding play names .flatMap { row => @@ -59,7 +59,7 @@ object DistinctByKeyExample { // Map `(String, String)` tuples into result `TableRow`s .map(kv => TableRow("word" -> kv._1, "reference_play" -> kv._2)) // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/FilterExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/FilterExamples.scala index 31fe4b0541..0708fd1960 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/FilterExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/FilterExamples.scala @@ -51,7 +51,7 @@ object FilterExamples { val monthFilter = args.int("monthFilter", 7) // Open BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) val pipe = sc .bigQueryTable(table) // Map `TableRow`s into `Record`s @@ -81,7 +81,7 @@ object FilterExamples { TableRow("year" -> r.year, "month" -> r.month, "day" -> r.day, "mean_temp" -> r.meanTemp) } // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala index 61bc7a1fd7..2126d853f9 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/JoinExamples.scala @@ -63,9 +63,9 @@ object JoinExamples { // Extract both sides as `SCollection[(String, String)]`s val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = - sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) + sc.bigQueryTable(Table(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) eventsInfo // Left outer join to produce `SCollection[(String, (String, Option[String]))] @@ -92,9 +92,9 @@ object SideInputJoinExamples { // Extract both sides as `SCollection[(String, String)]`s, and then convert right hand side as // a `SideInput` of `Map[String, String]` val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = sc - .bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)) + .bigQueryTable(Table(ExampleData.COUNTRY_TABLE)) .map(extractCountryInfo) .asMapSideInput @@ -127,9 +127,9 @@ object HashJoinExamples { // Extract both sides as `SCollection[(String, String)]`s val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = - sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) + sc.bigQueryTable(Table(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) eventsInfo // Hash join uses side input under the hood and is a drop-in replacement for regular join @@ -155,9 +155,9 @@ object SkewedJoinExamples { // Extract both sides as `SCollection[(String, String)]`s val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = - sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)) + sc.bigQueryTable(Table(ExampleData.COUNTRY_TABLE)) .map(extractCountryInfo) eventsInfo @@ -219,9 +219,9 @@ object SparseJoinExamples { // Extract both sides as `SCollection[(String, String)]`s val eventsInfo = - sc.bigQueryTable(Table.Spec(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) + sc.bigQueryTable(Table(ExampleData.EVENT_TABLE)).flatMap(extractEventInfo) val countryInfo = - sc.bigQueryTable(Table.Spec(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) + sc.bigQueryTable(Table(ExampleData.COUNTRY_TABLE)).map(extractCountryInfo) eventsInfo // Sparse Join is useful when LHS is much larger than the RHS which cannot fit in memory, but diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/MaxPerKeyExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/MaxPerKeyExamples.scala index 3ca121027e..5d5f61b7d7 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/MaxPerKeyExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/MaxPerKeyExamples.scala @@ -44,7 +44,7 @@ object MaxPerKeyExamples { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) + val table = Table(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) sc.bigQueryTable(table) // Extract month and mean temperature as `(Long, Double)` tuples .map(row => (row.getLong("month"), row.getDouble("mean_temp"))) @@ -54,7 +54,7 @@ object MaxPerKeyExamples { // Map `(Long, Double)` tuples into result `TableRow`s .map(kv => TableRow("month" -> kv._1, "max_mean_temp" -> kv._2)) // Save result as a BigQuery table - .saveAsBigQueryTable(Table.Spec(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) + .saveAsBigQueryTable(Table(args("output")), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED) // Execute the pipeline sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoes.scala index f020f0efb0..445186ae04 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoes.scala @@ -45,13 +45,13 @@ object StorageBigQueryTornadoes { ) // Open a BigQuery table as a `SCollection[TableRow]` - val table = Table.Spec(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE)) + val table = Table( + args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE), + selectedFields = List("tornado", "month"), + rowRestriction = "tornado = true" + ) val resultTap = sc - .bigQueryStorage( - table, - selectedFields = List("tornado", "month"), - rowRestriction = "tornado = true" - ) + .bigQueryStorage(table) .map(_.getLong("month")) // Count occurrences of each unique month to get `(Long, Long)` .countByValue @@ -59,7 +59,7 @@ object StorageBigQueryTornadoes { .map(kv => TableRow("month" -> kv._1, "tornado_count" -> kv._2)) // Save result as a BigQuery table .saveAsBigQueryTable( - table = Table.Spec(args("output")), + table = Table(args("output")), schema = schema, writeDisposition = WRITE_TRUNCATE, createDisposition = CREATE_IF_NEEDED, diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/TriggerExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/TriggerExample.scala index 5834d745ae..e233ade6ee 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/TriggerExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/cookbook/TriggerExample.scala @@ -137,7 +137,7 @@ object TriggerExample { sequentialResults ) ) - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) val result = sc.run() exampleUtils.waitToFinish(result.pipelineResult) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedBigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedBigQueryTornadoes.scala index ffc7493d2e..e6cbe51df8 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedBigQueryTornadoes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedBigQueryTornadoes.scala @@ -52,7 +52,7 @@ object TypedBigQueryTornadoes { .map(kv => Result(kv._1, kv._2)) // Convert elements from Result to TableRow and save output to BigQuery. .saveAsTypedBigQueryTable( - Table.Spec(args("output")), + Table(args("output")), writeDisposition = WRITE_TRUNCATE, createDisposition = CREATE_IF_NEEDED ) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoes.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoes.scala index 2c2b1b345e..3cbfab3e28 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoes.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoes.scala @@ -57,7 +57,7 @@ object TypedStorageBigQueryTornadoes { .map(kv => Result(kv._1, kv._2)) // Convert elements from Result to TableRow and save output to BigQuery. .saveAsTypedBigQueryTable( - Table.Spec(args("output")), + Table(args("output")), method = Method.STORAGE_WRITE_API, writeDisposition = WRITE_TRUNCATE, createDisposition = CREATE_IF_NEEDED, diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoesTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoesTest.scala index d79b8282b9..ed4e12afc5 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoesTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/cookbook/StorageBigQueryTornadoesTest.scala @@ -32,16 +32,14 @@ final class StorageBigQueryTornadoesTest extends PipelineSpec { .map(t => TableRow("month" -> t._1, "tornado_count" -> t._2)) "BigQueryTornadoes" should "work" in { + val table = Table( + "bigquery-public-data:samples.gsod", + List("tornado", "month"), + "tornado = true" + ) JobTest[com.spotify.scio.examples.cookbook.StorageBigQueryTornadoes.type] - .args("--input=bigquery-public-data:samples.gsod", "--output=dataset.table") - .input( - BigQueryIO( - "bigquery-public-data:samples.gsod", - List("tornado", "month"), - Some("tornado = true") - ), - inData - ) + .args(s"--input=${table.spec}", "--output=dataset.table") + .input(BigQueryIO(table), inData) .output(BigQueryIO[TableRow]("dataset.table")) { coll => coll should containInAnyOrder(expected) } diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoesTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoesTest.scala index 78288bd8df..10d265d529 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoesTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/TypedStorageBigQueryTornadoesTest.scala @@ -35,14 +35,7 @@ final class TypedStorageBigQueryTornadoesTest extends PipelineSpec { "StorageTypedBigQueryTornadoes" should "work" in { JobTest[com.spotify.scio.examples.extra.TypedStorageBigQueryTornadoes.type] .args("--output=dataset.table") - .input( - BigQueryIO( - TypedStorageBigQueryTornadoes.Row.table, - List("tornado", "month"), - Some("tornado = true") - ), - inData - ) + .input(BigQueryIO[TypedStorageBigQueryTornadoes.Row], inData) .output(BigQueryIO[Result]("dataset.table")) { coll => coll should containInAnyOrder(expected) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 3cc3f6ab03..6e7bcd27ba 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -18,16 +18,16 @@ package com.spotify.scio.bigquery import com.google.api.services.bigquery.model.TableSchema +import com.google.cloud.bigquery.storage.v1.DataFormat import com.spotify.scio.ScioContext -import com.spotify.scio.bigquery.client.BigQuery +import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation import com.spotify.scio.coders._ import com.spotify.scio.io._ import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} import com.spotify.scio.values.{SCollection, SideOutput, SideOutputCollections} -import com.twitter.chill.ClosureCleaner +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions import org.apache.beam.sdk.io.Compression import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.{Method => ReadMethod} import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{ @@ -37,85 +37,317 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{ } import org.apache.beam.sdk.io.gcp.bigquery._ import org.apache.beam.sdk.io.gcp.{bigquery => beam} -import org.apache.beam.sdk.transforms.SerializableFunction +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} import org.apache.beam.sdk.values.{PCollection, PCollectionTuple} import org.joda.time.Duration -import java.util.concurrent.ConcurrentHashMap -import java.util.function +import scala.util.chaining._ import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe._ -import scala.util.chaining._ -private object Reads { +case class BigQueryIO[T: Coder](source: Source) extends ScioIO[T] with WriteResultIO[T] { + import BigQueryIO._ + + final override val tapT: TapT.Aux[T, T] = TapOf[T] - private[this] val cache = new ConcurrentHashMap[ScioContext, BigQuery]() + override type ReadP = ReadParam[T] + override type WriteP = WriteParam[T] - @inline private def client(sc: ScioContext): BigQuery = - cache.computeIfAbsent( - sc, - new function.Function[ScioContext, BigQuery] { - override def apply(context: ScioContext): BigQuery = { - val opts = context.optionsAs[GcpOptions] - BigQuery(opts.getProject, opts.getGcpCredential) - } - } + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { + val coder = CoderMaterializer.beam(sc, Coder[T]) + val t = beam.BigQueryIO + .read(Functions.serializableFn(params.format.parseFn)) + .withCoder(coder) + .pipe(r => withSource(r)(source)) + .withMethod(params.method) + .withFormat(params.format.dataFormat) + .pipe(r => withResultFlattening(r)(params)) + .pipe(r => Option(params.errorHandler).fold(r)(r.withErrorHandler)) + .pipe(r => Option(params.configOverride).fold(r)(_.apply(r))) + + sc.applyTransform(t) + } + + override protected def writeWithResult( + data: SCollection[T], + params: WriteP + ): (Tap[T], SideOutputCollections) = { + val method: WriteMethod = resolveMethod( + params.method, + data.context.optionsAs[BigQueryOptions], + data.internal.isBounded ) + val t = beam.BigQueryIO + .write[T]() + .withMethod(method) + .pipe(w => withSink(w)(source)) + .pipe(w => withFormatFunction(w)(params.format)) + .pipe(w => Option(params.schema).fold(w)(w.withSchema)) + .pipe(w => Option(params.createDisposition).fold(w)(w.withCreateDisposition)) + .pipe(w => Option(params.writeDisposition).fold(w)(w.withWriteDisposition)) + .pipe(w => Option(params.tableDescription).fold(w)(w.withTableDescription)) + .pipe(w => Option(params.timePartitioning).map(_.asJava).fold(w)(w.withTimePartitioning)) + .pipe(w => Option(params.clustering).map(_.asJava).fold(w)(w.withClustering)) + .pipe(w => Option(params.triggeringFrequency).fold(w)(w.withTriggeringFrequency)) + .pipe(w => Option(params.sharding).fold(w)(withSharding(method, w))) + .pipe(w => Option(params.failedInsertRetryPolicy).fold(w)(w.withFailedInsertRetryPolicy)) + .pipe(w => withSuccessfulInsertsPropagation(method, w)(params.successfulInsertsPropagation)) + .pipe(w => if (params.extendedErrorInfo) w.withExtendedErrorInfo() else w) + .pipe(w => Option(params.errorHandler).fold(w)(w.withErrorHandler)) + .pipe(w => Option(params.configOverride).fold(w)(_.apply(w))) - private[scio] def bqReadQuery[T](sc: ScioContext)( - typedRead: beam.BigQueryIO.TypedRead[T], - sqlQuery: String, - flattenResults: Boolean = false - ): SCollection[T] = { - val bigQueryClient = client(sc) - val labels = sc.labels - val read = bigQueryClient.query - .newQueryJob(sqlQuery, flattenResults, labels) - .map { job => - sc.onClose(_ => bigQueryClient.waitForJobs(job)) - typedRead.from(job.table).withoutValidation() - } + val wr = data.applyInternal(t) + val outputs = sideOutputs( + data, + method, + params.successfulInsertsPropagation, + params.extendedErrorInfo, + wr + ) - sc.applyTransform(read.get) + (tap(ReadParam(params)), outputs) } - // TODO: support labels Inheritance like in bqReadQuery - private[scio] def bqReadStorage[T](sc: ScioContext)( - typedRead: beam.BigQueryIO.TypedRead[T], - table: Table, - selectedFields: List[String] = BigQueryStorage.ReadParam.DefaultSelectFields, - rowRestriction: Option[String] = BigQueryStorage.ReadParam.DefaultRowRestriction - ): SCollection[T] = { - val read = typedRead - .from(table.spec) - .withMethod(ReadMethod.DIRECT_READ) - .withSelectedFields(selectedFields.asJava) - .pipe(r => rowRestriction.fold(r)(r.withRowRestriction)) - - sc.applyTransform(read) + override def tap(read: ReadP): Tap[T] = { + val table = ensureTable(source) + BigQueryTap(table, read) } } -private[bigquery] object Writes { - def resolveMethod( - method: WriteMethod, - options: BigQueryOptions, - isBounded: PCollection.IsBounded - ): WriteMethod = (method, isBounded) match { - case (WriteMethod.DEFAULT, _) - if options.getUseStorageWriteApi && options.getUseStorageWriteApiAtLeastOnce => - WriteMethod.STORAGE_API_AT_LEAST_ONCE - case (WriteMethod.DEFAULT, _) if options.getUseStorageWriteApi => - WriteMethod.STORAGE_WRITE_API - case (WriteMethod.DEFAULT, PCollection.IsBounded.BOUNDED) => - WriteMethod.FILE_LOADS - case (WriteMethod.DEFAULT, PCollection.IsBounded.UNBOUNDED) => - WriteMethod.STREAMING_INSERTS - case _ => - method +object BigQueryIO { + implicit lazy val coderTableDestination: Coder[TableDestination] = Coder.kryo + + lazy val SuccessfulTableLoads: SideOutput[TableDestination] = SideOutput() + lazy val SuccessfulInserts: SideOutput[TableRow] = SideOutput() + lazy val SuccessfulStorageApiInserts: SideOutput[TableRow] = SideOutput() + + implicit lazy val coderBigQueryInsertError: Coder[BigQueryInsertError] = Coder.kryo + implicit lazy val coderBigQueryStorageApiInsertError: Coder[BigQueryStorageApiInsertError] = + Coder.kryo + + lazy val FailedInserts: SideOutput[TableRow] = SideOutput() + lazy val FailedInsertsWithErr: SideOutput[BigQueryInsertError] = SideOutput() + lazy val FailedStorageApiInserts: SideOutput[BigQueryStorageApiInsertError] = SideOutput() + + @inline def apply[T](id: String): TestIO[T] = + new TestIO[T] { + final override val tapT: TapT.Aux[T, T] = TapOf[T] + override def testId: String = s"BigQueryIO($id)" + } + + def apply[T <: HasAnnotation: TypeTag: Coder]: BigQueryIO[T] = { + val bqt = BigQueryType[T] + val source = if (bqt.isQuery) { + Query(bqt.queryRaw.get) + } else if (bqt.isStorage) { + val selectedFields = bqt.selectedFields + val rowRestriction = bqt.rowRestriction + if (selectedFields.isEmpty && rowRestriction.isEmpty) { + Table(bqt.table.get) + } else { + val filter = Table.Filter(selectedFields.getOrElse(Nil), rowRestriction) + Table(bqt.table.get, filter) + } + } else { + Table(bqt.table.get) + } + BigQueryIO(source) + } + + /** Defines the format in which BigQuery can be read and written to. */ + sealed trait Format[T] extends Serializable { + type BqType + + def dataFormat: DataFormat + + protected def fromSchemaAndRecord(input: SchemaAndRecord): BqType + + def from(x: BqType): T + def to(x: T): BqType + + private[bigquery] def parseFn(input: SchemaAndRecord): T = + from(fromSchemaAndRecord(input)) + } + + object Format { + + class Default[T](_from: TableRow => T, _to: T => TableRow) extends Format[T] { + override type BqType = TableRow + + override def dataFormat: DataFormat = DataFormat.AVRO + + override def fromSchemaAndRecord(input: SchemaAndRecord): TableRow = + BigQueryAvroUtilsWrapper.convertGenericRecordToTableRow( + input.getRecord, + input.getTableSchema + ) + + override def from(x: TableRow): T = _from(x) + override def to(x: T): TableRow = _to(x) + } + + object Default { + def apply(): Default[TableRow] = new Default(identity, identity) + def apply[T](from: TableRow => T, to: T => TableRow): Default[T] = new Default(from, to) + def apply[T](bqt: BigQueryType[T]): Default[T] = new Default(bqt.fromTableRow, bqt.toTableRow) + } + + class Avro[T](_from: GenericRecord => T, _to: T => GenericRecord) extends Format[T] { + override type BqType = GenericRecord + override def dataFormat: DataFormat = DataFormat.AVRO + + override def fromSchemaAndRecord(input: SchemaAndRecord): GenericRecord = input.getRecord + override def from(x: GenericRecord): T = _from(x) + override def to(x: T): GenericRecord = _to(x) + + private[bigquery] def formatFunction(x: AvroWriteRequest[T]): GenericRecord = to(x.getElement) + private[bigquery] def avroSchemaFactory(tableSchema: TableSchema): Schema = + BigQueryAvroUtilsWrapper.toGenericAvroSchema("root", tableSchema.getFields) + } + + object Avro { + def apply(): Avro[GenericRecord] = new Avro(identity, identity) + def apply[T](from: GenericRecord => T, to: T => GenericRecord): Avro[T] = new Avro(from, to) + def apply[T](bqt: BigQueryType[T]): Avro[T] = new Avro(bqt.fromAvro, bqt.toAvro) + } + } + + object ReadParam { + type ConfigOverride[T] = beam.BigQueryIO.TypedRead[T] => beam.BigQueryIO.TypedRead[T] + + val DefaultFlattenResults: Boolean = false + val DefaultErrorHandler: ErrorHandler[BadRecord, _] = null + val DefaultConfigOverride: Null = null + + private[scio] def apply[T](params: WriteParam[T]): ReadParam[T] = { + val format = params.format + // select read method matching with write method + val method = params.method match { + case WriteMethod.DEFAULT | WriteMethod.STREAMING_INSERTS => ReadMethod.DEFAULT + case WriteMethod.FILE_LOADS => ReadMethod.EXPORT + case WriteMethod.STORAGE_WRITE_API | WriteMethod.STORAGE_API_AT_LEAST_ONCE => + ReadMethod.DIRECT_READ + } + // after write, we'll always read the whole table + TableReadParam(format, method) + } + } + + sealed trait ReadParam[T] { + def format: Format[T] + def method: ReadMethod + + def errorHandler: ErrorHandler[BadRecord, _] = ReadParam.DefaultErrorHandler + def configOverride: ReadParam.ConfigOverride[T] = ReadParam.DefaultConfigOverride + } + + final case class QueryReadParam[T]( + override val format: Format[T], + override val method: ReadMethod, + flattenResults: Boolean = ReadParam.DefaultFlattenResults, + override val errorHandler: ErrorHandler[BadRecord, _] = ReadParam.DefaultErrorHandler, + override val configOverride: ReadParam.ConfigOverride[T] = ReadParam.DefaultConfigOverride + ) extends ReadParam[T] + + final case class TableReadParam[T]( + override val format: Format[T], + override val method: ReadMethod, + override val errorHandler: ErrorHandler[BadRecord, _] = ReadParam.DefaultErrorHandler, + override val configOverride: ReadParam.ConfigOverride[T] = ReadParam.DefaultConfigOverride + ) extends ReadParam[T] + + object WriteParam { + type ConfigOverride[T] = beam.BigQueryIO.Write[T] => beam.BigQueryIO.Write[T] + + val DefaultMethod: WriteMethod = WriteMethod.DEFAULT + val DefaultSchema: TableSchema = null + val DefaultWriteDisposition: WriteDisposition = null + val DefaultCreateDisposition: CreateDisposition = null + val DefaultTableDescription: String = null + val DefaultTimePartitioning: TimePartitioning = null + val DefaultClustering: Clustering = null + val DefaultTriggeringFrequency: Duration = null + val DefaultSharding: Sharding = null + val DefaultFailedInsertRetryPolicy: InsertRetryPolicy = null + val DefaultSuccessfulInsertsPropagation: Boolean = false + val DefaultExtendedErrorInfo: Boolean = false + val DefaultErrorHandler: ErrorHandler[BadRecord, _] = null + val DefaultConfigOverride: Null = null + } + + case class WriteParam[T] private ( + format: Format[T], + method: WriteMethod = WriteParam.DefaultMethod, + schema: TableSchema = WriteParam.DefaultSchema, + writeDisposition: WriteDisposition = WriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = WriteParam.DefaultCreateDisposition, + tableDescription: String = WriteParam.DefaultTableDescription, + timePartitioning: TimePartitioning = WriteParam.DefaultTimePartitioning, + clustering: Clustering = WriteParam.DefaultClustering, + triggeringFrequency: Duration = WriteParam.DefaultTriggeringFrequency, + sharding: Sharding = WriteParam.DefaultSharding, + failedInsertRetryPolicy: InsertRetryPolicy = WriteParam.DefaultFailedInsertRetryPolicy, + successfulInsertsPropagation: Boolean = WriteParam.DefaultSuccessfulInsertsPropagation, + extendedErrorInfo: Boolean = WriteParam.DefaultExtendedErrorInfo, + errorHandler: ErrorHandler[BadRecord, _] = WriteParam.DefaultErrorHandler, + configOverride: WriteParam.ConfigOverride[T] = WriteParam.DefaultConfigOverride + ) + + private[bigquery] def withSource[T]( + r: beam.BigQueryIO.TypedRead[T] + )(source: Source): beam.BigQueryIO.TypedRead[T] = + source match { + case q: Query => + r.fromQuery(q.underlying) + .pipe { r => + // TODO dryRun ? + q.underlying.trim.split("\n")(0).trim.toLowerCase match { + case "#legacysql" => r + case "#standardsql" => r.usingStandardSql() + case _ => r.usingStandardSql() + } + } + case t: Table => + val selectedFields = t.filter.map(_.selectedFields).filter(_.nonEmpty).map(_.asJava) + val rowRestriction = t.filter.flatMap(_.rowRestriction) + r + .from(t.spec) + .pipe(r => selectedFields.fold(r)(r.withSelectedFields)) + .pipe(r => rowRestriction.fold(r)(r.withRowRestriction)) + } + + private[bigquery] def withResultFlattening[T]( + r: beam.BigQueryIO.TypedRead[T] + )(param: ReadParam[T]): beam.BigQueryIO.TypedRead[T] = { + param match { + case p: QueryReadParam[_] if !p.flattenResults => r.withoutResultFlattening() + case _ => r + } + } + + private def ensureTable(source: Source): Table = + source match { + case t: Table => t + case _: Query => throw new IllegalArgumentException("Cannot write with query") + } + + private[bigquery] def withSink[T](w: beam.BigQueryIO.Write[T])( + source: Source + ): beam.BigQueryIO.Write[T] = + w.to(ensureTable(source).spec) + + private[bigquery] def withFormatFunction[T]( + w: beam.BigQueryIO.Write[T] + )(format: Format[T]): beam.BigQueryIO.Write[T] = format match { + case f: Format.Default[T] => + w.withFormatFunction(Functions.serializableFn(f.to)) + case f: Format.Avro[T] => + w.useAvroLogicalTypes() + .withAvroFormatFunction(Functions.serializableFn(f.formatFunction)) + .withAvroSchemaFactory(Functions.serializableFn(f.avroSchemaFactory)) } - def withSharding[T](method: WriteMethod, w: beam.BigQueryIO.Write[T])( + private[bigquery] def withSharding[T](method: WriteMethod, w: beam.BigQueryIO.Write[T])( sharding: Sharding ): beam.BigQueryIO.Write[T] = { import WriteMethod._ @@ -131,7 +363,10 @@ private[bigquery] object Writes { } } - def withSuccessfulInsertsPropagation[T](method: WriteMethod, w: beam.BigQueryIO.Write[T])( + private[bigquery] def withSuccessfulInsertsPropagation[T]( + method: WriteMethod, + w: beam.BigQueryIO.Write[T] + )( successfulInsertsPropagation: Boolean ): beam.BigQueryIO.Write[T] = { import WriteMethod._ @@ -145,7 +380,7 @@ private[bigquery] object Writes { } } - def sideOutputs( + private[bigquery] def sideOutputs( data: SCollection[_], method: WriteMethod, successfulInsertsPropagation: Boolean, @@ -185,393 +420,25 @@ private[bigquery] object Writes { SideOutputCollections(tuple, sc) } - trait WriteParam[T] { - def configOverride: beam.BigQueryIO.Write[T] => beam.BigQueryIO.Write[T] - } - - trait WriteParamDefaults { - - type ConfigOverride[T] = beam.BigQueryIO.Write[T] => beam.BigQueryIO.Write[T] - - val DefaultMethod: WriteMethod = WriteMethod.DEFAULT - val DefaultSchema: TableSchema = null - val DefaultWriteDisposition: WriteDisposition = null - val DefaultCreateDisposition: CreateDisposition = null - val DefaultTableDescription: String = null - val DefaultTimePartitioning: TimePartitioning = null - val DefaultClustering: Clustering = null - val DefaultTriggeringFrequency: Duration = null - val DefaultSharding: Sharding = null - val DefaultFailedInsertRetryPolicy: InsertRetryPolicy = null - val DefaultSuccessfulInsertsPropagation: Boolean = false - val DefaultExtendedErrorInfo: Boolean = false - val DefaultConfigOverride: Null = null - } -} - -sealed trait BigQueryIO[T] extends ScioIO[T] { - final override val tapT: TapT.Aux[T, T] = TapOf[T] -} - -object BigQueryIO { - implicit lazy val coderTableDestination: Coder[TableDestination] = Coder.kryo - - lazy val SuccessfulTableLoads: SideOutput[TableDestination] = SideOutput() - lazy val SuccessfulInserts: SideOutput[TableRow] = SideOutput() - lazy val SuccessfulStorageApiInserts: SideOutput[TableRow] = SideOutput() - - implicit lazy val coderBigQueryInsertError: Coder[BigQueryInsertError] = Coder.kryo - implicit lazy val coderBigQueryStorageApiInsertError: Coder[BigQueryStorageApiInsertError] = - Coder.kryo - - lazy val FailedInserts: SideOutput[TableRow] = SideOutput() - lazy val FailedInsertsWithErr: SideOutput[BigQueryInsertError] = SideOutput() - lazy val FailedStorageApiInserts: SideOutput[BigQueryStorageApiInsertError] = SideOutput() - - private[bigquery] val storageWriteMethod = - Set(WriteMethod.STORAGE_WRITE_API, WriteMethod.STORAGE_API_AT_LEAST_ONCE) - - @inline final def apply[T](id: String): BigQueryIO[T] = - new BigQueryIO[T] with TestIO[T] { - override def testId: String = s"BigQueryIO($id)" - } - - @inline final def apply[T](source: Source): BigQueryIO[T] = - new BigQueryIO[T] with TestIO[T] { - override def testId: String = source match { - case t: Table => s"BigQueryIO(${t.spec})" - case q: Query => s"BigQueryIO(${q.underlying})" - } - } - - @inline final def apply[T]( - id: String, - selectedFields: List[String], - rowRestriction: Option[String] - ): BigQueryIO[T] = - new BigQueryIO[T] with TestIO[T] { - override def testId: String = - s"BigQueryIO($id, List(${selectedFields.mkString(",")}), $rowRestriction)" - } -} - -object BigQueryTypedSelect { - object ReadParam { - val DefaultFlattenResults: Boolean = false - } - - final case class ReadParam private (flattenResults: Boolean = ReadParam.DefaultFlattenResults) -} - -final case class BigQueryTypedSelect[T: Coder]( - reader: beam.BigQueryIO.TypedRead[T], - sqlQuery: Query, - fromTableRow: TableRow => T -) extends BigQueryIO[T] { - override type ReadP = BigQueryTypedSelect.ReadParam - override type WriteP = Nothing // ReadOnly - - override def testId: String = s"BigQueryIO(${sqlQuery.underlying})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { - val coder = CoderMaterializer.beam(sc, Coder[T]) - val rc = reader.withCoder(coder) - Reads.bqReadQuery(sc)(rc, sqlQuery.underlying, params.flattenResults) - } - - override protected def write(data: SCollection[T], params: WriteP): Tap[T] = - throw new UnsupportedOperationException("BigQuerySelect is read-only") - - override def tap(params: ReadP): Tap[T] = { - val tableReference = BigQuery - .defaultInstance() - .query - .run(sqlQuery.underlying, flattenResults = params.flattenResults) - BigQueryTap(tableReference).map(fromTableRow) - } -} - -/** - * Get an SCollection for a BigQuery SELECT query. Both - * [[https://cloud.google.com/bigquery/docs/reference/legacy-sql Legacy SQL]] and - * [[https://cloud.google.com/bigquery/docs/reference/standard-sql/ Standard SQL]] dialects are - * supported. By default the query dialect will be automatically detected. To override this - * behavior, start the query string with `#legacysql` or `#standardsql`. - */ -final case class BigQuerySelect(sqlQuery: Query) extends BigQueryIO[TableRow] { - override type ReadP = BigQuerySelect.ReadParam - override type WriteP = Nothing // ReadOnly - - private[this] lazy val underlying = - BigQueryTypedSelect(beam.BigQueryIO.readTableRows(), sqlQuery, identity)(coders.tableRowCoder) - - override def testId: String = s"BigQueryIO(${sqlQuery.underlying})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[TableRow] = - sc.read(underlying)(params) - - override protected def write(data: SCollection[TableRow], params: WriteP): Tap[TableRow] = - throw new UnsupportedOperationException("BigQuerySelect is read-only") - - override def tap(params: ReadP): Tap[TableRow] = underlying.tap(params) -} - -object BigQuerySelect { - type ReadParam = BigQueryTypedSelect.ReadParam - val ReadParam = BigQueryTypedSelect.ReadParam - - @inline final def apply(sqlQuery: String): BigQuerySelect = new BigQuerySelect(Query(sqlQuery)) -} - -object BigQueryTypedTable { - - /** Defines the format in which BigQuery can be read and written to. */ - sealed abstract class Format[F] - object Format { - case object GenericRecord extends Format[GenericRecord] - case object TableRow extends Format[TableRow] - } - - final case class WriteParam[T] private ( + private def resolveMethod( method: WriteMethod, - schema: TableSchema, - writeDisposition: WriteDisposition, - createDisposition: CreateDisposition, - tableDescription: String, - timePartitioning: TimePartitioning, - clustering: Clustering, - triggeringFrequency: Duration, - sharding: Sharding, - failedInsertRetryPolicy: InsertRetryPolicy, - successfulInsertsPropagation: Boolean, - extendedErrorInfo: Boolean, - configOverride: WriteParam.ConfigOverride[T] - ) extends Writes.WriteParam[T] - - object WriteParam extends Writes.WriteParamDefaults { - @inline final def apply[T]( - method: WriteMethod = DefaultMethod, - schema: TableSchema = DefaultSchema, - writeDisposition: WriteDisposition = DefaultWriteDisposition, - createDisposition: CreateDisposition = DefaultCreateDisposition, - tableDescription: String = DefaultTableDescription, - timePartitioning: TimePartitioning = DefaultTimePartitioning, - clustering: Clustering = DefaultClustering, - triggeringFrequency: Duration = DefaultTriggeringFrequency, - sharding: Sharding = DefaultSharding, - failedInsertRetryPolicy: InsertRetryPolicy = DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = DefaultExtendedErrorInfo, - configOverride: ConfigOverride[T] = DefaultConfigOverride - ): WriteParam[T] = new WriteParam( - method, - schema, - writeDisposition, - createDisposition, - tableDescription, - timePartitioning, - clustering, - triggeringFrequency, - sharding, - failedInsertRetryPolicy, - successfulInsertsPropagation, - extendedErrorInfo, - configOverride - ) - } - - private[this] def tableRow(table: Table): BigQueryTypedTable[TableRow] = - BigQueryTypedTable( - beam.BigQueryIO.readTableRows(), - beam.BigQueryIO.writeTableRows(), - table, - BigQueryUtils.convertGenericRecordToTableRow(_, _) - )(coders.tableRowCoder) - - private[this] def genericRecord( - table: Table - )(implicit c: Coder[GenericRecord]): BigQueryTypedTable[GenericRecord] = - BigQueryTypedTable( - _.getRecord(), - identity[GenericRecord], - (genericRecord: GenericRecord, _: TableSchema) => genericRecord, - table - ) - - /** - * Creates a new instance of [[BigQueryTypedTable]] based on the supplied [[Format]]. - * - * NOTE: LogicalType support when using `Format.GenericRecord` has some caveats: Reading: Bigquery - * types DATE, TIME, DATIME will be read as STRING. Writing: Supports LogicalTypes only for DATE - * and TIME. DATETIME is not yet supported. https://issuetracker.google.com/issues/140681683 - */ - def apply[F: Coder](table: Table, format: Format[F]): BigQueryTypedTable[F] = - format match { - case Format.GenericRecord => genericRecord(table) - case Format.TableRow => tableRow(table) - } - - def apply[T: Coder]( - readerFn: SchemaAndRecord => T, - writerFn: T => TableRow, - tableRowFn: TableRow => T, - table: Table - ): BigQueryTypedTable[T] = { - val rFn = ClosureCleaner.clean(readerFn) - val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO.read(Functions.serializableFn(rFn)) - val writer = beam.BigQueryIO - .write[T]() - .withFormatFunction(Functions.serializableFn(wFn)) - val fn: (GenericRecord, TableSchema) => T = (gr, ts) => - tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr, ts)) - - BigQueryTypedTable(reader, writer, table, fn) - } - - def apply[T: Coder]( - readerFn: SchemaAndRecord => T, - writerFn: T => GenericRecord, - fn: (GenericRecord, TableSchema) => T, - table: Table - ): BigQueryTypedTable[T] = { - val rFn = ClosureCleaner.clean(readerFn) - val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO.read(rFn(_)) - val writer = beam.BigQueryIO - .write[T]() - .useAvroLogicalTypes() - .withAvroFormatFunction(input => wFn(input.getElement())) - .withAvroSchemaFactory { ts => - BigQueryAvroUtilsWrapper.toGenericAvroSchema("root", ts.getFields()) - } - - BigQueryTypedTable(reader, writer, table, fn) - } -} - -final case class BigQueryTypedTable[T: Coder]( - reader: beam.BigQueryIO.TypedRead[T], - writer: beam.BigQueryIO.Write[T], - table: Table, - fn: (GenericRecord, TableSchema) => T -) extends BigQueryIO[T] - with WriteResultIO[T] { - override type ReadP = Unit - override type WriteP = BigQueryTypedTable.WriteParam[T] - - override def testId: String = s"BigQueryIO(${table.spec})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { - val coder = CoderMaterializer.beam(sc, Coder[T]) - val io = reader.from(table.ref).withCoder(coder) - sc.applyTransform(s"Read BQ table ${table.spec}", io) - } - - override protected def writeWithResult( - data: SCollection[T], - params: WriteP - ): (Tap[T], SideOutputCollections) = { - val method = Writes.resolveMethod( - params.method, - data.context.optionsAs[BigQueryOptions], - data.internal.isBounded - ) - - val transform = writer - .to(table.ref) - .withMethod(params.method) - .pipe(w => Option(params.schema).fold(w)(w.withSchema)) - .pipe(w => Option(params.createDisposition).fold(w)(w.withCreateDisposition)) - .pipe(w => Option(params.writeDisposition).fold(w)(w.withWriteDisposition)) - .pipe(w => Option(params.tableDescription).fold(w)(w.withTableDescription)) - .pipe(w => Option(params.timePartitioning).map(_.asJava).fold(w)(w.withTimePartitioning)) - .pipe(w => Option(params.clustering).map(_.asJava).fold(w)(w.withClustering)) - .pipe(w => Option(params.triggeringFrequency).fold(w)(w.withTriggeringFrequency)) - .pipe(w => Option(params.sharding).fold(w)(Writes.withSharding(method, w))) - .pipe(w => - Writes.withSuccessfulInsertsPropagation(method, w)(params.successfulInsertsPropagation) - ) - .pipe(w => if (params.extendedErrorInfo) w.withExtendedErrorInfo() else w) - .pipe(w => Option(params.failedInsertRetryPolicy).fold(w)(w.withFailedInsertRetryPolicy)) - .pipe(w => Option(params.configOverride).fold(w)(_(w))) - - val wr = data.applyInternal(transform) - val outputs = Writes.sideOutputs( - data, - method, - params.successfulInsertsPropagation, - params.extendedErrorInfo, - wr - ) - - (tap(()), outputs) - } - - override def tap(read: ReadP): Tap[T] = BigQueryTypedTap(table, fn) -} - -/** Get an IO for a BigQuery table using the storage API. */ -final case class BigQueryStorage( - table: Table, - selectedFields: List[String], - rowRestriction: Option[String] -) extends BigQueryIO[TableRow] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - override def testId: String = - s"BigQueryIO(${table.spec}, List(${selectedFields.mkString(",")}), $rowRestriction)" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[TableRow] = { - val coder = CoderMaterializer.beam(sc, coders.tableRowCoder) - val read = beam.BigQueryIO.readTableRows().withCoder(coder) - Reads.bqReadStorage(sc)( - read, - table, - selectedFields, - rowRestriction - ) - } - - override protected def write(data: SCollection[TableRow], params: WriteP): Tap[TableRow] = - throw new UnsupportedOperationException("BigQueryStorage is read-only") - - override def tap(read: ReadP): Tap[TableRow] = { - val readOptions = StorageUtil.tableReadOptions(selectedFields, rowRestriction) - BigQueryStorageTap(table, readOptions) - } -} - -object BigQueryStorage { - object ReadParam { - val DefaultSelectFields: List[String] = Nil - val DefaultRowRestriction: Option[String] = None + options: BigQueryOptions, + isBounded: PCollection.IsBounded + ): WriteMethod = (method, isBounded) match { + case (WriteMethod.DEFAULT, _) + if options.getUseStorageWriteApi && options.getUseStorageWriteApiAtLeastOnce => + WriteMethod.STORAGE_API_AT_LEAST_ONCE + case (WriteMethod.DEFAULT, _) if options.getUseStorageWriteApi => + WriteMethod.STORAGE_WRITE_API + case (WriteMethod.DEFAULT, PCollection.IsBounded.BOUNDED) => + WriteMethod.FILE_LOADS + case (WriteMethod.DEFAULT, PCollection.IsBounded.UNBOUNDED) => + WriteMethod.STREAMING_INSERTS + case _ => + method } } -final case class BigQueryStorageSelect(sqlQuery: Query) extends BigQueryIO[TableRow] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - private[this] lazy val underlying = - BigQueryTypedSelect( - beam.BigQueryIO.readTableRows().withMethod(ReadMethod.DIRECT_READ), - sqlQuery, - identity - )(coders.tableRowCoder) - - override def testId: String = s"BigQueryIO(${sqlQuery.underlying})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[TableRow] = - sc.read(underlying)(BigQueryTypedSelect.ReadParam()) - - override protected def write(data: SCollection[TableRow], params: WriteP): Tap[TableRow] = - throw new UnsupportedOperationException("BigQuerySelect is read-only") - - override def tap(params: ReadP): Tap[TableRow] = underlying.tap(BigQueryTypedSelect.ReadParam()) -} - /** Get an IO for a BigQuery TableRow JSON file. */ final case class TableRowJsonIO(path: String) extends ScioIO[TableRow] { override type ReadP = TableRowJsonIO.ReadParam @@ -630,289 +497,4 @@ object TableRowJsonIO { ) } } - -} - -object BigQueryTyped { - import com.spotify.scio.bigquery.{Table => STable} - - @annotation.implicitNotFound( - """ - Can't find annotation for type ${T}. - Make sure this class is annotated with BigQueryType.fromStorage, BigQueryType.fromTable or - BigQueryType.fromQuery. - Alternatively, use BigQueryTyped.Storage(""), BigQueryTyped.Table("
"), or - BigQueryTyped.Query("") to get a ScioIO instance. - """ - ) - sealed trait IO[T <: HasAnnotation] { - type F[_ <: HasAnnotation] <: ScioIO[_] - def impl: F[T] - } - - object IO { - type Aux[T <: HasAnnotation, F0[_ <: HasAnnotation] <: ScioIO[_]] = - IO[T] { type F[A <: HasAnnotation] = F0[A] } - - implicit def tableIO[T <: HasAnnotation: TypeTag: Coder](implicit - t: BigQueryType.Table[T] - ): Aux[T, Table] = - new IO[T] { - type F[A <: HasAnnotation] = Table[A] - def impl: Table[T] = Table(STable.Spec(t.table)) - } - - implicit def queryIO[T <: HasAnnotation: TypeTag: Coder](implicit - t: BigQueryType.Query[T] - ): Aux[T, Select] = - new IO[T] { - type F[A <: HasAnnotation] = Select[A] - def impl: Select[T] = Select(Query(t.queryRaw)) - } - - implicit def storageIO[T <: HasAnnotation: TypeTag: Coder](implicit - t: BigQueryType.StorageOptions[T] - ): Aux[T, Storage] = - new IO[T] { - type F[A <: HasAnnotation] = Storage[A] - def impl: Storage[T] = Storage(STable.Spec(t.table), Nil, None) - } - } - - /** - * Get a typed SCollection for a BigQuery table or a SELECT query. - * - * Note that `T` must be annotated with - * [[com.spotify.scio.bigquery.types.BigQueryType.fromTable BigQueryType.fromStorage]], - * [[com.spotify.scio.bigquery.types.BigQueryType.fromTable BigQueryType.fromTable]], or - * [[com.spotify.scio.bigquery.types.BigQueryType.fromQuery BigQueryType.fromQuery]] - * - * The source (table) specified in the annotation will be used - */ - @inline final def apply[T <: HasAnnotation](implicit t: IO[T]): t.F[T] = - t.impl - - /** - * Get a typed SCollection for a BigQuery SELECT query. - * - * Both [[https://cloud.google.com/bigquery/docs/reference/legacy-sql Legacy SQL]] and - * [[https://cloud.google.com/bigquery/docs/reference/standard-sql/ Standard SQL]] dialects are - * supported. By default the query dialect will be automatically detected. To override this - * behavior, start the query string with `#legacysql` or `#standardsql`. - */ - final case class Select[T <: HasAnnotation: TypeTag: Coder](query: Query) extends BigQueryIO[T] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - private[this] lazy val underlying = { - val fromAvro = BigQueryType[T].fromAvro - val fromTableRow = BigQueryType[T].fromTableRow - val reader = beam.BigQueryIO - .read(new SerializableFunction[SchemaAndRecord, T] { - override def apply(input: SchemaAndRecord): T = fromAvro(input.getRecord) - }) - BigQueryTypedSelect(reader, query, fromTableRow) - } - - override def testId: String = s"BigQueryIO(${query.underlying})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = - sc.read(underlying)(BigQueryTypedSelect.ReadParam()) - - override protected def write(data: SCollection[T], params: WriteP): Tap[T] = - throw new UnsupportedOperationException("Select queries are read-only") - - override def tap(params: ReadP): Tap[T] = underlying.tap(BigQueryTypedSelect.ReadParam()) - } - - object Select { - @inline final def apply[T <: HasAnnotation: TypeTag: Coder]( - query: String - ): Select[T] = new Select[T](Query(query)) - } - - /** Get a typed SCollection for a BigQuery table. */ - final case class Table[T <: HasAnnotation: TypeTag: Coder](table: STable) - extends BigQueryIO[T] - with WriteResultIO[T] { - override type ReadP = Unit - override type WriteP = Table.WriteParam[T] - - private val underlying = BigQueryTypedTable[T]( - (i: SchemaAndRecord) => BigQueryType[T].fromAvro(i.getRecord), - BigQueryType[T].toTableRow, - BigQueryType[T].fromTableRow, - table - ) - - override def testId: String = s"BigQueryIO(${table.spec})" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = - sc.read(underlying) - - override protected def writeWithResult( - data: SCollection[T], - params: WriteP - ): (Tap[T], SideOutputCollections) = { - val outputs = data - .withName(s"${data.tfName}$$Write") - .write(underlying)(params) - .outputs - .get - - (tap(()), outputs) - } - - override def tap(read: ReadP): Tap[T] = - BigQueryTypedTap[T](table, underlying.fn) - } - - object Table { - final case class WriteParam[T] private ( - method: WriteMethod, - writeDisposition: WriteDisposition, - createDisposition: CreateDisposition, - timePartitioning: TimePartitioning, - clustering: Clustering, - triggeringFrequency: Duration, - sharding: Sharding, - failedInsertRetryPolicy: InsertRetryPolicy, - successfulInsertsPropagation: Boolean, - extendedErrorInfo: Boolean, - configOverride: WriteParam.ConfigOverride[T] - ) extends Writes.WriteParam[T] - - object WriteParam extends Writes.WriteParamDefaults { - - @inline final def apply[T]( - method: WriteMethod = DefaultMethod, - writeDisposition: WriteDisposition = DefaultWriteDisposition, - createDisposition: CreateDisposition = DefaultCreateDisposition, - timePartitioning: TimePartitioning = DefaultTimePartitioning, - clustering: Clustering = DefaultClustering, - triggeringFrequency: Duration = DefaultTriggeringFrequency, - sharding: Sharding = DefaultSharding, - failedInsertRetryPolicy: InsertRetryPolicy = DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = DefaultExtendedErrorInfo, - configOverride: ConfigOverride[T] = DefaultConfigOverride - ): WriteParam[T] = new WriteParam( - method, - writeDisposition, - createDisposition, - timePartitioning, - clustering, - triggeringFrequency, - sharding, - failedInsertRetryPolicy, - successfulInsertsPropagation, - extendedErrorInfo, - configOverride - ) - - implicit private[Table] def typedTableWriteParam[T: TypeTag, Info]( - params: Table.WriteParam[T] - ): BigQueryTypedTable.WriteParam[T] = - BigQueryTypedTable.WriteParam( - params.method, - BigQueryType[T].schema, - params.writeDisposition, - params.createDisposition, - BigQueryType[T].tableDescription.orNull, - params.timePartitioning, - params.clustering, - params.triggeringFrequency, - params.sharding, - params.failedInsertRetryPolicy, - params.successfulInsertsPropagation, - params.extendedErrorInfo, - params.configOverride - ) - } - - } - - /** Get a typed SCollection for a BigQuery table using the storage API. */ - final case class Storage[T <: HasAnnotation: TypeTag: Coder]( - table: STable, - selectedFields: List[String], - rowRestriction: Option[String] - ) extends BigQueryIO[T] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - override def testId: String = - s"BigQueryIO(${table.spec}, List(${selectedFields.mkString(",")}), $rowRestriction)" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { - val coder = CoderMaterializer.beam(sc, Coder[T]) - val fromAvro = BigQueryType[T].fromAvro - val reader = beam.BigQueryIO - .read(new SerializableFunction[SchemaAndRecord, T] { - override def apply(input: SchemaAndRecord): T = fromAvro(input.getRecord) - }) - .withCoder(coder) - Reads.bqReadStorage(sc)(reader, table, selectedFields, rowRestriction) - } - - override protected def write(data: SCollection[T], params: WriteP): Tap[T] = - throw new UnsupportedOperationException("Storage API is read-only") - - override def tap(read: ReadP): Tap[T] = { - val fn = BigQueryType[T].fromTableRow - val readOptions = StorageUtil.tableReadOptions(selectedFields, rowRestriction) - BigQueryStorageTap(table, readOptions).map(fn) - } - } - - final case class StorageQuery[T <: HasAnnotation: TypeTag: Coder](sqlQuery: Query) - extends BigQueryIO[T] { - override type ReadP = Unit - override type WriteP = Nothing // ReadOnly - - private[this] lazy val underlying = { - val fromAvro = BigQueryType[T].fromAvro - val fromTableRow = BigQueryType[T].fromTableRow - val reader = beam.BigQueryIO - .read(new SerializableFunction[SchemaAndRecord, T] { - override def apply(input: SchemaAndRecord): T = fromAvro(input.getRecord) - }) - .withMethod(ReadMethod.DIRECT_READ) - BigQueryTypedSelect(reader, sqlQuery, fromTableRow) - } - - override def testId: String = s"BigQueryIO($sqlQuery)" - - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = - sc.read(underlying)(BigQueryTypedSelect.ReadParam()) - - override protected def write(data: SCollection[T], params: WriteP): Tap[T] = - throw new UnsupportedOperationException("Storage API is read-only") - - override def tap(read: ReadP): Tap[T] = underlying.tap(BigQueryTypedSelect.ReadParam()) - } - - private[scio] def dynamic[T <: HasAnnotation: TypeTag: Coder]( - newSource: Option[Source] - ): ScioIO.ReadOnly[T, Unit] = { - val bqt = BigQueryType[T] - newSource match { - // newSource is missing, T's companion object must have either table or query - // The case where newSource is null is only there - // for legacy support and should not exists once - // BigQueryScioContext.typedBigQuery is removed - case None if bqt.isTable => - val table = STable.Spec(bqt.table.get) - ScioIO.ro[T](Table[T](table)) - case None if bqt.isQuery => - val query = Query(bqt.queryRaw.get) - Select[T](query) - case Some(s: STable) => - ScioIO.ro(Table[T](s)) - case Some(s: Query) => - Select[T](s) - case _ => - throw new IllegalArgumentException(s"Missing table or query field in companion object") - } - } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala index 47818e3bba..919768598a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala @@ -34,10 +34,15 @@ import java.math.MathContext import java.nio.ByteBuffer import scala.jdk.CollectionConverters._ -sealed trait Source +sealed trait Source { + protected type Impl <: Source + def latest(bq: BigQuery): Impl + def latest(): Impl = latest(BigQuery.defaultInstance()) +} /** A wrapper type [[Query]] which wraps a SQL String. */ final case class Query(underlying: String) extends Source { + override protected type Impl = Query /** * A helper method to replace the "$LATEST" placeholder in query to the latest common partition. @@ -61,64 +66,110 @@ final case class Query(underlying: String) extends Source { * @return * [[Query]] with "$LATEST" replaced */ - def latest(bq: BigQuery): Query = - Query(BigQueryPartitionUtil.latestQuery(bq, underlying)) + override def latest(bq: BigQuery): Query = + copy(BigQueryPartitionUtil.latestQuery(bq, underlying)) - def latest(): Query = latest(BigQuery.defaultInstance()) + override def toString: String = underlying } /** - * [[Table]] abstracts the multiple ways of referencing Bigquery tables. Tables can be referenced by - * a table spec `String` or by a table reference [[GTableReference]]. - * - * Example: - * {{{ - * val table = Table.Spec("bigquery-public-data:samples.shakespeare") - * sc.bigQueryTable(table) - * .filter(r => "hamlet".equals(r.getString("corpus")) && "Polonius".equals(r.getString("word"))) - * .saveAsTextFile("./output.txt") - * sc.run() - * }}} + * Bigquery [[Table]]. Tables can be referenced by a table spec `String` or by a table reference + * [[GTableReference]]. An additional [[Table.Filter]] can be given to specify selected fields and + * row restrictions when used with the BQ storage read API. * - * Or create a [[Table]] from a [[GTableReference]]: + * Example: Create a [[Table]] from a [[GTableReference]]: * {{{ * val tableReference = new TableReference * tableReference.setProjectId("bigquery-public-data") * tableReference.setDatasetId("samples") * tableReference.setTableId("shakespeare") - * val table = Table.Ref(tableReference) + * val table = Table(tableReference) + * }}} + * or with a spec string with filtering: + * {{{ + * val table = Table( + * "bigquery-public-data:samples.shakespeare", + * List("word", "word_count"), + * "word_count > 10" + * ) * }}} * * A helper method is provided to replace the "$LATEST" placeholder in the table name to the latest * common partition. * {{{ - * val table = Table.Spec("some_project:some_data.some_table_$LATEST").latest() + * val table = Table("some_project:some_data.some_table_$LATEST").latest() * }}} */ -sealed trait Table extends Source { - def spec: String - - def ref: GTableReference - - def latest(bg: BigQuery): Table +case class Table private (ref: GTableReference, filter: Option[Table.Filter]) extends Source { + override protected type Impl = Table + lazy val spec: String = BigQueryHelpers.toTableSpec(ref) + def latest(bq: BigQuery): Table = { + val latestSpec = BigQueryPartitionUtil.latestTable(bq, spec) + val latestRef = BigQueryHelpers.parseTableSpec(latestSpec) + copy(latestRef) + } - def latest(): Table + override def toString: String = filter match { + case None => spec + case Some(Table.Filter(selectedFields, rowRestriction)) => + val sb = new StringBuilder("SELECT ") + selectedFields match { + case Nil => sb.append("*") + case _ => sb.append(selectedFields.mkString(",")) + } + sb.append(" FROM `") + .append(ref.getProjectId) + .append(".") + .append(ref.getDatasetId) + .append(".") + .append(ref.getTableId) + .append("`") + rowRestriction.foreach(r => sb.append(" WHERE ").append(r)) + sb.result() + } } object Table { - final case class Ref(ref: GTableReference) extends Table { - override lazy val spec: String = BigQueryHelpers.toTableSpec(ref) - def latest(bq: BigQuery): Ref = - Ref(Spec(spec).latest(bq).ref) - def latest(): Ref = latest(BigQuery.defaultInstance()) + final case class Filter( + selectedFields: List[String], + rowRestriction: Option[String] + ) - } - final case class Spec(spec: String) extends Table { - override val ref: GTableReference = BigQueryHelpers.parseTableSpec(spec) - def latest(bq: BigQuery): Spec = - Spec(BigQueryPartitionUtil.latestTable(bq, spec)) - def latest(): Spec = latest(BigQuery.defaultInstance()) - } + def apply(ref: GTableReference): Table = + new Table(ref, None) + + def apply(ref: GTableReference, selectedFields: List[String]): Table = + new Table(ref, Some(Filter(selectedFields, None))) + + def apply(ref: GTableReference, rowRestriction: String): Table = + new Table(ref, Some(Filter(List.empty, Some(rowRestriction)))) + + def apply(ref: GTableReference, selectedFields: List[String], rowRestriction: String): Table = + new Table(ref, Some(Filter(selectedFields, Some(rowRestriction)))) + + def apply(ref: GTableReference, filter: Table.Filter): Table = + new Table(ref, Some(filter)) + + def apply(spec: String): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), None) + + def apply(spec: String, selectedFields: List[String]): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), Some(Filter(selectedFields, None))) + + def apply(spec: String, rowRestriction: String): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), Some(Filter(List.empty, Some(rowRestriction)))) + + def apply(spec: String, selectedFields: List[String], rowRestriction: String): Table = + new Table( + BigQueryHelpers.parseTableSpec(spec), + Some(Filter(selectedFields, Some(rowRestriction))) + ) + + def apply(spec: String, filter: Table.Filter): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), Some(filter)) + + def apply(spec: String, filter: Option[Table.Filter]): Table = + new Table(BigQueryHelpers.parseTableSpec(spec), filter) } /** diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/MockBigQuery.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/MockBigQuery.scala index d45ce08afb..1c7e6f3b79 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/MockBigQuery.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/MockBigQuery.scala @@ -181,7 +181,7 @@ class MockTable( */ def withSample(numRows: Int): Unit = { ensureUnique() - val rows = bq.tables.rows(Table.Ref(original)).take(numRows).toList + val rows = bq.tables.rows(Table(original)).take(numRows).toList require(rows.length == numRows, s"Sample size ${rows.length} != requested $numRows") writeRows(rows) () @@ -193,7 +193,7 @@ class MockTable( */ def withSample(minNumRows: Int, maxNumRows: Int): Unit = { ensureUnique() - val rows = bq.tables.rows(Table.Ref(original)).take(maxNumRows).toList + val rows = bq.tables.rows(Table(original)).take(maxNumRows).toList require( rows.length >= minNumRows && rows.length <= maxNumRows, s"Sample size ${rows.length} < requested minimal $minNumRows" diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala index bebbadfba6..d9cab5a134 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala @@ -89,7 +89,7 @@ final class BigQuery private (val client: Client) { val rows = if (newSource == null) { // newSource is missing, T's companion object must have either table or query if (bqt.isTable) { - tables.rows(STable.Spec(bqt.table.get)) + tables.rows(STable(bqt.table.get)) } else if (bqt.isQuery) { query.rows(bqt.queryRaw.get) } else { @@ -98,7 +98,7 @@ final class BigQuery private (val client: Client) { } else { // newSource can be either table or query Try(BigQueryHelpers.parseTableSpec(newSource)).toOption - .map(STable.Ref) + .map(STable.apply) .map(tables.rows) .getOrElse(query.rows(newSource)) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/QueryOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/QueryOps.scala index ab8395f320..34ee176c4c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/QueryOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/QueryOps.scala @@ -111,7 +111,7 @@ final private[client] class QueryOps(client: Client, tableService: TableOps, job newQueryJob(config).map { job => jobService.waitForJobs(job) - tableService.rows(STable.Ref(job.table)) + tableService.rows(STable(job.table)) }.get } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala index b17542f267..65eee8b4d3 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/dynamic/syntax/SCollectionSyntax.scala @@ -21,7 +21,7 @@ import com.google.api.services.bigquery.model.TableSchema import com.spotify.scio.bigquery.dynamic._ import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation -import com.spotify.scio.bigquery.{TableRow, Writes} +import com.spotify.scio.bigquery.{BigQueryIO, TableRow} import com.spotify.scio.io.{ClosedTap, EmptyTap} import com.spotify.scio.util.Functions import com.spotify.scio.values.SCollection @@ -68,12 +68,14 @@ final class DynamicBigQueryOps[T](private val self: SCollection[T]) extends AnyV .withFormatFunction(Functions.serializableFn(formatFn)) .pipe(w => Option(createDisposition).fold(w)(w.withCreateDisposition)) .pipe(w => Option(writeDisposition).fold(w)(w.withWriteDisposition)) - .pipe(w => Writes.withSuccessfulInsertsPropagation(method, w)(successfulInsertsPropagation)) + .pipe(w => + BigQueryIO.withSuccessfulInsertsPropagation(method, w)(successfulInsertsPropagation) + ) .pipe(w => if (extendedErrorInfo) w.withExtendedErrorInfo() else w) val wr = self.applyInternal(t) val outputs = - Writes.sideOutputs(self, method, successfulInsertsPropagation, extendedErrorInfo, wr) + BigQueryIO.sideOutputs(self, method, successfulInsertsPropagation, extendedErrorInfo, wr) ClosedTap[Nothing](EmptyTap, Some(outputs)) } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala index ff2a6ee079..e7962e4c8d 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala @@ -18,11 +18,8 @@ package com.spotify.scio.bigquery.syntax import com.google.api.services.bigquery.model.TableSchema -import com.spotify.scio.bigquery.BigQueryTyped.Table.{WriteParam => TableWriteParam} -import com.spotify.scio.bigquery.BigQueryTypedTable.{Format, WriteParam => TypedTableWriteParam} import com.spotify.scio.bigquery.TableRowJsonIO.{WriteParam => TableRowJsonWriteParam} import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation -import com.spotify.scio.bigquery.coders import com.spotify.scio.bigquery._ import com.spotify.scio.coders.Coder import com.spotify.scio.io._ @@ -36,6 +33,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{ WriteDisposition } import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} import org.joda.time.Duration import scala.reflect.runtime.universe._ @@ -63,23 +61,26 @@ final class SCollectionTableRowOps[T <: TableRow](private val self: SCollection[ */ def saveAsBigQueryTable( table: Table, - schema: TableSchema = TypedTableWriteParam.DefaultSchema, - writeDisposition: WriteDisposition = TypedTableWriteParam.DefaultWriteDisposition, - createDisposition: CreateDisposition = TypedTableWriteParam.DefaultCreateDisposition, - tableDescription: String = TypedTableWriteParam.DefaultTableDescription, - timePartitioning: TimePartitioning = TypedTableWriteParam.DefaultTimePartitioning, - clustering: Clustering = TypedTableWriteParam.DefaultClustering, - method: Method = TypedTableWriteParam.DefaultMethod, - triggeringFrequency: Duration = TypedTableWriteParam.DefaultTriggeringFrequency, - sharding: Sharding = TypedTableWriteParam.DefaultSharding, + schema: TableSchema = BigQueryIO.WriteParam.DefaultSchema, + writeDisposition: WriteDisposition = BigQueryIO.WriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = BigQueryIO.WriteParam.DefaultCreateDisposition, + tableDescription: String = BigQueryIO.WriteParam.DefaultTableDescription, + timePartitioning: TimePartitioning = BigQueryIO.WriteParam.DefaultTimePartitioning, + clustering: Clustering = BigQueryIO.WriteParam.DefaultClustering, + method: Method = BigQueryIO.WriteParam.DefaultMethod, + triggeringFrequency: Duration = BigQueryIO.WriteParam.DefaultTriggeringFrequency, + sharding: Sharding = BigQueryIO.WriteParam.DefaultSharding, failedInsertRetryPolicy: InsertRetryPolicy = - TypedTableWriteParam.DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = TableWriteParam.DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = TypedTableWriteParam.DefaultExtendedErrorInfo, - configOverride: TypedTableWriteParam.ConfigOverride[TableRow] = - TableWriteParam.DefaultConfigOverride + BigQueryIO.WriteParam.DefaultFailedInsertRetryPolicy, + successfulInsertsPropagation: Boolean = + BigQueryIO.WriteParam.DefaultSuccessfulInsertsPropagation, + extendedErrorInfo: Boolean = BigQueryIO.WriteParam.DefaultExtendedErrorInfo, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.WriteParam.DefaultErrorHandler, + configOverride: BigQueryIO.WriteParam.ConfigOverride[TableRow] = + BigQueryIO.WriteParam.DefaultConfigOverride ): ClosedTap[TableRow] = { - val param = TypedTableWriteParam( + val param = BigQueryIO.WriteParam[TableRow]( + BigQueryIO.Format.Default(), method, schema, writeDisposition, @@ -92,12 +93,13 @@ final class SCollectionTableRowOps[T <: TableRow](private val self: SCollection[ failedInsertRetryPolicy, successfulInsertsPropagation, extendedErrorInfo, + errorHandler, configOverride ) self .covary[TableRow] - .write(BigQueryTypedTable(table, Format.TableRow)(coders.tableRowCoder))(param) + .write(BigQueryIO(table))(param) } /** @@ -169,23 +171,26 @@ final class SCollectionGenericRecordOps[T <: GenericRecord](private val self: SC */ def saveAsBigQueryTable( table: Table, - schema: TableSchema = TypedTableWriteParam.DefaultSchema, - writeDisposition: WriteDisposition = TypedTableWriteParam.DefaultWriteDisposition, - createDisposition: CreateDisposition = TypedTableWriteParam.DefaultCreateDisposition, - tableDescription: String = TypedTableWriteParam.DefaultTableDescription, - timePartitioning: TimePartitioning = TypedTableWriteParam.DefaultTimePartitioning, - clustering: Clustering = TypedTableWriteParam.DefaultClustering, - method: Method = TypedTableWriteParam.DefaultMethod, - triggeringFrequency: Duration = TypedTableWriteParam.DefaultTriggeringFrequency, - sharding: Sharding = TypedTableWriteParam.DefaultSharding, + schema: TableSchema = BigQueryIO.WriteParam.DefaultSchema, + writeDisposition: WriteDisposition = BigQueryIO.WriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = BigQueryIO.WriteParam.DefaultCreateDisposition, + tableDescription: String = BigQueryIO.WriteParam.DefaultTableDescription, + timePartitioning: TimePartitioning = BigQueryIO.WriteParam.DefaultTimePartitioning, + clustering: Clustering = BigQueryIO.WriteParam.DefaultClustering, + method: Method = BigQueryIO.WriteParam.DefaultMethod, + triggeringFrequency: Duration = BigQueryIO.WriteParam.DefaultTriggeringFrequency, + sharding: Sharding = BigQueryIO.WriteParam.DefaultSharding, failedInsertRetryPolicy: InsertRetryPolicy = - TypedTableWriteParam.DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = TableWriteParam.DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = TypedTableWriteParam.DefaultExtendedErrorInfo, - configOverride: TypedTableWriteParam.ConfigOverride[GenericRecord] = - TypedTableWriteParam.DefaultConfigOverride + BigQueryIO.WriteParam.DefaultFailedInsertRetryPolicy, + successfulInsertsPropagation: Boolean = + BigQueryIO.WriteParam.DefaultSuccessfulInsertsPropagation, + extendedErrorInfo: Boolean = BigQueryIO.WriteParam.DefaultExtendedErrorInfo, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.WriteParam.DefaultErrorHandler, + configOverride: BigQueryIO.WriteParam.ConfigOverride[GenericRecord] = + BigQueryIO.WriteParam.DefaultConfigOverride ): ClosedTap[GenericRecord] = { - val param = TypedTableWriteParam( + val param = BigQueryIO.WriteParam[GenericRecord]( + BigQueryIO.Format.Avro(), method, schema, writeDisposition, @@ -198,15 +203,12 @@ final class SCollectionGenericRecordOps[T <: GenericRecord](private val self: SC failedInsertRetryPolicy, successfulInsertsPropagation, extendedErrorInfo, + errorHandler, configOverride ) self .covary[GenericRecord] - .write( - BigQueryTypedTable(table, Format.GenericRecord)( - self.coder.asInstanceOf[Coder[GenericRecord]] - ) - )(param) + .write(BigQueryIO[GenericRecord](table)(self.coder.asInstanceOf[Coder[GenericRecord]]))(param) } } @@ -227,7 +229,7 @@ final class SCollectionTypedOps[T <: HasAnnotation](private val self: SCollectio * case class Result(name: String, score: Double) * * val p: SCollection[Result] = // process data and convert elements to Result - * p.saveAsTypedBigQueryTable(Table.Spec("myproject:mydataset.mytable")) + * p.saveAsTypedBigQueryTable(Table("myproject:mydataset.mytable")) * }}} * * It could also be an empty class with schema from @@ -241,7 +243,7 @@ final class SCollectionTypedOps[T <: HasAnnotation](private val self: SCollectio * * sc.typedBigQuery[Row]() * .sample(withReplacement = false, fraction = 0.1) - * .saveAsTypedBigQueryTable(Table.Spec("myproject:samples.gsod")) + * .saveAsTypedBigQueryTable(Table("myproject:samples.gsod")) * }}} * * * @@ -260,22 +262,30 @@ final class SCollectionTypedOps[T <: HasAnnotation](private val self: SCollectio */ def saveAsTypedBigQueryTable( table: Table, - timePartitioning: TimePartitioning = TableWriteParam.DefaultTimePartitioning, - writeDisposition: WriteDisposition = TableWriteParam.DefaultWriteDisposition, - createDisposition: CreateDisposition = TableWriteParam.DefaultCreateDisposition, - clustering: Clustering = TableWriteParam.DefaultClustering, - method: Method = TableWriteParam.DefaultMethod, - triggeringFrequency: Duration = TableWriteParam.DefaultTriggeringFrequency, - sharding: Sharding = TableWriteParam.DefaultSharding, - failedInsertRetryPolicy: InsertRetryPolicy = TableWriteParam.DefaultFailedInsertRetryPolicy, - successfulInsertsPropagation: Boolean = TableWriteParam.DefaultSuccessfulInsertsPropagation, - extendedErrorInfo: Boolean = TableWriteParam.DefaultExtendedErrorInfo, - configOverride: TableWriteParam.ConfigOverride[T] = TableWriteParam.DefaultConfigOverride + timePartitioning: TimePartitioning = BigQueryIO.WriteParam.DefaultTimePartitioning, + writeDisposition: WriteDisposition = BigQueryIO.WriteParam.DefaultWriteDisposition, + createDisposition: CreateDisposition = BigQueryIO.WriteParam.DefaultCreateDisposition, + clustering: Clustering = BigQueryIO.WriteParam.DefaultClustering, + method: Method = BigQueryIO.WriteParam.DefaultMethod, + triggeringFrequency: Duration = BigQueryIO.WriteParam.DefaultTriggeringFrequency, + sharding: Sharding = BigQueryIO.WriteParam.DefaultSharding, + failedInsertRetryPolicy: InsertRetryPolicy = + BigQueryIO.WriteParam.DefaultFailedInsertRetryPolicy, + successfulInsertsPropagation: Boolean = + BigQueryIO.WriteParam.DefaultSuccessfulInsertsPropagation, + extendedErrorInfo: Boolean = BigQueryIO.WriteParam.DefaultExtendedErrorInfo, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.WriteParam.DefaultErrorHandler, + configOverride: BigQueryIO.WriteParam.ConfigOverride[T] = + BigQueryIO.WriteParam.DefaultConfigOverride )(implicit tt: TypeTag[T], coder: Coder[T]): ClosedTap[T] = { - val param = TableWriteParam[T]( + val bqt = BigQueryType[T] + val param = BigQueryIO.WriteParam[T]( + BigQueryIO.Format.Avro[T](bqt), method, + bqt.schema, writeDisposition, createDisposition, + BigQueryIO.WriteParam.DefaultTableDescription, timePartitioning, clustering, triggeringFrequency, @@ -283,9 +293,10 @@ final class SCollectionTypedOps[T <: HasAnnotation](private val self: SCollectio failedInsertRetryPolicy, successfulInsertsPropagation, extendedErrorInfo, + errorHandler, configOverride ) - self.write(BigQueryTyped.Table[T](table))(param) + self.write(BigQueryIO[T](table))(param) } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala index fa82cffe93..0626cc542d 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala @@ -19,30 +19,26 @@ package com.spotify.scio.bigquery.syntax import com.spotify.scio.ScioContext import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation -import com.spotify.scio.bigquery.{ - BigQuerySelect, - BigQueryStorage, - BigQueryStorageSelect, - BigQueryType, - BigQueryTyped, - Query, - Source, - Table, - TableRow, - TableRowJsonIO -} +import com.spotify.scio.bigquery.{BigQueryIO, Query, Source, Table, TableRow, TableRowJsonIO} import com.spotify.scio.coders.Coder import com.spotify.scio.values._ import scala.reflect.runtime.universe._ -import com.spotify.scio.bigquery.BigQueryTypedTable -import com.spotify.scio.bigquery.BigQueryTypedTable.Format import com.spotify.scio.bigquery.coders.tableRowCoder +import com.spotify.scio.bigquery.types.BigQueryType import org.apache.beam.sdk.io.Compression +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method import org.apache.beam.sdk.io.fs.EmptyMatchTreatment +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} +import org.slf4j.{Logger, LoggerFactory} + +object ScioContextOps { + @transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass) +} /** Enhanced version of [[ScioContext]] with BigQuery methods. */ final class ScioContextOps(private val self: ScioContext) extends AnyVal { + import ScioContextOps._ /** * Get an SCollection for a BigQuery SELECT query. Both @@ -53,36 +49,79 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { */ def bigQuerySelect( sqlQuery: Query, - flattenResults: Boolean - ): SCollection[TableRow] = - self.read(BigQuerySelect(sqlQuery))(BigQuerySelect.ReadParam(flattenResults)) + flattenResults: Boolean = BigQueryIO.ReadParam.DefaultFlattenResults, + configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[TableRow] = { + val params = BigQueryIO.QueryReadParam( + BigQueryIO.Format.Default(), + Method.DEFAULT, + flattenResults, + BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride + ) + self.read(BigQueryIO[TableRow](sqlQuery))(params) + } - /** - * Get an SCollection for a BigQuery SELECT query. Both - * [[https://cloud.google.com/bigquery/docs/reference/legacy-sql Legacy SQL]] and - * [[https://cloud.google.com/bigquery/docs/reference/standard-sql/ Standard SQL]] dialects are - * supported. By default the query dialect will be automatically detected. To override this - * behavior, start the query string with `#legacysql` or `#standardsql`. - */ - def bigQuerySelect( - sqlQuery: Query - ): SCollection[TableRow] = - bigQuerySelect(sqlQuery, BigQuerySelect.ReadParam.DefaultFlattenResults) + def bigQuerySelectFormat[T: Coder]( + sqlQuery: Query, + format: BigQueryIO.Format[T], + flattenResults: Boolean = BigQueryIO.ReadParam.DefaultFlattenResults, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[T] = { + val params = BigQueryIO.QueryReadParam( + format, + Method.DEFAULT, + flattenResults, + BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride + ) + self.read(BigQueryIO[T](sqlQuery))(params) + } - /** Get an SCollection for a BigQuery table. */ - def bigQueryTable(table: Table): SCollection[TableRow] = - bigQueryTable(table, BigQueryTypedTable.Format.TableRow)(tableRowCoder) + def bigQueryTable( + table: Table, + configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[TableRow] = { + if (table.filter.nonEmpty) { + logger.warn( + "Using filtered table with standard API. " + + "selectedFields and rowRestriction are ignored. " + + "Use bigQueryStorage instead" + ) + } + val params = BigQueryIO.TableReadParam( + BigQueryIO.Format.Default(), + Method.DEFAULT, + BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride + ) + self.read(BigQueryIO[TableRow](table))(params) + } - /** - * Get an SCollection for a BigQuery table using the specified [[Format]]. - * - * Reading records as GenericRecord **should** offer better performance over TableRow records. - * - * Note: When using `Format.GenericRecord` Bigquery types DATE, TIME and DATETIME are read as - * STRING. - */ - def bigQueryTable[F: Coder](table: Table, format: Format[F]): SCollection[F] = - self.read(BigQueryTypedTable(table, format)) + def bigQueryTableFormat[T: Coder]( + table: Table, + format: BigQueryIO.Format[T], + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[T] = { + if (table.filter.nonEmpty) { + logger.warn( + "Using filtered table with standard API. " + + "selectedFields and rowRestriction are ignored. " + + "Use bigQueryStorage instead" + ) + } + val params = BigQueryIO.TableReadParam( + format, + Method.DEFAULT, + BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride + ) + self.read(BigQueryIO[T](table))(params) + } /** * Get an SCollection for a BigQuery table using the storage API. @@ -103,10 +142,34 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { */ def bigQueryStorage( table: Table, - selectedFields: List[String] = BigQueryStorage.ReadParam.DefaultSelectFields, - rowRestriction: String = null - ): SCollection[TableRow] = - self.read(BigQueryStorage(table, selectedFields, Option(rowRestriction))) + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[TableRow] = { + val params = BigQueryIO.TableReadParam( + BigQueryIO.Format.Default(), + Method.DIRECT_READ, + errorHandler, + configOverride + ) + self.read(BigQueryIO[TableRow](table))(params) + } + + def bigQueryStorageFormat[T: Coder]( + table: Table, + format: BigQueryIO.Format[T], + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[T] = { + val params = BigQueryIO.TableReadParam( + format, + Method.DIRECT_READ, + errorHandler, + configOverride + ) + self.read(BigQueryIO[T](table))(params) + } /** * Get an SCollection for a BigQuery SELECT query using the storage API. @@ -114,99 +177,108 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * @param query * SQL query */ - def bigQueryStorage(query: Query): SCollection[TableRow] = - self.read(BigQueryStorageSelect(query)) - - def typedBigQuery[T <: HasAnnotation: TypeTag: Coder](): SCollection[T] = - typedBigQuery(None) + def bigQuerySelectStorage( + query: Query, + flattenResults: Boolean = BigQueryIO.ReadParam.DefaultFlattenResults, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[TableRow] = { + val params = BigQueryIO.QueryReadParam( + BigQueryIO.Format.Default(), + Method.DIRECT_READ, + flattenResults, + errorHandler, + configOverride + ) + self.read(BigQueryIO[TableRow](query))(params) + } - def typedBigQuery[T <: HasAnnotation: TypeTag: Coder]( - newSource: Source - ): SCollection[T] = typedBigQuery(Option(newSource)) + def bigQuerySelectStorageFormat[T: Coder]( + query: Query, + format: BigQueryIO.Format[T], + flattenResults: Boolean = BigQueryIO.ReadParam.DefaultFlattenResults, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride + ): SCollection[T] = { + val params = BigQueryIO.QueryReadParam( + format, + Method.DIRECT_READ, + flattenResults, + errorHandler, + configOverride + ) + self.read(BigQueryIO[T](query))(params) + } - /** Get a typed SCollection for BigQuery Table or a SELECT query using the Storage API. */ + /** Get a typed SCollection for BigQuery Table or a SELECT query. */ def typedBigQuery[T <: HasAnnotation: TypeTag: Coder]( - newSource: Option[Source] + source: Source = null, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride ): SCollection[T] = { - val bqt = BigQueryType[T] - if (bqt.isStorage) { - newSource - .asInstanceOf[Option[Table]] - .map(typedBigQueryStorage(_)) - .getOrElse(typedBigQueryStorage()) - } else { - self.read(BigQueryTyped.dynamic[T](newSource)) + val format = BigQueryIO.Format.Avro(BigQueryType[T]) + val io = Option(source) match { + case Some(s) => BigQueryIO[T](s) + case None => BigQueryIO[T] } - } - /** - * Get a typed SCollection for a BigQuery storage API. - * - * Note that `T` must be annotated with - * [[com.spotify.scio.bigquery.types.BigQueryType.fromSchema BigQueryType.fromStorage]] or - * [[com.spotify.scio.bigquery.types.BigQueryType.fromQuery BigQueryType.fromQuery]] - */ - def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder](): SCollection[T] = { - val bqt = BigQueryType[T] - if (bqt.isQuery) { - self.read(BigQueryTyped.StorageQuery[T](Query(bqt.queryRaw.get))) - } else { - val table = Table.Spec(bqt.table.get) - val rr = bqt.rowRestriction - val fields = bqt.selectedFields.getOrElse(BigQueryStorage.ReadParam.DefaultSelectFields) - self.read(BigQueryTyped.Storage[T](table, fields, rr)) + val params = io.source match { + case _: Query => + BigQueryIO.QueryReadParam[T]( + format, + Method.DEFAULT, + BigQueryIO.ReadParam.DefaultFlattenResults, + BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride + ) + case t: Table => + if (t.filter.nonEmpty) { + logger.warn( + "Using filtered table with standard API. " + + "selectedFields and rowRestriction are ignored. " + + "Use typedBigQueryStorage instead" + ) + } + BigQueryIO.TableReadParam[T]( + format, + Method.DEFAULT, + BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride + ) } + self.read(io)(params) } + /** Get a typed SCollection for a BigQuery storage API. */ def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder]( - table: Table - ): SCollection[T] = - self.read( - BigQueryTyped.Storage[T]( - table, - BigQueryType[T].selectedFields.getOrElse(BigQueryStorage.ReadParam.DefaultSelectFields), - BigQueryType[T].rowRestriction - ) - ) - - def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder]( - rowRestriction: String + source: Source = null, + errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, + configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = + BigQueryIO.ReadParam.DefaultConfigOverride ): SCollection[T] = { - val bqt = BigQueryType[T] - val table = Table.Spec(bqt.table.get) - self.read( - BigQueryTyped.Storage[T]( - table, - bqt.selectedFields.getOrElse(BigQueryStorage.ReadParam.DefaultSelectFields), - Option(rowRestriction) - ) - ) - } + val io = Option(source) match { + case Some(s) => BigQueryIO[T](s) + case None => BigQueryIO[T] + } - def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder]( - table: Table, - rowRestriction: String - ): SCollection[T] = - self.read( - BigQueryTyped.Storage[T]( - table, - BigQueryType[T].selectedFields.getOrElse(BigQueryStorage.ReadParam.DefaultSelectFields), - Option(rowRestriction) - ) - ) + val format = BigQueryIO.Format.Avro(BigQueryType[T]) + val params = io.source match { + case _: Query => + BigQueryIO.QueryReadParam[T]( + format, + Method.DIRECT_READ, + BigQueryIO.ReadParam.DefaultFlattenResults, + errorHandler, + configOverride + ) + case _: Table => + BigQueryIO.TableReadParam[T](format, Method.DIRECT_READ, errorHandler, configOverride) + } - def typedBigQueryStorage[T <: HasAnnotation: TypeTag: Coder]( - table: Table, - selectedFields: List[String], - rowRestriction: String - ): SCollection[T] = - self.read( - BigQueryTyped.Storage[T]( - table, - selectedFields, - Option(rowRestriction) - ) - ) + self.read(io)(params) + } /** Get an SCollection for a BigQuery TableRow JSON file. */ def tableRowJsonFile( @@ -214,10 +286,11 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { compression: Compression = TableRowJsonIO.ReadParam.DefaultCompression, emptyMatchTreatment: EmptyMatchTreatment = TableRowJsonIO.ReadParam.DefaultEmptyMatchTreatment, suffix: String = TableRowJsonIO.ReadParam.DefaultSuffix - ): SCollection[TableRow] = + ): SCollection[TableRow] = { self.read(TableRowJsonIO(path))( TableRowJsonIO.ReadParam(compression, emptyMatchTreatment, suffix) ) + } } trait ScioContextSyntax { diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala index 8c83dd24dd..12b490e6bb 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala @@ -17,21 +17,19 @@ package com.spotify.scio.bigquery +import com.google.api.services.bigquery.model.TableReference import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions -import com.google.api.services.bigquery.model.{TableReference, TableSchema} import com.spotify.scio.ScioContext -import com.spotify.scio.avro._ +import com.spotify.scio.bigquery.BigQueryIO.Format import com.spotify.scio.bigquery.client.BigQuery import com.spotify.scio.coders.Coder import com.spotify.scio.io.{FileStorage, Tap, Taps} import com.spotify.scio.values.SCollection -import org.apache.avro.generic.GenericRecord +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method -import scala.jdk.CollectionConverters._ import scala.concurrent.Future import scala.reflect.runtime.universe._ -import com.spotify.scio.bigquery.BigQueryTypedTable.Format -import com.twitter.chill.Externalizer +import scala.jdk.CollectionConverters._ /** Tap for BigQuery TableRow JSON files. */ final case class TableRowJsonTap(path: String, params: TableRowJsonIO.ReadParam) @@ -42,43 +40,19 @@ final case class TableRowJsonTap(path: String, params: TableRowJsonIO.ReadParam) sc.read(TableRowJsonIO(path))(params) } -final case class BigQueryTypedTap[T: Coder](table: Table, fn: (GenericRecord, TableSchema) => T) +/** Tap for BigQuery tables. */ +final case class BigQueryTap[T: Coder](table: Table, params: BigQueryIO.ReadParam[T]) extends Tap[T] { - lazy val client: BigQuery = BigQuery.defaultInstance() - lazy val ts: TableSchema = client.tables.table(table.spec).getSchema - - override def value: Iterator[T] = - client.tables.avroRows(table).map(gr => fn(gr, ts)) - - override def open(sc: ScioContext): SCollection[T] = { - val ser = Externalizer(ts) - // TODO this is inefficient. Migrate to TableRow API ? - val coder = avroGenericRecordCoder - sc.read(BigQueryTypedTable(table, Format.GenericRecord)(coder)).map(gr => fn(gr, ser.get)) + override def value: Iterator[T] = { + val tables = BigQuery.defaultInstance().tables + params.format match { + case f: Format.Default[T] => tables.rows(table).map(f.from) + case f: Format.Avro[T] => tables.avroRows(table).map(f.from) + } } -} - -/** Tap for BigQuery tables. */ -final case class BigQueryTap(table: TableReference) extends Tap[TableRow] { - override def value: Iterator[TableRow] = - BigQuery.defaultInstance().tables.rows(Table.Ref(table)) - override def open(sc: ScioContext): SCollection[TableRow] = - sc.read(BigQueryTypedTable(Table.Ref(table), Format.TableRow)) -} -/** Tap for BigQuery tables using storage api. */ -final case class BigQueryStorageTap(table: Table, readOptions: TableReadOptions) - extends Tap[TableRow] { - override def value: Iterator[TableRow] = - BigQuery.defaultInstance().tables.storageRows(table, readOptions) - override def open(sc: ScioContext): SCollection[TableRow] = - sc.read( - BigQueryStorage( - table, - readOptions.getSelectedFieldsList.asScala.toList, - Option(readOptions.getRowRestriction) - ) - ) + override def open(sc: ScioContext): SCollection[T] = + sc.read(BigQueryIO[T](table))(params) } final case class BigQueryTaps(self: Taps) { @@ -90,11 +64,13 @@ final case class BigQueryTaps(self: Taps) { private lazy val bqc = BigQuery.defaultInstance() /** Get a `Future[Tap[TableRow]]` for BigQuery SELECT query. */ - def bigQuerySelect(sqlQuery: String, flattenResults: Boolean = false): Future[Tap[TableRow]] = + def bigQuerySelect(sqlQuery: String): Future[Tap[TableRow]] = mkTap( s"BigQuery SELECT: $sqlQuery", () => isQueryDone(sqlQuery), - () => BigQuerySelect(Query(sqlQuery)).tap(BigQuerySelect.ReadParam(flattenResults)) + () => + BigQueryIO[TableRow](Query(sqlQuery)) + .tap(BigQueryIO.QueryReadParam(BigQueryIO.Format.Default(), Method.DEFAULT)) ) /** Get a `Future[Tap[TableRow]]` for BigQuery table. */ @@ -102,7 +78,9 @@ final case class BigQueryTaps(self: Taps) { mkTap( s"BigQuery Table: $table", () => bqc.tables.exists(table), - () => BigQueryTypedTable(Table.Ref(table), Format.TableRow).tap(()) + () => + BigQueryIO[TableRow](Table(table)) + .tap(BigQueryIO.TableReadParam(BigQueryIO.Format.Default(), Method.DEFAULT)) ) /** Get a `Future[Tap[TableRow]]` for BigQuery table. */ @@ -111,25 +89,15 @@ final case class BigQueryTaps(self: Taps) { /** Get a `Future[Tap[T]]` for typed BigQuery source. */ def typedBigQuery[T <: HasAnnotation: TypeTag: Coder]( - newSource: String = null + newSource: Option[Source] = None ): Future[Tap[T]] = { val bqt = BigQueryType[T] - lazy val table = - scala.util.Try(BigQueryHelpers.parseTableSpec(newSource)).toOption - val rows = - newSource match { - // newSource is missing, T's companion object must have either table or query - case null if bqt.isTable => - bigQueryTable(bqt.table.get) - case null if bqt.isQuery => - bigQuerySelect(bqt.queryRaw.get) - case null => - throw new IllegalArgumentException(s"Missing table or query field in companion object") - case _ if table.isDefined => - bigQueryTable(table.get) - case _ => - bigQuerySelect(newSource) - } + val rows = newSource match { + case Some(q: Query) => bigQuerySelect(q.underlying) + case Some(t: Table) => bigQueryTable(t.ref) + case None if bqt.isQuery => bigQuerySelect(bqt.queryRaw.get) + case _ => bigQueryTable(bqt.table.get) + } import scala.concurrent.ExecutionContext.Implicits.global rows.map(_.map(bqt.fromTableRow)) } @@ -145,6 +113,12 @@ final case class BigQueryTaps(self: Taps) { () => TableRowJsonTap(path, params) ) + def bigQueryStorage( + tableSpec: String, + readOptions: TableReadOptions + ): Future[Tap[TableRow]] = + bigQueryStorage(BigQueryHelpers.parseTableSpec(tableSpec), readOptions) + def bigQueryStorage( table: TableReference, readOptions: TableReadOptions @@ -153,9 +127,12 @@ final case class BigQueryTaps(self: Taps) { s"BigQuery direct read table: $table", () => bqc.tables.exists(table), () => { + val format = BigQueryIO.Format.Default() val selectedFields = readOptions.getSelectedFieldsList.asScala.toList val rowRestriction = Option(readOptions.getRowRestriction) - BigQueryStorage(Table.Ref(table), selectedFields, rowRestriction).tap(()) + val filter = Table.Filter(selectedFields, rowRestriction) + val source = Table(table, filter) + BigQueryIO[TableRow](source).tap(BigQueryIO.TableReadParam(format, Method.DIRECT_READ)) } ) @@ -163,16 +140,16 @@ final case class BigQueryTaps(self: Taps) { table: TableReference, readOptions: TableReadOptions ): Future[Tap[T]] = { - val fn = BigQueryType[T].fromTableRow mkTap( s"BigQuery direct read table: $table", () => bqc.tables.exists(table), () => { + val format = BigQueryIO.Format.Avro(BigQueryType[T]) val selectedFields = readOptions.getSelectedFieldsList.asScala.toList val rowRestriction = Option(readOptions.getRowRestriction) - BigQueryStorage(Table.Ref(table), selectedFields, rowRestriction) - .tap(()) - .map(fn) + val filter = Table.Filter(selectedFields, rowRestriction) + val source = Table(table, filter) + BigQueryIO[T](source).tap(BigQueryIO.TableReadParam(format, Method.DIRECT_READ)) } ) } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala index 45fbe21966..60c00d371b 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala @@ -17,21 +17,25 @@ package com.spotify.scio.bigquery +import com.google.api.services.bigquery.model.{Table => GTAble, TableReference} +import com.google.cloud.bigquery.storage.v1._ +import com.google.protobuf.ByteString import com.spotify.scio.avro._ -import com.spotify.scio.bigquery.BigQueryTypedTable.Format -import com.spotify.scio.coders.Coder -import com.spotify.scio.{ContextAndArgs, ScioContext} +import com.spotify.scio.bigquery.types.BigQueryType +import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.testing._ +import com.spotify.scio.{ContextAndArgs, ScioContext} import org.apache.avro.generic.GenericRecord -import org.apache.beam.sdk.Pipeline.PipelineVisitor -import org.apache.beam.sdk.io.gcp.{bigquery => beam} -import org.apache.beam.sdk.io.Read import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition -import org.apache.beam.sdk.runners.TransformHierarchy +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient +import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.FakeBigQueryServerStream +import org.apache.beam.sdk.io.gcp.testing.{FakeBigQueryServices, FakeDatasetService, FakeJobService} +import org.apache.beam.sdk.io.gcp.{bigquery => beam} import org.apache.beam.sdk.transforms.display.DisplayData -import org.apache.beam.sdk.values.PValue +import org.apache.beam.sdk.util.CoderUtils +import org.scalatest.BeforeAndAfterAll -import scala.collection.mutable import scala.jdk.CollectionConverters._ object BigQueryIOTest { @@ -45,53 +49,72 @@ object BigQueryIOTest { "tableDescription" ) - /** - * Return `Read` Transforms that do not have another transform using it as an input. - * - * To do this, we visit all PTransforms, and find the inputs at each stage, and mark those inputs - * as consumed by putting them in `consumedOutputs`. We also check if each transform is a `Read` - * and if so we extract them as well. - * - * This is copied from Beam's test for UnconsumedReads. - */ - def unconsumedReads(context: ScioContext): Set[PValue] = { - val consumedOutputs = mutable.HashSet[PValue]() - val allReads = mutable.HashSet[PValue]() - - context.pipeline.traverseTopologically( - new PipelineVisitor.Defaults { - override def visitPrimitiveTransform(node: TransformHierarchy#Node): Unit = - consumedOutputs ++= node.getInputs.values().asScala - - override def visitValue( - value: PValue, - producer: TransformHierarchy#Node - ): Unit = - producer.getTransform match { - case _: Read.Bounded[_] | _: Read.Unbounded[_] => - allReads += value - case _ => - } - } - ) + final class FakeStorageClient(data: Seq[BQRecord]) extends StorageClient with Serializable { + + @transient + private lazy val bqt: BigQueryType[BQRecord] = BigQueryType[BQRecord] + + override def createReadSession(request: CreateReadSessionRequest): ReadSession = ReadSession + .newBuilder() + .setName("session") + .setAvroSchema(AvroSchema.newBuilder().setSchema(bqt.avroSchema.toString())) + .addStreams(ReadStream.newBuilder().setName("stream")) + .setDataFormat(DataFormat.AVRO) + .build() + + override def readRows( + request: ReadRowsRequest + ): BigQueryServices.BigQueryServerStream[ReadRowsResponse] = { + val bcoder = CoderMaterializer.beamWithDefault(avroGenericRecordCoder(bqt.avroSchema)) + val bytes = data + .foldLeft(ByteString.newOutput()) { (bs, r) => + bs.write(CoderUtils.encodeToByteArray(bcoder, bqt.toAvro(r))) + bs + } + .toByteString + + new FakeBigQueryServerStream( + List( + ReadRowsResponse + .newBuilder() + .setAvroRows(AvroRows.newBuilder().setSerializedBinaryRows(bytes).build()) + .setRowCount(data.size.toLong) + .build() + ).asJava + ) + } + + override def readRows( + request: ReadRowsRequest, + fullTableId: String + ): BigQueryServices.BigQueryServerStream[ReadRowsResponse] = readRows(request) + + override def splitReadStream(request: SplitReadStreamRequest): SplitReadStreamResponse = ??? + override def splitReadStream( + request: SplitReadStreamRequest, + fullTableId: String + ): SplitReadStreamResponse = ??? - allReads.diff(consumedOutputs).toSet + override def close(): Unit = () } } -final class BigQueryIOTest extends ScioIOSpec { +final class BigQueryIOTest extends ScioIOSpec with BeforeAndAfterAll { import BigQueryIOTest._ + override def beforeAll(): Unit = + FakeDatasetService.setUp() + "BigQueryIO" should "apply config override" in { val name = "saveAsBigQueryTable" val desc = "table-description" val sc = ScioContext() implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder - val io = BigQueryTypedTable[GenericRecord]( - table = Table.Spec("project:dataset.out_table"), - format = Format.GenericRecord + val io = BigQueryIO[GenericRecord]( + Table("project:dataset.out_table") ) - val params = BigQueryTypedTable.WriteParam[GenericRecord]( + val params = BigQueryIO.WriteParam[GenericRecord]( + format = BigQueryIO.Format.Avro(), createDisposition = CreateDisposition.CREATE_NEVER, configOverride = _.withTableDescription(desc) ) @@ -111,78 +134,96 @@ final class BigQueryIOTest extends ScioIOSpec { val xs = (1 to 100).map(x => TableRow("x" -> x.toString)) testJobTest(xs, in = "project:dataset.in_table", out = "project:dataset.out_table")( BigQueryIO(_) - )((sc, s) => sc.bigQueryTable(Table.Spec(s)))((coll, s) => - coll.saveAsBigQueryTable(Table.Spec(s)) - ) - } - - /** - * The `BigQueryIO`'s write, runs Beam's BQ IO which creates a `Read` Transform to return the - * insert errors. - * - * The `saveAsBigQuery` or `saveAsTypedBigQuery` in Scio is designed to return a `ClosedTap` and - * by default drops insert errors. - * - * The following tests make sure that the dropped insert errors do not appear as an unconsumed - * read outside the transform writing to Big Query. - */ - it should "not have unconsumed errors with saveAsBigQuery" in { - val xs = (1 to 100).map(x => TableRow("x" -> x.toString)) - - val context = ScioContext() - context - .parallelize(xs) - .saveAsBigQueryTable(Table.Spec("project:dataset.dummy"), createDisposition = CREATE_NEVER) - // We want to validate on the job graph, and we need not actually execute the pipeline. - - unconsumedReads(context) shouldBe empty - } - - it should "not have unconsumed errors with saveAsTypedBigQuery" in { - val xs = (1 to 100).map(x => BQRecord(x, x.toString, (1 to x).map(_.toString).toList)) - - val context = ScioContext() - context - .parallelize(xs) - .saveAsTypedBigQueryTable( - Table.Spec("project:dataset.dummy"), - createDisposition = CREATE_NEVER - ) - // We want to validate on the job graph, and we need not actually execute the pipeline. - - unconsumedReads(context) shouldBe empty + )((sc, s) => sc.bigQueryTable(Table(s)))((coll, s) => coll.saveAsBigQueryTable(Table(s))) } it should "read the same input table with different predicate and projections using bigQueryStorage" in { - JobTest[JobWithDuplicateInput.type] .args("--input=table.in") .input( - BigQueryIO[TableRow]("table.in", List("a"), Some("a > 0")), + BigQueryIO[TableRow](Table("table.in", List("a"), "a > 0")), (1 to 3).map(x => TableRow("x" -> x.toString)) ) .input( - BigQueryIO[TableRow]("table.in", List("b"), Some("b > 0")), + BigQueryIO[TableRow](Table("table.in", List("b"), "b > 0")), (1 to 3).map(x => TableRow("x" -> x.toString)) ) .run() - } it should "read the same input table with different predicate and projections using typedBigQueryStorage" in { - JobTest[TypedJobWithDuplicateInput.type] .args("--input=table.in") .input( - BigQueryIO[BQRecord]("table.in", List("a"), Some("a > 0")), + BigQueryIO[BQRecord](Table("table.in", List("a"), "a > 0")), (1 to 3).map(x => BQRecord(x, x.toString, (1 to x).map(_.toString).toList)) ) .input( - BigQueryIO[BQRecord]("table.in", List("b"), Some("b > 0")), + BigQueryIO[BQRecord](Table("table.in", List("b"), "b > 0")), (1 to 3).map(x => BQRecord(x, x.toString, (1 to x).map(_.toString).toList)) ) .run() + } + + it should "propagate errors if handler is set" in { + val ref = new TableReference() + .setProjectId("project") + .setDatasetId("dataset") + .setTableId("table") + val bqt = BigQueryType[BQRecord] + val data = Seq( + BQRecord(1, "a", List("1")), + BQRecord(2, "b", List("2")) + ) + + val failingFormat = BigQueryIO.Format.Avro( + bqt.fromAvro.andThen(r => if (r.i % 2 == 0) throw new Exception("fail") else r), + bqt.toAvro + ) + + val fakeDatasetService = new FakeDatasetService() + val fakeJobService = new FakeJobService() + val fakeStorageClient = new FakeStorageClient(data) + + val fakeBqServices = new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withJobService(fakeJobService) + .withStorageClient(fakeStorageClient) + + try { + fakeDatasetService.createDataset(ref.getProjectId, ref.getDatasetId, "US", "desc", -1) + fakeDatasetService.createTable(new GTAble().setTableReference(ref).setSchema(bqt.schema)) + fakeDatasetService.insertAll(ref, data.map(bqt.toTableRow).asJava, null) + + runWithRealContext() { sc => + val table = Table(ref) + val errors = sc.errorSink() + sc.bigQueryStorageFormat[BQRecord]( + table, + failingFormat, + errorHandler = errors.handler, + configOverride = _.withTestServices(fakeBqServices) + ) + + val recordWithFailure = errors.sink.map { br => + val record = br.getRecord.getHumanReadableJsonRecord + val desc = br.getFailure.getDescription + val exception = br.getFailure.getException + (record, desc, exception) + } + recordWithFailure should containSingleValue( + ( + """{"i": 2, "s": "b", "r": ["2"]}""", + "Unable to parse record reading from BigQuery", + "java.lang.Exception: fail" + ) + ) + } + } finally { + fakeDatasetService.close() + fakeJobService.close() + } } "TableRowJsonIO" should "work" in { @@ -196,8 +237,8 @@ final class BigQueryIOTest extends ScioIOSpec { object JobWithDuplicateInput { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) - sc.bigQueryStorage(Table.Spec(args("input")), List("a"), "a > 0") - sc.bigQueryStorage(Table.Spec(args("input")), List("b"), "b > 0") + sc.bigQueryStorage(Table(args("input"), List("a"), "a > 0")) + sc.bigQueryStorage(Table(args("input"), List("b"), "b > 0")) sc.run() () } @@ -208,8 +249,8 @@ object TypedJobWithDuplicateInput { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) - sc.typedBigQueryStorage[BQRecord](Table.Spec(args("input")), List("a"), "a > 0") - sc.typedBigQueryStorage[BQRecord](Table.Spec(args("input")), List("b"), "b > 0") + sc.typedBigQueryStorage[BQRecord](Table(args("input"), List("a"), "a > 0")) + sc.typedBigQueryStorage[BQRecord](Table(args("input"), List("b"), "b > 0")) sc.run() () } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryTypesTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryTypesTest.scala index 137eb836c6..b15297b21a 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryTypesTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryTypesTest.scala @@ -21,7 +21,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.flatspec.AnyFlatSpec class BigQueryTypesTest extends AnyFlatSpec with Matchers { - "Table.Spec" should "fail malformed spec" in { - an[IllegalArgumentException] shouldBe thrownBy(Table.Spec("bad spec")) + "Table" should "fail malformed spec" in { + an[IllegalArgumentException] shouldBe thrownBy(Table("bad spec")) } } diff --git a/site/src/main/paradox/FAQ.md b/site/src/main/paradox/FAQ.md index 9db67a0bfe..82fcf9674b 100644 --- a/site/src/main/paradox/FAQ.md +++ b/site/src/main/paradox/FAQ.md @@ -553,7 +553,7 @@ def main(cmdlineArgs: Array[String]): Unit = { val p: SCollection[(String, Int)] = ??? p.map(kv => Result(kv._1, kv._2)) - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) + .saveAsTypedBigQueryTable(Table(args("output"))) } ``` diff --git a/site/src/main/paradox/io/BigQuery.md b/site/src/main/paradox/io/BigQuery.md index 7bf3b718cf..65fd4afcca 100644 --- a/site/src/main/paradox/io/BigQuery.md +++ b/site/src/main/paradox/io/BigQuery.md @@ -222,7 +222,7 @@ def main(cmdlineArgs: Array[String]): Unit = { .flatMap(r => if (r.tornado.getOrElse(false)) Seq(r.month) else Nil) .countByValue .map(kv => Result(kv._1, kv._2)) - .saveAsTypedBigQueryTable(Table.Spec(args("output"))) // schema from Row.schema + .saveAsTypedBigQueryTable(Table(args("output"))) // schema from Row.schema sc.run() () } From 513118fb01f3b5f8dad4a31e70a6c0dc63292aed Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Wed, 21 Aug 2024 16:54:51 +0200 Subject: [PATCH 2/3] Expose java beam error handler API --- scio-core/src/main/scala/com/spotify/scio/ScioContext.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala index 0668087905..9153695cd2 100644 --- a/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala +++ b/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala @@ -51,6 +51,7 @@ import scala.reflect.ClassTag import scala.util.control.NoStackTrace import scala.util.{Failure, Success, Try} import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions +import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} /** Runner specific context. */ trait RunnerContext { @@ -854,6 +855,11 @@ class ScioContext private[scio] ( // ======================================================================= // Error handler // ======================================================================= + def registerBadRecordErrorHandler[T <: POutput]( + sinkTransform: PTransform[PCollection[BadRecord], T] + ): ErrorHandler[BadRecord, T] = + pipeline.registerBadRecordErrorHandler(sinkTransform) + def errorSink(): ErrorSink = ErrorSink(this) // ======================================================================= From 2d664403cb46b17341ac1567bd4fa32a539adb69 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Wed, 21 Aug 2024 16:58:31 +0200 Subject: [PATCH 3/3] Strict table expectation on export API --- .../spotify/scio/bigquery/BigQueryTypes.scala | 16 ++++++ .../bigquery/syntax/ScioContextSyntax.scala | 56 ++++--------------- 2 files changed, 28 insertions(+), 44 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala index 919768598a..4c49a08585 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala @@ -130,6 +130,22 @@ case class Table private (ref: GTableReference, filter: Option[Table.Filter]) ex } object Table { + + /** + * @param selectedFields + * names of the fields in the table that should be read. If empty, all fields will be read. If + * the specified field is a nested field, all the sub-fields in the field will be selected. + * Fields will always appear in the generated class in the same order as they appear in the + * table, regardless of the order specified in selectedFields. + * @param rowRestriction + * SQL text filtering statement, similar ti a WHERE clause in a query. Currently, we support + * combinations of predicates that are a comparison between a column and a constant value in SQL + * statement. Aggregates are not supported. For example: + * + * {{{ + * "a > DATE '2014-09-27' AND (b > 5 AND c LIKE 'date')" + * }}} + */ final case class Filter( selectedFields: List[String], rowRestriction: Option[String] diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala index 0626cc542d..d8e3b19354 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala @@ -30,15 +30,9 @@ import org.apache.beam.sdk.io.Compression import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method import org.apache.beam.sdk.io.fs.EmptyMatchTreatment import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} -import org.slf4j.{Logger, LoggerFactory} - -object ScioContextOps { - @transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass) -} /** Enhanced version of [[ScioContext]] with BigQuery methods. */ final class ScioContextOps(private val self: ScioContext) extends AnyVal { - import ScioContextOps._ /** * Get an SCollection for a BigQuery SELECT query. Both @@ -85,13 +79,10 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = BigQueryIO.ReadParam.DefaultConfigOverride ): SCollection[TableRow] = { - if (table.filter.nonEmpty) { - logger.warn( - "Using filtered table with standard API. " + - "selectedFields and rowRestriction are ignored. " + - "Use bigQueryStorage instead" - ) - } + require( + table.filter.isEmpty, + "Cannot use filtered table with standard API. Use bigQueryStorage instead" + ) val params = BigQueryIO.TableReadParam( BigQueryIO.Format.Default(), Method.DEFAULT, @@ -107,13 +98,10 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = BigQueryIO.ReadParam.DefaultConfigOverride ): SCollection[T] = { - if (table.filter.nonEmpty) { - logger.warn( - "Using filtered table with standard API. " + - "selectedFields and rowRestriction are ignored. " + - "Use bigQueryStorage instead" - ) - } + require( + table.filter.isEmpty, + "Cannot use filtered table with standard API. Use bigQueryStorageFormat instead" + ) val params = BigQueryIO.TableReadParam( format, Method.DEFAULT, @@ -123,23 +111,6 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { self.read(BigQueryIO[T](table))(params) } - /** - * Get an SCollection for a BigQuery table using the storage API. - * - * @param selectedFields - * names of the fields in the table that should be read. If empty, all fields will be read. If - * the specified field is a nested field, all the sub-fields in the field will be selected. - * Fields will always appear in the generated class in the same order as they appear in the - * table, regardless of the order specified in selectedFields. - * @param rowRestriction - * SQL text filtering statement, similar ti a WHERE clause in a query. Currently, we support - * combinations of predicates that are a comparison between a column and a constant value in SQL - * statement. Aggregates are not supported. For example: - * - * {{{ - * "a > DATE '2014-09-27' AND (b > 5 AND c LIKE 'date')" - * }}} - */ def bigQueryStorage( table: Table, errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler, @@ -234,13 +205,10 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { configOverride ) case t: Table => - if (t.filter.nonEmpty) { - logger.warn( - "Using filtered table with standard API. " + - "selectedFields and rowRestriction are ignored. " + - "Use typedBigQueryStorage instead" - ) - } + require( + t.filter.isEmpty, + "Cannot use filtered table with standard API. Use typedBigQuery instead" + ) BigQueryIO.TableReadParam[T]( format, Method.DEFAULT,