Change the JSONObject import to import org.apache.storm.shade.org.json.simple.JSONObject;
Resolved. The missing exception stack information quiet misleads me ! 2016-11-17 12:38 GMT+08:00 Zhechao Ma <[email protected]>: > I modify code in KafkaProducer.java to catch the exception stack, and > finally get the real Exception cause. > > java.lang.ClassCastException: > org.apache.storm.shade.org.json.simple.JSONObject cannot be cast to > org.json.simple.JSONObject > > > So this problem is related to the maven shade plugin. > > 2016-11-16 19:20 GMT+08:00 Zhechao Ma <[email protected]>: > >> Hi Amber, >> >> Here is the code. >> >> Properties prop = new Properties(); >> prop.put("bootstrap.servers", kafkaBrokers); >> prop.put("ack", ack); >> prop.put("key.serializer", keySerializer); >> prop.put("value.serializer", valueSerializer); >> >> KafkaBolt kafkaBolt = new KafkaBolt<String, JSONObject>() >> .withProducerProperties(prop) >> .withTopicSelector(new DefaultTopicSelector(outputTopic)) >> .withTupleToKafkaMapper(new >> FieldNameBasedTupleToKafkaMapper("conn","httpstream")); >> >> builder.setBolt("kafkabolt", kafkaBolt, >> kafkaBoltParallelism).shuffleGrouping("normalizeLog", "origin"); >> >> >> >> >> And I have found where the Exception throws, that's in *KafkaProduce.java. >> *That's a ClassCastException. I am confused about the class casting, >> because there seems no class casting. My own serializer was posted in my >> last mail. >> >> byte[] serializedValue; >> try { >> serializedValue = valueSerializer.serialize(record.topic(), >> record.value()); >> } catch (ClassCastException cce) { >> throw new SerializationException("Can't convert value of class " + >> record.value().getClass().getName() + >> " to class " + >> producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() >> + >> " specified in value.serializer");} >> >> Any suggestion? >> >> Thanks. >> >> >> 2016-11-16 12:53 GMT+08:00 Amber Kulkarni <[email protected]>: >> >>> Hey, >>> >>> You want to post json string to kafka right ? >>> Also can you post code you are using to post to kafka. >>> >>> On Tue, Nov 15, 2016 at 12:18 PM, Zhechao Ma >>> <[email protected]> wrote: >>> > Here is the code. I can only get log in the constructor and configure >>> > method. >>> > >>> > import org.json.simple.JSONObject; >>> > import org.apache.kafka.common.errors.SerializationException; >>> > import org.apache.kafka.common.serialization.Serializer; >>> > >>> > import java.util.Map; >>> > import org.slf4j.Logger; >>> > import org.slf4j.LoggerFactory >>> > >>> > public class JsonSerializer implements Serializer<JSONObject> { >>> > private static final Logger LOG = >>> > LoggerFactory.getLogger(JsonSerializer.class); >>> > /** >>> > * Default constructor needed by Kafka >>> > */ >>> > public JsonSerializer() { >>> > LOG.info("===> JsonSerializer constructor !!"); >>> > } >>> > >>> > @Override >>> > public void configure(Map<String, ?> config, boolean isKey) { >>> > LOG.info("===> JsonSerializer configure"); >>> > } >>> > >>> > @Override >>> > public byte[] serialize(String topic, JSONObject data) { >>> > LOG.info("===> JsonSerializer serialize !!"); >>> > if (data == null) >>> > return null; >>> > try { >>> > return data.toString().getBytes("utf-8"); >>> > } catch (Exception e) { >>> > LOG.error("===> JsonSerializer serialize EXCEPTION"); >>> > throw new SerializationException("Error serializing JSON >>> > message", e); >>> > } >>> > } >>> > >>> > @Override >>> > public void close() { >>> > LOG.error("===> JsonSerializer close"); >>> > } >>> > } >>> > >>> > >>> > >>> > >>> > 2016-11-14 20:01 GMT+08:00 Andrew Xor <[email protected]>: >>> >> >>> >> Hi, >>> >> >>> >> Since you can't cast one type to another and you are not getting a >>> Null >>> >> exception in order to be better able to help you could you give us the >>> >> implementation of your serializer? >>> >> >>> >> Cheers, >>> >> >>> >> A. >>> >> >>> >> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma < >>> [email protected]> >>> >> wrote: >>> >>> >>> >>> Even when I implement my own json serializer, it still throws the >>> similar >>> >>> exception, but no more details for debug.: >>> >>> >>> >>> org.apache.kafka.common.errors.SerializationException: Can't convert >>> >>> value of class org.apache.storm.shade.org.json.simple.JSONObject to >>> xxxxxx >>> >>> specified in value.serializer >>> >>> >>> >>> I add a LOG in the overriding method byte[] serialize(String topic, >>> >>> JSONObject data), and found no logs in worker.log. That is to say >>> this >>> >>> exception is throwed before method serialize is called. >>> >>> >>> >>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <[email protected]>: >>> >>>> >>> >>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is >>> >>>> <String, JSONObject>. >>> >>>> I set both key.serializer and value.serializer as >>> >>>> "org.apache.kafka.common.serialization.StringSerializer". I get the >>> >>>> following Exception: >>> >>>> >>> >>>> org.apache.kafka.common.errors.SerializationException: Can't >>> convert >>> >>>> value of class org.apache.storm.shade.org.json.simple.JSONObject >>> to class >>> >>>> org.apache.kafka.common.serialization.StringSerializer specified in >>> >>>> value.serializer >>> >>>> >>> >>>> >>> >>>> I cannot find other serializers related to JSON, and I'm using storm >>> >>>> 1.0.2 and kafka 0.8.1.1. >>> >>>> >>> >>>> Could anyone help ? >>> >>>> >>> >>>> -- >>> >>>> Thanks >>> >>>> Zhechao Ma >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> -- >>> >>> Thanks >>> >>> Zhechao Ma >>> >> >>> >> >>> > >>> > >>> > >>> > -- >>> > Thanks >>> > Zhechao Ma >>> >>> >>> >>> -- >>> Regards, >>> Amber Kulkarni >>> >> >> >> >> -- >> Thanks >> Zhechao Ma >> > > > > -- > Thanks > Zhechao Ma > -- Thanks Zhechao Ma
