Skip to content

Commit

Permalink
Finish source tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Dec 12, 2024
1 parent d8d23e8 commit 8b6cea8
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public PscSourceBuilder<OUT> setGroupId(String groupId) {
* @see com.pinterest.psc.consumer.PscConsumer#subscribe(Collection)
*/
public PscSourceBuilder<OUT> setTopicUris(List<String> topicUris) {
ensureSubscriberIsNull("topics");
ensureSubscriberIsNull("topicUris");
subscriber = PscSubscriber.getTopicUriListSubscriber(topicUris);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,9 @@ public Map<TopicUriPartition, Long> beginningOffsets(Collection<TopicUriPartitio
@Override
public Map<TopicUriPartition, Long> offsetsForTimes(
Map<TopicUriPartition, Long> timestampsToSearch) {
return listOffsetsForTimestamps(timestampsToSearch);
return listOffsetsForTimestamps(timestampsToSearch).entrySet().stream()
.filter(entry -> entry.getValue() >= 0)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,28 +160,29 @@ public void testUsingCommittedOffsetsInitializerWithoutGroupId() {
}

@Test
public void testSettingCustomKafkaSubscriber() {
public void testSettingCustomPscSubscriber() {
ExampleCustomSubscriber exampleCustomSubscriber = new ExampleCustomSubscriber();
PscSourceBuilder<String> customPscSubscriberBuilder =
new PscSourceBuilder<String>()
.setPscSubscriber(exampleCustomSubscriber)
.setDeserializer(
PscRecordDeserializationSchema.valueOnly(
StringDeserializer.class));
StringDeserializer.class))
.setClusterUri(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX);

assertThat(customPscSubscriberBuilder.build().getPscSubscriber())
.isEqualTo(exampleCustomSubscriber);

assertThatThrownBy(() -> customPscSubscriberBuilder.setTopicUris(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + "topic"))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
"Cannot use topics for consumption because a ExampleCustomSubscriber is already set for consumption.");
"Cannot use topicUris for consumption because a ExampleCustomSubscriber is already set for consumption.");

assertThatThrownBy(
() -> customPscSubscriberBuilder.setTopicUriPattern(Pattern.compile(".+")))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
"Cannot use topic pattern for consumption because a ExampleCustomSubscriber is already set for consumption.");
"Cannot use topicUri pattern for consumption because a ExampleCustomSubscriber is already set for consumption.");

