Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue-70 Fix the repartitioning logic to handle statement IDs #81

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/it/scala/com/qubole/spark/hiveacid/MergeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,60 @@ class MergeSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll
assert(thrown.getMessage().contains("UPDATE on the partition columns are not allowed"))
}

test("Check for Merge update on multi statements with 1 bucket") {
/** In this test following is done:
* ** Insert into target table DF with statement id 1. Insert should just create one bucket file i.e., bucket0000
* ** Insert into target table DF with no statement id. Insert should just create one bucket file i.e., bucket0000
* ** Note encoded bucket id in both the above rows will be different due to difference in statement Id
* ** Try to update one row from each of the above transaction. It is expected that both are updated
*/
val spark = helper.spark
import spark.sqlContext.implicits._
val targetTable = s"$DEFAULT_DBNAME.target_bucket1"
val sourceTable = s"$DEFAULT_DBNAME.source_bucket1"

helper.hiveExecute(s"create table $targetTable (i int) stored as orc tblproperties('transactional'='true')")
val df1 = spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF().repartition(1)
val htable = HiveAcidTable.fromSparkSession(spark, targetTable)
htable.insertInto(df1, Some(1))
val df2 = spark.sparkContext.parallelize(Seq(4, 5, 6)).toDF().repartition(1)
htable.insertInto(df2)
helper.hiveExecute(s"create table $sourceTable (i int) stored as orc tblproperties('transactional' = 'true')")
helper.sparkSQL(s"insert into $sourceTable values (1), (4)")
helper.sparkSQL(s"Merge into $targetTable t using $sourceTable s on t.i = s.i when matched then update set i = s.i + 1")

val res = helper.sparkCollect(s"select * from $targetTable order by i")
val expected = s"2\n2\n3\n5\n5\n6"
helper.compareResult(expected, res)
}

test("Check for Merge update on multi statements with 2 buckets") {
/** In this test following is done:
* ** Insert into target table DF with statement id 1. Insert should just create two bucket file i.e., bucket0000, bucket0001
* ** Insert into target table DF with no statement id. Insert should just create one bucket file i.e., bucket0000, bucket0001
* ** Note encoded bucket id in rows of different transaction will be different due to difference in statement Id.
* ** Try to update all the rows from each of the above transaction. It is expected that all rows are updated.
*/
val spark = helper.spark
import spark.sqlContext.implicits._
val targetTable = s"$DEFAULT_DBNAME.target_bucket2"
val sourceTable = s"$DEFAULT_DBNAME.source_bucket2"

helper.hiveExecute(s"create table $targetTable (i int) stored as orc tblproperties('transactional'='true')")
val df1 = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).toDF().repartition(2)
val htable = HiveAcidTable.fromSparkSession(spark, targetTable)
htable.insertInto(df1, Some(1))
val df2 = spark.sparkContext.parallelize(Seq(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)).toDF().repartition(2)
htable.insertInto(df2)
helper.hiveExecute(s"create table $sourceTable (i int) stored as orc tblproperties('transactional' = 'true')")
helper.sparkSQL(s"insert into $sourceTable select * from $targetTable")
helper.sparkSQL(s"Merge into $targetTable t using $sourceTable s on t.i = s.i when matched then update set i = s.i + 1")

val res = helper.sparkCollect(s"select * from $targetTable order by i")
val expected = s"2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14\n15\n16\n17\n18\n19\n20\n21"
helper.compareResult(expected, res)
}

