diff --git a/build.sbt b/build.sbt index 0e22f1fd7c..81fe695816 100644 --- a/build.sbt +++ b/build.sbt @@ -814,9 +814,10 @@ lazy val `scio-google-cloud-platform` = project "org.apache.beam" % "beam-sdks-java-extensions-google-cloud-platform-core" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion, "org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion, + "org.apache.commons" % "commons-lang3" % commonsLang3Version, "org.slf4j" % "slf4j-api" % slf4jVersion, + "com.google.cloud" % "google-cloud-storage" % googleCloudStorageVersion, // test - "com.google.cloud" % "google-cloud-storage" % googleCloudStorageVersion % Test, "com.spotify" %% "magnolify-cats" % magnolifyVersion % Test, "com.spotify" %% "magnolify-scalacheck" % magnolifyVersion % Test, "org.hamcrest" % "hamcrest" % hamcrestVersion % Test, diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala index bebbadfba6..60599bdf74 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala @@ -33,6 +33,7 @@ import com.google.auth.http.HttpCredentialsAdapter import com.google.auth.oauth2.{GoogleCredentials, ImpersonatedCredentials} import com.google.cloud.bigquery.storage.v1beta1.{BigQueryStorageClient, BigQueryStorageSettings} import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer +import com.google.cloud.storage.{Storage, StorageOptions} import com.spotify.scio.bigquery.{Table => STable} import com.spotify.scio.bigquery.client.BigQuery.Client import com.spotify.scio.bigquery.client.BigQueryConfig.ImpersonationInfo @@ -331,5 +332,8 @@ object BigQuery { .build() BigQueryStorageClient.create(settings) } + + lazy val blobStorage: Storage = + StorageOptions.newBuilder.setProjectId(project).setCredentials(_credentials).build.getService } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala index 3ef534a5a8..646e123810 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/LoadOps.scala @@ -18,15 +18,22 @@ package com.spotify.scio.bigquery.client import com.google.api.services.bigquery.model._ +import com.google.cloud.storage.{BlobId, BlobInfo, Storage} import com.spotify.scio.bigquery.client.BigQuery.Client -import com.spotify.scio.bigquery.{BigQueryUtil, CREATE_IF_NEEDED, WRITE_APPEND} +import com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation +import com.spotify.scio.bigquery.{BigQueryType, BigQueryUtil, CREATE_IF_NEEDED, WRITE_APPEND} +import org.apache.avro.file.DataFileWriter +import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition} import org.apache.beam.sdk.io.gcp.{bigquery => bq} +import org.apache.commons.lang3.RandomStringUtils import org.slf4j.LoggerFactory +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.util.Try +import scala.reflect.runtime.universe.TypeTag private[client] object LoadOps { private val Logger = LoggerFactory.getLogger(this.getClass) @@ -119,6 +126,60 @@ final private[client] class LoadOps(client: Client, jobService: JobOps) { location = location ) + /** + * Upload List of rows to Cloud Storage as Avro file and load to BigQuery table. Note that element + * type `T` must be annotated with [[BigQueryType]]. + */ + def uploadTypedRows[T <: HasAnnotation: TypeTag]( + tableSpec: String, + rows: List[T], + tempLocation: String, + writeDisposition: WriteDisposition = WriteDisposition.WRITE_APPEND, + createDisposition: CreateDisposition = CreateDisposition.CREATE_IF_NEEDED + ): Try[TableReference] = { + val bqt = BigQueryType[T] + + Try { + val out = new ByteArrayOutputStream() + val datumWriter = new GenericDatumWriter[GenericRecord]() + val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) + try { + dataFileWriter.create(bqt.avroSchema, out) + rows.foreach { row => + dataFileWriter.append(bqt.toAvro(row)) + } + } finally { + dataFileWriter.close() + } + + val blobId = + BlobId.fromGsUtilUri( + s"${tempLocation.stripSuffix("/")}/upload_${RandomStringUtils.randomAlphanumeric(10)}.avro" + ) + val blobInfo = BlobInfo.newBuilder(blobId).build + client.blobStorage.createFrom( + blobInfo, + new ByteArrayInputStream(out.toByteArray), + Storage.BlobWriteOption.doesNotExist(), + Storage.BlobWriteOption.crc32cMatch() + ) + + blobId + }.flatMap { blobId => + try { + avro( + List(blobId.toGsUtilUri), + tableSpec, + schema = Some(bqt.schema), + createDisposition = createDisposition, + writeDisposition = writeDisposition + ) + } finally { + client.blobStorage.delete(blobId) + } + } + } + @nowarn("msg=private default argument in class LoadOps is never used") private def execute( sources: List[String],