Re: Problem with gzip

2019-05-15 Thread Allie Chen
Thanks Robert. Yes, reading is the bottleneck, and we cannot do much better
for gzip files, that's why we would like to at least palatalize other
transforms with reading.

I tried with the side input to break the fusion you suggested earlier, and
it does a much better job than using Reshuffle! One test running time if
anyone is interested,

without any fusion break: 6 hours
with Reshuffle: never ends. cancelled after running 6 hours, about half
elements processed at Reshuffle step.
with side input (not using --experiment=use_fastavro yet, I will try it
later): 2 hours

Thanks all for your help!
Allie


*From: *Robert Bradshaw 
*Date: *Wed, May 15, 2019 at 3:34 PM
*To: *dev
*Cc: *user

On Wed, May 15, 2019 at 8:43 PM Allie Chen  wrote:
>
>> Thanks all for your reply. I will try each of them and see how it goes.
>>
>> The experiment I am working now is similar to
>> https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform,
>> which tries to get early results from GroupByKey with windowing. I have
>> some code like:
>>
>> Reading | beam.WindowInto(beam.window.GlobalWindows(),
>>
>>   
>> trigger=trigger.Repeatedly(trigger.AfterCount(1)),
>>  accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
>>
>> | MapWithAKey
>>
>> | GroupByKey
>>
>> | RemoveKey
>>
>> | OtherTransforms
>>
>>
>> I don't see the window and trigger working, GroupByKey still waits for
>> all elements. I also tried adding a timestamp for each element and using a
>> fixed size window. Seems no impact.
>>
>>
>> Anyone knows how to get the early results from GroupByKey for a bounded
>> source?
>>
>
> Note that this is essentially how Reshuffle() is implemented. However,
> batch never gives early results from a GroupByKey; each stage is executed
> sequentially.
>
> Is the goal here to be able to parallelize the Read with other operations?
> If the Read (and limited-parallelism write) is still the bottleneck, that
> might not help much.
>
>


Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
On Wed, May 15, 2019 at 8:43 PM Allie Chen  wrote:

> Thanks all for your reply. I will try each of them and see how it goes.
>
> The experiment I am working now is similar to
> https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform,
> which tries to get early results from GroupByKey with windowing. I have
> some code like:
>
> Reading | beam.WindowInto(beam.window.GlobalWindows(),
>
>   
> trigger=trigger.Repeatedly(trigger.AfterCount(1)),
>  accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
>
> | MapWithAKey
>
> | GroupByKey
>
> | RemoveKey
>
> | OtherTransforms
>
>
> I don't see the window and trigger working, GroupByKey still waits for all
> elements. I also tried adding a timestamp for each element and using a
> fixed size window. Seems no impact.
>
>
> Anyone knows how to get the early results from GroupByKey for a bounded
> source?
>

Note that this is essentially how Reshuffle() is implemented. However,
batch never gives early results from a GroupByKey; each stage is executed
sequentially.

Is the goal here to be able to parallelize the Read with other operations?
If the Read (and limited-parallelism write) is still the bottleneck, that
might not help much.


Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
Interesting thread. Thanks for digging that up.

I would try the shuffle_mode=service experiment (forgot that wasn't
yet the default). If that doesn't do the trick, though avro as a
materialization format does not provide perfect parallelism, it should
be significantly better than what you have now (large gzip files) and
may be good enough.

On Wed, May 15, 2019 at 2:34 PM Michael Luckey  wrote:
>
> @Robert
>
> Does your suggestion imply, that the points made by Eugene on BEAM-2803 do 
> not apply (anymore) and the combined reshuffle could just be omitted?
>
> On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw  wrote:
>>
>> Unfortunately the "write" portion of the reshuffle cannot be
>> parallelized more than the source that it's reading from. In my
>> experience, generally the read is the bottleneck in this case, but
>> it's possible (e.g. if the input compresses extremely well) that it is
>> the write that is slow (which you seem to indicate based on your
>> observation of the UI, right?).
>>
>> It could be that materializing to temporary files is cheaper than
>> materializing randomly to shuffle (especially on pre-portable Python).
>> In that case you could force a fusion break with a side input instead.
>> E.g.
>>
>> class FusionBreak(beam.PTransform):
>> def expand(self, pcoll):
>> # Create an empty PCollection that depends on pcoll.
>> empty = pcoll | beam.FlatMap(lambda x: ())
>> # Use this empty PCollection as a side input, which will force
>> a fusion break.
>> return pcoll | beam.Map(lambda x, unused: x,
>> beam.pvalue.AsIterable(empty))
>>
>> which could be used in place of Reshard like
>>
>> p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>>
>> You'll probably want to be sure to pass the use_fastavro experiment as well.
>>
>> On Wed, May 15, 2019 at 6:53 AM Niels Basjes  wrote:
>> >
>> > Hi
>> >
>> > This project is a completely different solution towards this problem, but 
>> > in the hadoop mapreduce context.
>> >
>> > https://github.com/nielsbasjes/splittablegzip
>> >
>> >
>> > I have used this a lot in the past.
>> > Perhaps porting this project to beam is an option?
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> > On Tue, May 14, 2019, 20:45 Lukasz Cwik  wrote:
>> >>
>> >> Sorry I couldn't be more helpful.
>> >>
>> >> From: Allie Chen 
>> >> Date: Tue, May 14, 2019 at 10:09 AM
>> >> To: 
>> >> Cc: user
>> >>
>> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option 
>> >>> for us.
>> >>>
>> >>>
>> >>> I am trying to speed up Reshuffle step, since it waits for all data. 
>> >>> Here are two ways I have tried:
>> >>>
>> >>> 1.  add timestamps to the PCollection's elements after reading (since it 
>> >>> is bounded source), then apply windowing before Reshuffle, but it still 
>> >>> waits all data.
>> >>>
>> >>>
>> >>> 2.  run the pipeline with --streaming flag, but it leads to an error: 
>> >>> Workflow failed. Causes: Expected custom source to have non-zero number 
>> >>> of splits. Also, I found in 
>> >>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features:
>> >>>
>> >>> DataflowRunner does not currently support the following Cloud Dataflow 
>> >>> specific features with Python streaming execution.
>> >>>
>> >>> Streaming autoscaling
>> >>>
>> >>> I doubt whether this approach can solve my issue.
>> >>>
>> >>>
>> >>> Thanks so much!
>> >>>
>> >>> Allie
>> >>>
>> >>>
>> >>> From: Lukasz Cwik 
>> >>> Date: Tue, May 14, 2019 at 11:16 AM
>> >>> To: dev
>> >>> Cc: user
>> >>>
>>  Do you need to perform any joins across the files (e.g. 
>>  Combine.perKey/GroupByKey/...)?
>>  If not, you could structure your pipeline
>>  ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>  ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>  ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>  and then run it as a batch pipeline.
>> 
>>  You can set --streaming=true on the pipeline and then it will run in a 
>>  streaming mode but streaming prioritizes low latency and correctness on 
>>  Google Cloud Dataflow so it will cost more to run your pipeline then in 
>>  batch mode. It may make more sense to store the data uncompressed as it 
>>  may be less expensive then paying the additional compute cost for 
>>  streaming.
>> 
>>  From: Allie Chen 
>>  Date: Tue, May 14, 2019 at 7:38 AM
>>  To: 
>>  Cc: user
>> 
>> > Is it possible to use windowing or somehow pretend it is streaming so 
>> > Reshuffle or GroupByKey won't wait until all data has been read?
>> >
>> > Thanks!
>> > Allie
>> >
>> > From: Lukasz Cwik 
>> > Date: Fri, May 10, 2019 at 5:36 PM
>> > To: dev
>> > Cc: user
>> >
>> >> There is no such flag to turn of fusion.
>> >>
>> >> Writing 100s of GiBs of uncompressed data to reshuffle will take time 
>> >> when it is limited to a 

Re: Problem with gzip

2019-05-15 Thread Michael Luckey
@Robert

Does your suggestion imply, that the points made by Eugene on BEAM-2803 do
not apply (anymore) and the combined reshuffle could just be omitted?

On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw  wrote:

> Unfortunately the "write" portion of the reshuffle cannot be
> parallelized more than the source that it's reading from. In my
> experience, generally the read is the bottleneck in this case, but
> it's possible (e.g. if the input compresses extremely well) that it is
> the write that is slow (which you seem to indicate based on your
> observation of the UI, right?).
>
> It could be that materializing to temporary files is cheaper than
> materializing randomly to shuffle (especially on pre-portable Python).
> In that case you could force a fusion break with a side input instead.
> E.g.
>
> class FusionBreak(beam.PTransform):
> def expand(self, pcoll):
> # Create an empty PCollection that depends on pcoll.
> empty = pcoll | beam.FlatMap(lambda x: ())
> # Use this empty PCollection as a side input, which will force
> a fusion break.
> return pcoll | beam.Map(lambda x, unused: x,
> beam.pvalue.AsIterable(empty))
>
> which could be used in place of Reshard like
>
> p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>
> You'll probably want to be sure to pass the use_fastavro experiment as
> well.
>
> On Wed, May 15, 2019 at 6:53 AM Niels Basjes  wrote:
> >
> > Hi
> >
> > This project is a completely different solution towards this problem,
> but in the hadoop mapreduce context.
> >
> > https://github.com/nielsbasjes/splittablegzip
> >
> >
> > I have used this a lot in the past.
> > Perhaps porting this project to beam is an option?
> >
> > Niels Basjes
> >
> >
> >
> > On Tue, May 14, 2019, 20:45 Lukasz Cwik  wrote:
> >>
> >> Sorry I couldn't be more helpful.
> >>
> >> From: Allie Chen 
> >> Date: Tue, May 14, 2019 at 10:09 AM
> >> To: 
> >> Cc: user
> >>
> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option
> for us.
> >>>
> >>>
> >>> I am trying to speed up Reshuffle step, since it waits for all data.
> Here are two ways I have tried:
> >>>
> >>> 1.  add timestamps to the PCollection's elements after reading (since
> it is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
> >>>
> >>>
> >>> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
> >>>
> >>> DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.
> >>>
> >>> Streaming autoscaling
> >>>
> >>> I doubt whether this approach can solve my issue.
> >>>
> >>>
> >>> Thanks so much!
> >>>
> >>> Allie
> >>>
> >>>
> >>> From: Lukasz Cwik 
> >>> Date: Tue, May 14, 2019 at 11:16 AM
> >>> To: dev
> >>> Cc: user
> >>>
>  Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
>  If not, you could structure your pipeline
>  ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>  ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>  ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>  and then run it as a batch pipeline.
> 
>  You can set --streaming=true on the pipeline and then it will run in
> a streaming mode but streaming prioritizes low latency and correctness on
> Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
> 
>  From: Allie Chen 
>  Date: Tue, May 14, 2019 at 7:38 AM
>  To: 
>  Cc: user
> 
> > Is it possible to use windowing or somehow pretend it is streaming
> so Reshuffle or GroupByKey won't wait until all data has been read?
> >
> > Thanks!
> > Allie
> >
> > From: Lukasz Cwik 
> > Date: Fri, May 10, 2019 at 5:36 PM
> > To: dev
> > Cc: user
> >
> >> There is no such flag to turn of fusion.
> >>
> >> Writing 100s of GiBs of uncompressed data to reshuffle will take
> time when it is limited to a small number of workers.
> >>
> >> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
> >>
> >> On Fri, May 10, 2019 at 2:24 PM Allie Chen 
> wrote:
> >>>
> >>> Re Lukasz: Thanks! I am not able to control the compression format
> but I will see whether the splitting gzip files will work. Is there a
> simple flag in Dataflow that could turn off the fusion?
> >>>
> >>> Re Reuven: No, I checked the run time on Dataflow UI, the
> GroupByKey and FlatMap in Reshuffle are very slow 

Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
Unfortunately the "write" portion of the reshuffle cannot be
parallelized more than the source that it's reading from. In my
experience, generally the read is the bottleneck in this case, but
it's possible (e.g. if the input compresses extremely well) that it is
the write that is slow (which you seem to indicate based on your
observation of the UI, right?).

It could be that materializing to temporary files is cheaper than
materializing randomly to shuffle (especially on pre-portable Python).
In that case you could force a fusion break with a side input instead.
E.g.

class FusionBreak(beam.PTransform):
def expand(self, pcoll):
# Create an empty PCollection that depends on pcoll.
empty = pcoll | beam.FlatMap(lambda x: ())
# Use this empty PCollection as a side input, which will force
a fusion break.
return pcoll | beam.Map(lambda x, unused: x,
beam.pvalue.AsIterable(empty))

which could be used in place of Reshard like

p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...

You'll probably want to be sure to pass the use_fastavro experiment as well.

On Wed, May 15, 2019 at 6:53 AM Niels Basjes  wrote:
>
> Hi
>
> This project is a completely different solution towards this problem, but in 
> the hadoop mapreduce context.
>
> https://github.com/nielsbasjes/splittablegzip
>
>
> I have used this a lot in the past.
> Perhaps porting this project to beam is an option?
>
> Niels Basjes
>
>
>
> On Tue, May 14, 2019, 20:45 Lukasz Cwik  wrote:
>>
>> Sorry I couldn't be more helpful.
>>
>> From: Allie Chen 
>> Date: Tue, May 14, 2019 at 10:09 AM
>> To: 
>> Cc: user
>>
>>> Thank Lukasz. Unfortunately, decompressing the files is not an option for 
>>> us.
>>>
>>>
>>> I am trying to speed up Reshuffle step, since it waits for all data. Here 
>>> are two ways I have tried:
>>>
>>> 1.  add timestamps to the PCollection's elements after reading (since it is 
>>> bounded source), then apply windowing before Reshuffle, but it still waits 
>>> all data.
>>>
>>>
>>> 2.  run the pipeline with --streaming flag, but it leads to an error: 
>>> Workflow failed. Causes: Expected custom source to have non-zero number of 
>>> splits. Also, I found in 
>>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features:
>>>
>>> DataflowRunner does not currently support the following Cloud Dataflow 
>>> specific features with Python streaming execution.
>>>
>>> Streaming autoscaling
>>>
>>> I doubt whether this approach can solve my issue.
>>>
>>>
>>> Thanks so much!
>>>
>>> Allie
>>>
>>>
>>> From: Lukasz Cwik 
>>> Date: Tue, May 14, 2019 at 11:16 AM
>>> To: dev
>>> Cc: user
>>>
 Do you need to perform any joins across the files (e.g. 
 Combine.perKey/GroupByKey/...)?
 If not, you could structure your pipeline
 ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
 ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
 ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
 and then run it as a batch pipeline.

 You can set --streaming=true on the pipeline and then it will run in a 
 streaming mode but streaming prioritizes low latency and correctness on 
 Google Cloud Dataflow so it will cost more to run your pipeline then in 
 batch mode. It may make more sense to store the data uncompressed as it 
 may be less expensive then paying the additional compute cost for 
 streaming.

 From: Allie Chen 
 Date: Tue, May 14, 2019 at 7:38 AM
 To: 
 Cc: user

