yaojingguo opened a new issue #6456:
URL: https://github.com/apache/skywalking/issues/6456


   Please answer these questions before submitting your issue.
   
   - Why do you submit this issue?
   - [ ] Question or discussion
   - [x] Bug
   - [ ] Requirement
   - [ ] Feature or performance improvement
   ___
   ### Bug
   - Which version of SkyWalking, OS, and JRE?
   SkyWalking:v8.4.0, OS: CentOS Linux release 7.4.1708 (Core), JRE: 1.8.0_191, 
spring-kafka 2.2.6.RELEASE
   
   - Which company or project?
   
   - What happened?
   My code uses KafkaTemplate to send messages to Kafka. The following usages 
of KafkaTemplate  are used to send messages.
   
   ```java
   // Without callback
   template.send(topic, msg);
   
   // With callback
   template.execute(
       (Producer<String, String> producer) -> {
         producer.send(
             new ProducerRecord<>(topic, msg),
             (RecordMetadata metadata, Exception e) -> {
               if (e != null) {
                 System.out.println("message sending failed");
               } else {
                 System.out.println("message sending succeeded");
               }
             });
         return 0;
       });
   ```
   
   agent/logs/skywalking-api.log has the following exception:
   
   ```
   ERROR 2021-02-25 20:36:23:117 kafka-producer-network-thread | producer-2 
InstMethodsInter : class[class 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback] before 
method[onCompletion] intercept failure
   java.lang.ClassCastException: 
org.apache.skywalking.apm.plugin.kafka.CallbackAdapterInterceptor cannot be 
cast to 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
   at 
org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor.getSnapshot(CallbackInterceptor.java:83)
   at 
org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor.beforeMethod(CallbackInterceptor.java:44)
   at 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:76)
   at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java)
   at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
   at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
   at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:599)
   at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:575)
   at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
   at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
   at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
   at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
   at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
   at java.lang.Thread.run(Thread.java:748)
   ```
   
   ___
   ### Possible fix
   
   My guess is that the following branch of 
`KafkaProducerInterceptor.beforeMethod`' executed:
   
   ```java
               } else if (shouldCallbackInstance instanceof Callback) {
                   Callback callback = (Callback) shouldCallbackInstance;
                   ContextSnapshot snapshot = ContextManager.capture();
                   if (null != snapshot) {
                       CallbackCache cache = new CallbackCache();
                       cache.setSnapshot(snapshot);
                       cache.setCallback(callback);
                       allArguments[1] = new CallbackAdapterInterceptor(cache);
                   }
               }
   ```
   
   So when `CallbackInterceptor.getSnapshot` tried to cast cache.getCallback() 
to EnhancedInstance, a ClassCastException was thrown. One possible fix is to 
add a type check to CallbackConstructorInterceptor.onConstruct.
   
   ```java
       @Override
       public void onConstruct(EnhancedInstance objInst, Object[] allArguments) 
{
           Callback callback = (Callback) allArguments[0];
           if (callback instanceof EnhancedInstance) {
               CallbackCache cache;
               if (null != objInst.getSkyWalkingDynamicField()) {
                   cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
               } else {
                   cache = new CallbackCache();
               }
               cache.setCallback(callback);
               objInst.setSkyWalkingDynamicField(cache);
           }
       }
   ```
   
   
   If some committers can give me some guidance, I will try to fix this issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to