From 2a9f0d20c737f77cfee6a9d315ae681e078cf3e8 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 15:51:18 -0500 Subject: [PATCH 1/6] Disable SSL reset by default --- .../java/com/pinterest/psc/config/PscConfiguration.java | 5 +++++ .../pinterest/psc/config/PscConfigurationInternal.java | 9 +++++++++ .../pinterest/psc/consumer/kafka/PscKafkaConsumer.java | 6 ++++++ 3 files changed, 20 insertions(+) diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java index 59f9dec..62e5e88 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java @@ -89,6 +89,11 @@ public class PscConfiguration extends PropertiesConfiguration { */ public final static String PCS_AUTO_RESOLUTION_RETRY_COUNT = "psc.auto.resolution.retry.count"; + /** + * Whether to proactively reset consumer or producer based on approaching SSL certificate expiry + */ + public final static String PSC_PROACTIVE_SSL_RESET_ENABLED = "psc.proactive.ssl.reset.enabled"; + private final static String PSC_CLIENT_TYPE = "psc.client.type"; public final static String PSC_CLIENT_TYPE_CONSUMER = "consumer"; public final static String PSC_CLIENT_TYPE_PRODUCER = "producer"; diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java index ed4384f..7086c16 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java @@ -53,6 +53,7 @@ public class PscConfigurationInternal { private boolean autoResolutionEnabled; private int autoResolutionRetryCount; private MetricsReporterConfiguration metricsReporterConfiguration; + private boolean proactiveSslResetEnabled; public PscConfigurationInternal() { } @@ -206,6 +207,10 @@ private void validateGenericConfiguration(Map invalidConfigs) Integer autoResolutionRetryCount = verifyConfigHasValue(pscConfiguration, PscConfiguration.PCS_AUTO_RESOLUTION_RETRY_COUNT, Integer.class, invalidConfigs); this.autoResolutionRetryCount = autoResolutionRetryCount != null ? autoResolutionRetryCount : 5; } + + // SSL reset + Boolean proactiveSslResetEnabled = verifyConfigHasValue(pscConfiguration, PscConfiguration.PSC_PROACTIVE_SSL_RESET_ENABLED, Boolean.class, invalidConfigs); + this.proactiveSslResetEnabled = proactiveSslResetEnabled != null ? proactiveSslResetEnabled : false; // false by default } public void logConfiguration() { @@ -733,6 +738,10 @@ public boolean isAutoResolutionEnabled() { return autoResolutionEnabled; } + public boolean isProactiveSslResetEnabled() { + return proactiveSslResetEnabled; + } + public int getAutoResolutionRetryCount() { return autoResolutionRetryCount; } diff --git a/psc/src/main/java/com/pinterest/psc/consumer/kafka/PscKafkaConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/kafka/PscKafkaConsumer.java index 388e50b..ca43310 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/kafka/PscKafkaConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/kafka/PscKafkaConsumer.java @@ -95,6 +95,7 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, TopicUri t properties, pscConfigurationInternal, Collections.singleton(topicUri)); logger.info("Initialized PscKafkaConsumer with SSL cert expiry time at " + sslCertificateExpiryTimeInMillis); } + logger.info("Proactive SSL reset enabled: {}", pscConfigurationInternal.isProactiveSslResetEnabled()); } @Override @@ -1042,6 +1043,11 @@ protected void maybeResetBackendClient() throws ConsumerException { // reset if SSL enabled && cert is expired if (isSslEnabledInAnyActiveSusbcriptionOrAssignment && (System.currentTimeMillis() >= sslCertificateExpiryTimeInMillis)) { + if (!pscConfigurationInternal.isProactiveSslResetEnabled()) { + logger.info("Skipping reset of client even though SSL certificate is approaching expiry at {}" + + " because proactive reset is disabled", sslCertificateExpiryTimeInMillis); + return; + } if (KafkaSslUtils.keyStoresExist(properties)) { logger.info("Resetting backend Kafka client due to cert expiry at " + sslCertificateExpiryTimeInMillis); From 4d9a99bd03f679665d4c3d5a0ad2ab012a2485b2 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 15:58:47 -0500 Subject: [PATCH 2/6] Swallow ConcurrentModificationException when stack trace matches Kafka JmxReporter in consumer poll --- .../com/pinterest/psc/common/kafka/KafkaErrors.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/psc/src/main/java/com/pinterest/psc/common/kafka/KafkaErrors.java b/psc/src/main/java/com/pinterest/psc/common/kafka/KafkaErrors.java index 417cde5..c6a2329 100644 --- a/psc/src/main/java/com/pinterest/psc/common/kafka/KafkaErrors.java +++ b/psc/src/main/java/com/pinterest/psc/common/kafka/KafkaErrors.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; +import java.util.ConcurrentModificationException; import java.util.LinkedHashMap; import java.util.Map; @@ -165,6 +166,17 @@ ImmutableMap., Map(1) {{ + put( + "org.apache.kafka.common.metrics.JmxReporter.getMBeanName", // known case of CME - we will swallow it + new PscErrorHandler.ConsumerAction(PscErrorHandler.ActionType.NONE, ConsumerException.class) + ); + }} + ) + .build(); /** From c266781a5432d2e59a47988a7a6107ad63dda202 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 16:10:16 -0500 Subject: [PATCH 3/6] Add config into psc.conf --- psc/src/main/resources/psc.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/psc/src/main/resources/psc.conf b/psc/src/main/resources/psc.conf index 724b8a1..fb4ffda 100644 --- a/psc/src/main/resources/psc.conf +++ b/psc/src/main/resources/psc.conf @@ -6,6 +6,7 @@ psc.config.topic.uri= psc.auto.resolution.enabled=true psc.auto.resolution.retry.count=5 +psc.proactive.ssl.reset.enabled=false #psc.metrics #valid options com.pinterest.psc.metrics.NullMetricsReporter, com.pinterest.psc.metrics.OpenTSDBMetricsReporter From fccbf7fdd96d543c13ea85d19f777675cf52fd71 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 17:36:02 -0500 Subject: [PATCH 4/6] Add build action --- .github/workflows/maven.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 .github/workflows/maven.yml diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml new file mode 100644 index 0000000..827cdfb --- /dev/null +++ b/.github/workflows/maven.yml @@ -0,0 +1,16 @@ +name: PSC-Java Build + +on: [pull_request] + +jobs: + build: + + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v1 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Build with Maven + run: mvn -B package --file pom.xml \ No newline at end of file From 9195a718d06387ed1c974c11a933f63db8592523 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 17:38:56 -0500 Subject: [PATCH 5/6] Revert "Add build action" This reverts commit fccbf7fdd96d543c13ea85d19f777675cf52fd71. --- .github/workflows/maven.yml | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 .github/workflows/maven.yml diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml deleted file mode 100644 index 827cdfb..0000000 --- a/.github/workflows/maven.yml +++ /dev/null @@ -1,16 +0,0 @@ -name: PSC-Java Build - -on: [pull_request] - -jobs: - build: - - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v1 - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 - with: - java-version: 1.8 - - name: Build with Maven - run: mvn -B package --file pom.xml \ No newline at end of file From 5fcc4bb0a6ee8a9e28988d99da2cf6e4d4160b9f Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 20:10:09 -0500 Subject: [PATCH 6/6] Remove duplicate metrics reporting for psc.producer.produce.messages --- .../main/java/com/pinterest/psc/producer/PscProducer.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java b/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java index dc55288..8ff1fcc 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java +++ b/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java @@ -316,13 +316,6 @@ public Future send(PscProducerMessage pscProducerMessage, Callb Future future = backendProducer.send(pscProducerMessage, callback); - PscMetricRegistryManager.getInstance().incrementCounterMetric( - pscProducerMessage.getTopicUriPartition().getTopicUri(), - pscProducerMessage.getPartition(), - PscMetrics.PSC_PRODUCER_PRODUCE_MESSAGES_METRIC, - pscConfigurationInternal - ); - return future; }