My Flink job does reading from Kafka stream and does some processing. Code snippet:
> DataStream<Row> flatternedDnsStream = filteredNonNullStream.rebalance() > .map(node -> { > JsonNode source = node.path("source"); > JsonNode destination = node.path("destination"); > JsonNode dns = node.path("dns"); > JsonNode event = node.path("event"); > JsonNode client = node.path("client"); > JsonNode organization = node.path("organization"); > JsonNode timestamp_received = node.path("timestamp_received"); > JsonNode transaction = node.path("transaction"); > JsonNode timestamp = node.path("@timestamp"); > JsonNode message = node.path("message"); > JsonNode network = node.path("network"); > JsonNode ecs = node.path("ecs"); > > return Row.of(String.join(",", getParsed(ecs)), String.join(",", > getParsed(source)), > String.join(",", getParsed(destination)), String.join(",", > getParsed(event)), > String.join(",", getParsed(organization)), String.join(",", > getParsed(timestamp_received)), > String.join(",", getParsed(client)), String.join(",", > getParsed(transaction)), > String.join(",", getParsed(timestamp)), String.join(",", > getParsed(message)), > String.join(",", getParsed(dns)), String.join(",", getParsed(network))); > }).returns(rowTypeDNS); > > public static final RowTypeInfo rowTypeDNS = new > RowTypeInfo(Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()); > > private static List<String> getParsed(JsonNode node) { > > List<String> list = new ArrayList<>(); > Iterator<Entry<String, JsonNode>> it = node.fields(); > iterateAndExtract(it, list); > return list; > } > > private static void iterateAndExtract(Iterator<Entry<String, JsonNode>> > it, List<String> list) { > > while (it.hasNext()) { > Entry<String, JsonNode> e = it.next(); > if (!e.getValue().isContainerNode()) { > list.add(e.getValue().asText()); > continue; > } > > iterateAndExtract(e.getValue().fields(), list); > } > } > failing with the following error: java.lang.RuntimeException: Row arity of from does not match serializers. at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:86) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy( RowSerializer.java:44) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.runtime.tasks. OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707) at org.apache.flink.streaming.runtime.tasks. OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 727) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 705) at org.apache.flink.streaming.api.operators.StreamMap.processElement( StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 727) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 705) at org.apache.flink.streaming.api.operators.StreamFilter.processElement( StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 727) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 705) at org.apache.flink.streaming.api.operators.StreamFilter.processElement( StreamFilter.java:40) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processElement(StreamOneInputProcessor.java:164) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:143) at org.apache.flink.streaming.runtime.tasks.StreamTask .performDefaultAction(StreamTask.java:276) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask .java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Help me understand the error in detail. Thanks Srikanth