Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARKC-710: Update component versions #1367

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,38 @@ jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
scala: [2.12.11, 2.13.11]
db-version: [3.11.10, 4.0-rc2, 6.8.13]
scala: [2.12.19, 2.13.13]
db-version: [3.11.17, 4.0.12, 4.1.4, 5.0-beta1, dse-6.8.44]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: ccm pip installation
uses: BSFishy/pip-action@v1
with:
packages: git+https://github.com/riptano/ccm.git@435f3210e16d0b648fbf33d6390d5ab4c9e630d4
packages: git+https://github.com/riptano/ccm.git@d74db63d75112908a77b6c80757df9343fdc3338

- name: Setup Scala
uses: olafurpg/setup-scala@v10
- name: Setup Java
uses: actions/setup-java@v4
with:
java-version: "[email protected]"
distribution: "temurin"
java-version: | # order is important, the last one is the default which will be used by SBT
11
8

- name: sbt tests
env:
TEST_PARALLEL_TASKS: 1
CCM_CASSANDRA_VERSION: ${{ matrix.db-version }}
PUBLISH_VERSION: test
JAVA8_HOME: ${{ env.JAVA_HOME_8_X64 }}
JAVA11_HOME: ${{ env.JAVA_HOME_11_X64 }}
run: sbt/sbt ++${{ matrix.scala }} test it:test

- name: Publish Test Report
uses: mikepenz/action-junit-report@v3
uses: mikepenz/action-junit-report@v4
if: always()
with:
report_paths: '**/target/test-reports/*.xml'
Expand Down
44 changes: 22 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,27 @@ Currently, the following branches are actively supported:
3.0.x ([b3.0](https://github.com/datastax/spark-cassandra-connector/tree/b3.0)) and
2.5.x ([b2.5](https://github.com/datastax/spark-cassandra-connector/tree/b2.5)).

| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
|-----------|---------------|-----------------------| --------------------- | -------------------- | ----------------------- |
| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 |
| 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 |
| 3.1 | 3.1 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 |
| 3.0 | 3.0 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 |
| 2.5 | 2.4 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.11, 2.12 |
| 2.4.2 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11, 2.12 |
| 2.4 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 |
| 2.3 | 2.3 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 |
| 2.0 | 2.0, 2.1, 2.2 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.10, 2.11 |
| 1.6 | 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
| 1.5 | 1.5, 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
| 1.4 | 1.4 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
| 1.3 | 1.3 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
| 1.2 | 1.2 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
| 1.1 | 1.1, 1.0 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
| 1.0 | 1.0, 0.9 | 2.0 | 2.0 | 7 | 2.10, 2.11 |
| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
|-----------|---------------|----------------------------|-----------------------|----------------------|--------------------------|
| 3.5.1 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x, 5.0 | 4.18 | 8 | 2.12, 2.13 |
| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 |
| 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 |
| 3.1 | 3.1 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 |
| 3.0 | 3.0 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 |
| 2.5 | 2.4 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.11, 2.12 |
| 2.4.2 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11, 2.12 |
| 2.4 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 |
| 2.3 | 2.3 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 |
| 2.0 | 2.0, 2.1, 2.2 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.10, 2.11 |
| 1.6 | 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
| 1.5 | 1.5, 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
| 1.4 | 1.4 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
| 1.3 | 1.3 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
| 1.2 | 1.2 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
| 1.1 | 1.1, 1.0 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
| 1.0 | 1.0, 0.9 | 2.0 | 2.0 | 7 | 2.10, 2.11 |

**Compatible with 2.1.X where X >= 5*

Expand Down Expand Up @@ -193,14 +194,13 @@ Note that the integration tests require [CCM](https://github.com/riptano/ccm) to
See [Tips for Developing the Spark Cassandra Connector](doc/developers.md) for details.

By default, integration tests start up a separate, single Cassandra instance and run Spark in local mode.
It is possible to run integration tests with your own Cassandra and/or Spark cluster.
It is possible to run integration tests with your own Spark cluster.
First, prepare a jar with testing code:

./sbt/sbt test:package

Then copy the generated test jar to your Spark nodes and run:

export IT_TEST_CASSANDRA_HOST=<IP of one of the Cassandra nodes>
export IT_TEST_SPARK_MASTER=<Spark Master URL>
./sbt/sbt it:test

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import sbt.Keys.parallelExecution
import sbt.{Compile, moduleFilter, _}
import sbtassembly.AssemblyPlugin.autoImport.assembly

lazy val scala212 = "2.12.11"
lazy val scala213 = "2.13.11"
lazy val scala212 = "2.12.19"
lazy val scala213 = "2.13.13"
lazy val supportedScalaVersions = List(scala212, scala213)

// factor out common settings
Expand Down
29 changes: 29 additions & 0 deletions connector/src/it/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
rootLogger.level = warn
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %5p %d{HH:mm:ss,SSS} [T%X{TEST_GROUP_NO}] %C (%F:%L) - %m%n

logger.ccm.name = com.datastax.spark.connector.ccm
logger.ccm.level = info
1 change: 1 addition & 0 deletions connector/src/it/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
<appender-ref ref="STDOUT" />
</root>

<logger name="com.datastax.spark.connector.ccm" level="INFO"/>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,15 @@ trait AuthCluster extends SingleClusterFixture {
"authentication_options.enabled" -> "true"
)))
} else {
Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map(
"authenticator" -> "PasswordAuthenticator"
)))
if (defaultConfig.getCassandraVersion.compareTo(CcmConfig.V5_0_0) >= 0) {
Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map(
"authenticator.class_name" -> "PasswordAuthenticator"
)))
} else {
Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map(
"authenticator" -> "PasswordAuthenticator"
)))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.datastax.spark.connector.cql.sai