> Is it possible to use windowing or somehow pretend it is streaming so 
> Reshuffle or GroupByKey won't wait until all data has been read?
>
> Thanks!
> Allie
>
> From: Lukasz Cwik 
> Date: Fri, May 10, 2019 at 5:36 PM
> To: dev
> Cc: user
>
>> There is no such flag to turn of fusion.
>>
>> Writing 100s of GiBs of uncompressed data to reshuffle will take time 
>> when it is limited to a small number of workers.
>>
>> If you can split up your input into a lot of smaller files that are 
>> compressed then you shouldn't need to use the reshuffle but still could 
>> if you found it helped.
>>
>> On Fri, May 10, 2019 at 2:24 PM Allie Chen  wrote:
>>>
>>> Re Lukasz: Thanks! I am not able to control the compression format but 
>>> I will see whether the splitting gzip files will work. Is there a 
>>> simple flag in Dataflow that could turn off the fusion?
>>>
>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey 
>>> and FlatMap in Reshuffle are very slow when the data is large. 
>>> Reshuffle itself is not parallel either.
>>>
>>> Thanks all,
>>>
>>> Allie
>>>
>>> From: Reuven Lax 
>>> Date: Fri, May 10, 2019 at 5:02 PM
>>> To: dev
>>> Cc: user
>>>
 It's unlikely that Reshuffle itself takes hours. It's more likely that 
 simply reading 