assertThatThrownBy(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.pinterest.flink.streaming.connectors.psc.PscProducerTestBase;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSubImpl;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/**
Expand Down Expand Up @@ -89,6 +90,7 @@ public void testMultipleSourcesOnePartition() throws Exception {
// --- broker failure ---

@Test
@Ignore("FLINK-28267")
public void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
Expand All @@ -106,7 +108,7 @@ public void testMultipleTopicsWithLegacySerializer() throws Exception {
}

@Test
public void testMultipleTopicsWithKafkaSerializer() throws Exception {
public void testMultipleTopicsWithPscSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,12 @@ public void testDiscoverPartitionsPeriodically() throws Throwable {

// new partitions use EARLIEST_OFFSET, while initial partitions use LATEST_OFFSET
List<PscTopicUriPartitionSplit> initialPartitionAssign =
getAllAssignSplits(context, PRE_EXISTING_TOPICS);
getAllAssignSplits(context, PRE_EXISTING_TOPIC_URIS);
assertThat(initialPartitionAssign)
.extracting(PscTopicUriPartitionSplit::getStartingOffset)
.containsOnly((long) PscSourceTestEnv.NUM_RECORDS_PER_PARTITION);
List<PscTopicUriPartitionSplit> newPartitionAssign =
getAllAssignSplits(context, Collections.singleton(DYNAMIC_TOPIC_NAME));
getAllAssignSplits(context, Collections.singleton(DYNAMIC_TOPIC_URI));
assertThat(newPartitionAssign)
.extracting(PscTopicUriPartitionSplit::getStartingOffset)
.containsOnly(PscTopicUriPartitionSplit.EARLIEST_OFFSET);
Expand Down Expand Up @@ -729,9 +729,9 @@ private void verifySplitAssignmentWithPartitions(
}


/** get all assigned partition splits of topics. */
/** get all assigned partition splits of topicUris. */
private List<PscTopicUriPartitionSplit> getAllAssignSplits(
MockSplitEnumeratorContext<PscTopicUriPartitionSplit> context, Set<String> topics) {
MockSplitEnumeratorContext<PscTopicUriPartitionSplit> context, Set<String> topicUris) {

List<PscTopicUriPartitionSplit> allSplits = new ArrayList<>();
List<SplitsAssignment<PscTopicUriPartitionSplit>> splitsAssignmentSequence =
Expand All @@ -740,7 +740,7 @@ private List<PscTopicUriPartitionSplit> getAllAssignSplits(
List<PscTopicUriPartitionSplit> splitsOfOnceAssignment =
splitsAssignment.assignment().values().stream()
.flatMap(splits -> splits.stream())
.filter(split -> topics.contains(split.getTopicUri()))
.filter(split -> topicUris.contains(split.getTopicUri()))
.collect(Collectors.toList());
allSplits.addAll(splitsOfOnceAssignment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testLatestOffsetsInitializer() {
assertThat(offsets).hasSameSizeAs(partitions);
assertThat(offsets.keySet()).containsAll(partitions);
for (long offset : offsets.values()) {
assertThat(offset).isEqualTo(PscTopicUriPartitionSplit.LATEST_OFFSET);
assertThat(offset).isEqualTo(PscSourceTestEnv.NUM_RECORDS_PER_PARTITION);
}
assertThat(initializer.getAutoOffsetResetStrategy())
.isEqualTo(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_LATEST);
Expand Down Expand Up @@ -123,7 +123,7 @@ public void testTimestampOffsetsInitializer() {
@Test
public void testTimestampOffsetsInitializerForEmptyPartitions() {
OffsetsInitializer initializer = OffsetsInitializer.timestamp(2001);
List<TopicUriPartition> partitions = PscSourceTestEnv.getPartitionsForTopic(EMPTY_TOPIC3);
List<TopicUriPartition> partitions = PscSourceTestEnv.getPartitionsForTopic(EMPTY_TOPIC_URI3);
Map<TopicUriPartition, Long> expectedOffsets =
partitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
assertThat(initializer.getPartitionOffsets(partitions, retriever))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() throws Config
new TopicUriPartition(TOPIC_URI1, 0),
PscTopicUriPartitionSplit
.COMMITTED_OFFSET)))))
.isInstanceOf(RuntimeException.class)
.cause()
.hasCauseInstanceOf(NoOffsetForPartitionException.class)
.hasMessageContaining("Undefined offset with no reset policy for partition");
}
Expand Down Expand Up @@ -352,7 +352,7 @@ public void testConsumerClientRackSupplier() throws ConfigurationException, Clie
// Here we call the helper function directly, because the KafkaPartitionSplitReader
// doesn't allow us to examine the final ConsumerConfig object
reader.setConsumerClientRack(properties, rackId);
assertThat(properties.get(ConsumerConfig.CLIENT_RACK_CONFIG)).isEqualTo(rackId);
assertThat(properties.get(PscConfiguration.PSC_CONSUMER_CLIENT_RACK)).isEqualTo(rackId);
}

@ParameterizedTest
Expand All @@ -368,7 +368,7 @@ public void testSetConsumerClientRackIgnoresNullAndEmpty(String rackId) throws C
// Here we call the helper function directly, because the KafkaPartitionSplitReader
// doesn't allow us to examine the final ConsumerConfig object
reader.setConsumerClientRack(properties, rackId);
assertThat(properties.containsKey(ConsumerConfig.CLIENT_RACK_CONFIG)).isFalse();
assertThat(properties.containsKey(PscConfiguration.PSC_CONSUMER_CLIENT_RACK)).isFalse();
}

// ------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,16 +784,16 @@ public void runStartFromTimestamp() throws Exception {
*/
@RetryOnException(times = 2, exception = org.apache.kafka.common.errors.NotLeaderForPartitionException.class)
public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID();
final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + topic;
final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString();
final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID();
final String additionalEmptyTopicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + additionalEmptyTopic;

final int parallelism = 3;
final int elementsPerPartition = 100;
final int totalElements = parallelism * elementsPerPartition;

createTestTopic(topicUri, parallelism, 2);
createTestTopic(topicUri, parallelism, 1);
createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will
// remain empty all the time

