I still don't see anything wrong.

Googling the error leads to
https://bugs.eclipse.org/bugs/show_bug.cgi?id=516620. Is it possible that
you're compiling on a different JDK than the one running the cluster?

A workaround would be to eliminate the lambda from ByTopicRecordTranslator,
by using an anonymous class instead?

Den tor. 6. jun. 2019 kl. 17.25 skrev aurelien violette <
[email protected]>:

> Hey,
>
> Well, here it is. It extends ConfigurableTopology from storm-crawler. And
> I've tried many simplifications to get rid of any lambda potential code.
> The only place I see it, is in the RecordTranslator defaults.
>
> Thank you for you ideas if any about where or what to search for.
>
>
> import com.digitalpebble.stormcrawler.ConfigurableTopology;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
>
> import java.io.Serializable;
>
> public abstract class AntConfigurableTopology extends ConfigurableTopology 
> implements Serializable {
>
>   String topologyName;
>   // ZkHosts zkHosts;
>   String bootstrapServers = "localhost:9092"; // Default local configuration
>   int parallel = 1;
>
>   void init(String topologyId) {
>     topologyName = (String) this.getConf().get(topologyId);
>     bootstrapServers = (String) this.getConf().get("metadata.broker");
>     final Integer parallelismHint = (Integer) 
> this.getConf().getOrDefault("parallelism", 1);
>     parallel = parallelismHint;
>     if (!this.getConf().containsKey("zkhost")) {
>       this.getConf().put("zkhost", "localhost:2181");
>     }
>   }
>
>
>   /**
>    * Get a spout config by topic
>    * @param topic
>    * @return
>    */
>   KafkaSpoutConfig getSpoutConfig(String topic) {
>     return getSpoutConfig(topic, null); //new 
> AntmarkTupleBuilder(this.getConf()));
>   }
>
>
>   /**
>    * Get a spout config by topic, define the scheme.
>    * @param topic
>    * @param deserializer deserializer to use from bytes to value.
>    * @return
>    */
>   KafkaSpoutConfig getSpoutConfig(
>           String topic,
>           Object deserializer)
>   {
>
>     String addWeblinkTopic = (String) this.getConf().get(topic);
>
>     // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>     KafkaSpoutConfig kafkaAddWeblinkConfig = 
> KafkaSpoutConfig.builder(bootstrapServers, addWeblinkTopic)
>             // Consummer will start from the latest uncommitted offset, or 
> the earliest offset if any.
>             
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             // Setup serializers from bytes to string.
>             // careful the key is dropped from here.
>             .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  
> "org.apache.kafka.common.serialization.StringSerializer")
>             .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.StringSerializer")
>             // Setup deserialization to fields : (String key, String json 
> value) => (String Key, Unpacked object from json)
>             // .setRecordTranslator(new 
> ByTopicRecordTranslator<>((TargetInterface Serializable)(r) -> new 
> Values(r.value()), new Fields(FieldNames.ANTMARK)))
>             //.setRecordTranslator(deserializer, deserializer.getFields())
>             .build();
>
>
>     return kafkaAddWeblinkConfig;
>   }
> }
>
>
> }
>
>
> Le jeu. 6 juin 2019 à 16:39, Stig Rohde Døssing <[email protected]>
> a écrit :
>
>> I don't see anything wrong with the code you posted. Could you post the
>> full AntConfigurableTopology code? It's hard to tell from that snippet what
>> your topology setup looks like.
>>
>> Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette <
>> [email protected]>:
>>
>>> Hello,
>>>
>>> I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
>>> Crawler. I needed though to upgrade to Kafka 0.10.x
>>>
>>> I tried to simulate my enviroment using a Docker environment :
>>> Storm 1.1 and Kafka 2.11-0.10.2.2
>>>
>>> Unfortunately, at the deploy, I get an error on :
>>>
>>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>>> deserialization
>>>        at
>>> c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
>>> ~[stormjar.jar:?]
>>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_212]
>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
>>> ~[?:1.8.0_212]
>>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_212]
>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>> ~[?:1.8.0_212]
>>>        at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
>>> ~[storm-core-1.1.3.jar:1.1.3]
>>>
>>> Where my ConfigurableTopology is only gathering some config utils for
>>> building topology. In particular, it defines the SpoutConfig.
>>>
>>> /**
>>>  * Get a spout config by topic, define the scheme.
>>>  * @param topic
>>>  * @param deserializer deserializer to use from bytes to value.
>>>  * @return
>>>  */
>>> KafkaSpoutConfig getSpoutConfig(
>>>         String topic,
>>>         Object deserializer)
>>> {
>>>
>>>   String topic = (String) this.getConf().get(topic);
>>>
>>>   // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>>>   KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, 
>>> topic)
>>>           // Consummer will start from the latest uncommitted offset, or 
>>> the earliest offset if any.
>>>           
>>> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>           // Setup serializers from bytes to string.
>>>           // careful the key is dropped from here.
>>>           .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  
>>> "org.apache.kafka.common.serialization.StringSerializer")
>>>           .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
>>> "org.apache.kafka.common.serialization.StringSerializer")
>>>           // Setup deserialization to fields : (String key, String json 
>>> value) => (String Key, Unpacked object from json)
>>>           // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new 
>>> Values(r.value()), new Fields("FieldNames")))
>>>           .build();
>>>
>>>
>>>   return kafkaConfig;
>>>
>>> }
>>>
>>> I don't understand the origin of the issue. My Maven sets java to 1.8.
>>> Any idea on this issue ?
>>>
>>> Actually, I wanted to set up a RecordTranslator to handle the transition
>>> from the input JSON String to my deserialized JSON object. Deserialization
>>> is handled by Gson.
>>>
>>> Thank you for your help,
>>> BR,
>>> Aurelien
>>>
>>>
>>> --
>>> BR,
>>> Aurelien Violette
>>>
>>
>
> --
> BR,
> Aurelien Violette
>

Reply via email to