Re: Problem with gzip

2019-05-14 Thread Lukasz Cwik
Sorry I couldn't be more helpful.

*From: *Allie Chen 
*Date: *Tue, May 14, 2019 at 10:09 AM
*To: * 
*Cc: *user

Thank Lukasz. Unfortunately, decompressing the files is not an option for
> us.
>
>
> I am trying to speed up Reshuffle step, since it waits for all data. Here
> are two ways I have tried:
>
> 1.  add timestamps to the PCollection's elements after reading (since it
> is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
>
>
> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
>
> *DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.*
>
>-
>
>*Streaming autoscaling*
>
> I doubt whether this approach can solve my issue.
>
>
> Thanks so much!
>
> Allie
>
> *From: *Lukasz Cwik 
> *Date: *Tue, May 14, 2019 at 11:16 AM
> *To: *dev
> *Cc: *user
>
> Do you need to perform any joins across the files (e.g.
>> Combine.perKey/GroupByKey/...)?
>> If not, you could structure your pipeline
>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> and then run it as a batch pipeline.
>>
>> You can set --streaming=true on the pipeline and then it will run in a
>> streaming mode but streaming prioritizes low latency and correctness on
>> Google Cloud Dataflow so it will cost more to run your pipeline then in
>> batch mode. It may make more sense to store the data uncompressed as it may
>> be less expensive then paying the additional compute cost for streaming.
>>
>> *From: *Allie Chen 
>> *Date: *Tue, May 14, 2019 at 7:38 AM
>> *To: * 
>> *Cc: *user
>>
>> Is it possible to use windowing or somehow pretend it is streaming so
>>> Reshuffle or GroupByKey won't wait until all data has been read?
>>>
>>> Thanks!
>>> Allie
>>>
>>> *From: *Lukasz Cwik 
>>> *Date: *Fri, May 10, 2019 at 5:36 PM
>>> *To: *dev
>>> *Cc: *user
>>>
>>> There is no such flag to turn of fusion.

 Writing 100s of GiBs of uncompressed data to reshuffle will take time
 when it is limited to a small number of workers.

 If you can split up your input into a lot of smaller files that are
 compressed then you shouldn't need to use the reshuffle but still could if
 you found it helped.

 On Fri, May 10, 2019 at 2:24 PM Allie Chen 
 wrote:

