This is interesting. So if I understand your algorithm, it would be something like (pseudocode):
A = ReadWithShardedLineNumbers(myFile) : output K<ShardOffset+LocalLineNumber>, V<Data> B = A.ExtractShardOffsetKeys() : output K<ShardOffset>, V<LocalLineNumber> C = B.PerKeySum() : output K<ShardOffset>, V<ShardTotalLines> D = C.GlobalSortAndPrefixSum() : output K<ShardOffset> V<ShardLineNumberOffset> E = [A,D].JoinAndCalculateGlobalLineNumbers() : output V<GlobalLineNumber+Data> This makes a couple assumptions: 1. (ReadWithShardedLineNumbers) Sources can output their shard offset, and the offsets are globally ordered 2. (GlobalSortAndPrefixSum) The totals for all read shards can fit in memory to perform a total sort Assumption #2 will not hold true for all data sizes, and varies by runner depending on how granular the read shards are. But it seems feasible for some practical subset of file-sizes. Also, I believe the pseudo-code above is representable in Beam, and would not require SDF. On Wed, Dec 12, 2018 at 6:59 PM Chak-Pong Chung <[email protected]> wrote: > Hello everyone! > > I asked the following question and think I might get some suggestions > whether what I want is doable or not. > > > https://stackoverflow.com/questions/53746046/how-can-i-implement-zipwithindex-like-spark-in-apache-beam/53747612#53747612 > > If I can get `PCollection` id and the number of (contiguous)lines in each > `PCollection`, then I can calculate the row order within each > partition/`PCollection` first and then do prefix-sum to compute the offset > for each partition. This is doable in MPI or openMP since I can get the > id/rank of each processor/thread. > > Anton pointed out the current design wants to allow dynamic > scheduling/allocation at run-time. My approach works for static allocation > at compile-time with fixed number of hardware resources. > > There could be another way to look at this problem. The file can also sit > in hdfs or google cloud storage before processing in Beam. So we might also > reduce the problem to uploading and splitting such a big file into chunks > and at the same time preserving the row order within the file. In this > case, by the time Beam processing chunks of this file there is no need to > preserve row order work. > > Best, > Chak-Pong > > > -- Got feedback? tinyurl.com/swegner-feedback
