Hi b0c1,

This is an limitation in Flink's optimizer.
Internally, all binary unions are merged into a single n-ary union. The
optimizer restricts the number of inputs for an operator to 64.

You can work around this limitation with an identity mapper which prevents
the union operators from merging:

in1----\
in2------ Id-Map--- NextOp
...       /             / /
in14--/             / /
                      / /
in15------------/ /
...                   /
in74------------/

This is not a super nice solution, but the only way that comes to my mind.

Cheers, Fabian

2017-08-28 23:29 GMT+02:00 boci <boci.b...@gmail.com>:

> Hi guys!
>
> I have one input (from mongo) and I split the incoming data to multiple
> datasets (each created dynamically from configuration) and before I write
> back the result I want to merge it to one dataset (there is some common
> transformation).
> so the flow:
>
> DataSet from Mongod =>
> Create Mappers dynamically (currently 74) so I have 74 DataSet =>
> Custom filter and mapping on each dataset =>
> Union dynamically to one (every mapper result is same type) =>
> Some another common transformation =>
> Count the result
>
> but when I want to union more than 64 dataset I got these exception:
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Cannot currently handle nodes with more than 64 outputs.
> at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(
> OptimizerNode.java:348)
> at org.apache.flink.optimizer.dag.SingleInputNode.setInput(
> SingleInputNode.java:202)
> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
> GraphCreatingVisitor.java:268)
> at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(
> GraphCreatingVisitor.java:82)
>
> I try to split the incoming (74) list of dataset to split to 60 + 14
>  dataset and create an id mapper and union the result datasets but no
> success:
>
> val listOfDataSet: List[DataSet[...]] = ....
>
> listOfDataSet
> .sliding(60,60)
> .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
> //There is an iterator of DataSet
> .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
> .map(finalDataSet => ... some transformation ...)
> .count()
>
> There is any solution to solve this?
>
> Thanks
> b0c1
>

Reply via email to