> Re Lukasz: Thanks! I am not able to control the compression format but
> I will see whether the splitting gzip files will work. Is there a simple
> flag in Dataflow that could turn off the fusion?
>
> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
> itself is not parallel either.
>
> Thanks all,
>
> Allie
>
> *From: *Reuven Lax 
> *Date: *Fri, May 10, 2019 at 5:02 PM
> *To: *dev
> *Cc: *user
>
> It's unlikely that Reshuffle itself takes hours. It's more likely that
>> simply reading and decompressing all that data was very slow when there 
>> was
>> no parallelism.
>>
>> *From: *Allie Chen 
>> *Date: *Fri, May 10, 2019 at 1:17 PM
>> *To: * 
>> *Cc: * 
>>
>> Yes, I do see the data after reshuffle are processed in parallel. But
>>> Reshuffle transform itself takes hours or even days to run, according to
>>> one test (24 gzip files, 17 million lines in total) I did.
>>>
>>> The file format for our users are mostly gzip format, since
>>> uncompressed files would be too costly to store (It could be in 
>>> hundreds of
>>> GB).
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>
>>> *From: *Lukasz Cwik 
>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>> *To: *dev, 
>>>
>>> +u...@beam.apache.org 

 Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
 till all the data has been read before the next transforms can run. 
 After
 the reshuffle, the data should have been processed in parallel across 
 the
 workers. Did you see this?

 Are you able to change the input of your pipeline to use an
 uncompressed file or many compressed files?

 On Fri, May 10, 2019 at 1:03 PM Allie Chen 
 wrote:

> Hi,
>
>
> I am trying to load a gzip file to BigQuey using Dataflow. Since
> the compressed file is not splittable, one worker is allocated to 
> read the
> file. The same worker will do all the other transforms since Dataflow 
> fused
> all transforms together.  There are a large amount 

Re: Problem with gzip

2019-05-14 Thread Allie Chen
Thank Lukasz. Unfortunately, decompressing the files is not an option for
us.


I am trying to speed up Reshuffle step, since it waits for all data. Here
are two ways I have tried:

1.  add timestamps to the PCollection's elements after reading (since it is
bounded source), then apply windowing before Reshuffle, but it still waits
all data.


2.  run the pipeline with --streaming flag, but it leads to an error:
Workflow failed. Causes: Expected custom source to have non-zero number of
splits. Also, I found in
https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
:

*DataflowRunner does not currently support the following Cloud Dataflow
specific features with Python streaming execution.*

   -

   *Streaming autoscaling*

I doubt whether this approach can solve my issue.


Thanks so much!

Allie

*From: *Lukasz Cwik 
*Date: *Tue, May 14, 2019 at 11:16 AM
*To: *dev
*Cc: *user

Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
> If not, you could structure your pipeline
> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
> and then run it as a batch pipeline.
>
> You can set --streaming=true on the pipeline and then it will run in a
> streaming mode but streaming prioritizes low latency and correctness on
> Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
>
> *From: *Allie Chen 
> *Date: *Tue, May 14, 2019 at 7:38 AM
> *To: * 
> *Cc: *user
>
> Is it possible to use windowing or somehow pretend it is streaming so
>> Reshuffle or GroupByKey won't wait until all data has been read?
>>
>> Thanks!
>> Allie
>>
>> *From: *Lukasz Cwik 
>> *Date: *Fri, May 10, 2019 at 5:36 PM
>> *To: *dev
>> *Cc: *user
>>
>> There is no such flag to turn of fusion.
>>>
>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>>> when it is limited to a small number of workers.
>>>
>>> If you can split up your input into a lot of smaller files that are
>>> compressed then you shouldn't need to use the reshuffle but still could if
>>> you found it helped.
>>>
>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen 
>>> wrote:
>>>
 Re Lukasz: Thanks! I am not able to control the compression format but
 I will see whether the splitting gzip files will work. Is there a simple
 flag in Dataflow that could turn off the fusion?

 Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
 and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
 itself is not parallel either.

 Thanks all,

 Allie

 *From: *Reuven Lax 
 *Date: *Fri, May 10, 2019 at 5:02 PM
 *To: *dev
 *Cc: *user

 It's unlikely that Reshuffle itself takes hours. It's more likely that
> simply reading and decompressing all that data was very slow when there 
> was
> no parallelism.
>
> *From: *Allie Chen 
> *Date: *Fri, May 10, 2019 at 1:17 PM
> *To: * 
> *Cc: * 
>
> Yes, I do see the data after reshuffle are processed in parallel. But
>> Reshuffle transform itself takes hours or even days to run, according to
>> one test (24 gzip files, 17 million lines in total) I did.
>>
>> The file format for our users are mostly gzip format, since
>> uncompressed files would be too costly to store (It could be in hundreds 
>> of
>> GB).
>>
>> Thanks,
>>
>> Allie
>>
>>
>> *From: *Lukasz Cwik 
>> *Date: *Fri, May 10, 2019 at 4:07 PM
>> *To: *dev, 
>>
>> +u...@beam.apache.org 
>>>
>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>> all the data has been read before the next transforms can run. After the
>>> reshuffle, the data should have been processed in parallel across the
>>> workers. Did you see this?
>>>
>>> Are you able to change the input of your pipeline to use an
>>> uncompressed file or many compressed files?
>>>
>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
>>> wrote:
>>>
 Hi,


 I am trying to load a gzip file to BigQuey using Dataflow. Since
 the compressed file is not splittable, one worker is allocated to read 
 the
 file. The same worker will do all the other transforms since Dataflow 
 fused
 all transforms together.  There are a large amount of data in the 
 file, and
 I expect to see more workers spinning up after reading transforms. I 
 tried
 to use Reshuffle Transform
 
 to prevent the 

