Re: Custom shardingFn for FileIO

2019-05-08 Thread Jozef Vilcek
Yes, I was able to use it in Flink and I do see performance gain. I also see, which is important for me, more predictable and uniform memory usage among workers On Wed, May 8, 2019 at 7:19 AM Reuven Lax wrote: > So you were able to use this in Flink? Did you see performance gains? > > On Sun, Ma

Re: Custom shardingFn for FileIO

2019-05-07 Thread Reuven Lax
So you were able to use this in Flink? Did you see performance gains? On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek wrote: > Sorry, it took a while. I wanted to actually use this extension for > WriteFiles in Flink and see it works and that proved too be a bit bumpy. > PR is at https://github.com/

Re: Custom shardingFn for FileIO

2019-05-05 Thread Jozef Vilcek
Sorry, it took a while. I wanted to actually use this extension for WriteFiles in Flink and see it works and that proved too be a bit bumpy. PR is at https://github.com/apache/beam/pull/8499 On Thu, May 2, 2019 at 3:22 PM Reuven Lax wrote: > Great, let me know when to take another look at the PR

Re: Custom shardingFn for FileIO

2019-05-02 Thread Reuven Lax
Great, let me know when to take another look at the PR! Reuven On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek wrote: > That coder is added extra as a re-map stage from "original" key to new > ShardAwareKey ... But pipeline might get broken I guess. > Very fair point. I am having a second thought p

Re: Custom shardingFn for FileIO

2019-05-01 Thread Jozef Vilcek
That coder is added extra as a re-map stage from "original" key to new ShardAwareKey ... But pipeline might get broken I guess. Very fair point. I am having a second thought pass over this and will try to simplify it much more On Wed, May 1, 2019 at 2:12 PM Reuven Lax wrote: > I haven't looked a

Re: Custom shardingFn for FileIO

2019-05-01 Thread Reuven Lax
I haven't looked at the PR in depth yet, but it appears that someone running a pipeline today who then tries to update post this PR will have the coder change to DefaultShardKeyCoder, even if they haven't picked any custom function. Is that correct, or am I misreading things? Reuven On Tue, Apr 3

Re: Custom shardingFn for FileIO

2019-04-30 Thread Jozef Vilcek
Hm, what would be the scenario? Have version A running with original random sharding and then start version B where I change sharding to some custom function? So I have to enable the pipeline to digest old keys from GBK restored state and also work with new keys produced to GBK going forward? On T

Re: Custom shardingFn for FileIO

2019-04-30 Thread Reuven Lax
Initial thought on PR: we usually try to limit changing coders in these types of transforms to better support runners that allow in-place updates of pipelines. Can this be done without changing the coder? On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek wrote: > I have created a PR for enhancing Wri

Re: Custom shardingFn for FileIO

2019-04-30 Thread Jozef Vilcek
I have created a PR for enhancing WriteFiles for custom sharding function. https://github.com/apache/beam/pull/8438 If this sort of change looks good, then next step would be to use in in Flink runner transform override. Let me know what do you think On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek

Re: Custom shardingFn for FileIO

2019-04-26 Thread Jozef Vilcek
I guess it is fine to enable shardingFn control only on WriteFiles level rather than FileIO. On WriteFiles it can be manipulated in PTransformOverride by runner. On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax wrote: > Yes, a hook would have to be added to allow specifying a different > function for

Re: Custom shardingFn for FileIO

2019-04-25 Thread Reuven Lax
Yes, a hook would have to be added to allow specifying a different function for choosing the shard number (I assume the problem is that there are cases where the current random assignment is not good?). However this can be set using PTransformOverride, we ideally shouldn't force the user to know de

Re: Custom shardingFn for FileIO

2019-04-25 Thread Maximilian Michels
Reuven is talking about PTransformOverride, e.g. FlinkTransformOverrides. We already use this to determine the number of shards in case of Runner-determined sharding. Not sure if that would work for Jozef's case because setting the number of shards is not enough. We want to set the shard key d

Re: Custom shardingFn for FileIO

2019-04-25 Thread Reuven Lax
Actually the runner is free to perform surgery on the graph. The FlinkRunner can insert a custom function to determine the sharding keys. On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek wrote: > Right now, sharding can be specified only via target `shardCount`, be it > user or runner. Next to confi

Re: Custom shardingFn for FileIO

2019-04-25 Thread Jozef Vilcek
Right now, sharding can be specified only via target `shardCount`, be it user or runner. Next to configurable shardCount, I am proposing to be able to pass also a function which will allow to the user (or runner) control how is shard determined and what key will be used to represent it interface S

Re: Custom shardingFn for FileIO

2019-04-25 Thread Reuven Lax
If sharding is not specified, then the semantics are "runner-determined sharding." The DataflowRunner already takes advantage of this to impose its own sharding if the user hasn't specified an explicit one. Could the Flink runner do the same instead of pushing this to the users? On Thu, Apr 25, 20

Re: Custom shardingFn for FileIO

2019-04-25 Thread Maximilian Michels
Hi Jozef, For sharding in FileIO there are basically two options: (1) num_shards ~= num_workers => bad spread of the load across workers (2) num_shards >> num_workers => good spread of the load across workers, but huge number of files Your approach would give users control over the sharding k

Custom shardingFn for FileIO

2019-04-25 Thread Jozef Vilcek
Hello, Right now, if someone needs sharded files via FileIO, there is only one option which is random (round robin) shard assignment per element and it always use ShardedKey as a key for the GBK which follows. I would like to generalize this and have a possibility to provide some ShardingFn[UserT