Hi,

The inline lambda MapFunction produces a Row with 12 String fields (12
calls to String.join()).
You use RowTypeInfo rowTypeDNS to declare the return type of the lambda
MapFunction. However, rowTypeDNS is defined with much more String fields.

The exception tells you that the number of fields returned by the function
is not equal to the number of fields that were declared by rowTypeDNS.

Hope this helps,
Fabian

Am Do., 5. Dez. 2019 um 20:35 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

> My Flink job does reading from Kafka stream and does some processing.
>
> Code snippet:
>
>> DataStream<Row> flatternedDnsStream = filteredNonNullStream.rebalance()
>> .map(node -> {
>> JsonNode source = node.path("source");
>> JsonNode destination = node.path("destination");
>> JsonNode dns = node.path("dns");
>> JsonNode event = node.path("event");
>> JsonNode client = node.path("client");
>> JsonNode organization = node.path("organization");
>> JsonNode timestamp_received = node.path("timestamp_received");
>> JsonNode transaction = node.path("transaction");
>> JsonNode timestamp = node.path("@timestamp");
>> JsonNode message = node.path("message");
>> JsonNode network = node.path("network");
>> JsonNode ecs = node.path("ecs");
>>
>> return Row.of(String.join(",", getParsed(ecs)), String.join(",",
>> getParsed(source)),
>> String.join(",", getParsed(destination)), String.join(",",
>> getParsed(event)),
>> String.join(",", getParsed(organization)), String.join(",",
>> getParsed(timestamp_received)),
>> String.join(",", getParsed(client)), String.join(",",
>> getParsed(transaction)),
>> String.join(",", getParsed(timestamp)), String.join(",",
>> getParsed(message)),
>> String.join(",", getParsed(dns)), String.join(",", getParsed(network)));
>> }).returns(rowTypeDNS);
>>
>> public static final RowTypeInfo rowTypeDNS = new
>> RowTypeInfo(Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING());
>>
>
>> private static List<String> getParsed(JsonNode node) {
>>
>> List<String> list = new ArrayList<>();
>> Iterator<Entry<String, JsonNode>> it = node.fields();
>> iterateAndExtract(it, list);
>> return list;
>> }
>>
>> private static void iterateAndExtract(Iterator<Entry<String, JsonNode>>
>> it, List<String> list) {
>>
>> while (it.hasNext()) {
>> Entry<String, JsonNode> e = it.next();
>> if (!e.getValue().isContainerNode()) {
>> list.add(e.getValue().asText());
>> continue;
>> }
>>
>> iterateAndExtract(e.getValue().fields(), list);
>> }
>> }
>>
>
> failing with the following error:
> java.lang.RuntimeException: Row arity of from does not match serializers.
>     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
> RowSerializer.java:86)
>     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
> RowSerializer.java:44)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(
> StreamMap.java:41)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
>     at org.apache.flink.streaming.api.operators.StreamFilter
> .processElement(StreamFilter.java:40)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
>     at org.apache.flink.streaming.api.operators.StreamFilter
> .processElement(StreamFilter.java:40)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processElement(StreamOneInputProcessor.java:164)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:143)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performDefaultAction(StreamTask.java:276)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:298)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:403)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
>
> Help me understand the error in detail.
>
> Thanks
> Srikanth
>
>

Reply via email to