import com.datastax.spark.connector.SparkCassandraITWordSpecBase
import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3
import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3
import com.datastax.spark.connector.cluster.DefaultCluster
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources._

class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster with SaiBaseSpec {

override def beforeClass {
dseFrom(V6_8_3) {
dseFrom(DSE_V6_8_3) {
conn.withSessionDo { session =>
createKeyspace(session, ks)
session.execute(
Expand Down Expand Up @@ -46,7 +46,7 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
}

"Index on partition key columns" should {
"allow for predicate push down for indexed parts of the partition key" in dseFrom(V6_8_3) {
"allow for predicate push down for indexed parts of the partition key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("pk_1") === 1),
pushedPredicate = EqualTo("pk_1", 1))
Expand All @@ -64,13 +64,13 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
pushedPredicate = GreaterThanOrEqual("pk_2", 1))
}

"allow for multiple predicate push down for the same indexed part of the partition key" in dseFrom(V6_8_3) {
"allow for multiple predicate push down for the same indexed part of the partition key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("pk_3") < 10 and col("pk_3") > 0),
pushedPredicate = LessThan("pk_3", 10), GreaterThan("pk_3", 0))
}

"allow for multiple range predicate push down for different indexed parts of the partition key" in dseFrom(V6_8_3) {
"allow for multiple range predicate push down for different indexed parts of the partition key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("pk_3") < 10 and col("pk_1") > 0),
pushedPredicate = LessThan("pk_3", 10), GreaterThan("pk_1", 0))
Expand All @@ -82,7 +82,7 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
pushedPredicate = EqualTo("pk_3", 10), LessThan("v_1", 1))
}

"allow for range predicate push down for the partition key" in dseFrom(V6_8_3) {
"allow for range predicate push down for the partition key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("pk_3") < 10 and col("pk_1") > 0 and col("pk_2") >= 0),
pushedPredicate = LessThan("pk_3", 10), GreaterThan("pk_1", 0), GreaterThanOrEqual("pk_2", 0))
Expand All @@ -91,7 +91,7 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
pushedPredicate = EqualTo("pk_3", 10), LessThan("pk_1", 6), EqualTo("pk_2", 1))
}

"not allow for regular column predicate push down if any part of the partition key has an IN clause" in dseFrom(V6_8_3) {
"not allow for regular column predicate push down if any part of the partition key has an IN clause" in dseFrom(DSE_V6_8_3) {
assertNonPushedColumns(
df("pk_test").filter("pk_1 = 1 and pk_2 = 2 and pk_3 in(1, 3) and v_1 < 5"),
nonPushedColumns = "v_1")
Expand All @@ -103,32 +103,32 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
nonPushedColumns = "v_1")
}

"allow for regular column predicate push down if a part of the clustering key has an IN clause" in dseFrom(V6_8_3) {
"allow for regular column predicate push down if a part of the clustering key has an IN clause" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter("pk_1 = 1 and pk_2 = 2 and pk_3 = 3 and ck_1 in (1,2) and v_1 < 5"),
pushedPredicate = EqualTo("pk_1", 1), EqualTo("pk_2", 2), EqualTo("pk_3", 3), In("ck_1", Array(1, 2)), LessThan("v_1", 5))
}

"not allow for push down if more than one equality predicate is defined" in dseFrom(V6_8_3) {
"not allow for push down if more than one equality predicate is defined" in dseFrom(DSE_V6_8_3) {
val data = df("pk_test").filter(col("pk_1") === 7 and col("pk_1") === 10)
assertPushedPredicate(data, pushedPredicate = EqualTo("pk_1", 7))
}

"allow only for equality push down if equality and range predicates are defined for the same pk column" in dseFrom(V6_8_3) {
"allow only for equality push down if equality and range predicates are defined for the same pk column" in dseFrom(DSE_V6_8_3) {
val data = df("pk_test").filter(col("pk_1") === 7 and col("pk_1") < 10)
assertPushedPredicate(data, pushedPredicate = EqualTo("pk_1", 7))
data.count() shouldBe 2
}
}

"Index on clustering key columns" should {
"allow for predicate push down for indexed parts of the clustering key" in dseFrom(V6_8_3) {
"allow for predicate push down for indexed parts of the clustering key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("ck_2") === 1),
pushedPredicate = EqualTo("ck_2", 1))
}

"not allow for predicate push down for non-indexed parts of the clustering key" in dseFrom(V6_8_3) {
"not allow for predicate push down for non-indexed parts of the clustering key" in dseFrom(DSE_V6_8_3) {
assertNoPushDown(df("pk_test").filter(col("ck_3") === 1))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.datastax.spark.connector.cql.sai

import com.datastax.spark.connector.SparkCassandraITWordSpecBase
import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3
import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3
import com.datastax.spark.connector.cluster.DefaultCluster


class IndexedListSpec extends SparkCassandraITWordSpecBase with DefaultCluster with SaiCollectionBaseSpec {

override def beforeClass {
dseFrom(V6_8_3) {
dseFrom(DSE_V6_8_3) {
conn.withSessionDo { session =>
createKeyspace(session, ks)
session.execute(
Expand Down
Loading
Loading