Hi Amber,

Here is the code.

Properties prop = new Properties();
prop.put("bootstrap.servers", kafkaBrokers);
prop.put("ack", ack);
prop.put("key.serializer", keySerializer);
prop.put("value.serializer", valueSerializer);

KafkaBolt kafkaBolt = new KafkaBolt<String, JSONObject>()
                .withProducerProperties(prop)
                .withTopicSelector(new DefaultTopicSelector(outputTopic))
                .withTupleToKafkaMapper(new
FieldNameBasedTupleToKafkaMapper("conn","httpstream"));

builder.setBolt("kafkabolt", kafkaBolt,
kafkaBoltParallelism).shuffleGrouping("normalizeLog", "origin");




And I have found where the Exception throws, that's in
*KafkaProduce.java. *That's
a ClassCastException. I am confused about the class casting, because there
seems no class casting. My own serializer was posted in my last mail.

byte[] serializedValue;
    try {
        serializedValue = valueSerializer.serialize(record.topic(),
record.value());
    } catch (ClassCastException cce) {
      throw new SerializationException("Can't convert value of class "
+ record.value().getClass().getName() +
         " to class " +
producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()
+
         " specified in value.serializer");}

Any suggestion?

Thanks.


2016-11-16 12:53 GMT+08:00 Amber Kulkarni <[email protected]>:

> Hey,
>
> You want to post json string to kafka right ?
> Also can you post code you are using to post to kafka.
>
> On Tue, Nov 15, 2016 at 12:18 PM, Zhechao Ma
> <[email protected]> wrote:
> > Here is the code. I can only get  log in the constructor and configure
> > method.
> >
> > import org.json.simple.JSONObject;
> > import org.apache.kafka.common.errors.SerializationException;
> > import org.apache.kafka.common.serialization.Serializer;
> >
> > import java.util.Map;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory
> >
> > public class JsonSerializer implements Serializer<JSONObject> {
> >     private static final Logger LOG =
> > LoggerFactory.getLogger(JsonSerializer.class);
> >     /**
> >      * Default constructor needed by Kafka
> >      */
> >     public JsonSerializer() {
> >         LOG.info("===> JsonSerializer constructor !!");
> >     }
> >
> >     @Override
> >     public void configure(Map<String, ?> config, boolean isKey) {
> >         LOG.info("===> JsonSerializer configure");
> >     }
> >
> >     @Override
> >     public byte[] serialize(String topic, JSONObject data) {
> >         LOG.info("===> JsonSerializer serialize !!");
> >         if (data == null)
> >             return null;
> >         try {
> >             return data.toString().getBytes("utf-8");
> >         } catch (Exception e) {
> >             LOG.error("===> JsonSerializer serialize EXCEPTION");
> >             throw new SerializationException("Error serializing JSON
> > message", e);
> >         }
> >     }
> >
> >     @Override
> >     public void close() {
> >         LOG.error("===> JsonSerializer close");
> >     }
> > }
> >
> >
> >
> >
> > 2016-11-14 20:01 GMT+08:00 Andrew Xor <[email protected]>:
> >>
> >> Hi,
> >>
> >>  Since you can't cast one type to  another and you are not getting a
> Null
> >> exception in order to be better able to help you could you give us the
> >> implementation of your serializer?
> >>
> >> Cheers,
> >>
> >> A.
> >>
> >> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma <
> [email protected]>
> >> wrote:
> >>>
> >>> Even when I implement my own json serializer, it still throws the
> similar
> >>> exception, but no more details for debug.:
> >>>
> >>> org.apache.kafka.common.errors.SerializationException: Can't convert
> >>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
> xxxxxx
> >>> specified in value.serializer
> >>>
> >>> I add a LOG in the overriding method byte[] serialize(String topic,
> >>> JSONObject data), and found no logs in worker.log. That is to say this
> >>> exception is throwed before method serialize is called.
> >>>
> >>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <[email protected]>:
> >>>>
> >>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is
> >>>> <String, JSONObject>.
> >>>> I set both key.serializer and value.serializer as
> >>>> "org.apache.kafka.common.serialization.StringSerializer". I get the
> >>>> following Exception:
> >>>>
> >>>> org.apache.kafka.common.errors.SerializationException: Can't convert
> >>>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
> class
> >>>> org.apache.kafka.common.serialization.StringSerializer specified in
> >>>> value.serializer
> >>>>
> >>>>
> >>>> I cannot find other serializers related to JSON, and I'm using storm
> >>>> 1.0.2 and kafka 0.8.1.1.
> >>>>
> >>>> Could anyone help ?
> >>>>
> >>>> --
> >>>> Thanks
> >>>> Zhechao Ma
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Thanks
> >>> Zhechao Ma
> >>
> >>
> >
> >
> >
> > --
> > Thanks
> > Zhechao Ma
>
>
>
> --
> Regards,
> Amber Kulkarni
>



-- 
Thanks
Zhechao Ma

Reply via email to