Skip to content

Commit

Permalink
SPARKC-710: Update component versions (datastax#1367)
Browse files Browse the repository at this point in the history
Updated Cassandra driver version and Cassandra versions used for testing
  • Loading branch information
jacek-lewandowski authored and tarzanek committed Aug 28, 2024
1 parent f861241 commit a5c9911
Show file tree
Hide file tree
Showing 24 changed files with 230 additions and 160 deletions.
22 changes: 14 additions & 8 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,38 @@ jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
scala: [2.12.11, 2.13.11]
db-version: [3.11.10, 4.0-rc2, 6.8.13]
scala: [2.12.19, 2.13.13]
db-version: [3.11.17, 4.0.12, 4.1.4, 5.0-beta1, dse-6.8.44]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: ccm pip installation
uses: BSFishy/pip-action@v1
with:
packages: git+https://github.com/riptano/ccm.git@435f3210e16d0b648fbf33d6390d5ab4c9e630d4
packages: git+https://github.com/riptano/ccm.git@d74db63d75112908a77b6c80757df9343fdc3338

- name: Setup Scala
uses: olafurpg/setup-scala@v10
- name: Setup Java
uses: actions/setup-java@v4
with:
java-version: "[email protected]"
distribution: "temurin"
java-version: | # order is important, the last one is the default which will be used by SBT
11
8
- name: sbt tests
env:
TEST_PARALLEL_TASKS: 1
CCM_CASSANDRA_VERSION: ${{ matrix.db-version }}
PUBLISH_VERSION: test
JAVA8_HOME: ${{ env.JAVA_HOME_8_X64 }}
JAVA11_HOME: ${{ env.JAVA_HOME_11_X64 }}
run: sbt/sbt ++${{ matrix.scala }} test it:test

- name: Publish Test Report
uses: mikepenz/action-junit-report@v3
uses: mikepenz/action-junit-report@v4
if: always()
with:
report_paths: '**/target/test-reports/*.xml'
Expand Down
44 changes: 22 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,27 @@ Currently, the following branches are actively supported:
3.0.x ([b3.0](https://github.com/datastax/spark-cassandra-connector/tree/b3.0)) and
2.5.x ([b2.5](https://github.com/datastax/spark-cassandra-connector/tree/b2.5)).

| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
|-----------|---------------|-----------------------| --------------------- | -------------------- | ----------------------- |
| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 |
| 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 |
| 3.1 | 3.1 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 |
| 3.0 | 3.0 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 |
| 2.5 | 2.4 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.11, 2.12 |
| 2.4.2 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11, 2.12 |
| 2.4 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 |
| 2.3 | 2.3 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 |
| 2.0 | 2.0, 2.1, 2.2 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.10, 2.11 |
| 1.6 | 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
| 1.5 | 1.5, 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
| 1.4 | 1.4 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
| 1.3 | 1.3 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
| 1.2 | 1.2 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
| 1.1 | 1.1, 1.0 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
| 1.0 | 1.0, 0.9 | 2.0 | 2.0 | 7 | 2.10, 2.11 |
| Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions |
|-----------|---------------|----------------------------|-----------------------|----------------------|--------------------------|
| 3.5.1 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x, 5.0 | 4.18 | 8 | 2.12, 2.13 |
| 3.5 | 3.5 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.4 | 3.4 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12, 2.13 |
| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.x | 4.13 | 8 | 2.12 |
| 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 |
| 3.1 | 3.1 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 |
| 3.0 | 3.0 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 |
| 2.5 | 2.4 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.11, 2.12 |
| 2.4.2 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11, 2.12 |
| 2.4 | 2.4 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 |
| 2.3 | 2.3 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.11 |
| 2.0 | 2.0, 2.1, 2.2 | 2.1.5*, 2.2, 3.x | 3.0 | 8 | 2.10, 2.11 |
| 1.6 | 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
| 1.5 | 1.5, 1.6 | 2.1.5*, 2.2, 3.0 | 3.0 | 7 | 2.10, 2.11 |
| 1.4 | 1.4 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
| 1.3 | 1.3 | 2.1.5* | 2.1 | 7 | 2.10, 2.11 |
| 1.2 | 1.2 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
| 1.1 | 1.1, 1.0 | 2.1, 2.0 | 2.1 | 7 | 2.10, 2.11 |
| 1.0 | 1.0, 0.9 | 2.0 | 2.0 | 7 | 2.10, 2.11 |

**Compatible with 2.1.X where X >= 5*

Expand Down Expand Up @@ -199,14 +200,13 @@ Note that the integration tests require [CCM](https://github.com/riptano/ccm) to
See [Tips for Developing the Spark Cassandra Connector](doc/developers.md) for details.

By default, integration tests start up a separate, single Cassandra instance and run Spark in local mode.
It is possible to run integration tests with your own Cassandra and/or Spark cluster.
It is possible to run integration tests with your own Spark cluster.
First, prepare a jar with testing code:

./sbt/sbt test:package

Then copy the generated test jar to your Spark nodes and run:

export IT_TEST_CASSANDRA_HOST=<IP of one of the Cassandra nodes>
export IT_TEST_SPARK_MASTER=<Spark Master URL>
./sbt/sbt it:test

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import sbt.Keys.parallelExecution
import sbt.{Compile, moduleFilter, _}
import sbtassembly.AssemblyPlugin.autoImport.assembly

lazy val scala212 = "2.12.11"
lazy val scala213 = "2.13.11"
lazy val scala212 = "2.12.19"
lazy val scala213 = "2.13.13"
lazy val supportedScalaVersions = List(scala212, scala213)

// factor out common settings
Expand Down
29 changes: 29 additions & 0 deletions connector/src/it/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
rootLogger.level = warn
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %5p %d{HH:mm:ss,SSS} [T%X{TEST_GROUP_NO}] %C (%F:%L) - %m%n

logger.ccm.name = com.datastax.spark.connector.ccm
logger.ccm.level = info
1 change: 1 addition & 0 deletions connector/src/it/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
<appender-ref ref="STDOUT" />
</root>

<logger name="com.datastax.spark.connector.ccm" level="INFO"/>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,15 @@ trait AuthCluster extends SingleClusterFixture {
"authentication_options.enabled" -> "true"
)))
} else {
Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map(
"authenticator" -> "PasswordAuthenticator"
)))
if (defaultConfig.getCassandraVersion.compareTo(CcmConfig.V5_0_0) >= 0) {
Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map(
"authenticator.class_name" -> "PasswordAuthenticator"
)))
} else {
Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map(
"authenticator" -> "PasswordAuthenticator"
)))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.datastax.spark.connector.cql.sai

