Dear Till and others.

I solved the issue by using the strategy suggested by Till like this:

        List<String> fileListOfSpectra = ...
SplittableList<String> fileListOfSpectraSplitable = new SplittableList<String>( fileListOfSpectra ); DataSource<String> fileListOfSpectraDataSource = env.fromParallelCollection( fileListOfSpectraSplitable, String.class );

and then - as before -

 DataSet<Peaklist> peakLists = fileListOfSpectraDataSource
                .flatMap(new ReadDataFromFile())
...

(Find the source for the class "SplittableList" below). Now FLINK distributes the tasks to all available FLINK nodes.

Thanks for the help!

Cheers
Tim



On 24.02.2016 16:30, Till Rohrmann wrote:

If I’m not mistaken, then this shouldn’t solve the scheduling peculiarity of Flink. Flink will still deploy the tasks of the flat map operation to the machine where the source task is running. Only after this machine has no more slots left, other machines will be used as well.

I think that you don’t need an explicit |rebalance()| method here. Flink will automatically insert the |PartitionMethod.REBALANCE| strategy.

Cheers,
Till

​

|import org.apache.flink.util.SplittableIterator; import java.util.Iterator; import java.util.List; public class SplittableList<T> extends SplittableIterator<T> { private List<T> list; private int cursor; public SplittableList(List<T> list) { this.cursor = 0; this.list = list; } @Override public Iterator<T>[] split(int numPartitions) { if (numPartitions < 1) { throw new IllegalArgumentException("The number of partitions must be at least 1."); } Iterator<T>[] iters = new Iterator[numPartitions]; if (numPartitions == 1) { iters[0] = new SplittableList(list); return iters; } int partSize = (int) Math.floor((double) list.size() / numPartitions); for (int i = 0; i < (numPartitions - 1); i++) { List<T> subFileList = list.subList(i * partSize, (i + 1) * partSize); iters[i] = new SplittableList(subFileList); } List<T> subFileList = list.subList((numPartitions - 1) * partSize, list.size()); iters[numPartitions - 1] = new SplittableList(subFileList); return iters; } @Override public int getMaximumNumberOfSplits() { return list.size(); } public boolean hasNext() { return (cursor < list.size()); } public T next() { T item = list.get(cursor); cursor++; return item; } public void remove() { throw new IllegalArgumentException("Remove not implemented yet."); } }|


Reply via email to