lujiajing1126 commented on PR #659:
URL: https://github.com/apache/skywalking-java/pull/659#issuecomment-1867193746
With `jad`,
```java
[arthas@42125]$ jad org.apache.kafka.clients.producer.KafkaProducer
ClassLoader:
+-org.springframework.boot.loader.LaunchedURLClassLoader@8c46918
+-jdk.internal.loader.ClassLoaders$AppClassLoader@251a69d7
+-jdk.internal.loader.ClassLoaders$PlatformClassLoader@42b64ab8
Location:
file:/Users/megrez/Code/opensource/dynamic-redefine-test/target/dynamic-redefine-test-1.0-SNAPSHOT.jar!/BOOT-INF/lib/kafka-clients-3.1.
2.jar!/
/*
* Decompiled with CFR.
*
* Could not load the following classes:
* org.apache.kafka.clients.ApiVersions
* org.apache.kafka.clients.ClientUtils
* org.apache.kafka.clients.KafkaClient
* org.apache.kafka.clients.Metadata
* org.apache.kafka.clients.NetworkClient
* org.apache.kafka.clients.consumer.ConsumerGroupMetadata
* org.apache.kafka.clients.consumer.OffsetAndMetadata
* org.apache.kafka.clients.producer.Callback
*
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback$$sw$auxiliary$4j3p8q3
* org.apache.kafka.clients.producer.Partitioner
* org.apache.kafka.clients.producer.Producer
* org.apache.kafka.clients.producer.ProducerConfig
* org.apache.kafka.clients.producer.ProducerInterceptor
* org.apache.kafka.clients.producer.ProducerRecord
* org.apache.kafka.clients.producer.RecordMetadata
* org.apache.kafka.clients.producer.internals.BufferPool
* org.apache.kafka.clients.producer.internals.KafkaProducerMetrics
* org.apache.kafka.clients.producer.internals.ProducerInterceptors
* org.apache.kafka.clients.producer.internals.ProducerMetadata
* org.apache.kafka.clients.producer.internals.ProducerMetrics
* org.apache.kafka.clients.producer.internals.RecordAccumulator
*
org.apache.kafka.clients.producer.internals.RecordAccumulator$RecordAppendResult
* org.apache.kafka.clients.producer.internals.Sender
*
org.apache.kafka.clients.producer.internals.SenderMetricsRegistry
* org.apache.kafka.clients.producer.internals.TransactionManager
*
org.apache.kafka.clients.producer.internals.TransactionalRequestResult
* org.apache.kafka.common.Cluster
* org.apache.kafka.common.KafkaException
* org.apache.kafka.common.Metric
* org.apache.kafka.common.MetricName
* org.apache.kafka.common.PartitionInfo
* org.apache.kafka.common.TopicPartition
* org.apache.kafka.common.config.AbstractConfig
* org.apache.kafka.common.config.ConfigException
* org.apache.kafka.common.errors.ApiException
* org.apache.kafka.common.errors.InterruptException
* org.apache.kafka.common.errors.InvalidTopicException
* org.apache.kafka.common.errors.ProducerFencedException
* org.apache.kafka.common.errors.RecordTooLargeException
* org.apache.kafka.common.errors.SerializationException
* org.apache.kafka.common.errors.TimeoutException
* org.apache.kafka.common.header.Header
* org.apache.kafka.common.header.Headers
* org.apache.kafka.common.header.internals.RecordHeaders
* org.apache.kafka.common.internals.ClusterResourceListeners
* org.apache.kafka.common.metrics.JmxReporter
* org.apache.kafka.common.metrics.KafkaMetricsContext
* org.apache.kafka.common.metrics.MetricConfig
* org.apache.kafka.common.metrics.Metrics
* org.apache.kafka.common.metrics.MetricsContext
* org.apache.kafka.common.metrics.MetricsReporter
* org.apache.kafka.common.metrics.Sensor
* org.apache.kafka.common.metrics.Sensor$RecordingLevel
* org.apache.kafka.common.network.ChannelBuilder
* org.apache.kafka.common.network.Selectable
* org.apache.kafka.common.network.Selector
* org.apache.kafka.common.record.AbstractRecords
* org.apache.kafka.common.record.CompressionType
* org.apache.kafka.common.serialization.Serializer
* org.apache.kafka.common.utils.AppInfoParser
* org.apache.kafka.common.utils.KafkaThread
* org.apache.kafka.common.utils.LogContext
* org.apache.kafka.common.utils.Time
* org.apache.kafka.common.utils.Utils
*
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ConstructorInter
*
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
*
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter
*
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInterWithOverrideArgs
*
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.OverrideCallable
* org.slf4j.Logger
*/
package org.apache.kafka.clients.producer;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import
org.apache.kafka.clients.producer.KafkaProducer$$sw$auxiliary$12kr501;
import
org.apache.kafka.clients.producer.KafkaProducer$$sw$auxiliary$o9jqgj1;
import
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback$;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.BufferPool;
import
org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
import
org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import
org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import
org.apache.kafka.clients.producer.internals.SenderMetricsRegistry;
import
org.apache.kafka.clients.producer.internals.TransactionManager;
import
org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ConstructorInter;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInterWithOverrideArgs;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.OverrideCallable;
import org.slf4j.Logger;
public class KafkaProducer<K, V>
implements Producer<K, V>,
EnhancedInstance {
private final Logger log;
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX =
"kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME =
"producer-metrics";
private final String clientId;
final Metrics metrics;
private final KafkaProducerMetrics producerMetrics;
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
private final ProducerMetadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
private final ProducerInterceptors<K, V> interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
private volatile Object _$EnhancedClassField_ws;
public static volatile /* synthetic */ ConstructorInter
$sw$delegate$0jj6ri2$6qta5p1$f22j6m3;
public static volatile /* synthetic */
InstMethodsInterWithOverrideArgs $sw$delegate$0jj6ri2$hg7pk30$bb7a8p2;
public static volatile /* synthetic */ ConstructorInter
$sw$delegate$0jj6ri2$hg7pk30$3upq2f1;
private static final /* synthetic */ Method
cachedValue$$sw$0jj6ri2$bt6qud2;
public KafkaProducer(Map<String, Object> map) {
this(map, null);
$sw$delegate$0jj6ri2$6qta5p1$f22j6m3.intercept(this, new
Object[]{map});
}
/*
* WARNING - void declaration
*/
private /* synthetic */ KafkaProducer(Map map,
$sw$auxiliary$o9jqgj1 o9jqgj12) {
/* 275*/ this((Map<String, Object>)configs, null, null);
void configs;
}
public KafkaProducer(Map<String, Object> map, Serializer<K>
serializer, Serializer<V> serializer2) {
this(map, serializer, serializer2, null);
$sw$delegate$0jj6ri2$6qta5p1$f22j6m3.intercept(this, new
Object[]{map, serializer, serializer2});
}
/*
* WARNING - void declaration
*/
private /* synthetic */ KafkaProducer(Map map, Serializer
serializer, Serializer serializer2, $sw$auxiliary$o9jqgj1 o9jqgj12) {
/* 292*/ this(new
ProducerConfig(ProducerConfig.appendSerializerToConfig((Map)configs,
(Serializer)keySerializer, (Serializer)valueSerializer)),
(Serializer<K>)keySerializer, (Serializer<V>)valueSerializer, null, null, null,
Time.SYSTEM);
void valueSerializer;
void keySerializer;
void configs;
}
public KafkaProducer(Properties properties) {
/* 304*/ this(properties, (Serializer<K>)null, (Serializer<V>)null);
}
public KafkaProducer(Properties properties, Serializer<K>
keySerializer, Serializer<V> valueSerializer) {
/* 319*/ this(Utils.propsToMap((Properties)properties),
keySerializer, valueSerializer);
}
KafkaProducer(ProducerConfig producerConfig, Serializer<K>
serializer, Serializer<V> serializer2, ProducerMetadata producerMetadata,
KafkaClient kafkaClient, ProducerInterceptors<K, V> producerInterceptors, Time
time) {
this(producerConfig, serializer, serializer2,
producerMetadata, kafkaClient, producerInterceptors, time, null);
$sw$delegate$0jj6ri2$hg7pk30$3upq2f1.intercept(this, new
Object[]{producerConfig, serializer, serializer2, producerMetadata,
kafkaClient, producerInterceptors, time});
}
/*
* WARNING - void declaration
*/
private /* synthetic */ KafkaProducer(ProducerConfig
producerConfig, Serializer serializer, Serializer serializer2, ProducerMetadata
producerMetadata, KafkaClient kafkaClient, ProducerInterceptors
producerInterceptors, Time time, $sw$auxiliary$o9jqgj1 o9jqgj12) {
try {
void kafkaClient2;
void metadata;
void interceptors;
void valueSerializer;
void keySerializer;
void time2;
void config;
/* 332*/ this.producerConfig = config;
/* 333*/ this.time = time2;
/* 335*/ String transactionalId =
config.getString("transactional.id");
/* 337*/ this.clientId = config.getString("client.id");
/* 340*/ LogContext logContext = transactionalId == null ? new
LogContext(String.format("[Producer clientId=%s] ", this.clientId)) : new
LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ",
this.clientId, transactionalId));
/* 344*/ this.log = logContext.logger(KafkaProducer.class);
/* 345*/ this.log.trace("Starting the Kafka producer");
/* 347*/ Map<String, String> metricTags =
Collections.singletonMap("client-id", this.clientId);
MetricConfig metricConfig = new
MetricConfig().samples(config.getInt("metrics.num.samples").intValue()).timeWindow(config.getLong("metrics.sample.window.ms").longValue(),
TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName((String)config.getString("metrics.recording.level"))).tags(metricTags);
/* 352*/ List reporters =
config.getConfiguredInstances("metric.reporters", MetricsReporter.class,
Collections.singletonMap("client.id", this.clientId));
JmxReporter jmxReporter = new JmxReporter();
/* 356*/
jmxReporter.configure(config.originals(Collections.singletonMap("client.id",
this.clientId)));
/* 357*/ reporters.add(jmxReporter);
KafkaMetricsContext metricsContext = new
KafkaMetricsContext(JMX_PREFIX, config.originalsWithPrefix("metrics.context."));
/* 360*/ this.metrics = new Metrics(metricConfig, reporters,
(Time)time2, (MetricsContext)metricsContext);
/* 361*/ this.producerMetrics = new
KafkaProducerMetrics(this.metrics);
/* 362*/ this.partitioner =
(Partitioner)config.getConfiguredInstance("partitioner.class",
Partitioner.class, Collections.singletonMap("client.id", this.clientId));
/* 366*/ long retryBackoffMs =
config.getLong("retry.backoff.ms");
/* 367*/ if (keySerializer == null) {
/* 368*/ this.keySerializer =
(Serializer)config.getConfiguredInstance("key.serializer", Serializer.class);
/* 370*/
this.keySerializer.configure(config.originals(Collections.singletonMap("client.id",
this.clientId)), true);
} else {
/* 372*/ config.ignore("key.serializer");
/* 373*/ this.keySerializer = keySerializer;
}
/* 375*/ if (valueSerializer == null) {
/* 376*/ this.valueSerializer =
(Serializer)config.getConfiguredInstance("value.serializer", Serializer.class);
/* 378*/
this.valueSerializer.configure(config.originals(Collections.singletonMap("client.id",
this.clientId)), false);
} else {
/* 380*/ config.ignore("value.serializer");
/* 381*/ this.valueSerializer = valueSerializer;
}
/* 384*/ List interceptorList =
config.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class,
Collections.singletonMap("client.id", this.clientId));
/* 388*/ this.interceptors = interceptors != null ? interceptors
: new ProducerInterceptors(interceptorList);
/* 392*/ ClusterResourceListeners clusterResourceListeners =
this.configureClusterResourceListeners((Serializer<K>)keySerializer,
(Serializer<V>)valueSerializer, interceptorList, reporters);
/* 394*/ this.maxRequestSize = config.getInt("max.request.size");
/* 395*/ this.totalMemorySize = config.getLong("buffer.memory");
/* 396*/ this.compressionType =
CompressionType.forName((String)config.getString("compression.type"));
/* 398*/ this.maxBlockTimeMs = config.getLong("max.block.ms");
/* 399*/ int deliveryTimeoutMs =
KafkaProducer.configureDeliveryTimeout((ProducerConfig)config, this.log);
/* 401*/ this.apiVersions = new ApiVersions();
/* 402*/ this.transactionManager =
this.configureTransactionState((ProducerConfig)config, logContext);
/* 403*/ this.accumulator = new RecordAccumulator(logContext,
config.getInt("batch.size").intValue(), this.compressionType,
KafkaProducer.lingerMs((ProducerConfig)config), retryBackoffMs,
deliveryTimeoutMs, this.metrics, PRODUCER_METRIC_GROUP_NAME, (Time)time2,
this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize,
config.getInt("batch.size").intValue(), this.metrics, (Time)time2,
PRODUCER_METRIC_GROUP_NAME));
/* 416*/ List addresses =
ClientUtils.parseAndValidateAddresses((List)config.getList("bootstrap.servers"),
(String)config.getString("client.dns.lookup"));
/* 419*/ if (metadata != null) {
/* 420*/ this.metadata = metadata;
} else {
/* 422*/ this.metadata = new
ProducerMetadata(retryBackoffMs,
config.getLong("metadata.max.age.ms").longValue(),
config.getLong("metadata.max.idle.ms").longValue(), logContext,
clusterResourceListeners, Time.SYSTEM);
/* 428*/ this.metadata.bootstrap(addresses);
}
/* 430*/ this.errors = this.metrics.sensor("errors");
/* 431*/ this.sender = this.newSender(logContext,
(KafkaClient)kafkaClient2, this.metadata);
String ioThreadName = "kafka-producer-network-thread |
" + this.clientId;
/* 433*/ this.ioThread = new KafkaThread(ioThreadName,
(Runnable)this.sender, true);
/* 434*/ this.ioThread.start();
/* 435*/ config.logUnused();
/* 436*/ AppInfoParser.registerAppInfo((String)JMX_PREFIX,
(String)this.clientId, (Metrics)this.metrics, (long)time2.milliseconds());
/* 437*/ this.log.debug("Kafka producer started");
}
catch (Throwable t) {
/* 440*/ this.close(Duration.ofMillis(0L), true);
throw new KafkaException("Failed to construct kafka
producer", t);
}
}
KafkaProducer(ProducerConfig producerConfig, LogContext
logContext, Metrics metrics, Serializer<K> serializer, Serializer<V>
serializer2, ProducerMetadata producerMetadata, RecordAccumulator
recordAccumulator, TransactionManager transactionManager, Sender sender,
ProducerInterceptors<K, V> producerInterceptors, Partitioner partitioner, Time
time, KafkaThread kafkaThread) {
this(producerConfig, logContext, metrics, serializer,
serializer2, producerMetadata, recordAccumulator, transactionManager, sender,
producerInterceptors, partitioner, time, kafkaThread, null);
$sw$delegate$0jj6ri2$hg7pk30$3upq2f1.intercept(this, new
Object[]{producerConfig, logContext, metrics, serializer, serializer2,
producerMetadata, recordAccumulator, transactionManager, sender,
producerInterceptors, partitioner, time, kafkaThread});
}
/*
* WARNING - void declaration
*/
private /* synthetic */ KafkaProducer(ProducerConfig
producerConfig, LogContext logContext, Metrics metrics, Serializer serializer,
Serializer serializer2, ProducerMetadata producerMetadata, RecordAccumulator
recordAccumulator, TransactionManager transactionManager, Sender sender,
ProducerInterceptors producerInterceptors, Partitioner partitioner, Time time,
KafkaThread kafkaThread, $sw$auxiliary$o9jqgj1 o9jqgj12) {
void ioThread;
void sender2;
void metadata;
void accumulator;
void transactionManager2;
void interceptors;
void valueSerializer;
void keySerializer;
void partitioner2;
void metrics2;
void logContext2;
void time2;
void config;
/* 460*/ this.producerConfig = config;
/* 461*/ this.time = time2;
/* 462*/ this.clientId = config.getString("client.id");
/* 463*/ this.log = logContext2.logger(KafkaProducer.class);
/* 464*/ this.metrics = metrics2;
/* 465*/ this.producerMetrics = new
KafkaProducerMetrics((Metrics)metrics2);
/* 466*/ this.partitioner = partitioner2;
/* 467*/ this.keySerializer = keySerializer;
/* 468*/ this.valueSerializer = valueSerializer;
/* 469*/ this.interceptors = interceptors;
/* 470*/ this.maxRequestSize = config.getInt("max.request.size");
/* 471*/ this.totalMemorySize = config.getLong("buffer.memory");
/* 472*/ this.compressionType =
CompressionType.forName((String)config.getString("compression.type"));
/* 473*/ this.maxBlockTimeMs = config.getLong("max.block.ms");
/* 474*/ this.apiVersions = new ApiVersions();
/* 475*/ this.transactionManager = transactionManager2;
/* 476*/ this.accumulator = accumulator;
/* 477*/ this.errors = this.metrics.sensor("errors");
/* 478*/ this.metadata = metadata;
/* 479*/ this.sender = sender2;
/* 480*/ this.ioThread = ioThread;
}
Sender newSender(LogContext logContext, KafkaClient
kafkaClient, ProducerMetadata metadata) {
/* 485*/ int maxInflightRequests =
this.producerConfig.getInt("max.in.flight.requests.per.connection");
/* 486*/ int requestTimeoutMs =
this.producerConfig.getInt("request.timeout.ms");
/* 487*/ ChannelBuilder channelBuilder =
ClientUtils.createChannelBuilder((AbstractConfig)this.producerConfig,
(Time)this.time, (LogContext)logContext);
ProducerMetrics metricsRegistry = new
ProducerMetrics(this.metrics);
/* 489*/ Sensor throttleTimeSensor =
Sender.throttleTimeSensor((SenderMetricsRegistry)metricsRegistry.senderMetrics);
/* 490*/ KafkaClient client = kafkaClient != null ? kafkaClient :
new NetworkClient((Selectable)new
Selector(this.producerConfig.getLong("connections.max.idle.ms").longValue(),
this.metrics, this.time, "producer", channelBuilder, logContext),
(Metadata)metadata, this.clientId, maxInflightRequests,
this.producerConfig.getLong("reconnect.backoff.ms").longValue(),
this.producerConfig.getLong("reconnect.backoff.max.ms").longValue(),
this.producerConfig.getInt("send.buffer.bytes").intValue(),
this.producerConfig.getInt("receive.buffer.bytes").intValue(),
requestTimeoutMs,
this.producerConfig.getLong("socket.connection.setup.timeout.ms").longValue(),
this.producerConfig.getLong("socket.connection.setup.timeout.max.ms").longValue(),
this.time, true, this.apiVersions, throttleTimeSensor, logContext);
/* 509*/ short acks =
Short.parseShort(this.producerConfig.getString("acks"));
return new Sender(logContext, client, metadata,
this.accumulator, maxInflightRequests == 1,
this.producerConfig.getInt("max.request.size").intValue(), acks,
this.producerConfig.getInt("retries").intValue(),
metricsRegistry.senderMetrics, this.time, requestTimeoutMs,
this.producerConfig.getLong("retry.backoff.ms").longValue(),
this.transactionManager, this.apiVersions);
}
private static int lingerMs(ProducerConfig config) {
/* 527*/ return (int)Math.min(config.getLong("linger.ms"),
Integer.MAX_VALUE);
}
private static int configureDeliveryTimeout(ProducerConfig
config, Logger log) {
int requestTimeoutMs;
int lingerMs;
int lingerAndRequestTimeoutMs;
int deliveryTimeoutMs =
config.getInt("delivery.timeout.ms");
/* 532*/ if (deliveryTimeoutMs < (lingerAndRequestTimeoutMs =
(int)Math.min((long)(lingerMs = KafkaProducer.lingerMs(config)) +
(long)(requestTimeoutMs = config.getInt("request.timeout.ms").intValue()),
Integer.MAX_VALUE))) {
/* 537*/ if
(config.originals().containsKey("delivery.timeout.ms")) {
throw new ConfigException("delivery.timeout.ms
should be equal to or larger than linger.ms + request.timeout.ms");
}
/* 544*/ deliveryTimeoutMs = lingerAndRequestTimeoutMs;
/* 545*/ log.warn("{} should be equal to or larger than {} + {}.
Setting it to {}.", new Object[]{"delivery.timeout.ms", "linger.ms",
"request.timeout.ms", deliveryTimeoutMs});
}
/* 550*/ return deliveryTimeoutMs;
}
private TransactionManager
configureTransactionState(ProducerConfig config, LogContext logContext) {
/* 555*/ TransactionManager transactionManager = null;
/* 557*/ if (config.getBoolean("enable.idempotence").booleanValue())
{
long retryBackoffMs;
int transactionTimeoutMs;
String transactionalId =
config.getString("transactional.id");
/* 559*/ transactionManager = new TransactionManager(logContext,
transactionalId, transactionTimeoutMs =
config.getInt("transaction.timeout.ms").intValue(), retryBackoffMs =
config.getLong("retry.backoff.ms").longValue(), this.apiVersions);
/* 569*/ if (transactionManager.isTransactional()) {
/* 570*/ this.log.info("Instantiated a transactional
producer.");
} else {
/* 572*/ this.log.info("Instantiated an idempotent
producer.");
}
}
/* 574*/ return transactionManager;
}
public void initTransactions() {
/* 603*/ this.throwIfNoTransactionManager();
/* 604*/ this.throwIfProducerClosed();
/* 605*/ long now = this.time.nanoseconds();
/* 606*/ TransactionalRequestResult result =
this.transactionManager.initializeTransactions();
/* 607*/ this.sender.wakeup();
/* 608*/ result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
/* 609*/ this.producerMetrics.recordInit(this.time.nanoseconds() -
now);
}
public void beginTransaction() throws ProducerFencedException {
/* 628*/ this.throwIfNoTransactionManager();
/* 629*/ this.throwIfProducerClosed();
/* 630*/ long now = this.time.nanoseconds();
/* 631*/ this.transactionManager.beginTransaction();
/* 632*/ this.producerMetrics.recordBeginTxn(this.time.nanoseconds()
- now);
}
@Deprecated
public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offsets, String consumerGroupId) throws
ProducerFencedException {
/* 666*/ this.sendOffsetsToTransaction(offsets, new
ConsumerGroupMetadata(consumerGroupId));
}
public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) throws
ProducerFencedException {
/* 713*/ this.throwIfInvalidGroupMetadata(groupMetadata);
/* 714*/ this.throwIfNoTransactionManager();
/* 715*/ this.throwIfProducerClosed();
/* 717*/ if (!offsets.isEmpty()) {
/* 718*/ long start = this.time.nanoseconds();
/* 719*/ TransactionalRequestResult result =
this.transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
/* 720*/ this.sender.wakeup();
/* 721*/ result.await(this.maxBlockTimeMs,
TimeUnit.MILLISECONDS);
/* 722*/
this.producerMetrics.recordSendOffsets(this.time.nanoseconds() - start);
}
}
public void commitTransaction() throws ProducerFencedException {
/* 752*/ this.throwIfNoTransactionManager();
/* 753*/ this.throwIfProducerClosed();
/* 754*/ long commitStart = this.time.nanoseconds();
/* 755*/ TransactionalRequestResult result =
this.transactionManager.beginCommit();
/* 756*/ this.sender.wakeup();
/* 757*/ result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
/* 758*/
this.producerMetrics.recordCommitTxn(this.time.nanoseconds() - commitStart);
}
public void abortTransaction() throws ProducerFencedException {
/* 784*/ this.throwIfNoTransactionManager();
/* 785*/ this.throwIfProducerClosed();
/* 786*/ this.log.info("Aborting incomplete transaction");
/* 787*/ long abortStart = this.time.nanoseconds();
/* 788*/ TransactionalRequestResult result =
this.transactionManager.beginAbort();
/* 789*/ this.sender.wakeup();
/* 790*/ result.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
/* 791*/ this.producerMetrics.recordAbortTxn(this.time.nanoseconds()
- abortStart);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record)
{
/* 800*/ return this.send(record, null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record,
Callback callback) {
/* 913*/ ProducerRecord interceptedRecord =
this.interceptors.onSend(record);
/* 914*/ return this.doSend(interceptedRecord, callback);
}
private void throwIfProducerClosed() {
/* 920*/ if (this.sender == null || !this.sender.isRunning()) {
throw new IllegalStateException("Cannot perform
operation after producer has been closed");
}
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V>
producerRecord, Callback callback) {
return
(Future)$sw$delegate$0jj6ri2$hg7pk30$bb7a8p2.intercept((Object)this, new
Object[]{producerRecord, callback}, cachedValue$$sw$0jj6ri2$bt6qud2,
(OverrideCallable)new KafkaProducer$$sw$auxiliary$12kr501(this));
}
private /* synthetic */ Future
$sw$original$doSend$v18i882(ProducerRecord record, Callback callback) {
/* 928*/ TopicPartition tp = null;
try {
long timestamp;
byte[] serializedValue;
byte[] serializedKey;
ClusterAndWaitTime clusterAndWaitTime;
/* 930*/ this.throwIfProducerClosed();
/* 932*/ long nowMs = this.time.milliseconds();
try {
/* 935*/ clusterAndWaitTime =
this.waitOnMetadata(record.topic(), record.partition(), nowMs,
this.maxBlockTimeMs);
}
catch (KafkaException e) {
/* 937*/ if (this.metadata.isClosed()) {
throw new KafkaException("Producer closed while
send in progress", (Throwable)e);
}
/* 939*/ throw e;
}
/* 941*/ nowMs += clusterAndWaitTime.waitedOnMetadataMs;
/* 942*/ long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs
- clusterAndWaitTime.waitedOnMetadataMs);
/* 943*/ Cluster cluster = clusterAndWaitTime.cluster;
try {
/* 946*/ serializedKey =
this.keySerializer.serialize(record.topic(), record.headers(), record.key());
}
catch (ClassCastException cce) {
throw new SerializationException("Can't convert key
of class " + record.key().getClass().getName() + " to class " +
this.producerConfig.getClass("key.serializer").getName() + " specified in
key.serializer", (Throwable)cce);
}
try {
/* 954*/ serializedValue =
this.valueSerializer.serialize(record.topic(), record.headers(),
record.value());
}
catch (ClassCastException cce) {
throw new SerializationException("Can't convert
value of class " + record.value().getClass().getName() + " to class " +
this.producerConfig.getClass("value.serializer").getName() + " specified in
value.serializer", (Throwable)cce);
}
/* 960*/ int partition = this.partition(record, serializedKey,
serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
/* 963*/ this.setReadOnly(record.headers());
/* 964*/ Header[] headers = record.headers().toArray();
/* 966*/ int serializedSize =
AbstractRecords.estimateSizeInBytesUpperBound((byte)this.apiVersions.maxUsableProduceMagic(),
(CompressionType)this.compressionType, (byte[])serializedKey,
(byte[])serializedValue, (Header[])headers);
/* 968*/ this.ensureValidRecordSize(serializedSize);
/* 969*/ long l = timestamp = record.timestamp() == null ? nowMs
: record.timestamp();
/* 970*/ if (this.log.isTraceEnabled()) {
/* 971*/ this.log.trace("Attempting to append record {} with
callback {} to topic {} partition {}", new Object[]{record, callback,
record.topic(), partition});
}
InterceptorCallback interceptCallback = new
InterceptorCallback(callback, this.interceptors, tp);
/* 976*/ RecordAccumulator.RecordAppendResult result =
this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers,
interceptCallback, remainingWaitMs, true, nowMs);
/* 979*/ if (result.abortForNewBatch) {
/* 980*/ int prevPartition = partition;
/* 981*/ this.partitioner.onNewBatch(record.topic(),
cluster, prevPartition);
/* 982*/ partition = this.partition(record, serializedKey,
serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
/* 984*/ if (this.log.isTraceEnabled()) {
/* 985*/ this.log.trace("Retrying append due to new
batch creation for topic {} partition {}. The old partition was {}", new
Object[]{record.topic(), partition, prevPartition});
}
interceptCallback = new
InterceptorCallback(callback, this.interceptors, tp);
/* 990*/ result = this.accumulator.append(tp, timestamp,
serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs,
false, nowMs);
}
/* 999*/ if (this.transactionManager != null) {
/*1000*/ this.transactionManager.maybeAddPartition(tp);
}
/*1003*/ if (result.batchIsFull || result.newBatchCreated) {
/*1004*/ this.log.trace("Waking up the sender since topic {}
partition {} is either full or getting a new batch", (Object)record.topic(),
(Object)partition);
/*1005*/ this.sender.wakeup();
}
/*1007*/ return result.future;
}
catch (ApiException e) {
/*1012*/ this.log.debug("Exception occurred during message
send:", (Throwable)e);
/*1014*/ if (tp == null) {
/*1016*/ tp =
ProducerInterceptors.extractTopicPartition((ProducerRecord)record);
}
InterceptorCallback interceptCallback = new
InterceptorCallback(callback, this.interceptors, tp);
/*1023*/ interceptCallback.onCompletion(null,
(Exception)((Object)e));
/*1024*/ this.errors.record();
/*1025*/ this.interceptors.onSendError(record, tp,
(Exception)((Object)e));
return new FutureFailure((Exception)((Object)e));
}
catch (InterruptedException e) {
/*1028*/ this.errors.record();
/*1029*/ this.interceptors.onSendError(record, tp, (Exception)e);
throw new InterruptException(e);
}
catch (KafkaException e) {
/*1032*/ this.errors.record();
/*1033*/ this.interceptors.onSendError(record, tp,
(Exception)((Object)e));
/*1034*/ throw e;
}
catch (Exception e) {
/*1037*/ this.interceptors.onSendError(record, tp, e);
/*1038*/ throw e;
}
}
private void setReadOnly(Headers headers) {
/*1043*/ if (headers instanceof RecordHeaders) {
/*1044*/ ((RecordHeaders)headers).setReadOnly();
}
}
private ClusterAndWaitTime waitOnMetadata(String topic, Integer
partition, long nowMs, long maxWaitMs) throws InterruptedException {
Cluster cluster = this.metadata.fetch();
/*1062*/ if (cluster.invalidTopics().contains(topic)) {
throw new InvalidTopicException(topic);
}
/*1065*/ this.metadata.add(topic, nowMs);
/*1067*/ Integer partitionsCount =
cluster.partitionCountForTopic(topic);
/*1070*/ if (partitionsCount != null && (partition == null ||
partition < partitionsCount)) {
return new ClusterAndWaitTime(cluster, 0L);
}
/*1073*/ long remainingWaitMs = maxWaitMs;
/*1074*/ long elapsed = 0L;
do {
/*1079*/ if (partition != null) {
/*1080*/ this.log.trace("Requesting metadata update for
partition {} of topic {}.", (Object)partition, (Object)topic);
} else {
/*1082*/ this.log.trace("Requesting metadata update for
topic {}.", (Object)topic);
}
/*1084*/ this.metadata.add(topic, nowMs + elapsed);
/*1085*/ int version =
this.metadata.requestUpdateForTopic(topic);
/*1086*/ this.sender.wakeup();
try {
/*1088*/ this.metadata.awaitUpdate(version, remainingWaitMs);
}
catch (TimeoutException ex) {
throw new TimeoutException(String.format("Topic %s
not present in metadata after %d ms.", topic, maxWaitMs));
}
/*1095*/ cluster = this.metadata.fetch();
/*1096*/ elapsed = this.time.milliseconds() - nowMs;
/*1097*/ if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null
? String.format("Topic %s not present in metadata after %d ms.", topic,
maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is
not present in metadata after %d ms.", partition, topic, partitionsCount,
maxWaitMs));
}
/*1104*/ this.metadata.maybeThrowExceptionForTopic(topic);
/*1105*/ remainingWaitMs = maxWaitMs - elapsed;
} while ((partitionsCount =
cluster.partitionCountForTopic(topic)) == null || partition != null &&
partition >= partitionsCount);
return new ClusterAndWaitTime(cluster, elapsed);
}
private void ensureValidRecordSize(int size) {
/*1116*/ if (size > this.maxRequestSize) {
throw new RecordTooLargeException("The message is " +
size + " bytes when serialized which is larger than " + this.maxRequestSize +
", which is the value of the " + "max.request.size" + " configuration.");
}
/*1120*/ if ((long)size > this.totalMemorySize) {
throw new RecordTooLargeException("The message is " +
size + " bytes when serialized which is larger than the total memory buffer you
have configured with the " + "buffer.memory" + " configuration.");
}
}
public void flush() {
/*1163*/ this.log.trace("Flushing accumulated records in producer.");
/*1165*/ long start = this.time.nanoseconds();
/*1166*/ this.accumulator.beginFlush();
/*1167*/ this.sender.wakeup();
try {
/*1169*/ this.accumulator.awaitFlushCompletion();
}
catch (InterruptedException e) {
throw new InterruptException("Flush interrupted.", e);
}
finally {
this.producerMetrics.recordFlush(this.time.nanoseconds() - start);
}
}
public List<PartitionInfo> partitionsFor(String topic) {
/*1187*/ Objects.requireNonNull(topic, "topic cannot be null");
try {
/*1189*/ return this.waitOnMetadata((String)topic, null,
(long)this.time.milliseconds(),
(long)this.maxBlockTimeMs).cluster.partitionsForTopic(topic);
}
catch (InterruptedException e) {
throw new InterruptException(e);
}
}
public Map<MetricName, ? extends Metric> metrics() {
/*1200*/ return Collections.unmodifiableMap(this.metrics.metrics());
}
public void close() {
/*1218*/ this.close(Duration.ofMillis(Long.MAX_VALUE));
}
public void close(Duration timeout) {
/*1242*/ this.close(timeout, false);
}
private void close(Duration timeout, boolean swallowException) {
boolean invokedFromCallback;
long timeoutMs = timeout.toMillis();
/*1247*/ if (timeoutMs < 0L) {
throw new IllegalArgumentException("The timeout cannot
be negative.");
}
/*1249*/ this.log.info("Closing the Kafka producer with
timeoutMillis = {} ms.", (Object)timeoutMs);
AtomicReference<InterruptException> firstException = new
AtomicReference<InterruptException>();
/*1253*/ boolean bl = invokedFromCallback = Thread.currentThread()
== this.ioThread;
/*1254*/ if (timeoutMs > 0L) {
/*1255*/ if (invokedFromCallback) {
/*1256*/ this.log.warn("Overriding close timeout {} ms to 0
ms in order to prevent useless blocking due to self-join. This means you have
incorrectly invoked close with a non-zero timeout from the producer
call-back.", (Object)timeoutMs);
} else {
/*1261*/ if (this.sender != null) {
/*1262*/ this.sender.initiateClose();
}
/*1263*/ if (this.ioThread != null) {
try {
/*1265*/ this.ioThread.join(timeoutMs);
}
catch (InterruptedException t) {
/*1267*/ firstException.compareAndSet(null, new
InterruptException(t));
/*1268*/ this.log.error("Interrupted while joining
ioThread", (Throwable)t);
}
}
}
}
/*1274*/ if (this.sender != null && this.ioThread != null &&
this.ioThread.isAlive()) {
/*1275*/ this.log.info("Proceeding to force close the producer
since pending requests could not be completed within timeout {} ms.",
(Object)timeoutMs);
/*1277*/ this.sender.forceClose();
/*1279*/ if (!invokedFromCallback) {
try {
/*1281*/ this.ioThread.join();
}
catch (InterruptedException e) {
/*1283*/ firstException.compareAndSet(null, new
InterruptException(e));
}
}
}
/*1288*/ Utils.closeQuietly(this.interceptors, (String)"producer
interceptors", firstException);
/*1289*/ Utils.closeQuietly((AutoCloseable)this.producerMetrics,
(String)"producer metrics wrapper", firstException);
/*1290*/ Utils.closeQuietly((AutoCloseable)this.metrics,
(String)"producer metrics", firstException);
/*1291*/ Utils.closeQuietly(this.keySerializer, (String)"producer
keySerializer", firstException);
/*1292*/ Utils.closeQuietly(this.valueSerializer, (String)"producer
valueSerializer", firstException);
/*1293*/ Utils.closeQuietly((AutoCloseable)this.partitioner,
(String)"producer partitioner", firstException);
/*1294*/ AppInfoParser.unregisterAppInfo((String)JMX_PREFIX,
(String)this.clientId, (Metrics)this.metrics);
/*1295*/ Throwable exception = (Throwable)firstException.get();
/*1296*/ if (exception != null && !swallowException) {
/*1297*/ if (exception instanceof InterruptException) {
/*1298*/ throw (InterruptException)exception;
}
throw new KafkaException("Failed to close kafka
producer", exception);
}
/*1302*/ this.log.debug("Kafka producer has been closed");
}
private ClusterResourceListeners
configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V>
valueSerializer, List<?> ... candidateLists) {
ClusterResourceListeners clusterResourceListeners = new
ClusterResourceListeners();
/*1307*/ for (List<?> candidateList : candidateLists) {
/*1308*/ clusterResourceListeners.maybeAddAll(candidateList);
}
/*1310*/ clusterResourceListeners.maybeAdd(keySerializer);
/*1311*/ clusterResourceListeners.maybeAdd(valueSerializer);
/*1312*/ return clusterResourceListeners;
}
private int partition(ProducerRecord<K, V> record, byte[]
serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition.intValue() :
this.partitioner.partition(record.topic(), record.key(), serializedKey,
record.value(), serializedValue, cluster);
}
private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata
groupMetadata) {
/*1329*/ if (groupMetadata == null) {
throw new IllegalArgumentException("Consumer group
metadata could not be null");
}
/*1331*/ if (groupMetadata.generationId() > 0 &&
"".equals(groupMetadata.memberId())) {
throw new IllegalArgumentException("Passed in group
metadata " + groupMetadata + " has generationId > 0 but member.id ");
}
}
private void throwIfNoTransactionManager() {
/*1338*/ if (this.transactionManager == null) {
throw new IllegalStateException("Cannot use
transactional methods without enabling transactions by setting the
transactional.id configuration property");
}
}
String getClientId() {
/*1345*/ return this.clientId;
}
public void setSkyWalkingDynamicField(Object object) {
this._$EnhancedClassField_ws = object;
}
public Object getSkyWalkingDynamicField() {
return this._$EnhancedClassField_ws;
}
static {
ClassLoader.getSystemClassLoader().loadClass("org.apache.skywalking.apm.dependencies.net.bytebuddy.dynamic.Nexus").getMethod("initialize",
Class.class, Integer.TYPE).invoke(null, KafkaProducer.class, 1381404450);
cachedValue$$sw$0jj6ri2$bt6qud2 =
KafkaProducer.class.getDeclaredMethod("doSend", ProducerRecord.class,
Callback.class);
}
final /* synthetic */ Future
$sw$original$doSend$v18i882$accessor$$sw$0jj6ri2(ProducerRecord producerRecord,
Callback callback) {
return this.$sw$original$doSend$v18i882(producerRecord,
callback);
}
private static class InterceptorCallback<K, V>
implements Callback,
EnhancedInstance {
private final Callback userCallback;
private final ProducerInterceptors<K, V> interceptors;
private final TopicPartition tp;
private volatile Object _$EnhancedClassField_ws;
public static volatile /* synthetic */ InstMethodsInter
$sw$delegate$69ob5a2$rqepec0$qenpom2;
private static final /* synthetic */ Method
cachedValue$$sw$69ob5a2$km196e1;
private InterceptorCallback(Callback userCallback,
ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
this.userCallback = userCallback;
this.interceptors = interceptors;
this.tp = tp;
}
public void onCompletion(RecordMetadata recordMetadata,
Exception exception) {
$sw$delegate$69ob5a2$rqepec0$qenpom2.intercept((Object)this, new
Object[]{recordMetadata, exception}, (Callable)new
InterceptorCallback$$sw$auxiliary$4j3p8q3(this, recordMetadata, exception),
cachedValue$$sw$69ob5a2$km196e1);
}
private /* synthetic */ void
$sw$original$onCompletion$ibg2i91(RecordMetadata metadata, Exception exception)
{
metadata = metadata != null ? metadata : new
RecordMetadata(this.tp, -1L, -1, -1L, -1, -1);
this.interceptors.onAcknowledgement(metadata,
exception);
if (this.userCallback != null) {
this.userCallback.onCompletion(metadata, exception);
}
}
public void setSkyWalkingDynamicField(Object object) {
this._$EnhancedClassField_ws = object;
}
public Object getSkyWalkingDynamicField() {
return this._$EnhancedClassField_ws;
}
static {
ClassLoader.getSystemClassLoader().loadClass("org.apache.skywalking.apm.dependencies.net.bytebuddy.dynamic.Nexus").getMethod("initialize",
Class.class, Integer.TYPE).invoke(null, InterceptorCallback.class, 1720479049);
cachedValue$$sw$69ob5a2$km196e1 =
InterceptorCallback.class.getMethod("onCompletion", RecordMetadata.class,
Exception.class);
}
final /* synthetic */ void
$sw$original$onCompletion$ibg2i91$accessor$$sw$69ob5a2(RecordMetadata
recordMetadata, Exception exception) {
this.$sw$original$onCompletion$ibg2i91(recordMetadata,
exception);
}
}
private static class FutureFailure
implements Future<RecordMetadata> {
private final ExecutionException exception;
public FutureFailure(Exception exception) {
this.exception = new ExecutionException(exception);
}
@Override
public boolean cancel(boolean interrupt) {
return false;
}
@Override
public RecordMetadata get() throws ExecutionException {
throw this.exception;
}
@Override
public RecordMetadata get(long timeout, TimeUnit unit)
throws ExecutionException {
throw this.exception;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
}
private static class ClusterAndWaitTime {
final Cluster cluster;
final long waitedOnMetadataMs;
ClusterAndWaitTime(Cluster cluster, long
waitedOnMetadataMs) {
this.cluster = cluster;
this.waitedOnMetadataMs = waitedOnMetadataMs;
}
}
}
Affect(row-cnt:6) cost in 876 ms.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]