From 8b6cea86ab0e21757a7ed4a0ce835d0226cbc0f1 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 12 Dec 2024 16:15:25 -0500 Subject: [PATCH] Finish source tests --- .../psc/source/PscSourceBuilder.java | 2 +- .../enumerator/PscSourceEnumerator.java | 4 +- .../psc/source/PscSourceBuilderTest.java | 9 +++-- .../psc/source/PscSourceLegacyITCase.java | 4 +- .../source/enumerator/PscEnumeratorTest.java | 10 ++--- .../initializer/OffsetsInitializerTest.java | 4 +- .../PscTopicUriPartitionSplitReaderTest.java | 6 +-- .../PscConsumerTestBaseWithKafkaAsPubSub.java | 40 +++++++++++-------- ...cTestEnvironmentWithKafkaAsPubSubImpl.java | 5 +-- 9 files changed, 48 insertions(+), 36 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java index 830a5a9..b2d57fc 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/PscSourceBuilder.java @@ -148,7 +148,7 @@ public PscSourceBuilder setGroupId(String groupId) { * @see com.pinterest.psc.consumer.PscConsumer#subscribe(Collection) */ public PscSourceBuilder setTopicUris(List topicUris) { - ensureSubscriberIsNull("topics"); + ensureSubscriberIsNull("topicUris"); subscriber = PscSubscriber.getTopicUriListSubscriber(topicUris); return this; } diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java index 4452d72..eebd6fa 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java @@ -639,7 +639,9 @@ public Map beginningOffsets(Collection offsetsForTimes( Map timestampsToSearch) { - return listOffsetsForTimestamps(timestampsToSearch); + return listOffsetsForTimestamps(timestampsToSearch).entrySet().stream() + .filter(entry -> entry.getValue() >= 0) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @Override diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceBuilderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceBuilderTest.java index 4f76561..e3069ff 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceBuilderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceBuilderTest.java @@ -160,14 +160,15 @@ public void testUsingCommittedOffsetsInitializerWithoutGroupId() { } @Test - public void testSettingCustomKafkaSubscriber() { + public void testSettingCustomPscSubscriber() { ExampleCustomSubscriber exampleCustomSubscriber = new ExampleCustomSubscriber(); PscSourceBuilder customPscSubscriberBuilder = new PscSourceBuilder() .setPscSubscriber(exampleCustomSubscriber) .setDeserializer( PscRecordDeserializationSchema.valueOnly( - StringDeserializer.class)); + StringDeserializer.class)) + .setClusterUri(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX); assertThat(customPscSubscriberBuilder.build().getPscSubscriber()) .isEqualTo(exampleCustomSubscriber); @@ -175,13 +176,13 @@ public void testSettingCustomKafkaSubscriber() { assertThatThrownBy(() -> customPscSubscriberBuilder.setTopicUris(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + "topic")) .isInstanceOf(IllegalStateException.class) .hasMessageContaining( - "Cannot use topics for consumption because a ExampleCustomSubscriber is already set for consumption."); + "Cannot use topicUris for consumption because a ExampleCustomSubscriber is already set for consumption."); assertThatThrownBy( () -> customPscSubscriberBuilder.setTopicUriPattern(Pattern.compile(".+"))) .isInstanceOf(IllegalStateException.class) .hasMessageContaining( - "Cannot use topic pattern for consumption because a ExampleCustomSubscriber is already set for consumption."); + "Cannot use topicUri pattern for consumption because a ExampleCustomSubscriber is already set for consumption."); assertThatThrownBy( () -> diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java index cdff2bc..44c1b39 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceLegacyITCase.java @@ -23,6 +23,7 @@ import com.pinterest.flink.streaming.connectors.psc.PscProducerTestBase; import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSubImpl; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; /** @@ -89,6 +90,7 @@ public void testMultipleSourcesOnePartition() throws Exception { // --- broker failure --- @Test + @Ignore("FLINK-28267") public void testBrokerFailure() throws Exception { runBrokerFailureTest(); } @@ -106,7 +108,7 @@ public void testMultipleTopicsWithLegacySerializer() throws Exception { } @Test - public void testMultipleTopicsWithKafkaSerializer() throws Exception { + public void testMultipleTopicsWithPscSerializer() throws Exception { runProduceConsumeMultipleTopics(false); } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscEnumeratorTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscEnumeratorTest.java index f4f37c0..afc8f4e 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscEnumeratorTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/PscEnumeratorTest.java @@ -308,12 +308,12 @@ public void testDiscoverPartitionsPeriodically() throws Throwable { // new partitions use EARLIEST_OFFSET, while initial partitions use LATEST_OFFSET List initialPartitionAssign = - getAllAssignSplits(context, PRE_EXISTING_TOPICS); + getAllAssignSplits(context, PRE_EXISTING_TOPIC_URIS); assertThat(initialPartitionAssign) .extracting(PscTopicUriPartitionSplit::getStartingOffset) .containsOnly((long) PscSourceTestEnv.NUM_RECORDS_PER_PARTITION); List newPartitionAssign = - getAllAssignSplits(context, Collections.singleton(DYNAMIC_TOPIC_NAME)); + getAllAssignSplits(context, Collections.singleton(DYNAMIC_TOPIC_URI)); assertThat(newPartitionAssign) .extracting(PscTopicUriPartitionSplit::getStartingOffset) .containsOnly(PscTopicUriPartitionSplit.EARLIEST_OFFSET); @@ -729,9 +729,9 @@ private void verifySplitAssignmentWithPartitions( } - /** get all assigned partition splits of topics. */ + /** get all assigned partition splits of topicUris. */ private List getAllAssignSplits( - MockSplitEnumeratorContext context, Set topics) { + MockSplitEnumeratorContext context, Set topicUris) { List allSplits = new ArrayList<>(); List> splitsAssignmentSequence = @@ -740,7 +740,7 @@ private List getAllAssignSplits( List splitsOfOnceAssignment = splitsAssignment.assignment().values().stream() .flatMap(splits -> splits.stream()) - .filter(split -> topics.contains(split.getTopicUri())) + .filter(split -> topicUris.contains(split.getTopicUri())) .collect(Collectors.toList()); allSplits.addAll(splitsOfOnceAssignment); } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerTest.java index 9285b22..a14f873 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/initializer/OffsetsInitializerTest.java @@ -90,7 +90,7 @@ public void testLatestOffsetsInitializer() { assertThat(offsets).hasSameSizeAs(partitions); assertThat(offsets.keySet()).containsAll(partitions); for (long offset : offsets.values()) { - assertThat(offset).isEqualTo(PscTopicUriPartitionSplit.LATEST_OFFSET); + assertThat(offset).isEqualTo(PscSourceTestEnv.NUM_RECORDS_PER_PARTITION); } assertThat(initializer.getAutoOffsetResetStrategy()) .isEqualTo(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_LATEST); @@ -123,7 +123,7 @@ public void testTimestampOffsetsInitializer() { @Test public void testTimestampOffsetsInitializerForEmptyPartitions() { OffsetsInitializer initializer = OffsetsInitializer.timestamp(2001); - List partitions = PscSourceTestEnv.getPartitionsForTopic(EMPTY_TOPIC3); + List partitions = PscSourceTestEnv.getPartitionsForTopic(EMPTY_TOPIC_URI3); Map expectedOffsets = partitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L)); assertThat(initializer.getPartitionOffsets(partitions, retriever)) diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java index f3af820..275ccf1 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java @@ -313,7 +313,7 @@ public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() throws Config new TopicUriPartition(TOPIC_URI1, 0), PscTopicUriPartitionSplit .COMMITTED_OFFSET))))) - .isInstanceOf(RuntimeException.class) + .cause() .hasCauseInstanceOf(NoOffsetForPartitionException.class) .hasMessageContaining("Undefined offset with no reset policy for partition"); } @@ -352,7 +352,7 @@ public void testConsumerClientRackSupplier() throws ConfigurationException, Clie // Here we call the helper function directly, because the KafkaPartitionSplitReader // doesn't allow us to examine the final ConsumerConfig object reader.setConsumerClientRack(properties, rackId); - assertThat(properties.get(ConsumerConfig.CLIENT_RACK_CONFIG)).isEqualTo(rackId); + assertThat(properties.get(PscConfiguration.PSC_CONSUMER_CLIENT_RACK)).isEqualTo(rackId); } @ParameterizedTest @@ -368,7 +368,7 @@ public void testSetConsumerClientRackIgnoresNullAndEmpty(String rackId) throws C // Here we call the helper function directly, because the KafkaPartitionSplitReader // doesn't allow us to examine the final ConsumerConfig object reader.setConsumerClientRack(properties, rackId); - assertThat(properties.containsKey(ConsumerConfig.CLIENT_RACK_CONFIG)).isFalse(); + assertThat(properties.containsKey(PscConfiguration.PSC_CONSUMER_CLIENT_RACK)).isFalse(); } // ------------------ diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java index 413bca8..a6cb87a 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java @@ -784,16 +784,16 @@ public void runStartFromTimestamp() throws Exception { */ @RetryOnException(times = 2, exception = org.apache.kafka.common.errors.NotLeaderForPartitionException.class) public void runSimpleConcurrentProducerConsumerTopology() throws Exception { - final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString(); + final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID(); final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + topic; - final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString(); + final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID(); final String additionalEmptyTopicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + additionalEmptyTopic; final int parallelism = 3; final int elementsPerPartition = 100; final int totalElements = parallelism * elementsPerPartition; - createTestTopic(topicUri, parallelism, 2); + createTestTopic(topicUri, parallelism, 1); createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will // remain empty all the time @@ -1211,7 +1211,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + topic; topicUris.add(topicUri); // create topic - createTestTopic(topic, i + 1 /* partitions */, 1); + createTestTopic(topicUri, i + 1 /* partitions */, 1); } // before FLINK-6078 the RemoteExecutionEnvironment set the parallelism to 1 as @@ -1433,19 +1433,26 @@ public void runBrokerFailureTest() throws Exception { final String topic = "brokerFailureTestTopic"; final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + topic; + // Start a temporary multi-broker cluster. + // This test case relies on stopping a broker and switching partition leader to another + // during the test, so single-broker cluster (kafkaServer) could not fulfill the + // requirement. + PscTestEnvironmentWithKafkaAsPubSub multiBrokerCluster = constructKafkaTestEnvionment(); + multiBrokerCluster.prepare(PscTestEnvironmentWithKafkaAsPubSub.createConfig().setKafkaServersNumber(3)); + final int parallelism = 2; final int numElementsPerPartition = 1000; final int totalElements = parallelism * numElementsPerPartition; final int failAfterElements = numElementsPerPartition / 3; - createTestTopic(topic, parallelism, 2); + multiBrokerCluster.createTestTopic(topicUri, parallelism, 2); DataGenerators.generateRandomizedIntegerSequence( - StreamExecutionEnvironment.getExecutionEnvironment(), pscTestEnvWithKafka, topicUri, parallelism, + StreamExecutionEnvironment.getExecutionEnvironment(), multiBrokerCluster, topicUri, parallelism, numElementsPerPartition, true); // find leader to shut down - int leaderId = pscTestEnvWithKafka.getLeaderToShutDown(topic); + int leaderId = multiBrokerCluster.getLeaderToShutDown(topic); LOG.info("Leader to shutdown {}", leaderId); @@ -1460,21 +1467,22 @@ public void runBrokerFailureTest() throws Exception { env.setRestartStrategy(RestartStrategies.noRestart()); Properties pscConsumerConfiguration = new Properties(); - pscConsumerConfiguration.putAll(standardPscConsumerConfiguration); - pscConsumerConfiguration.putAll(securePscConsumerConfiguration); - pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); + pscConsumerConfiguration.putAll(multiBrokerCluster.getStandardPscConsumerConfiguration()); + pscConsumerConfiguration.putAll(multiBrokerCluster.getSecurePscConsumerConfiguration()); + pscConsumerConfiguration.putAll(multiBrokerCluster.getPscDiscoveryConfiguration()); getStream(env, topicUri, schema, pscConsumerConfiguration) .map(new PartitionValidatingMapper(parallelism, 1)) .map(new BrokerKillingMapper(leaderId, failAfterElements)) .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); - BrokerKillingMapper.killedLeaderBefore = false; - tryExecute(env, "Broker failure once test"); - - // start a new broker: - pscTestEnvWithKafka.restartBroker(leaderId); - //deleteTestTopic(topic); + try { + BrokerKillingMapper.killedLeaderBefore = false; + tryExecute(env, "Broker failure once test"); + } finally { + // Tear down the temporary cluster anyway + multiBrokerCluster.shutdown(); + } } public void runKeyValueTest() throws Exception { diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSubImpl.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSubImpl.java index e5f358a..3f1c2ea 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSubImpl.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSubImpl.java @@ -23,6 +23,7 @@ import com.pinterest.flink.connector.psc.testutils.DockerImageVersions; import com.pinterest.flink.connector.psc.testutils.PscUtil; import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkPscPartitioner; +import com.pinterest.flink.streaming.util.serialization.psc.KeyedSerializationSchema; import com.pinterest.psc.common.BaseTopicUri; import com.pinterest.psc.common.MessageId; import com.pinterest.psc.common.TopicUriPartition; @@ -47,7 +48,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; -import com.pinterest.flink.streaming.util.serialization.psc.KeyedSerializationSchema; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; @@ -78,7 +78,6 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -494,7 +493,7 @@ private void startKafkaContainerCluster(int numBrokers) { brokers.put(brokerID, broker); } new ArrayList<>(brokers.values()).parallelStream().forEach(GenericContainer::start); - LOG.info("{} brokers started", numBrokers); + LOG.info("{} brokers started: {}", numBrokers, brokers.keySet()); brokerConnectionString = brokers.values().stream() .map(KafkaContainer::getBootstrapServers)