I still don't see anything wrong. Googling the error leads to https://bugs.eclipse.org/bugs/show_bug.cgi?id=516620. Is it possible that you're compiling on a different JDK than the one running the cluster?
A workaround would be to eliminate the lambda from ByTopicRecordTranslator, by using an anonymous class instead? Den tor. 6. jun. 2019 kl. 17.25 skrev aurelien violette < [email protected]>: > Hey, > > Well, here it is. It extends ConfigurableTopology from storm-crawler. And > I've tried many simplifications to get rid of any lambda potential code. > The only place I see it, is in the RecordTranslator defaults. > > Thank you for you ideas if any about where or what to search for. > > > import com.digitalpebble.stormcrawler.ConfigurableTopology; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.storm.kafka.spout.ByTopicRecordTranslator; > import org.apache.storm.kafka.spout.KafkaSpoutConfig; > > import java.io.Serializable; > > public abstract class AntConfigurableTopology extends ConfigurableTopology > implements Serializable { > > String topologyName; > // ZkHosts zkHosts; > String bootstrapServers = "localhost:9092"; // Default local configuration > int parallel = 1; > > void init(String topologyId) { > topologyName = (String) this.getConf().get(topologyId); > bootstrapServers = (String) this.getConf().get("metadata.broker"); > final Integer parallelismHint = (Integer) > this.getConf().getOrDefault("parallelism", 1); > parallel = parallelismHint; > if (!this.getConf().containsKey("zkhost")) { > this.getConf().put("zkhost", "localhost:2181"); > } > } > > > /** > * Get a spout config by topic > * @param topic > * @return > */ > KafkaSpoutConfig getSpoutConfig(String topic) { > return getSpoutConfig(topic, null); //new > AntmarkTupleBuilder(this.getConf())); > } > > > /** > * Get a spout config by topic, define the scheme. > * @param topic > * @param deserializer deserializer to use from bytes to value. > * @return > */ > KafkaSpoutConfig getSpoutConfig( > String topic, > Object deserializer) > { > > String addWeblinkTopic = (String) this.getConf().get(topic); > > // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder > KafkaSpoutConfig kafkaAddWeblinkConfig = > KafkaSpoutConfig.builder(bootstrapServers, addWeblinkTopic) > // Consummer will start from the latest uncommitted offset, or > the earliest offset if any. > > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) > // Setup serializers from bytes to string. > // careful the key is dropped from here. > .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > // Setup deserialization to fields : (String key, String json > value) => (String Key, Unpacked object from json) > // .setRecordTranslator(new > ByTopicRecordTranslator<>((TargetInterface Serializable)(r) -> new > Values(r.value()), new Fields(FieldNames.ANTMARK))) > //.setRecordTranslator(deserializer, deserializer.getFields()) > .build(); > > > return kafkaAddWeblinkConfig; > } > } > > > } > > > Le jeu. 6 juin 2019 à 16:39, Stig Rohde Døssing <[email protected]> > a écrit : > >> I don't see anything wrong with the code you posted. Could you post the >> full AntConfigurableTopology code? It's hard to tell from that snippet what >> your topology setup looks like. >> >> Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette < >> [email protected]>: >> >>> Hello, >>> >>> I was sucessfully using Kafka 0.8.x in a storm topology based on Storm >>> Crawler. I needed though to upgrade to Kafka 0.10.x >>> >>> I tried to simulate my enviroment using a Docker environment : >>> Storm 1.1 and Kafka 2.11-0.10.2.2 >>> >>> Unfortunately, at the deploy, I get an error on : >>> >>> Caused by: java.lang.IllegalArgumentException: Invalid lambda >>> deserialization >>> at >>> c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24) >>> ~[stormjar.jar:?] >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> ~[?:1.8.0_212] >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> ~[?:1.8.0_212] >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> ~[?:1.8.0_212] >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> ~[?:1.8.0_212] >>> at >>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) >>> ~[?:1.8.0_212] >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> ~[?:1.8.0_212] >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> ~[?:1.8.0_212] >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> ~[?:1.8.0_212] >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >>> ~[?:1.8.0_212] >>> at >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) >>> ~[?:1.8.0_212] >>> at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253) >>> ~[storm-core-1.1.3.jar:1.1.3] >>> >>> Where my ConfigurableTopology is only gathering some config utils for >>> building topology. In particular, it defines the SpoutConfig. >>> >>> /** >>> * Get a spout config by topic, define the scheme. >>> * @param topic >>> * @param deserializer deserializer to use from bytes to value. >>> * @return >>> */ >>> KafkaSpoutConfig getSpoutConfig( >>> String topic, >>> Object deserializer) >>> { >>> >>> String topic = (String) this.getConf().get(topic); >>> >>> // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder >>> KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, >>> topic) >>> // Consummer will start from the latest uncommitted offset, or >>> the earliest offset if any. >>> >>> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) >>> // Setup serializers from bytes to string. >>> // careful the key is dropped from here. >>> .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >>> "org.apache.kafka.common.serialization.StringSerializer") >>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>> "org.apache.kafka.common.serialization.StringSerializer") >>> // Setup deserialization to fields : (String key, String json >>> value) => (String Key, Unpacked object from json) >>> // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new >>> Values(r.value()), new Fields("FieldNames"))) >>> .build(); >>> >>> >>> return kafkaConfig; >>> >>> } >>> >>> I don't understand the origin of the issue. My Maven sets java to 1.8. >>> Any idea on this issue ? >>> >>> Actually, I wanted to set up a RecordTranslator to handle the transition >>> from the input JSON String to my deserialized JSON object. Deserialization >>> is handled by Gson. >>> >>> Thank you for your help, >>> BR, >>> Aurelien >>> >>> >>> -- >>> BR, >>> Aurelien Violette >>> >> > > -- > BR, > Aurelien Violette >
