This is interesting. So if I understand your algorithm, it would be
something like (pseudocode):

A = ReadWithShardedLineNumbers(myFile) : output
K<ShardOffset+LocalLineNumber>, V<Data>
B = A.ExtractShardOffsetKeys() : output K<ShardOffset>, V<LocalLineNumber>
C = B.PerKeySum() : output K<ShardOffset>, V<ShardTotalLines>
D = C.GlobalSortAndPrefixSum() : output K<ShardOffset>
V<ShardLineNumberOffset>
E = [A,D].JoinAndCalculateGlobalLineNumbers() : output
V<GlobalLineNumber+Data>

This makes a couple assumptions:
1. (ReadWithShardedLineNumbers) Sources can output their shard offset, and
the offsets are globally ordered
2. (GlobalSortAndPrefixSum) The totals for all read shards can fit in
memory to perform a total sort

Assumption #2 will not hold true for all data sizes, and varies by runner
depending on how granular the read shards are. But it seems feasible for
some practical subset of file-sizes.

Also, I believe the pseudo-code above is representable in Beam, and would
not require SDF.

On Wed, Dec 12, 2018 at 6:59 PM Chak-Pong Chung <[email protected]> wrote:

> Hello everyone!
>
> I asked the following question and think I might get some suggestions
> whether what I want is doable or not.
>
>
> https://stackoverflow.com/questions/53746046/how-can-i-implement-zipwithindex-like-spark-in-apache-beam/53747612#53747612
>
> If I can get `PCollection` id and the number of (contiguous)lines in each
> `PCollection`, then I can calculate the row order within each
> partition/`PCollection`  first and then do prefix-sum to compute the offset
> for each partition. This is doable in MPI or openMP since I can get the
> id/rank of each processor/thread.
>
> Anton pointed out the current design wants to allow dynamic
> scheduling/allocation at run-time. My approach works for static allocation
> at compile-time with fixed number of hardware resources.
>
> There could be another way to look at this problem. The file can also sit
> in hdfs or google cloud storage before processing in Beam. So we might also
> reduce the problem to uploading and splitting such a big file into chunks
> and at the same time preserving the row order within the file. In this
> case, by the time Beam processing chunks of this file there is no need to
> preserve row order work.
>
> Best,
> Chak-Pong
>
>
>

-- 




Got feedback? tinyurl.com/swegner-feedback

Reply via email to