// Merge test for full acid tables
def mergeTestWithJustInsert(tType: String, isPartitioned: Boolean): Unit = {
val tableNameSpark = if (isPartitioned) {
Expand Down
29 changes: 22 additions & 7 deletions src/main/scala/com/qubole/spark/hiveacid/writer/TableWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ package com.qubole.spark.hiveacid.writer

import scala.collection.JavaConverters._
import scala.language.implicitConversions

import com.qubole.spark.hiveacid._
import com.qubole.spark.hiveacid.hive.HiveAcidMetadata
import com.qubole.spark.hiveacid.writer.hive.{HiveAcidFullAcidWriter, HiveAcidInsertOnlyWriter, HiveAcidWriterOptions}
import com.qubole.spark.hiveacid.transaction._
import com.qubole.spark.hiveacid.util.SerializableConfiguration
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command.AlterTableAddPartitionCommand
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -151,19 +149,36 @@ private[hiveacid] class TableWriter(sparkSession: SparkSession,

val resultRDD =
operationType match {
// In order to read data from delete delts, hive uses mergesort, which requires
// In order to read data from delete deltas, hive uses mergesort, which requires
// originalWriteId, bucket, and rowId in ascending order, and currentWriteId in descending order.
// We take care of originalWriteId, bucket, and rowId in asc order here. We only write file per bucket-transaction,
// hence currentWriteId remains same throughout the file and doesn't need ordering.
//
// Deleted rowId needs to be in same bucketed file name as the original row. To achieve this,
// we repartition into 4096 partitions (i.e maximum number of buckets).
// we repartition into 4096 partitions (i.e maximum number of buckets) based on bucket Id.
// This ensures all rows of one bucket goes to same partition.
//
// ************** Repartitioning Logic *******************
//
// rowId.bucketId is composed of following.

// top 3 bits - version code.
// next 1 bit - reserved for future
// next 12 bits - the bucket ID
// next 4 bits reserved for future
// remaining 12 bits - the statement ID - 0-based numbering of all statements within a
// transaction. Each leg of a multi-insert statement gets a separate statement ID.
// The reserved bits align it so that it easier to interpret it in Hex.
//
// We need to repartition only on the basis of 12 bits representing bucketID
// We extract by
// rowId.bucketId OR 268369920 (0b00001111111111110000000000000000) >>> (rightshift) by 16 bits
//
// There is still a chance that rows from multiple buckets go to same partition as well, but this is expected to work!
case HiveAcidOperation.DELETE | HiveAcidOperation.UPDATE =>
df.repartition(MAX_NUMBER_OF_BUCKETS, col("rowId.bucketId"))
.toDF.sortWithinPartitions("rowId.bucketId", "rowId.writeId", "rowId.rowId")
logInfo("New repartitioning logic in play")
df.repartition(MAX_NUMBER_OF_BUCKETS, functions.expr("shiftright(rowId.bucketId & 268369920, 16)"))
.toDF.sortWithinPartitions("rowId.writeId", "rowId.bucketId", "rowId.rowId")
.toDF.queryExecution.executedPlan.execute()
case HiveAcidOperation.INSERT_OVERWRITE | HiveAcidOperation.INSERT_INTO =>
df.queryExecution.executedPlan.execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private[writer] class HiveAcidFullAcidWriter(options: WriterOptions,
}

override protected def createWriter(path: Path, acidBucketId: Int): Any = {

logInfo(s"Create Writer for path:$path and bucketId: $acidBucketId")
val tableDesc = HiveAcidOptions.getFileSinkDesc.getTableInfo

val recordUpdater = HiveFileFormatUtils.getAcidRecordUpdater(
Expand Down Expand Up @@ -224,8 +224,10 @@ private[writer] class HiveAcidFullAcidWriter(options: WriterOptions,
def safeDeletePath(fs: FileSystem, path: Path): Unit = {
try {
if (path != null && fs.exists(path)) {
logInfo(s"safeDelete: $path exists")
fs.delete(path, false)
}
logInfo(s"safeDelete: $path doesn't exist")
} catch {
case e: Exception =>
logWarning(s"Error while trying to delete $path" +
Expand All @@ -235,13 +237,15 @@ private[writer] class HiveAcidFullAcidWriter(options: WriterOptions,

if (createDelta) {
// Delete delta bucket file if exists. It can exist in the cases of task retries.
logInfo(s"Trying to safe remove delta path: ${path.toString}")
safeDeletePath(fs, AcidUtils.createFilename(path, acidOutputFormatOptions))
createVersionFile(acidOutputFormatOptions)
}

if (createDeleteDelta) {
// Delete delete_delta bucket file if it exists. It can exist in the cases of task retries.
val deleteDeltaOptions = acidOutputFormatOptions.clone().writingDeleteDelta(true)
logInfo(s"Trying to safe remove delete delta path: ${path.toString}")
safeDeletePath(fs, AcidUtils.createFilename(path, deleteDeltaOptions))
createVersionFile(deleteDeltaOptions)
}
Expand Down Expand Up @@ -279,9 +283,11 @@ private[writer] class HiveAcidFullAcidWriter(options: WriterOptions,
val partitionColRow = getPartitionValues(row)
val dataColRow = getDataValues(row)

val bucketId = getBucketID(dataColRow)

// Get the recordWriter for this partitionedRow
val recordUpdater =
getOrCreateWriter(partitionColRow, getBucketID(dataColRow)).asInstanceOf[RecordUpdater]
getOrCreateWriter(partitionColRow, bucketId).asInstanceOf[RecordUpdater]

val recordValue = sparkHiveRowConverter.toHiveRow(dataColRow, hiveRow)

Expand Down