Re: Problem with gzip

2019-05-14 Thread Lukasz Cwik
Do you need to perform any joins across the files (e.g.
Combine.perKey/GroupByKey/...)?
If not, you could structure your pipeline
ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
and then run it as a batch pipeline.

You can set --streaming=true on the pipeline and then it will run in a
streaming mode but streaming prioritizes low latency and correctness on
Google Cloud Dataflow so it will cost more to run your pipeline then in
batch mode. It may make more sense to store the data uncompressed as it may
be less expensive then paying the additional compute cost for streaming.

*From: *Allie Chen 
*Date: *Tue, May 14, 2019 at 7:38 AM
*To: * 
*Cc: *user

Is it possible to use windowing or somehow pretend it is streaming so
> Reshuffle or GroupByKey won't wait until all data has been read?
>
> Thanks!
> Allie
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 5:36 PM
> *To: *dev
> *Cc: *user
>
> There is no such flag to turn of fusion.
>>
>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>> when it is limited to a small number of workers.
>>
>> If you can split up your input into a lot of smaller files that are
>> compressed then you shouldn't need to use the reshuffle but still could if
>> you found it helped.
>>
>> On Fri, May 10, 2019 at 2:24 PM Allie Chen  wrote:
>>
>>> Re Lukasz: Thanks! I am not able to control the compression format but I
>>> will see whether the splitting gzip files will work. Is there a simple flag
>>> in Dataflow that could turn off the fusion?
>>>
>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>>> is not parallel either.
>>>
>>> Thanks all,
>>>
>>> Allie
>>>
>>> *From: *Reuven Lax 
>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>> *To: *dev
>>> *Cc: *user
>>>
>>> It's unlikely that Reshuffle itself takes hours. It's more likely that
 simply reading and decompressing all that data was very slow when there was
 no parallelism.

 *From: *Allie Chen 
 *Date: *Fri, May 10, 2019 at 1:17 PM
 *To: * 
 *Cc: * 

 Yes, I do see the data after reshuffle are processed in parallel. But
> Reshuffle transform itself takes hours or even days to run, according to
> one test (24 gzip files, 17 million lines in total) I did.
>
> The file format for our users are mostly gzip format, since
> uncompressed files would be too costly to store (It could be in hundreds 
> of
> GB).
>
> Thanks,
>
> Allie
>
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 4:07 PM
> *To: *dev, 
>
> +u...@beam.apache.org 
>>
>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>> all the data has been read before the next transforms can run. After the
>> reshuffle, the data should have been processed in parallel across the
>> workers. Did you see this?
>>
>> Are you able to change the input of your pipeline to use an
>> uncompressed file or many compressed files?
>>
>> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
>> wrote:
>>
>>> Hi,
>>>
>>>
>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>> compressed file is not splittable, one worker is allocated to read the
>>> file. The same worker will do all the other transforms since Dataflow 
>>> fused
>>> all transforms together.  There are a large amount of data in the file, 
>>> and
>>> I expect to see more workers spinning up after reading transforms. I 
>>> tried
>>> to use Reshuffle Transform
>>> 
>>> to prevent the fusion, but it is not scalable since it won’t proceed 
>>> until
>>> all data arrived at this point.
>>>
>>> Is there any other ways to allow more workers working on all the
>>> other transforms after reading?
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>


Re: Problem with gzip

2019-05-14 Thread Allie Chen
Is it possible to use windowing or somehow pretend it is streaming so
Reshuffle or GroupByKey won't wait until all data has been read?

Thanks!
Allie

*From: *Lukasz Cwik 
*Date: *Fri, May 10, 2019 at 5:36 PM
*To: *dev
*Cc: *user

There is no such flag to turn of fusion.
>
> Writing 100s of GiBs of uncompressed data to reshuffle will take time when
> it is limited to a small number of workers.
>
> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
>
> On Fri, May 10, 2019 at 2:24 PM Allie Chen  wrote:
>
>> Re Lukasz: Thanks! I am not able to control the compression format but I
>> will see whether the splitting gzip files will work. Is there a simple flag
>> in Dataflow that could turn off the fusion?
>>
>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>> is not parallel either.
>>
>> Thanks all,
>>
>> Allie
>>
>> *From: *Reuven Lax 
>> *Date: *Fri, May 10, 2019 at 5:02 PM
>> *To: *dev
>> *Cc: *user
>>
>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>> simply reading and decompressing all that data was very slow when there was
>>> no parallelism.
>>>
>>> *From: *Allie Chen 
>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>> *To: * 
>>> *Cc: * 
>>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
 Reshuffle transform itself takes hours or even days to run, according to
 one test (24 gzip files, 17 million lines in total) I did.

 The file format for our users are mostly gzip format, since
 uncompressed files would be too costly to store (It could be in hundreds of
 GB).

 Thanks,

 Allie


 *From: *Lukasz Cwik 
 *Date: *Fri, May 10, 2019 at 4:07 PM
 *To: *dev, 

 +u...@beam.apache.org 
>
> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
> all the data has been read before the next transforms can run. After the
> reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
>
> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
>
> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
> wrote:
>
>> Hi,
>>
>>
>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>> compressed file is not splittable, one worker is allocated to read the
>> file. The same worker will do all the other transforms since Dataflow 
>> fused
>> all transforms together.  There are a large amount of data in the file, 
>> and
>> I expect to see more workers spinning up after reading transforms. I 
>> tried
>> to use Reshuffle Transform
>> 
>> to prevent the fusion, but it is not scalable since it won’t proceed 
>> until
>> all data arrived at this point.
>>
>> Is there any other ways to allow more workers working on all the
>> other transforms after reading?
>>
>> Thanks,
>>
>> Allie
>>
>>


Re: Problem with gzip

