Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 86 : Add support for Datasource V2 : ORC #85

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/it/scala/com/qubole/spark/hiveacid/LockSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class TestLockHelper extends TestHelper {
.config("spark.hadoop.hive.txn.timeout", "6")
//.config("spark.ui.enabled", "true")
//.config("spark.ui.port", "4041")
// All V1 tests are executed USING HiveAcid
.config("spark.hive.acid.datasource.version", "v2")
.enableHiveSupport()
.getOrCreate()
}
Expand Down
32 changes: 20 additions & 12 deletions src/it/scala/com/qubole/spark/hiveacid/ReadSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.scalatest._

import scala.util.control.NonFatal

@Ignore
//@Ignore
class ReadSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {

val log: Logger = LogManager.getLogger(this.getClass)
Expand Down Expand Up @@ -222,9 +222,10 @@ class ReadSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll
// Special case of comparing result read before conversion
// and after conversion.
log.info("++ Compare result across conversion")
val (dfFromSql, dfFromScala) = helper.sparkGetDF(table)
val (dfFromSql, dfFromScala, dfFromSqlV2) = helper.sparkGetDF(table)
helper.compareResult(hiveResStr, dfFromSql.collect())
helper.compareResult(hiveResStr, dfFromScala.collect())
helper.compareResult(hiveResStr, dfFromSqlV2.collect())

helper.verify(table, insertOnly = false)
}
Expand Down Expand Up @@ -272,21 +273,22 @@ class ReadSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll

val hiveResStr = helper.hiveExecuteQuery(table.hiveSelect)

val (df1, df2) = helper.sparkGetDF(table)
val (df1, df2, dfV2) = helper.sparkGetDF(table)

// Materialize it once
helper.compareResult(hiveResStr, df1.collect())
helper.compareResult(hiveResStr, df2.collect())
helper.compareResult(hiveResStr, dfV2.collect())

helper.hiveExecute(table.insertIntoHiveTableKey(11))
helper.hiveExecute(table.insertIntoHiveTableKey(12))
helper.hiveExecute(table.insertIntoHiveTableKey(13))
helper.hiveExecute(table.insertIntoHiveTableKey(14))
helper.hiveExecute(table.insertIntoHiveTableKey(15))
if (isPartitioned) {
compactPartitionedAndTest(hiveResStr, df1, df2, Seq(11,12,13,14,15))
compactPartitionedAndTest(hiveResStr, df1, df2, dfV2, Seq(11,12,13,14,15))
} else {
compactAndTest(hiveResStr, df1, df2)
compactAndTest(hiveResStr, df1, df2, dfV2)
}

// Shortcut for insert Only
Expand All @@ -296,43 +298,49 @@ class ReadSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll
helper.hiveExecute(table.deleteFromHiveTableKey(5))
helper.hiveExecute(table.deleteFromHiveTableKey(6))
if (isPartitioned) {
compactPartitionedAndTest(hiveResStr, df1, df2, Seq(3,4,5,6))
compactPartitionedAndTest(hiveResStr, df1, df2, dfV2, Seq(3,4,5,6))
} else {
compactAndTest(hiveResStr, df1, df2)
compactAndTest(hiveResStr, df1, df2, dfV2)
}

helper.hiveExecute(table.updateInHiveTableKey(7))
helper.hiveExecute(table.updateInHiveTableKey(8))
helper.hiveExecute(table.updateInHiveTableKey(9))
helper.hiveExecute(table.updateInHiveTableKey(10))
if (isPartitioned) {
compactPartitionedAndTest(hiveResStr, df1, df2, Seq(7,8,9,10))
compactPartitionedAndTest(hiveResStr, df1, df2, dfV2, Seq(7,8,9,10))
} else {
compactAndTest(hiveResStr, df1, df2)
compactAndTest(hiveResStr, df1, df2, dfV2)
}
}
}

