Dear Till and others.
I solved the issue by using the strategy suggested by Till like this:
List<String> fileListOfSpectra = ...
SplittableList<String> fileListOfSpectraSplitable = new
SplittableList<String>( fileListOfSpectra );
DataSource<String> fileListOfSpectraDataSource =
env.fromParallelCollection( fileListOfSpectraSplitable, String.class );
and then - as before -
DataSet<Peaklist> peakLists = fileListOfSpectraDataSource
.flatMap(new ReadDataFromFile())
...
(Find the source for the class "SplittableList" below). Now FLINK
distributes the tasks to all available FLINK nodes.
Thanks for the help!
Cheers
Tim
On 24.02.2016 16:30, Till Rohrmann wrote:
If I’m not mistaken, then this shouldn’t solve the scheduling
peculiarity of Flink. Flink will still deploy the tasks of the flat
map operation to the machine where the source task is running. Only
after this machine has no more slots left, other machines will be used
as well.
I think that you don’t need an explicit |rebalance()| method here.
Flink will automatically insert the |PartitionMethod.REBALANCE| strategy.
Cheers,
Till
|import org.apache.flink.util.SplittableIterator; import
java.util.Iterator; import java.util.List; public class
SplittableList<T> extends SplittableIterator<T> { private List<T> list;
private int cursor; public SplittableList(List<T> list) { this.cursor =
0; this.list = list; } @Override public Iterator<T>[] split(int
numPartitions) { if (numPartitions < 1) { throw new
IllegalArgumentException("The number of partitions must be at least
1."); } Iterator<T>[] iters = new Iterator[numPartitions]; if
(numPartitions == 1) { iters[0] = new SplittableList(list); return
iters; } int partSize = (int) Math.floor((double) list.size() /
numPartitions); for (int i = 0; i < (numPartitions - 1); i++) { List<T>
subFileList = list.subList(i * partSize, (i + 1) * partSize); iters[i] =
new SplittableList(subFileList); } List<T> subFileList =
list.subList((numPartitions - 1) * partSize, list.size());
iters[numPartitions - 1] = new SplittableList(subFileList); return
iters; } @Override public int getMaximumNumberOfSplits() { return
list.size(); } public boolean hasNext() { return (cursor < list.size());
} public T next() { T item = list.get(cursor); cursor++; return item; }
public void remove() { throw new IllegalArgumentException("Remove not
implemented yet."); } }|