My Flink job does reading from Kafka stream and does some processing.

Code snippet:

> DataStream<Row> flatternedDnsStream = filteredNonNullStream.rebalance()
> .map(node -> {
> JsonNode source = node.path("source");
> JsonNode destination = node.path("destination");
> JsonNode dns = node.path("dns");
> JsonNode event = node.path("event");
> JsonNode client = node.path("client");
> JsonNode organization = node.path("organization");
> JsonNode timestamp_received = node.path("timestamp_received");
> JsonNode transaction = node.path("transaction");
> JsonNode timestamp = node.path("@timestamp");
> JsonNode message = node.path("message");
> JsonNode network = node.path("network");
> JsonNode ecs = node.path("ecs");
>
> return Row.of(String.join(",", getParsed(ecs)), String.join(",",
> getParsed(source)),
> String.join(",", getParsed(destination)), String.join(",",
> getParsed(event)),
> String.join(",", getParsed(organization)), String.join(",",
> getParsed(timestamp_received)),
> String.join(",", getParsed(client)), String.join(",",
> getParsed(transaction)),
> String.join(",", getParsed(timestamp)), String.join(",",
> getParsed(message)),
> String.join(",", getParsed(dns)), String.join(",", getParsed(network)));
> }).returns(rowTypeDNS);
>
> public static final RowTypeInfo rowTypeDNS = new
> RowTypeInfo(Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(),
> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING());
>

> private static List<String> getParsed(JsonNode node) {
>
> List<String> list = new ArrayList<>();
> Iterator<Entry<String, JsonNode>> it = node.fields();
> iterateAndExtract(it, list);
> return list;
> }
>
> private static void iterateAndExtract(Iterator<Entry<String, JsonNode>>
> it, List<String> list) {
>
> while (it.hasNext()) {
> Entry<String, JsonNode> e = it.next();
> if (!e.getValue().isContainerNode()) {
> list.add(e.getValue().asText());
> continue;
> }
>
> iterateAndExtract(e.getValue().fields(), list);
> }
> }
>

failing with the following error:
java.lang.RuntimeException: Row arity of from does not match serializers.
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:86)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:44)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(
StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(
StreamFilter.java:40)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(
StreamFilter.java:40)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processElement(StreamOneInputProcessor.java:164)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.performDefaultAction(StreamTask.java:276)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

Help me understand the error in detail.

Thanks
Srikanth

Reply via email to