Please check that you are implementing SerializableDeserializer<T> for some type T and not the raw type.
2017-09-26 0:07 GMT+02:00 Manish Sharma <[email protected]>: > Thanks Stig, > So I tried using > > >> .setValue(EmailObjectDeserializer.class) > > and the EmailObjectDeserializer class is implementing the interface > org.apache.storm.kafka.spout.SerializableDeserializer > > >> public class EmailObjectDeserializer implements > SerializableDeserializer {...} > > > I see the following compilation error.. > > > > [ERROR] COMPILATION ERROR : > [INFO] ------------------------------------------------------------- > [ERROR] /xxx/comms/topology/SmtpInjectionTopology.java:[72,18] no > suitable method found for setValue(java.lang.Class<xxx.comms.utils. > EmailObjectDeserializer>) > method org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder.<NV> > setValue(org.apache.storm.kafka.spout.SerializableDeserializer<NV>) is > not applicable > (cannot infer type-variable(s) NV > (argument mismatch; > java.lang.Class<xxx.comms.utils.EmailObjectDeserializer> > cannot be converted to org.apache.storm.kafka.spout. > SerializableDeserializer<NV>)) > method > org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder.<NV>setValue(java.lang.Class<? > extends org.apache.kafka.common.serialization.Deserializer<NV>>) is not > applicable > (cannot infer type-variable(s) NV > (argument mismatch; > java.lang.Class<xxx.comms.utils.EmailObjectDeserializer> > cannot be converted to java.lang.Class<? extends org.apache.kafka.common. > serialization.Deserializer<NV>>)) > > > I tried implementing the org.apache.kafka.common.serialization.Deserializer > class too and got the same error.. > > Thanks for your help. /Manish > > > > > > On Sun, Sep 24, 2017 at 6:46 AM, Stig Rohde Døssing <[email protected]> > wrote: > >> Hi Manish, >> >> The setProp method will not work for setting deserializers until Storm >> 1.2.0. For 1.1.1 you will need to use setKey/setValue to set a different >> deserializer. >> >> e.g. >> KafkaSpoutConfig kafkaSpoutConfig = KafkaSpoutConfig >> .builder(property.getKafka_consumer_bootstrap_servers(), topics) >> .setValue(TestDeserializer.class) >> .build() >> >> Also when you upgrade to 1.2.0 please note that you can either do >> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> EmailObjectDeserializer.class) >> >> or >> >> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> "com.example.EmailObjectDeserializer") >> >> that is, you need to use the fully qualified class name of the >> deserializer class if you're setting it as a string. >> >> 2017-09-24 <20%2017%2009%2024> 1:38 GMT+02:00 Manish Sharma < >> [email protected]>: >> >>> Hello, >>> I am trying to use a custom ValueDeserializer when consuming from kafka, >>> I tried the following >>> >>> >>> --snip-- >>> KafkaSpoutConfig kafkaSpoutConfig = KafkaSpoutConfig >>> .builder(property.getKafka_consumer_bootstrap_servers(), topics) >>> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffset >>> Strategy.EARLIEST) >>> .setGroupId(property.getKafka_consumer_groupid()) >>> .setProp(ConsumerConfig.CLIENT_ID_CONFIG, "StormKafkaConsumer") >>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>> "EmailObjectDeserializer") >>> .build(); >>> --snip-- >>> >>> >>> It didn't take, In the logs I still see spout executor instantiated with >>> default "StringDeserializer" class. >>> >>> >>> --snip-- >>> 6348 [Thread-18-SMTPInjectionKafkaSpout-executor[2 2]] INFO >>> o.a.k.c.c.ConsumerConfig - ConsumerConfig values: >>> auto.commit.interval.ms = 5000 >>> auto.offset.reset = latest >>> bootstrap.servers = [XXXX.XXXX.XXXX:9092] >>> check.crcs = true >>> client.id = StormKafkaConsumer >>> connections.max.idle.ms = 540000 >>> enable.auto.commit = false >>> exclude.internal.topics = true >>> fetch.max.bytes = 52428800 <52%2042%2088%2000> >>> fetch.max.wait.ms = 500 >>> fetch.min.bytes = 1 >>> group.id = dev_worker >>> heartbeat.interval.ms = 3000 >>> interceptor.classes = null >>> key.deserializer = class org.apache.kafka.common.serial >>> ization.StringDeserializer >>> max.partition.fetch.bytes = 1048576 >>> max.poll.interval.ms = 300000 >>> max.poll.records = 100 >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> partition.assignment.strategy = [class org.apache.kafka.clients.consu >>> mer.RangeAssignor] >>> receive.buffer.bytes = 65536 >>> reconnect.backoff.ms = 50 >>> request.timeout.ms = 305000 >>> retry.backoff.ms = 100 >>> sasl.jaas.config = null >>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> sasl.kerberos.min.time.before.relogin = 60000 >>> sasl.kerberos.service.name = null >>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> sasl.mechanism = GSSAPI >>> security.protocol = PLAINTEXT >>> send.buffer.bytes = 131072 >>> session.timeout.ms = 10000 >>> ssl.cipher.suites = null >>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> ssl.endpoint.identification.algorithm = null >>> ssl.key.password = null >>> ssl.keymanager.algorithm = SunX509 >>> ssl.keystore.location = null >>> ssl.keystore.password = null >>> ssl.keystore.type = JKS >>> ssl.protocol = TLS >>> ssl.provider = null >>> ssl.secure.random.implementation = null >>> ssl.trustmanager.algorithm = PKIX >>> ssl.truststore.location = null >>> ssl.truststore.password = null >>> ssl.truststore.type = JKS >>> value.deserializer = class >>> org.apache.kafka.common.serialization.StringDeserializer >>> <------- >>> --snip-- >>> >>> >>> Any thoughts on how to get custom value.deserializer working with >>> storm-kafka-client-1.1.1? >>> >> >> >
