This sounds like it is not Flink related - it seems more like a
Jackson/Json question than a Flink question.


On Thu, Oct 13, 2016 at 5:56 PM, PedroMrChaves <pedro.mr.cha...@gmail.com>
wrote:

> Hello,
>
> I recently started programming with Apache Flink API. I am trying to get
> input directly
> from kafka in a JSON format with the following code:
>
> /private void kafkaConsumer(String server, String topic) {
>                 Properties properties = new Properties();
>                 properties.setProperty("bootstrap.servers", server);
>                 properties.setProperty("group.id", "Demo");
>                 stream = environment.addSource(new
> FlinkKafkaConsumer09<>(topic, new
> JSONDeserializationSchema(), properties))
>                                 .map(new MapFunction<ObjectNode, Event>() {
>                                         @Override
>                                         public Event map(ObjectNode value)
> throws Exception {
>                                                 return new
> Event(Integer.parseInt(value.get("id").asText()),
> value.get("user").asText(),
>
> value.get("action").asText(), value.get("ip").asText());
>                                         }
>                                 });
>         }/
>
> But I alwys get the following error:
>
>
> /17:56:46,335 ERROR org.apache.flink.runtime.taskmanager.Task
> - Task execution failed.
> com.fasterxml.jackson.databind.JsonMappingException: No content to map due
> to end-of-input
>  at [Source: [B@69a90966; line: 1, column: 1]
>         at
> com.fasterxml.jackson.databind.JsonMappingException.
> from(JsonMappingException.java:148)
>         at
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.
> java:3095)
>         at
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.
> java:3036)
>         at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:
> 2215)
>         at
> org.apache.flink.streaming.util.serialization.JSONDeserializationSchema.
> deserialize(JSONDeserializationSchema.java:38)
>         at
> org.apache.flink.streaming.util.serialization.JSONDeserializationSchema.
> deserialize(JSONDeserializationSchema.java:30)
>         at
> org.apache.flink.streaming.util.serialization.
> KeyedDeserializationSchemaWrapper.deserialize(
> KeyedDeserializationSchemaWrapper.java:39)
>         at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(
> Kafka09Fetcher.java:227)
>         at java.lang.Thread.run(Thread.java:745)/
>
> What am I doing wrong?
>
> Attached follows the JSON sample that I am using.
>
> Thank you and Regards.
>
> log.json
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n9536/log.json>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/JsonMappingException-No-
> content-to-map-due-to-end-of-input-tp9536.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to