Thanks Pushkar for your response.
I tried to send my own byte array; however the Kafka Producer Class does not
take byte [] as input type. Do you have an example of this? Please share if you
do; really appreciate.
Here is my code:
public class TestEventProducer {
public static void main(String[] args) {
String topic = "test-topic";
long eventsNum = 10;
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.DefaultEncoder ");
props.put("request.required.acks", "0");
ProducerConfig config = new ProducerConfig(props);
byte [] rawData;
Producer<String, rawData> producer = new Producer<String,
rawData>(config); //compillation error rawData cannot be resolved to a type
long start = System.currentTimeMillis();
for (long nEvents = 0; nEvents < eventsNum; nEvents++) {
SimulateEvent event = new SimulateEvent();
try {
rawData = Serializer.serialize(event);
} catch (IOException e) {
e.printStackTrace();
}
KeyedMessage<String, rawData> data = new
KeyedMessage<String, rawData>(topic, event);
producer.send(data);
System.out.println("produced event#:" + nEvents + " "+
data);
}
System.out.println("Took " + (System.currentTimeMillis() - start) +
"to produce " + eventsNum + "messages");
producer.close();
}
}
public class Serializer {
public static byte[] serialize(Object obj) throws IOException {
ByteArrayOutputStream b = new ByteArrayOutputStream();
ObjectOutputStream o = new ObjectOutputStream(b);
o.writeObject(obj);
return b.toByteArray();
}
public static Object deserialize(byte[] bytes) throws IOException,
ClassNotFoundException {
ByteArrayInputStream b = new ByteArrayInputStream(bytes);
ObjectInputStream o = new ObjectInputStream(b);
return o.readObject();
}
}
>>> pushkar priyadarshi <[email protected]> 5/20/2014 5:11 PM >>>
you can send byte[] that you get by using your own serializer ; through
kafka ().On the reciving side u can deseraialize from the byte[] and read
back your object.for using this you will have to
supply serializer.class=kafka.serializer.DefaultEncoder in the properties.
On Tue, May 20, 2014 at 4:23 PM, Kumar Pradeep <[email protected]> wrote:
> I am trying to build a POC with Kafka 0.8.1. I am using my own java class
> as a Kafka message which has a bunch of String data types. For
> serializer.class property in my producer, I cannot use the default
> serializer class or the String serializer class that comes with Kafka
> library. I guess I need to write my own serializer and feed it to the
> producer properties. If you are aware of writing an example custom
> serializer in Kafka (in java), please do share. Appreciate a lot, thanks
> much.
>
> I tried to use something like below, but I get the exception: Exception in
> thread "main" java.lang.NoSuchMethodException:
> test.EventsDataSerializer.<init>(kafka.utils.VerifiableProperties)
> at java.lang.Class.getConstructor0(Class.java:2971)
>
>
> package test;
>
> import java.io.IOException;
>
> import com.fasterxml.jackson.core.JsonFactory;
> import com.fasterxml.jackson.databind.ObjectMapper;
>
> import kafka.message.Message;
> import kafka.serializer.Decoder;
> import kafka.serializer.Encoder;
>
> public class EventsDataSerializer implements Encoder<SimulateEvent>,
> Decoder<SimulateEvent> {
>
> public Message toMessage(SimulateEvent eventDetails) {
> try {
> ObjectMapper mapper = new ObjectMapper(new
> JsonFactory());
> byte[] serialized =
> mapper.writeValueAsBytes(eventDetails);
> return new Message(serialized);
> } catch (IOException e) {
> e.printStackTrace();
> return null; // TODO
> }
> }
> public SimulateEvent toEvent(Message message) {
> SimulateEvent event = new SimulateEvent();
>
> ObjectMapper mapper = new ObjectMapper(new JsonFactory());
> try {
> //TODO handle error
> return mapper.readValue(message.payload().array(),
> SimulateEvent.class);
> } catch (IOException e) {
> e.printStackTrace();
> return null;
> }
>
> }
>
> public byte[] toBytes(SimulateEvent arg0) {
> // TODO Auto-generated method stub
> return null;
> }
> public SimulateEvent fromBytes(byte[] arg0) {
> // TODO Auto-generated method stub
> return null;
> }
> }
>
>
>