Expand Down Expand Up @@ -1211,7 +1211,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + topic;
topicUris.add(topicUri);
// create topic
createTestTopic(topic, i + 1 /* partitions */, 1);
createTestTopic(topicUri, i + 1 /* partitions */, 1);
}

// before FLINK-6078 the RemoteExecutionEnvironment set the parallelism to 1 as
Expand Down Expand Up @@ -1433,19 +1433,26 @@ public void runBrokerFailureTest() throws Exception {
final String topic = "brokerFailureTestTopic";
final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER0_URI_PREFIX + topic;

// Start a temporary multi-broker cluster.
// This test case relies on stopping a broker and switching partition leader to another
// during the test, so single-broker cluster (kafkaServer) could not fulfill the
// requirement.
PscTestEnvironmentWithKafkaAsPubSub multiBrokerCluster = constructKafkaTestEnvionment();
multiBrokerCluster.prepare(PscTestEnvironmentWithKafkaAsPubSub.createConfig().setKafkaServersNumber(3));

final int parallelism = 2;
final int numElementsPerPartition = 1000;
final int totalElements = parallelism * numElementsPerPartition;
final int failAfterElements = numElementsPerPartition / 3;

createTestTopic(topic, parallelism, 2);
multiBrokerCluster.createTestTopic(topicUri, parallelism, 2);

DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(), pscTestEnvWithKafka, topicUri, parallelism,
StreamExecutionEnvironment.getExecutionEnvironment(), multiBrokerCluster, topicUri, parallelism,
numElementsPerPartition, true);

// find leader to shut down
int leaderId = pscTestEnvWithKafka.getLeaderToShutDown(topic);
int leaderId = multiBrokerCluster.getLeaderToShutDown(topic);

LOG.info("Leader to shutdown {}", leaderId);

Expand All @@ -1460,21 +1467,22 @@ public void runBrokerFailureTest() throws Exception {
env.setRestartStrategy(RestartStrategies.noRestart());

Properties pscConsumerConfiguration = new Properties();
pscConsumerConfiguration.putAll(standardPscConsumerConfiguration);
pscConsumerConfiguration.putAll(securePscConsumerConfiguration);
pscConsumerConfiguration.putAll(pscDiscoveryConfiguration);
pscConsumerConfiguration.putAll(multiBrokerCluster.getStandardPscConsumerConfiguration());
pscConsumerConfiguration.putAll(multiBrokerCluster.getSecurePscConsumerConfiguration());
pscConsumerConfiguration.putAll(multiBrokerCluster.getPscDiscoveryConfiguration());

getStream(env, topicUri, schema, pscConsumerConfiguration)
.map(new PartitionValidatingMapper(parallelism, 1))
.map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);

BrokerKillingMapper.killedLeaderBefore = false;
tryExecute(env, "Broker failure once test");

// start a new broker:
pscTestEnvWithKafka.restartBroker(leaderId);
//deleteTestTopic(topic);
try {
BrokerKillingMapper.killedLeaderBefore = false;
tryExecute(env, "Broker failure once test");
} finally {
// Tear down the temporary cluster anyway
multiBrokerCluster.shutdown();
}
}

public void runKeyValueTest() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.pinterest.flink.connector.psc.testutils.DockerImageVersions;
import com.pinterest.flink.connector.psc.testutils.PscUtil;
import com.pinterest.flink.streaming.connectors.psc.partitioner.FlinkPscPartitioner;
import com.pinterest.flink.streaming.util.serialization.psc.KeyedSerializationSchema;
import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.MessageId;
import com.pinterest.psc.common.TopicUriPartition;
Expand All @@ -47,7 +48,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import com.pinterest.flink.streaming.util.serialization.psc.KeyedSerializationSchema;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
Expand Down Expand Up @@ -78,7 +78,6 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
Expand Down Expand Up @@ -494,7 +493,7 @@ private void startKafkaContainerCluster(int numBrokers) {
brokers.put(brokerID, broker);
}
new ArrayList<>(brokers.values()).parallelStream().forEach(GenericContainer::start);
LOG.info("{} brokers started", numBrokers);
LOG.info("{} brokers started: {}", numBrokers, brokers.keySet());
brokerConnectionString =
brokers.values().stream()
.map(KafkaContainer::getBootstrapServers)
Expand Down

0 comments on commit 8b6cea8

Please sign in to comment.