import com.datastax.spark.connector.SparkCassandraITWordSpecBase
import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3
import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3
import com.datastax.spark.connector.cluster.DefaultCluster
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources._

class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster with SaiBaseSpec {

override def beforeClass {
dseFrom(V6_8_3) {
dseFrom(DSE_V6_8_3) {
conn.withSessionDo { session =>
createKeyspace(session, ks)
session.execute(
Expand Down Expand Up @@ -46,7 +46,7 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
}

"Index on partition key columns" should {
"allow for predicate push down for indexed parts of the partition key" in dseFrom(V6_8_3) {
"allow for predicate push down for indexed parts of the partition key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("pk_1") === 1),
pushedPredicate = EqualTo("pk_1", 1))
Expand All @@ -64,13 +64,13 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
pushedPredicate = GreaterThanOrEqual("pk_2", 1))
}

"allow for multiple predicate push down for the same indexed part of the partition key" in dseFrom(V6_8_3) {
"allow for multiple predicate push down for the same indexed part of the partition key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("pk_3") < 10 and col("pk_3") > 0),
pushedPredicate = LessThan("pk_3", 10), GreaterThan("pk_3", 0))
}

"allow for multiple range predicate push down for different indexed parts of the partition key" in dseFrom(V6_8_3) {
"allow for multiple range predicate push down for different indexed parts of the partition key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("pk_3") < 10 and col("pk_1") > 0),
pushedPredicate = LessThan("pk_3", 10), GreaterThan("pk_1", 0))
Expand All @@ -82,7 +82,7 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
pushedPredicate = EqualTo("pk_3", 10), LessThan("v_1", 1))
}

"allow for range predicate push down for the partition key" in dseFrom(V6_8_3) {
"allow for range predicate push down for the partition key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("pk_3") < 10 and col("pk_1") > 0 and col("pk_2") >= 0),
pushedPredicate = LessThan("pk_3", 10), GreaterThan("pk_1", 0), GreaterThanOrEqual("pk_2", 0))
Expand All @@ -91,7 +91,7 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
pushedPredicate = EqualTo("pk_3", 10), LessThan("pk_1", 6), EqualTo("pk_2", 1))
}

