Re: Json KAFKA producer

2018-04-14 Thread Fabian Hueske
Hi,

SerializationSchema is a public interface that you can implement.
It has a single method to turn an object into a byte array.

I would suggest to implement your own SerializationSchema.

Best, Fabian

2018-04-11 15:56 GMT+02:00 Luigi Sgaglione :

> Hi,
>
> I'm trying to create a Flink example with kafka consumer and producer
> using Json data format. In particular, I'm able to consume and process Json
> data published on a Kafka topic, but not to publish the results.
>
> The problem is that I don't know what is the serialization schema that
> should be used to publish an ObjectNode (Jackson).
>
>
> This is an excerpt of my test code
>
> import org.apache.flink.shaded.jackson2.com.fasterxml.
> jackson.databind.node.ObjectNode;
> 
>
> FlinkKafkaConsumer011 myConsumer = new
> FlinkKafkaConsumer011<>("test2", new JSONDeserializationSchema(),
> properties);
> myConsumer.setStartFromEarliest();
>
> DataStreamSource stream = env.addSource(myConsumer);
> SingleOutputStreamOperator out1 = stream.filter(new
> FilterFunction() {
> private static final long serialVersionUID = 1L;
>
> @Override
> public boolean filter(ObjectNode arg0) throws Exception {
> String temp=arg0.get("value").asText();
> return (!temp.equals("1"));
> }
> });
> FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<
> ObjectNode>("192.168.112.128:9092", "flinkOut", *XXX*);
> out1.addsink(producer);
>
> Can you help me to understand how I can publish an ObjectNode?
>
> Thanks
>
>
>


Json KAFKA producer

2018-04-11 Thread Luigi Sgaglione
Hi,

I'm trying to create a Flink example with kafka consumer and producer using
Json data format. In particular, I'm able to consume and process Json data
published on a Kafka topic, but not to publish the results.

The problem is that I don't know what is the serialization schema that
should be used to publish an ObjectNode (Jackson).


This is an excerpt of my test code

import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;


FlinkKafkaConsumer011 myConsumer = new
FlinkKafkaConsumer011<>("test2", new JSONDeserializationSchema(),
properties);
myConsumer.setStartFromEarliest();

DataStreamSource stream = env.addSource(myConsumer);
SingleOutputStreamOperator out1 = stream.filter(new
FilterFunction() {
private static final long serialVersionUID = 1L;

@Override
public boolean filter(ObjectNode arg0) throws Exception {
String temp=arg0.get("value").asText();
return (!temp.equals("1"));
}
});
FlinkKafkaProducer011 producer = new
FlinkKafkaProducer011("192.168.112.128:9092", "flinkOut", *XXX*
);
out1.addsink(producer);

Can you help me to understand how I can publish an ObjectNode?

Thanks