2019-05-10 Thread Michael Luckey
Maybe the solution implemented on JdbcIO [1], [2] could be helpful in this
cases.

[1] https://issues.apache.org/jira/browse/BEAM-2803
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1088-L1118

On Fri, May 10, 2019 at 11:36 PM Lukasz Cwik  wrote:

> There is no such flag to turn of fusion.
>
> Writing 100s of GiBs of uncompressed data to reshuffle will take time when
> it is limited to a small number of workers.
>
> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
>
> On Fri, May 10, 2019 at 2:24 PM Allie Chen  wrote:
>
>> Re Lukasz: Thanks! I am not able to control the compression format but I
>> will see whether the splitting gzip files will work. Is there a simple flag
>> in Dataflow that could turn off the fusion?
>>
>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>> is not parallel either.
>>
>> Thanks all,
>>
>> Allie
>>
>> *From: *Reuven Lax 
>> *Date: *Fri, May 10, 2019 at 5:02 PM
>> *To: *dev
>> *Cc: *user
>>
>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>> simply reading and decompressing all that data was very slow when there was
>>> no parallelism.
>>>
>>> *From: *Allie Chen 
>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>> *To: * 
>>> *Cc: * 
>>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
 Reshuffle transform itself takes hours or even days to run, according to
 one test (24 gzip files, 17 million lines in total) I did.

 The file format for our users are mostly gzip format, since
 uncompressed files would be too costly to store (It could be in hundreds of
 GB).

 Thanks,

 Allie


 *From: *Lukasz Cwik 
 *Date: *Fri, May 10, 2019 at 4:07 PM
 *To: *dev, 

 +u...@beam.apache.org 
>
> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
> all the data has been read before the next transforms can run. After the
> reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
>
> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
>
> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
> wrote:
>
>> Hi,
>>
>>
>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>> compressed file is not splittable, one worker is allocated to read the
>> file. The same worker will do all the other transforms since Dataflow 
>> fused
>> all transforms together.  There are a large amount of data in the file, 
>> and
>> I expect to see more workers spinning up after reading transforms. I 
>> tried
>> to use Reshuffle Transform
>> 
>> to prevent the fusion, but it is not scalable since it won’t proceed 
>> until
>> all data arrived at this point.
>>
>> Is there any other ways to allow more workers working on all the
>> other transforms after reading?
>>
>> Thanks,
>>
>> Allie
>>
>>


Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
There is no such flag to turn of fusion.

Writing 100s of GiBs of uncompressed data to reshuffle will take time when
it is limited to a small number of workers.

If you can split up your input into a lot of smaller files that are
compressed then you shouldn't need to use the reshuffle but still could if
you found it helped.

On Fri, May 10, 2019 at 2:24 PM Allie Chen  wrote:

> Re Lukasz: Thanks! I am not able to control the compression format but I
> will see whether the splitting gzip files will work. Is there a simple flag
> in Dataflow that could turn off the fusion?
>
> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
> is not parallel either.
>
> Thanks all,
>
> Allie
>
> *From: *Reuven Lax 
> *Date: *Fri, May 10, 2019 at 5:02 PM
> *To: *dev
> *Cc: *user
>
> It's unlikely that Reshuffle itself takes hours. It's more likely that
>> simply reading and decompressing all that data was very slow when there was
>> no parallelism.
>>
>> *From: *Allie Chen 
>> *Date: *Fri, May 10, 2019 at 1:17 PM
>> *To: * 
>> *Cc: * 
>>
>> Yes, I do see the data after reshuffle are processed in parallel. But
>>> Reshuffle transform itself takes hours or even days to run, according to
>>> one test (24 gzip files, 17 million lines in total) I did.
>>>
>>> The file format for our users are mostly gzip format, since uncompressed
>>> files would be too costly to store (It could be in hundreds of GB).
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>
>>> *From: *Lukasz Cwik 
>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>> *To: *dev, 
>>>
>>> +u...@beam.apache.org 

 Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
 all the data has been read before the next transforms can run. After the
 reshuffle, the data should have been processed in parallel across the
 workers. Did you see this?

 Are you able to change the input of your pipeline to use an
 uncompressed file or many compressed files?

 On Fri, May 10, 2019 at 1:03 PM Allie Chen 
 wrote:

> Hi,
>
>
> I am trying to load a gzip file to BigQuey using Dataflow. Since the
> compressed file is not splittable, one worker is allocated to read the
> file. The same worker will do all the other transforms since Dataflow 
> fused
> all transforms together.  There are a large amount of data in the file, 
> and
> I expect to see more workers spinning up after reading transforms. I tried
> to use Reshuffle Transform
> 
> to prevent the fusion, but it is not scalable since it won’t proceed until
> all data arrived at this point.
>
> Is there any other ways to allow more workers working on all the other
> transforms after reading?
>
> Thanks,
>
> Allie
>
>


Re: Problem with gzip

2019-05-10 Thread Allie Chen
Re Lukasz: Thanks! I am not able to control the compression format but I
will see whether the splitting gzip files will work. Is there a simple flag
in Dataflow that could turn off the fusion?

Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
is not parallel either.

Thanks all,

Allie

*From: *Reuven Lax 
*Date: *Fri, May 10, 2019 at 5:02 PM
*To: *dev
*Cc: *user