"not allow for regular column predicate push down if any part of the partition key has an IN clause" in dseFrom(V6_8_3) {
"not allow for regular column predicate push down if any part of the partition key has an IN clause" in dseFrom(DSE_V6_8_3) {
assertNonPushedColumns(
df("pk_test").filter("pk_1 = 1 and pk_2 = 2 and pk_3 in(1, 3) and v_1 < 5"),
nonPushedColumns = "v_1")
Expand All @@ -103,32 +103,32 @@ class IndexedKeySpec extends SparkCassandraITWordSpecBase with DefaultCluster wi
nonPushedColumns = "v_1")
}

"allow for regular column predicate push down if a part of the clustering key has an IN clause" in dseFrom(V6_8_3) {
"allow for regular column predicate push down if a part of the clustering key has an IN clause" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter("pk_1 = 1 and pk_2 = 2 and pk_3 = 3 and ck_1 in (1,2) and v_1 < 5"),
pushedPredicate = EqualTo("pk_1", 1), EqualTo("pk_2", 2), EqualTo("pk_3", 3), In("ck_1", Array(1, 2)), LessThan("v_1", 5))
}

"not allow for push down if more than one equality predicate is defined" in dseFrom(V6_8_3) {
"not allow for push down if more than one equality predicate is defined" in dseFrom(DSE_V6_8_3) {
val data = df("pk_test").filter(col("pk_1") === 7 and col("pk_1") === 10)
assertPushedPredicate(data, pushedPredicate = EqualTo("pk_1", 7))
}

"allow only for equality push down if equality and range predicates are defined for the same pk column" in dseFrom(V6_8_3) {
"allow only for equality push down if equality and range predicates are defined for the same pk column" in dseFrom(DSE_V6_8_3) {
val data = df("pk_test").filter(col("pk_1") === 7 and col("pk_1") < 10)
assertPushedPredicate(data, pushedPredicate = EqualTo("pk_1", 7))
data.count() shouldBe 2
}
}

"Index on clustering key columns" should {
"allow for predicate push down for indexed parts of the clustering key" in dseFrom(V6_8_3) {
"allow for predicate push down for indexed parts of the clustering key" in dseFrom(DSE_V6_8_3) {
assertPushedPredicate(
df("pk_test").filter(col("ck_2") === 1),
pushedPredicate = EqualTo("ck_2", 1))
}

"not allow for predicate push down for non-indexed parts of the clustering key" in dseFrom(V6_8_3) {
"not allow for predicate push down for non-indexed parts of the clustering key" in dseFrom(DSE_V6_8_3) {
assertNoPushDown(df("pk_test").filter(col("ck_3") === 1))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.datastax.spark.connector.cql.sai

import com.datastax.spark.connector.SparkCassandraITWordSpecBase
import com.datastax.spark.connector.ccm.CcmConfig.V6_8_3
import com.datastax.spark.connector.ccm.CcmConfig.DSE_V6_8_3
import com.datastax.spark.connector.cluster.DefaultCluster


class IndexedListSpec extends SparkCassandraITWordSpecBase with DefaultCluster with SaiCollectionBaseSpec {

override def beforeClass {
dseFrom(V6_8_3) {
dseFrom(DSE_V6_8_3) {
conn.withSessionDo { session =>
createKeyspace(session, ks)
session.execute(
Expand Down
Loading

0 comments on commit a5c9911

Please sign in to comment.