def compactAndTest(hiveResStr: String, df1: DataFrame, df2: DataFrame): Unit = {
def compactAndTest(hiveResStr: String, df1: DataFrame, df2: DataFrame, dfV2: DataFrame): Unit = {
helper.compareResult(hiveResStr, df1.collect())
helper.compareResult(hiveResStr, df2.collect())
helper.compareResult(hiveResStr, dfV2.collect())
helper.hiveExecute(table.minorCompaction)
helper.compareResult(hiveResStr, df1.collect())
helper.compareResult(hiveResStr, df2.collect())
helper.compareResult(hiveResStr, dfV2.collect())
helper.hiveExecute(table.majorCompaction)
helper.compareResult(hiveResStr, df1.collect())
helper.compareResult(hiveResStr, df2.collect())
helper.compareResult(hiveResStr, dfV2.collect())
}

def compactPartitionedAndTest(hiveResStr: String, df1: DataFrame, df2: DataFrame, keys: Seq[Int]): Unit = {
def compactPartitionedAndTest(hiveResStr: String, df1: DataFrame, df2: DataFrame, dfV2: DataFrame, keys: Seq[Int]): Unit = {
helper.compareResult(hiveResStr, df1.collect())
helper.compareResult(hiveResStr, df2.collect())
helper.compareResult(hiveResStr, dfV2.collect())
keys.foreach(k => helper.hiveExecute(table.minorPartitionCompaction(k)))
helper.compareResult(hiveResStr, df1.collect())
helper.compareResult(hiveResStr, df2.collect())
helper.compareResult(hiveResStr, dfV2.collect())
keys.foreach((k: Int) => helper.hiveExecute(table.majorPartitionCompaction(k)))
helper.compareResult(hiveResStr, df1.collect())
helper.compareResult(hiveResStr, df2.collect())
helper.compareResult(hiveResStr, dfV2.collect())
}

helper.myRun(testName, code)
Expand Down Expand Up @@ -365,7 +373,7 @@ class ReadSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll
helper.hiveExecute(table2.insertIntoHiveTableKeyRange(10, 25))

var hiveResStr = helper.hiveExecuteQuery(Table.hiveJoin(table1, table2))
val sparkRes1 = helper.sparkCollect(Table.sparkJoin(table1, table2))
val sparkRes1 = helper.sparkCollect(Table.hiveJoin(table1, table2))
helper.compareResult(hiveResStr, sparkRes1)
}

Expand Down
24 changes: 15 additions & 9 deletions src/it/scala/com/qubole/spark/hiveacid/TestHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,29 @@ class TestHelper extends SQLImplicits {
def compare(table: Table, msg: String): Unit = {
log.info(s"Verify simple $msg")
val hiveResStr = hiveExecuteQuery(table.hiveSelect)
val (dfFromSql, dfFromScala) = sparkGetDF(table)
val (dfFromSql, dfFromScala, dfFromSqlV2) = sparkGetDF(table)
compareResult(hiveResStr, dfFromSql.collect())
compareResult(hiveResStr, dfFromScala.collect())
compareResult(hiveResStr, dfFromSqlV2.collect())
}

// With Predicate
private def compareWithPred(table: Table, msg: String, pred: String): Unit = {
log.info(s"Verify with predicate $msg")
val hiveResStr = hiveExecuteQuery(table.hiveSelectWithPred(pred))
val (dfFromSql, dfFromScala) = sparkGetDFWithPred(table, pred)
val (dfFromSql, dfFromScala, dfFromSqlV2) = sparkGetDFWithPred(table, pred)
compareResult(hiveResStr, dfFromSql.collect())
compareResult(hiveResStr, dfFromScala.collect())
compareResult(hiveResStr, dfFromSqlV2.collect())
}
// With Projection
private def compareWithProj(table: Table, msg: String): Unit = {
log.info(s"Verify with projection $msg")
val hiveResStr = hiveExecuteQuery(table.hiveSelectWithProj)
val (dfFromSql, dfFromScala) = sparkGetDFWithProj(table)
val (dfFromSql, dfFromScala, dfFromSqlV2) = sparkGetDFWithProj(table)
compareResult(hiveResStr, dfFromSql.collect())
compareResult(hiveResStr, dfFromScala.collect())
compareResult(hiveResStr, dfFromSqlV2.collect())
}

// Compare result of 2 tables via hive
Expand Down Expand Up @@ -198,28 +201,31 @@ class TestHelper extends SQLImplicits {
compareWithProj(table, "After Delete")
}

def sparkGetDFWithProj(table: Table): (DataFrame, DataFrame) = {
def sparkGetDFWithProj(table: Table): (DataFrame, DataFrame, DataFrame) = {
val dfSql = sparkSQL(table.sparkSelect)
val dfSqlV2 = sparkSQL(table.hiveSelect)

var dfScala = spark.read.format("HiveAcid").options(Map("table" -> table.hiveTname)).load().select(table.sparkDFProj)
dfScala = totalOrderBy(table, dfScala)
(dfSql, dfScala)
(dfSql, dfScala, dfSqlV2)
}

def sparkGetDFWithPred(table: Table, pred: String): (DataFrame, DataFrame) = {
def sparkGetDFWithPred(table: Table, pred: String): (DataFrame, DataFrame, DataFrame) = {
val dfSql = sparkSQL(table.sparkSelectWithPred(pred))
val dfSqlV2 = sparkSQL(table.hiveSelectWithPred(pred))

var dfScala = spark.read.format("HiveAcid").options(Map("table" -> table.hiveTname)).load().where(col("intCol") < "5")
dfScala = totalOrderBy(table, dfScala)
(dfSql, dfScala)
(dfSql, dfScala, dfSqlV2)
}

def sparkGetDF(table: Table): (DataFrame, DataFrame) = {
def sparkGetDF(table: Table): (DataFrame, DataFrame, DataFrame) = {
val dfSql = sparkSQL(table.sparkSelect)
val dfSqlV2 = sparkSQL(table.hiveSelect)

var dfScala = spark.read.format("HiveAcid").options(Map("table" -> table.hiveTname)).load()
dfScala = totalOrderBy(table, dfScala)
(dfSql, dfScala)
(dfSql, dfScala, dfSqlV2)
}

def sparkSQL(cmd: String): DataFrame = {
Expand Down
2 changes: 2 additions & 0 deletions src/it/scala/com/qubole/spark/hiveacid/TestSparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ private[hiveacid] object TestSparkSession {
.config("spark.sql.extensions", "com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension")
//.config("spark.ui.enabled", "true")
//.config("spark.ui.port", "4041")
// All V1 tests are executed USING HiveAcid
.config("spark.hive.acid.datasource.version", "v2")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
Expand Down
28 changes: 24 additions & 4 deletions src/main/scala/com/qubole/spark/hiveacid/HiveAcidAutoConvert.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, InsertIntoTable, Log
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import com.qubole.spark.hiveacid.datasource.HiveAcidDataSource
import com.qubole.spark.hiveacid.datasource.{HiveAcidDataSource, HiveAcidDataSourceV2}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.SparkContext
import org.apache.spark.sql.internal.HiveSerDe


/**
Expand All @@ -43,14 +46,27 @@ case class HiveAcidAutoConvert(spark: SparkSession) extends Rule[LogicalPlan] {
relation.tableMeta.properties.getOrElse("transactional", "false").toBoolean
}

private def convert(relation: HiveTableRelation): LogicalRelation = {
private def convert(relation: HiveTableRelation): LogicalPlan = {
val options = relation.tableMeta.properties ++
relation.tableMeta.storage.properties ++ Map("table" -> relation.tableMeta.qualifiedName)

val newRelation = new HiveAcidDataSource().createRelation(spark.sqlContext, options)
LogicalRelation(newRelation, isStreaming = false)
}

private def convertV2(relation: HiveTableRelation): LogicalPlan = {
val serde = relation.tableMeta.storage.serde.getOrElse("")
if (!serde.equalsIgnoreCase(HiveSerDe.sourceToSerDe("orc").get.serde.get)) {
// Only ORC formatted is supported as of now. If its not ORC, then fallback to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log line stating the reason to fallback to v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// datasource V1.
logInfo("Falling back to datasource v1 as " + serde + " is not supported by v2 reader.")
return convert(relation)
}
val dbName = relation.tableMeta.identifier.database.getOrElse("default")
val tableName = relation.tableMeta.identifier.table
val tableOpts = Map("database" -> dbName, "table" -> tableName)
DataSourceV2Relation.create(new HiveAcidDataSourceV2, tableOpts, None, None)
}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// Write path
Expand All @@ -61,7 +77,11 @@ case class HiveAcidAutoConvert(spark: SparkSession) extends Rule[LogicalPlan] {
// Read path
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
convert(relation)
if (spark.conf.get("spark.hive.acid.datasource.version", "v1").equals("v2")) {
convertV2(relation)
} else {
convert(relation)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2019 Qubole, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qubole.spark.hiveacid

import java.lang.String.format
import java.io.IOException
import java.util.{ArrayList, List, Map}

import org.apache.spark.sql.sources.v2.reader.DataSourceReader
import com.qubole.spark.hiveacid.hive.{HiveAcidMetadata, HiveConverter}
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.DataSourceReader
import com.qubole.spark.hiveacid.transaction.HiveAcidTxn
import com.qubole.spark.hiveacid.util.{SerializableConfiguration, Util}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.reader._
import com.qubole.spark.hiveacid.reader.v2.HiveAcidInputPartitionV2
import com.qubole.spark.hiveacid.reader.TableReader
import com.qubole.spark.hiveacid.reader.hive.HiveAcidSearchArgument
import com.qubole.spark.hiveacid.reader.hive.HiveAcidSearchArgument.{buildTree, castLiteralValue, getPredicateLeafType, isSearchableType, quoteAttributeNameIfNeeded}

/**
* Data source V2 implementation for HiveACID
*/
class HiveAcidDataSourceV2Reader
extends DataSourceV2 with DataSourceReader with SupportsScanColumnarBatch
with SupportsPushDownRequiredColumns
with SupportsPushDownFilters with Logging {

def this(options: java.util.Map[String, String],
sparkSession : SparkSession,
dbName : String,
tblName : String) {
this()
this.options = options
this.sparkSession = sparkSession
if (dbName != null) {
hiveAcidMetadata = HiveAcidMetadata.fromSparkSession(sparkSession, dbName + "." + tblName)
} else {
// If db name is null, default db is chosen.
hiveAcidMetadata = HiveAcidMetadata.fromSparkSession(sparkSession, tblName)
}

// This is a hack to prevent the following situation:
// Spark(v 2.4.0) creates one instance of DataSourceReader to call readSchema()
// and then a new instance of DataSourceReader to call pushFilters(),
// planBatchInputPartitions() etc. Since it uses different DataSourceReader instances,
// and reads schema in former instance, schema remains null in the latter instance
// (which causes problems for other methods). More discussion:
// http://apache-spark-user-list.1001560.n3.nabble.com/DataSourceV2-APIs-creating-multiple-instances-of-DataSourceReader-and-hence-not-preserving-the-state-tc33646.html
// Also a null check on schema is already there in readSchema() to prevent initialization
// more than once just in case.
readSchema
}

private var options: java.util.Map[String, String] = null
private var sparkSession : SparkSession = null

//The pruned schema
private var schema: StructType = null

private var pushedFilterArray : Array[Filter] = null

private var hiveAcidMetadata: HiveAcidMetadata = _

override def readSchema: StructType = {
if (schema == null) {
schema = hiveAcidMetadata.tableSchema
}
schema
}

override def planBatchInputPartitions() : java.util.List[InputPartition[ColumnarBatch]] = {
val factories = new java.util.ArrayList[InputPartition[ColumnarBatch]]
inTxn {
txn: HiveAcidTxn => {
import scala.collection.JavaConversions._
val reader = new TableReader(sparkSession, txn, hiveAcidMetadata)
val hiveReader = reader.getPartitionsV2(schema.fieldNames,
pushedFilterArray, new SparkAcidConf(sparkSession, options.toMap))
factories.addAll(hiveReader)
}
}
factories
}

private def inTxn(f: HiveAcidTxn => Unit): Unit = {
new HiveTxnWrapper(sparkSession).inTxn(f)
}

override def pushFilters (filters: Array[Filter]): Array[Filter] = {
this.pushedFilterArray = HiveAcidSearchArgument.
getSupportedFilters(hiveAcidMetadata.tableSchema, filters.toSeq).toArray
// ORC does not do row level filtering. So the filters has to be applied again.
filters
}

override def pushedFilters(): Array[Filter] = this.pushedFilterArray

override def pruneColumns(requiredSchema: StructType): Unit = {
this.schema = requiredSchema
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ object HiveAcidTable {
* This wrapper can be used just once for running an operation. That operation is not allowed to recursively call this again
* @param sparkSession
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amoghmargoor @maheshk114 : Do we need to make changes to createDF api in HiveAcidTable to use v2 readers if enabled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think based on config it should use the reader. I will create a Jira for that.

private class HiveTxnWrapper(sparkSession: SparkSession) extends Logging {
private[hiveacid] class HiveTxnWrapper(sparkSession: SparkSession) extends Logging {

private var isLocalTxn: Boolean = _
private var curTxn: HiveAcidTxn = _
Expand Down
Loading