Skip to content

Commit

Permalink
Merge pull request #11 from pinterest/updates
Browse files Browse the repository at this point in the history
Disable SSL reset by default; Swallow ConcurrentModificationExceptoin when stack trace matches JmxReporter in native Kafka library; Remove duplicate metrics emission for PscProducer
  • Loading branch information
jeffxiang authored Nov 7, 2023
2 parents 0328073 + 5fcc4bb commit 2ac8e42
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 7 deletions.
12 changes: 12 additions & 0 deletions psc/src/main/java/com/pinterest/psc/common/kafka/KafkaErrors.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;

import java.util.ConcurrentModificationException;
import java.util.LinkedHashMap;
import java.util.Map;

Expand Down Expand Up @@ -165,6 +166,17 @@ ImmutableMap.<Class<? extends Exception>, Map<String, PscErrorHandler.ConsumerAc
}}
)

// ConcurrentModificationException
.put(
ConcurrentModificationException.class,
new LinkedHashMap<String, PscErrorHandler.ConsumerAction>(1) {{
put(
"org.apache.kafka.common.metrics.JmxReporter.getMBeanName", // known case of CME - we will swallow it
new PscErrorHandler.ConsumerAction(PscErrorHandler.ActionType.NONE, ConsumerException.class)
);
}}
)

.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public class PscConfiguration extends PropertiesConfiguration {
*/
public final static String PCS_AUTO_RESOLUTION_RETRY_COUNT = "psc.auto.resolution.retry.count";

/**
* Whether to proactively reset consumer or producer based on approaching SSL certificate expiry
*/
public final static String PSC_PROACTIVE_SSL_RESET_ENABLED = "psc.proactive.ssl.reset.enabled";

private final static String PSC_CLIENT_TYPE = "psc.client.type";
public final static String PSC_CLIENT_TYPE_CONSUMER = "consumer";
public final static String PSC_CLIENT_TYPE_PRODUCER = "producer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class PscConfigurationInternal {
private boolean autoResolutionEnabled;
private int autoResolutionRetryCount;
private MetricsReporterConfiguration metricsReporterConfiguration;
private boolean proactiveSslResetEnabled;

public PscConfigurationInternal() {
}
Expand Down Expand Up @@ -206,6 +207,10 @@ private void validateGenericConfiguration(Map<String, Exception> invalidConfigs)
Integer autoResolutionRetryCount = verifyConfigHasValue(pscConfiguration, PscConfiguration.PCS_AUTO_RESOLUTION_RETRY_COUNT, Integer.class, invalidConfigs);
this.autoResolutionRetryCount = autoResolutionRetryCount != null ? autoResolutionRetryCount : 5;
}

// SSL reset
Boolean proactiveSslResetEnabled = verifyConfigHasValue(pscConfiguration, PscConfiguration.PSC_PROACTIVE_SSL_RESET_ENABLED, Boolean.class, invalidConfigs);
this.proactiveSslResetEnabled = proactiveSslResetEnabled != null ? proactiveSslResetEnabled : false; // false by default
}

public void logConfiguration() {
Expand Down Expand Up @@ -733,6 +738,10 @@ public boolean isAutoResolutionEnabled() {
return autoResolutionEnabled;
}

public boolean isProactiveSslResetEnabled() {
return proactiveSslResetEnabled;
}

public int getAutoResolutionRetryCount() {
return autoResolutionRetryCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, TopicUri t
properties, pscConfigurationInternal, Collections.singleton(topicUri));
logger.info("Initialized PscKafkaConsumer with SSL cert expiry time at " + sslCertificateExpiryTimeInMillis);
}
logger.info("Proactive SSL reset enabled: {}", pscConfigurationInternal.isProactiveSslResetEnabled());
}

@Override
Expand Down Expand Up @@ -1042,6 +1043,11 @@ protected void maybeResetBackendClient() throws ConsumerException {
// reset if SSL enabled && cert is expired
if (isSslEnabledInAnyActiveSusbcriptionOrAssignment &&
(System.currentTimeMillis() >= sslCertificateExpiryTimeInMillis)) {
if (!pscConfigurationInternal.isProactiveSslResetEnabled()) {
logger.info("Skipping reset of client even though SSL certificate is approaching expiry at {}" +
" because proactive reset is disabled", sslCertificateExpiryTimeInMillis);
return;
}
if (KafkaSslUtils.keyStoresExist(properties)) {
logger.info("Resetting backend Kafka client due to cert expiry at " +
sslCertificateExpiryTimeInMillis);
Expand Down
7 changes: 0 additions & 7 deletions psc/src/main/java/com/pinterest/psc/producer/PscProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,6 @@ public Future<MessageId> send(PscProducerMessage<K, V> pscProducerMessage, Callb

Future<MessageId> future = backendProducer.send(pscProducerMessage, callback);

PscMetricRegistryManager.getInstance().incrementCounterMetric(
pscProducerMessage.getTopicUriPartition().getTopicUri(),
pscProducerMessage.getPartition(),
PscMetrics.PSC_PRODUCER_PRODUCE_MESSAGES_METRIC,
pscConfigurationInternal
);

return future;
}

Expand Down
1 change: 1 addition & 0 deletions psc/src/main/resources/psc.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ psc.config.topic.uri=

psc.auto.resolution.enabled=true
psc.auto.resolution.retry.count=5
psc.proactive.ssl.reset.enabled=false

#psc.metrics
#valid options com.pinterest.psc.metrics.NullMetricsReporter, com.pinterest.psc.metrics.OpenTSDBMetricsReporter
Expand Down

0 comments on commit 2ac8e42

Please sign in to comment.