It's unlikely that Reshuffle itself takes hours. It's more likely that
> simply reading and decompressing all that data was very slow when there was
> no parallelism.
>
> *From: *Allie Chen 
> *Date: *Fri, May 10, 2019 at 1:17 PM
> *To: * 
> *Cc: * 
>
> Yes, I do see the data after reshuffle are processed in parallel. But
>> Reshuffle transform itself takes hours or even days to run, according to
>> one test (24 gzip files, 17 million lines in total) I did.
>>
>> The file format for our users are mostly gzip format, since uncompressed
>> files would be too costly to store (It could be in hundreds of GB).
>>
>> Thanks,
>>
>> Allie
>>
>>
>> *From: *Lukasz Cwik 
>> *Date: *Fri, May 10, 2019 at 4:07 PM
>> *To: *dev, 
>>
>> +u...@beam.apache.org 
>>>
>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>>> the data has been read before the next transforms can run. After the
>>> reshuffle, the data should have been processed in parallel across the
>>> workers. Did you see this?
>>>
>>> Are you able to change the input of your pipeline to use an uncompressed
>>> file or many compressed files?
>>>
>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
>>> wrote:
>>>
 Hi,


 I am trying to load a gzip file to BigQuey using Dataflow. Since the
 compressed file is not splittable, one worker is allocated to read the
 file. The same worker will do all the other transforms since Dataflow fused
 all transforms together.  There are a large amount of data in the file, and
 I expect to see more workers spinning up after reading transforms. I tried
 to use Reshuffle Transform
 
 to prevent the fusion, but it is not scalable since it won’t proceed until
 all data arrived at this point.

 Is there any other ways to allow more workers working on all the other
 transforms after reading?

 Thanks,

 Allie




Re: Problem with gzip

2019-05-10 Thread Reuven Lax
It's unlikely that Reshuffle itself takes hours. It's more likely that
simply reading and decompressing all that data was very slow when there was
no parallelism.

*From: *Allie Chen 
*Date: *Fri, May 10, 2019 at 1:17 PM
*To: * 
*Cc: * 

Yes, I do see the data after reshuffle are processed in parallel. But
> Reshuffle transform itself takes hours or even days to run, according to
> one test (24 gzip files, 17 million lines in total) I did.
>
> The file format for our users are mostly gzip format, since uncompressed
> files would be too costly to store (It could be in hundreds of GB).
>
> Thanks,
>
> Allie
>
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 4:07 PM
> *To: *dev, 
>
> +u...@beam.apache.org 
>>
>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>> the data has been read before the next transforms can run. After the
>> reshuffle, the data should have been processed in parallel across the
>> workers. Did you see this?
>>
>> Are you able to change the input of your pipeline to use an uncompressed
>> file or many compressed files?
>>
>> On Fri, May 10, 2019 at 1:03 PM Allie Chen  wrote:
>>
>>> Hi,
>>>
>>>
>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>> compressed file is not splittable, one worker is allocated to read the
>>> file. The same worker will do all the other transforms since Dataflow fused
>>> all transforms together.  There are a large amount of data in the file, and
>>> I expect to see more workers spinning up after reading transforms. I tried
>>> to use Reshuffle Transform
>>> 
>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>> all data arrived at this point.
>>>
>>> Is there any other ways to allow more workers working on all the other
>>> transforms after reading?
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>


Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
The best solution would be to find a compression format that is splittable
and add support for that to Apache Beam and use it. The issue with
compressed files is that you can't read from an arbitrary offset. This
stack overflow post[1] has some suggestions on seekable compression
libraries.

A much easier solution would be to split up your data to 100s of gzip
files. This would give you most of the compression benefit and would also
give you a lot of parallelization benefit during reading.

1:
https://stackoverflow.com/questions/2046559/any-seekable-compression-library

On Fri, May 10, 2019 at 1:25 PM Allie Chen  wrote:

> Yes, that is correct.
>
> *From: *Allie Chen 
> *Date: *Fri, May 10, 2019 at 4:21 PM
> *To: * 
> *Cc: * 
>
> Yes.
>>
>> *From: *Lukasz Cwik 
>> *Date: *Fri, May 10, 2019 at 4:19 PM
>> *To: *dev
>> *Cc: * 
>>
>> When you had X gzip files and were not using Reshuffle, did you see X
>>> workers read and process the files?
>>>
>>> On Fri, May 10, 2019 at 1:17 PM Allie Chen 
>>> wrote:
>>>
 Yes, I do see the data after reshuffle are processed in parallel. But
 Reshuffle transform itself takes hours or even days to run, according to
 one test (24 gzip files, 17 million lines in total) I did.

 The file format for our users are mostly gzip format, since
 uncompressed files would be too costly to store (It could be in hundreds of
 GB).

 Thanks,

 Allie


 *From: *Lukasz Cwik 
 *Date: *Fri, May 10, 2019 at 4:07 PM
 *To: *dev, 

 +u...@beam.apache.org 
>
> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
> all the data has been read before the next transforms can run. After the
> reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
>
> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
>
> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
> wrote:
>
>> Hi,
>>
>>
>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>> compressed file is not splittable, one worker is allocated to read the
>> file. The same worker will do all the other transforms since Dataflow 
>> fused
>> all transforms together.  There are a large amount of data in the file, 
>> and
>> I expect to see more workers spinning up after reading transforms. I 
>> tried
>> to use Reshuffle Transform
>> 
>> to prevent the fusion, but it is not scalable since it won’t proceed 
>> until
>> all data arrived at this point.
>>
>> Is there any other ways to allow more workers working on all the
>> other transforms after reading?
>>
>> Thanks,
>>
>> Allie
>>
>>


Re: Problem with gzip

2019-05-10 Thread Allie Chen
Yes, that is correct.

*From: *Allie Chen 
*Date: *Fri, May 10, 2019 at 4:21 PM
*To: * 
*Cc: * 

Yes.
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 4:19 PM
> *To: *dev
> *Cc: * 
>
> When you had X gzip files and were not using Reshuffle, did you see X
>> workers read and process the files?
>>
>> On Fri, May 10, 2019 at 1:17 PM Allie Chen  wrote:
>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>> Reshuffle transform itself takes hours or even days to run, according to
>>> one test (24 gzip files, 17 million lines in total) I did.
>>>
>>> The file format for our users are mostly gzip format, since uncompressed
>>> files would be too costly to store (It could be in hundreds of GB).
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>
>>> *From: *Lukasz Cwik 
>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>> *To: *dev, 
>>>
>>> +u...@beam.apache.org 

 Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
 all the data has been read before the next transforms can run. After the
 reshuffle, the data should have been processed in parallel across the
 workers. Did you see this?

 Are you able to change the input of your pipeline to use an
 uncompressed file or many compressed files?

 On Fri, May 10, 2019 at 1:03 PM Allie Chen 
 wrote:

> Hi,
>
>
> I am trying to load a gzip file to BigQuey using Dataflow. Since the
> compressed file is not splittable, one worker is allocated to read the
> file. The same worker will do all the other transforms since Dataflow 
> fused
> all transforms together.  There are a large amount of data in the file, 
> and
> I expect to see more workers spinning up after reading transforms. I tried
> to use Reshuffle Transform
> 
> to prevent the fusion, but it is not scalable since it won’t proceed until
> all data arrived at this point.
>
> Is there any other ways to allow more workers working on all the other
> transforms after reading?
>
> Thanks,
>
> Allie
>
>


Re: Problem with gzip

2019-05-10 Thread Allie Chen
Yes.

*From: *Lukasz Cwik 
*Date: *Fri, May 10, 2019 at 4:19 PM
*To: *dev
*Cc: * 

When you had X gzip files and were not using Reshuffle, did you see X
> workers read and process the files?
>
> On Fri, May 10, 2019 at 1:17 PM Allie Chen  wrote:
>
>> Yes, I do see the data after reshuffle are processed in parallel. But
>> Reshuffle transform itself takes hours or even days to run, according to
>> one test (24 gzip files, 17 million lines in total) I did.
>>
>> The file format for our users are mostly gzip format, since uncompressed
>> files would be too costly to store (It could be in hundreds of GB).
>>
>> Thanks,
>>
>> Allie
>>
>>
>> *From: *Lukasz Cwik 
>> *Date: *Fri, May 10, 2019 at 4:07 PM
>> *To: *dev, 
>>
>> +u...@beam.apache.org 
>>>
>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>>> the data has been read before the next transforms can run. After the
>>> reshuffle, the data should have been processed in parallel across the
>>> workers. Did you see this?
>>>
>>> Are you able to change the input of your pipeline to use an uncompressed
>>> file or many compressed files?
>>>
>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
>>> wrote:
>>>
 Hi,


 I am trying to load a gzip file to BigQuey using Dataflow. Since the
 compressed file is not splittable, one worker is allocated to read the
 file. The same worker will do all the other transforms since Dataflow fused
 all transforms together.  There are a large amount of data in the file, and
 I expect to see more workers spinning up after reading transforms. I tried
 to use Reshuffle Transform
 
 to prevent the fusion, but it is not scalable since it won’t proceed until
 all data arrived at this point.

 Is there any other ways to allow more workers working on all the other
 transforms after reading?

 Thanks,

 Allie




Re: Problem with gzip

2019-05-10 Thread Allie Chen
Yes, I do see the data after reshuffle are processed in parallel. But
Reshuffle transform itself takes hours or even days to run, according to
one test (24 gzip files, 17 million lines in total) I did.

The file format for our users are mostly gzip format, since uncompressed
files would be too costly to store (It could be in hundreds of GB).

Thanks,

Allie


*From: *Lukasz Cwik 
*Date: *Fri, May 10, 2019 at 4:07 PM
*To: *dev, 

+u...@beam.apache.org 
>
> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
> the data has been read before the next transforms can run. After the
> reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
>
> Are you able to change the input of your pipeline to use an uncompressed
> file or many compressed files?
>
> On Fri, May 10, 2019 at 1:03 PM Allie Chen  wrote:
>
>> Hi,
>>
>>
>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>> compressed file is not splittable, one worker is allocated to read the
>> file. The same worker will do all the other transforms since Dataflow fused
>> all transforms together.  There are a large amount of data in the file, and
>> I expect to see more workers spinning up after reading transforms. I tried
>> to use Reshuffle Transform
>> 
>> to prevent the fusion, but it is not scalable since it won’t proceed until
>> all data arrived at this point.
>>
>> Is there any other ways to allow more workers working on all the other
>> transforms after reading?
>>
>> Thanks,
>>
>> Allie
>>
>>


Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
When you had X gzip files and were not using Reshuffle, did you see X
workers read and process the files?

On Fri, May 10, 2019 at 1:17 PM Allie Chen  wrote:

> Yes, I do see the data after reshuffle are processed in parallel. But
> Reshuffle transform itself takes hours or even days to run, according to
> one test (24 gzip files, 17 million lines in total) I did.
>
> The file format for our users are mostly gzip format, since uncompressed
> files would be too costly to store (It could be in hundreds of GB).
>
> Thanks,
>
> Allie
>
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 4:07 PM
> *To: *dev, 
>
> +u...@beam.apache.org 
>>
>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>> the data has been read before the next transforms can run. After the
>> reshuffle, the data should have been processed in parallel across the
>> workers. Did you see this?
>>
>> Are you able to change the input of your pipeline to use an uncompressed
>> file or many compressed files?
>>
>> On Fri, May 10, 2019 at 1:03 PM Allie Chen  wrote:
>>
>>> Hi,
>>>
>>>
>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>> compressed file is not splittable, one worker is allocated to read the
>>> file. The same worker will do all the other transforms since Dataflow fused
>>> all transforms together.  There are a large amount of data in the file, and
>>> I expect to see more workers spinning up after reading transforms. I tried
>>> to use Reshuffle Transform
>>> 
>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>> all data arrived at this point.
>>>
>>> Is there any other ways to allow more workers working on all the other
>>> transforms after reading?
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>


Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
+u...@beam.apache.org 

Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
the data has been read before the next transforms can run. After the
reshuffle, the data should have been processed in parallel across the
workers. Did you see this?

Are you able to change the input of your pipeline to use an uncompressed
file or many compressed files?

On Fri, May 10, 2019 at 1:03 PM Allie Chen  wrote:

> Hi,
>
>
> I am trying to load a gzip file to BigQuey using Dataflow. Since the
> compressed file is not splittable, one worker is allocated to read the
> file. The same worker will do all the other transforms since Dataflow fused
> all transforms together.  There are a large amount of data in the file, and
> I expect to see more workers spinning up after reading transforms. I tried
> to use Reshuffle Transform
> 
> to prevent the fusion, but it is not scalable since it won’t proceed until
> all data arrived at this point.
>
> Is there any other ways to allow more workers working on all the other
> transforms after reading?
>
> Thanks,
>
> Allie
>
>