Re: Cassandra and hadoop test broken on master and in previous releases

2019-05-10 Thread Jean-Baptiste Onofré
Hi,

let me try to reproduce on my box.

Regards
JB

On 11/05/2019 01:34, Ankur Goenka wrote:
> Hi,
> 
> Cassandra and Hadoop tests for targets :beam-sdks-java-io-cassandra:test
> :beam-sdks-java-io-hadoop-format:test are failing at master and in
> 2.12.0 release with jvm crash. 
> 
> Gradle Scan: https://gradle.com/s/rhseoqeouup6e
> 
> Any help on the debugging failure will be useful.
> 
> Thanks,
> Ankur

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Do we maintain offline artifact version in javadocs sdks/java/javadoc/build.gradle

2019-05-10 Thread Ankur Goenka
Hi,

I see that the sdks/java/javadoc/build.gradle is not in sync with
org/apache/beam/gradle/BeamModulePlugin.groovy .
I wanted to check if we are maintaining or not based on that we can either
remove or update sdks/java/javadoc/build.gradle.

Thanks,
Ankur


Re: Problem with gzip

2019-05-10 Thread Michael Luckey
Maybe the solution implemented on JdbcIO [1], [2] could be helpful in this
cases.

[1] https://issues.apache.org/jira/browse/BEAM-2803
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1088-L1118

On Fri, May 10, 2019 at 11:36 PM Lukasz Cwik  wrote:

> There is no such flag to turn of fusion.
>
> Writing 100s of GiBs of uncompressed data to reshuffle will take time when
> it is limited to a small number of workers.
>
> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
>
> On Fri, May 10, 2019 at 2:24 PM Allie Chen  wrote:
>
>> Re Lukasz: Thanks! I am not able to control the compression format but I
>> will see whether the splitting gzip files will work. Is there a simple flag
>> in Dataflow that could turn off the fusion?
>>
>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>> is not parallel either.
>>
>> Thanks all,
>>
>> Allie
>>
>> *From: *Reuven Lax 
>> *Date: *Fri, May 10, 2019 at 5:02 PM
>> *To: *dev
>> *Cc: *user
>>
>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>> simply reading and decompressing all that data was very slow when there was
>>> no parallelism.
>>>
>>> *From: *Allie Chen 
>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>> *To: * 
>>> *Cc: * 
>>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
 Reshuffle transform itself takes hours or even days to run, according to
 one test (24 gzip files, 17 million lines in total) I did.

 The file format for our users are mostly gzip format, since
 uncompressed files would be too costly to store (It could be in hundreds of
 GB).

 Thanks,

 Allie


 *From: *Lukasz Cwik 
 *Date: *Fri, May 10, 2019 at 4:07 PM
 *To: *dev, 

 +u...@beam.apache.org 
>
> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
> all the data has been read before the next transforms can run. After the
> reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
>
> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
>
> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
> wrote:
>
>> Hi,
>>
>>
>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>> compressed file is not splittable, one worker is allocated to read the
>> file. The same worker will do all the other transforms since Dataflow 
>> fused
>> all transforms together.  There are a large amount of data in the file, 
>> and
>> I expect to see more workers spinning up after reading transforms. I 
>> tried
>> to use Reshuffle Transform
>> 
>> to prevent the fusion, but it is not scalable since it won’t proceed 
>> until
>> all data arrived at this point.
>>
>> Is there any other ways to allow more workers working on all the
>> other transforms after reading?
>>
>> Thanks,
>>
>> Allie
>>
>>


Cassandra and hadoop test broken on master and in previous releases

2019-05-10 Thread Ankur Goenka
Hi,

Cassandra and Hadoop tests for targets :beam-sdks-java-io-cassandra:test
:beam-sdks-java-io-hadoop-format:test are failing at master and in 2.12.0
release with jvm crash.

Gradle Scan: https://gradle.com/s/rhseoqeouup6e

Any help on the debugging failure will be useful.

Thanks,
Ankur


Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
There is no such flag to turn of fusion.

Writing 100s of GiBs of uncompressed data to reshuffle will take time when
it is limited to a small number of workers.

If you can split up your input into a lot of smaller files that are
compressed then you shouldn't need to use the reshuffle but still could if
you found it helped.

On Fri, May 10, 2019 at 2:24 PM Allie Chen  wrote:

> Re Lukasz: Thanks! I am not able to control the compression format but I
> will see whether the splitting gzip files will work. Is there a simple flag
> in Dataflow that could turn off the fusion?
>
> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
> is not parallel either.
>
> Thanks all,
>
> Allie
>
> *From: *Reuven Lax 
> *Date: *Fri, May 10, 2019 at 5:02 PM
> *To: *dev
> *Cc: *user
>
> It's unlikely that Reshuffle itself takes hours. It's more likely that
>> simply reading and decompressing all that data was very slow when there was
>> no parallelism.
>>
>> *From: *Allie Chen 
>> *Date: *Fri, May 10, 2019 at 1:17 PM
>> *To: * 
>> *Cc: * 
>>
>> Yes, I do see the data after reshuffle are processed in parallel. But
>>> Reshuffle transform itself takes hours or even days to run, according to
>>> one test (24 gzip files, 17 million lines in total) I did.
>>>
>>> The file format for our users are mostly gzip format, since uncompressed
>>> files would be too costly to store (It could be in hundreds of GB).
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>
>>> *From: *Lukasz Cwik 
>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>> *To: *dev, 
>>>
>>> +u...@beam.apache.org 

 Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
 all the data has been read before the next transforms can run. After the
 reshuffle, the data should have been processed in parallel across the
 workers. Did you see this?

 Are you able to change the input of your pipeline to use an
 uncompressed file or many compressed files?

 On Fri, May 10, 2019 at 1:03 PM Allie Chen 
 wrote:

> Hi,
>
>
> I am trying to load a gzip file to BigQuey using Dataflow. Since the
> compressed file is not splittable, one worker is allocated to read the
> file. The same worker will do all the other transforms since Dataflow 
> fused
> all transforms together.  There are a large amount of data in the file, 
> and
> I expect to see more workers spinning up after reading transforms. I tried
> to use Reshuffle Transform
> 
> to prevent the fusion, but it is not scalable since it won’t proceed until
> all data arrived at this point.
>
> Is there any other ways to allow more workers working on all the other
> transforms after reading?
>
> Thanks,
>
> Allie
>
>


Re: Problem with gzip

2019-05-10 Thread Allie Chen
Re Lukasz: Thanks! I am not able to control the compression format but I
will see whether the splitting gzip files will work. Is there a simple flag
in Dataflow that could turn off the fusion?

Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
is not parallel either.

Thanks all,

Allie

*From: *Reuven Lax 
*Date: *Fri, May 10, 2019 at 5:02 PM
*To: *dev
*Cc: *user

It's unlikely that Reshuffle itself takes hours. It's more likely that
> simply reading and decompressing all that data was very slow when there was
> no parallelism.
>
> *From: *Allie Chen 
> *Date: *Fri, May 10, 2019 at 1:17 PM
> *To: * 
> *Cc: * 
>
> Yes, I do see the data after reshuffle are processed in parallel. But
>> Reshuffle transform itself takes hours or even days to run, according to
>> one test (24 gzip files, 17 million lines in total) I did.
>>
>> The file format for our users are mostly gzip format, since uncompressed
>> files would be too costly to store (It could be in hundreds of GB).
>>
>> Thanks,
>>
>> Allie
>>
>>
>> *From: *Lukasz Cwik 
>> *Date: *Fri, May 10, 2019 at 4:07 PM
>> *To: *dev, 
>>
>> +u...@beam.apache.org 
>>>
>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>>> the data has been read before the next transforms can run. After the
>>> reshuffle, the data should have been processed in parallel across the
>>> workers. Did you see this?
>>>
>>> Are you able to change the input of your pipeline to use an uncompressed
>>> file or many compressed files?
>>>
>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
>>> wrote:
>>>
 Hi,


 I am trying to load a gzip file to BigQuey using Dataflow. Since the
 compressed file is not splittable, one worker is allocated to read the
 file. The same worker will do all the other transforms since Dataflow fused
 all transforms together.  There are a large amount of data in the file, and
 I expect to see more workers spinning up after reading transforms. I tried
 to use Reshuffle Transform
 
 to prevent the fusion, but it is not scalable since it won’t proceed until
 all data arrived at this point.

 Is there any other ways to allow more workers working on all the other
 transforms after reading?

 Thanks,

 Allie




Re: Problem with gzip

2019-05-10 Thread Reuven Lax
It's unlikely that Reshuffle itself takes hours. It's more likely that
simply reading and decompressing all that data was very slow when there was
no parallelism.

*From: *Allie Chen 
*Date: *Fri, May 10, 2019 at 1:17 PM
*To: * 
*Cc: * 

Yes, I do see the data after reshuffle are processed in parallel. But
> Reshuffle transform itself takes hours or even days to run, according to
> one test (24 gzip files, 17 million lines in total) I did.
>
> The file format for our users are mostly gzip format, since uncompressed
> files would be too costly to store (It could be in hundreds of GB).
>
> Thanks,
>
> Allie
>
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 4:07 PM
> *To: *dev, 
>
> +u...@beam.apache.org 
>>
>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>> the data has been read before the next transforms can run. After the
>> reshuffle, the data should have been processed in parallel across the
>> workers. Did you see this?
>>
>> Are you able to change the input of your pipeline to use an uncompressed
>> file or many compressed files?
>>
>> On Fri, May 10, 2019 at 1:03 PM Allie Chen  wrote:
>>
>>> Hi,
>>>
>>>
>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>> compressed file is not splittable, one worker is allocated to read the
>>> file. The same worker will do all the other transforms since Dataflow fused
>>> all transforms together.  There are a large amount of data in the file, and
>>> I expect to see more workers spinning up after reading transforms. I tried
>>> to use Reshuffle Transform
>>> 
>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>> all data arrived at this point.
>>>
>>> Is there any other ways to allow more workers working on all the other
>>> transforms after reading?
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>


Re: Fwd: Your application for Season of Docs 2019 was unsuccessful

2019-05-10 Thread Aizhamal Nurmamat kyzy
I think it is still a good idea to make those Jira issues easily findable
in the Beam website. Maybe in https://beam.apache.org/contribute/ next to
'improve the documentation' add the link to the documentation component or
label..  or something similar. To solicit openly for docs contributors is
more effective I presume.

*From: *Maximilian Michels 
*Date: *Thu, May 2, 2019 at 6:47 AM
*To: * 

Aw, too bad. Next time. I hope we can extend the docs for portability
> before next year :)
>
> On 02.05.19 00:30, Pablo Estrada wrote:
> > Hello all,
> > as you may already know, unfortunately our application for Season of
> > Docs was not successful. That's too bad : ) - but it's good that we were
> > able to produce a couple work items that can still be picked up by the
> > community at some point. Thanks to everyone who helped here.
> > Best
> > -P.
> >
> > -- Forwarded message -
> > From: *Andrew Chen* mailto:cheno...@google.com>>
> > Date: Tue, Apr 30, 2019 at 5:31 AM
> > Subject: Your application for Season of Docs 2019 was unsuccessful
> > To:  > >
> >
> >
> > Thank you for your interest and enthusiasm for Season of Docs.
> >
> >
> > We’re sorry to say that your organization’s application for Season of
> > Docs was not successful this year. Because 2019 is the program’s pilot
> > year, we were only able to accept 50 organizations out of the almost 200
> > applications submitted. There were many high quality applications, so
> > the selection of organizations was not easy.
> >
> >
> > Please do stay in touch with the progress of Season of Docs, and let us
> > know if you have any questions or feedback by emailing
> > season-of-docs-supp...@googlegroups.com
> > . We are hoping to grow
> > the program's capacity in subsequent years, so please apply again next
> year.
> >
> >
> > Sincerely,
> >
> >
> > The Season of Docs team
> >
> >
>


Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
The best solution would be to find a compression format that is splittable
and add support for that to Apache Beam and use it. The issue with
compressed files is that you can't read from an arbitrary offset. This
stack overflow post[1] has some suggestions on seekable compression
libraries.

A much easier solution would be to split up your data to 100s of gzip
files. This would give you most of the compression benefit and would also
give you a lot of parallelization benefit during reading.

1:
https://stackoverflow.com/questions/2046559/any-seekable-compression-library

On Fri, May 10, 2019 at 1:25 PM Allie Chen  wrote:

> Yes, that is correct.
>
> *From: *Allie Chen 
> *Date: *Fri, May 10, 2019 at 4:21 PM
> *To: * 
> *Cc: * 
>
> Yes.
>>
>> *From: *Lukasz Cwik 
>> *Date: *Fri, May 10, 2019 at 4:19 PM
>> *To: *dev
>> *Cc: * 
>>
>> When you had X gzip files and were not using Reshuffle, did you see X
>>> workers read and process the files?
>>>
>>> On Fri, May 10, 2019 at 1:17 PM Allie Chen 
>>> wrote:
>>>
 Yes, I do see the data after reshuffle are processed in parallel. But
 Reshuffle transform itself takes hours or even days to run, according to
 one test (24 gzip files, 17 million lines in total) I did.

 The file format for our users are mostly gzip format, since
 uncompressed files would be too costly to store (It could be in hundreds of
 GB).

 Thanks,

 Allie


 *From: *Lukasz Cwik 
 *Date: *Fri, May 10, 2019 at 4:07 PM
 *To: *dev, 

 +u...@beam.apache.org 
>
> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
> all the data has been read before the next transforms can run. After the
> reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
>
> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
>
> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
> wrote:
>
>> Hi,
>>
>>
>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>> compressed file is not splittable, one worker is allocated to read the
>> file. The same worker will do all the other transforms since Dataflow 
>> fused
>> all transforms together.  There are a large amount of data in the file, 
>> and
>> I expect to see more workers spinning up after reading transforms. I 
>> tried
>> to use Reshuffle Transform
>> 
>> to prevent the fusion, but it is not scalable since it won’t proceed 
>> until
>> all data arrived at this point.
>>
>> Is there any other ways to allow more workers working on all the
>> other transforms after reading?
>>
>> Thanks,
>>
>> Allie
>>
>>


Re: Unexpected behavior of StateSpecs

2019-05-10 Thread Jan Lukavský

Hi Lukasz,

I've created JIRA issue [1] and PR [2].

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7269

[2] https://github.com/apache/beam/pull/8555

On 5/10/19 7:39 PM, Lukasz Cwik wrote:
That seems like the correct fix as well. We could open up a PR and see 
what the tests catch as a first pass for understanding the implications.


On Fri, May 10, 2019 at 9:31 AM Jan Lukavský > wrote:


Hm, yes, the fix might be also in fixing hashCode and equals of
SimpleStateTag, so that it doesn't hash and compare the StateSpec,
but only the StructureId. That looks like best option to me. But
I'm not sure about other implications this might have.

Jan

On 5/10/19 5:43 PM, Reuven Lax wrote:

Ok so this sounds like a bug in the DirectRunner then?

*From: *Lukasz Cwik mailto:lc...@google.com>>
*Date: *Fri, May 10, 2019 at 8:38 AM
*To: *dev

StateSpec should not be used as a key within any maps. We
should use the logical name of the StateSpec relative to the
DoFn as its id and should only be using that id for
comparisons/lookups.

On Fri, May 10, 2019 at 1:07 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

I'm not sure. Generally it affects any runner that uses
HashMap to store StateSpec.

Jan

On 5/9/19 6:32 PM, Reuven Lax wrote:

Is this specific to the DirectRunner, or does it affect
other runners?

On Thu, May 9, 2019 at 8:13 AM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

Because of the use of hashCode in StateSpecs, I'd
say that it is. But it is not obvious. That's why
I'd suggest to make it abstract on Coder, so that
all implementations have to override it. That's a
simple solution, but the question is - should
hashCode of Coder be used that way? I think that
StateSpec instances should be equal only to itself.
Then the hashCode can be stored in the instance, e.g.

  private final int hashCode =
System.identityHashCode(this)

and returned in hashCode(). There would be no need
for Coder to implement hashCode anymore (if there
aren't any other cases, where it is needed, in which
case it would still be better to add abstract
hashCode and equals methods on Coder).

Jan

On 5/9/19 5:04 PM, Reuven Lax wrote:

Is a valid hashCode on Coder part of our contract
or not? If it is, then the lack of hashCode on
SchemaCoder is simply a bug.

On Thu, May 9, 2019 at 7:42 AM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

Hi,

I have spent several hour digging into strange
issue with DirectRunner,
that manifested as non-deterministic run of
pipeline. The pipeline
contains basically only single stateful ParDo,
which adds elements into
state and after some timeout flushes these
elements into output. The
issues was, that sometimes (very often) when
the timer fired, the state
appeared to be empty, although I actually added
something into the
state. I will skip details, but the problem
boils down to the fact, that
StateSpecs hash Coder into hashCode - e.g.

 @Override
 public int hashCode() {
   return Objects.hash(getClass(), coder);
 }

in ValueStateSpec. Now, when Coder doesn't have
hashCode and equals
implemented (and there are some of those in the
codebase itself - e.g.
SchemaCoder), it all blows up in a very
hard-to-debug manner. So the
proposal is - either to add abstract hashCode
and equals to Coder, or
don't hash the Coder into hashCode of
StateSpecs (we can generate unique
ID for each StateSpec instance for example).

Any thoughts about which path to follow? Or
maybe both? :)

Jan




Re: Python SDK timestamp precision

2019-05-10 Thread Robert Bradshaw
On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles  wrote:

> From: Robert Bradshaw 
> Date: Wed, May 8, 2019 at 3:00 PM
> To: dev
>
>> From: Kenneth Knowles 
>> Date: Wed, May 8, 2019 at 6:50 PM
>> To: dev
>>
>> >> The end-of-window, for firing, can be approximate, but it seems it
>> >> should be exact for timestamp assignment of the result (and similarly
>> >> with the other timestamp combiners).
>> >
>> > I was thinking that the window itself should be stored as exact data, 
>> > while just the firing itself is approximated, since it already is, because 
>> > of watermarks and timers.
>>
>> I think this works where we can compare encoded windows, but some
>> portable interpretation of windows is required for runner-side
>> implementation of merging windows (for example).
>
> But in this case, you've recognized the URN of the WindowFn anyhow, so you 
> understand its windows. Remembering that IntervalWindow is just one choice, 
> and that windows themselves are totally user-defined and that merging logic 
> is completely arbitrary per WindowFn (we probably should have some 
> restrictions, but see https://issues.apache.org/jira/browse/BEAM-654). So I 
> file this use case in the "runner knows everything about the WindowFn and 
> Window type and window encoding anyhow".

Being able to merge common windows in the runner is just an
optimization, but an important one (especially for bootstrapping
SDKs). However, this is not just about runner to SDK, but SDK to SDK
as well (where a user from one SDK may want to inspect the windows
produced by another). Having MillisIntervalWindow,
MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I
think is worth going down.

Yes, we need to solve the "extract the endpoint of an unknown encoded
window" problem as well, possibly similar to what we do with length
prefix coders, possibly a restriction on window encodings themselves.

>> There may also be issues if windows (or timestamps) are assigned to a
>> high precision in one SDK, then inspected/acted on in another SDK, and
>> then passed back to the original SDK where the truncation would be
>> visible.
>
> This is pretty interesting and complex. But again, a window is just data. An 
> SDK has to know how to deserialize it to operate on it. Unless we do actually 
> standardize some aspects of it. I don't believe BoundedWindow encoding has a 
> defined way to get the timestamp without decoding the window, does it? I 
> thought we had basically default to all InternalWindows. But I am not 
> following that closely.
>
>> > You raise a good point that min/max timestamp combiners require actually 
>> > understanding the higher-precision timestamp. I can think of a couple 
>> > things to do. One is the old "standardize all 3 or for precisions we need" 
>> > and the other is that combiners other than EOW exist primarily to hold the 
>> > watermark, and that hold does not require the original precision. Still, 
>> > neither of these is that satisfying.
>>
>> In the current model, the output timestamp is user-visible.
>
> But as long as the watermark hold is less, it is safe. It requires knowing 
> the coarse-precision lower bound of the timestamps of the input. And there 
> may be situations where you also want the coarse upper bound. But you do know 
> that these are at most one millisecond apart (assuming the runner is in 
> millis) so perhaps no storage overhead. But a lot of complexity and chances 
> for off by ones. And this is pretty hand-wavy.

Yeah. A different SDK may (implicitly or explicitly) ask for the
timestamp of the (transitive) output of a GBK, for which an
approximation (either way) is undesirable.

>> >> > A correction: Java *now* uses nanoseconds [1]. It uses the same 
>> >> > breakdown as proto (int64 seconds since epoch + int32 nanos within 
>> >> > second). It has legacy classes that use milliseconds, and Joda itself 
>> >> > now encourages moving back to Java's new Instant type. Nanoseconds 
>> >> > should complicate the arithmetic only for the one person authoring the 
>> >> > date library, which they have already done.
>> >>
>> >> The encoding and decoding need to be done in a language-consistent way
>> >> as well.
>> >
>> > I honestly am not sure what you mean by "language-consistent" here.
>>
>> If we want to make reading and writing of timestamps, windows
>> cross-language, we can't rely on language-specific libraries to do the
>> encoding.
>>
>> >> Also, most date libraries don't division, etc. operators, so
>> >> we have to do that as well. Not that it should be *that* hard.
>> >
>> > If the libraries dedicated to time handling haven't found it needful, is 
>> > there a specific reason you raise this? We do some simple math to find the 
>> > window things fall into; is that it?
>>
>> Yes. E.g.
>>
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77
>>
>> would be a lot messier if there were no mapping 

Re: Beam's Conda package

2019-05-10 Thread Ahmet Altay
https://github.com/sodre seems to be the person behind it. Does anybody
know who is that person?

*From: *Charles Chen 
*Date: *Fri, May 10, 2019 at 1:13 PM
*To: *dev

Looks like this is where it's living:
> https://github.com/conda-forge/apache-beam-feedstock/tree/c96274713fcc5970c967c20e84859e73d0efa0d0
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 1:02 PM
> *To: *dev
>
> I'm not aware of who set up conda as well. There seem to have been ~4500
>> downloads of the package so that is a good amount of users.
>>
>> On Fri, May 10, 2019 at 11:45 AM Ahmet Altay  wrote:
>>
>>> Hi all,
>>>
>>> There a conda package for apache-beam [1]. As far as I know, we do not
>>> release this package. Does anyone know who owns this? It was last updated
>>> to use 2.9.0, at least it would be good to add a newer version there.
>>>
>>> We also don't test in that environment so I am not sure how well it
>>> works or who uses it.
>>>
>>> Thank you,
>>> Ahmet
>>>
>>> [1] https://anaconda.org/conda-forge/apache-beam
>>>
>>


Re: Problem with gzip

2019-05-10 Thread Allie Chen
Yes, that is correct.

*From: *Allie Chen 
*Date: *Fri, May 10, 2019 at 4:21 PM
*To: * 
*Cc: * 

Yes.
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 4:19 PM
> *To: *dev
> *Cc: * 
>
> When you had X gzip files and were not using Reshuffle, did you see X
>> workers read and process the files?
>>
>> On Fri, May 10, 2019 at 1:17 PM Allie Chen  wrote:
>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>> Reshuffle transform itself takes hours or even days to run, according to
>>> one test (24 gzip files, 17 million lines in total) I did.
>>>
>>> The file format for our users are mostly gzip format, since uncompressed
>>> files would be too costly to store (It could be in hundreds of GB).
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>
>>> *From: *Lukasz Cwik 
>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>> *To: *dev, 
>>>
>>> +u...@beam.apache.org 

 Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
 all the data has been read before the next transforms can run. After the
 reshuffle, the data should have been processed in parallel across the
 workers. Did you see this?

 Are you able to change the input of your pipeline to use an
 uncompressed file or many compressed files?

 On Fri, May 10, 2019 at 1:03 PM Allie Chen 
 wrote:

> Hi,
>
>
> I am trying to load a gzip file to BigQuey using Dataflow. Since the
> compressed file is not splittable, one worker is allocated to read the
> file. The same worker will do all the other transforms since Dataflow 
> fused
> all transforms together.  There are a large amount of data in the file, 
> and
> I expect to see more workers spinning up after reading transforms. I tried
> to use Reshuffle Transform
> 
> to prevent the fusion, but it is not scalable since it won’t proceed until
> all data arrived at this point.
>
> Is there any other ways to allow more workers working on all the other
> transforms after reading?
>
> Thanks,
>
> Allie
>
>


Re: Beam's Conda package

2019-05-10 Thread Charles Chen
Looks like this is where it's living:
https://github.com/conda-forge/apache-beam-feedstock/tree/c96274713fcc5970c967c20e84859e73d0efa0d0

*From: *Lukasz Cwik 
*Date: *Fri, May 10, 2019 at 1:02 PM
*To: *dev

I'm not aware of who set up conda as well. There seem to have been ~4500
> downloads of the package so that is a good amount of users.
>
> On Fri, May 10, 2019 at 11:45 AM Ahmet Altay  wrote:
>
>> Hi all,
>>
>> There a conda package for apache-beam [1]. As far as I know, we do not
>> release this package. Does anyone know who owns this? It was last updated
>> to use 2.9.0, at least it would be good to add a newer version there.
>>
>> We also don't test in that environment so I am not sure how well it works
>> or who uses it.
>>
>> Thank you,
>> Ahmet
>>
>> [1] https://anaconda.org/conda-forge/apache-beam
>>
>


Re: Problem with gzip

2019-05-10 Thread Allie Chen
Yes.

*From: *Lukasz Cwik 
*Date: *Fri, May 10, 2019 at 4:19 PM
*To: *dev
*Cc: * 

When you had X gzip files and were not using Reshuffle, did you see X
> workers read and process the files?
>
> On Fri, May 10, 2019 at 1:17 PM Allie Chen  wrote:
>
>> Yes, I do see the data after reshuffle are processed in parallel. But
>> Reshuffle transform itself takes hours or even days to run, according to
>> one test (24 gzip files, 17 million lines in total) I did.
>>
>> The file format for our users are mostly gzip format, since uncompressed
>> files would be too costly to store (It could be in hundreds of GB).
>>
>> Thanks,
>>
>> Allie
>>
>>
>> *From: *Lukasz Cwik 
>> *Date: *Fri, May 10, 2019 at 4:07 PM
>> *To: *dev, 
>>
>> +u...@beam.apache.org 
>>>
>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>>> the data has been read before the next transforms can run. After the
>>> reshuffle, the data should have been processed in parallel across the
>>> workers. Did you see this?
>>>
>>> Are you able to change the input of your pipeline to use an uncompressed
>>> file or many compressed files?
>>>
>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen 
>>> wrote:
>>>
 Hi,


 I am trying to load a gzip file to BigQuey using Dataflow. Since the
 compressed file is not splittable, one worker is allocated to read the
 file. The same worker will do all the other transforms since Dataflow fused
 all transforms together.  There are a large amount of data in the file, and
 I expect to see more workers spinning up after reading transforms. I tried
 to use Reshuffle Transform
 
 to prevent the fusion, but it is not scalable since it won’t proceed until
 all data arrived at this point.

 Is there any other ways to allow more workers working on all the other
 transforms after reading?

 Thanks,

 Allie




Re: Problem with gzip

2019-05-10 Thread Allie Chen
Yes, I do see the data after reshuffle are processed in parallel. But
Reshuffle transform itself takes hours or even days to run, according to
one test (24 gzip files, 17 million lines in total) I did.

The file format for our users are mostly gzip format, since uncompressed
files would be too costly to store (It could be in hundreds of GB).

Thanks,

Allie


*From: *Lukasz Cwik 
*Date: *Fri, May 10, 2019 at 4:07 PM
*To: *dev, 

+u...@beam.apache.org 
>
> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
> the data has been read before the next transforms can run. After the
> reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
>
> Are you able to change the input of your pipeline to use an uncompressed
> file or many compressed files?
>
> On Fri, May 10, 2019 at 1:03 PM Allie Chen  wrote:
>
>> Hi,
>>
>>
>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>> compressed file is not splittable, one worker is allocated to read the
>> file. The same worker will do all the other transforms since Dataflow fused
>> all transforms together.  There are a large amount of data in the file, and
>> I expect to see more workers spinning up after reading transforms. I tried
>> to use Reshuffle Transform
>> 
>> to prevent the fusion, but it is not scalable since it won’t proceed until
>> all data arrived at this point.
>>
>> Is there any other ways to allow more workers working on all the other
>> transforms after reading?
>>
>> Thanks,
>>
>> Allie
>>
>>


Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
When you had X gzip files and were not using Reshuffle, did you see X
workers read and process the files?

On Fri, May 10, 2019 at 1:17 PM Allie Chen  wrote:

> Yes, I do see the data after reshuffle are processed in parallel. But
> Reshuffle transform itself takes hours or even days to run, according to
> one test (24 gzip files, 17 million lines in total) I did.
>
> The file format for our users are mostly gzip format, since uncompressed
> files would be too costly to store (It could be in hundreds of GB).
>
> Thanks,
>
> Allie
>
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 4:07 PM
> *To: *dev, 
>
> +u...@beam.apache.org 
>>
>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>> the data has been read before the next transforms can run. After the
>> reshuffle, the data should have been processed in parallel across the
>> workers. Did you see this?
>>
>> Are you able to change the input of your pipeline to use an uncompressed
>> file or many compressed files?
>>
>> On Fri, May 10, 2019 at 1:03 PM Allie Chen  wrote:
>>
>>> Hi,
>>>
>>>
>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>> compressed file is not splittable, one worker is allocated to read the
>>> file. The same worker will do all the other transforms since Dataflow fused
>>> all transforms together.  There are a large amount of data in the file, and
>>> I expect to see more workers spinning up after reading transforms. I tried
>>> to use Reshuffle Transform
>>> 
>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>> all data arrived at this point.
>>>
>>> Is there any other ways to allow more workers working on all the other
>>> transforms after reading?
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>


Re: Beam's Conda package

2019-05-10 Thread Lukasz Cwik
I'm not aware of who set up conda as well. There seem to have been ~4500
downloads of the package so that is a good amount of users.

On Fri, May 10, 2019 at 11:45 AM Ahmet Altay  wrote:

> Hi all,
>
> There a conda package for apache-beam [1]. As far as I know, we do not
> release this package. Does anyone know who owns this? It was last updated
> to use 2.9.0, at least it would be good to add a newer version there.
>
> We also don't test in that environment so I am not sure how well it works
> or who uses it.
>
> Thank you,
> Ahmet
>
> [1] https://anaconda.org/conda-forge/apache-beam
>


Re: Problem with gzip

2019-05-10 Thread Lukasz Cwik
+u...@beam.apache.org 

Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
the data has been read before the next transforms can run. After the
reshuffle, the data should have been processed in parallel across the
workers. Did you see this?

Are you able to change the input of your pipeline to use an uncompressed
file or many compressed files?

On Fri, May 10, 2019 at 1:03 PM Allie Chen  wrote:

> Hi,
>
>
> I am trying to load a gzip file to BigQuey using Dataflow. Since the
> compressed file is not splittable, one worker is allocated to read the
> file. The same worker will do all the other transforms since Dataflow fused
> all transforms together.  There are a large amount of data in the file, and
> I expect to see more workers spinning up after reading transforms. I tried
> to use Reshuffle Transform
> 
> to prevent the fusion, but it is not scalable since it won’t proceed until
> all data arrived at this point.
>
> Is there any other ways to allow more workers working on all the other
> transforms after reading?
>
> Thanks,
>
> Allie
>
>


Problem with gzip

2019-05-10 Thread Allie Chen
Hi,


I am trying to load a gzip file to BigQuey using Dataflow. Since the
compressed file is not splittable, one worker is allocated to read the
file. The same worker will do all the other transforms since Dataflow fused
all transforms together.  There are a large amount of data in the file, and
I expect to see more workers spinning up after reading transforms. I tried
to use Reshuffle Transform

to prevent the fusion, but it is not scalable since it won’t proceed until
all data arrived at this point.

Is there any other ways to allow more workers working on all the other
transforms after reading?

Thanks,

Allie


Beam's Conda package

2019-05-10 Thread Ahmet Altay
Hi all,

There a conda package for apache-beam [1]. As far as I know, we do not
release this package. Does anyone know who owns this? It was last updated
to use 2.9.0, at least it would be good to add a newer version there.

We also don't test in that environment so I am not sure how well it works
or who uses it.

Thank you,
Ahmet

[1] https://anaconda.org/conda-forge/apache-beam


Re: Unexpected behavior of StateSpecs

2019-05-10 Thread Lukasz Cwik
That seems like the correct fix as well. We could open up a PR and see what
the tests catch as a first pass for understanding the implications.

On Fri, May 10, 2019 at 9:31 AM Jan Lukavský  wrote:

> Hm, yes, the fix might be also in fixing hashCode and equals of
> SimpleStateTag, so that it doesn't hash and compare the StateSpec, but only
> the StructureId. That looks like best option to me. But I'm not sure about
> other implications this might have.
>
> Jan
> On 5/10/19 5:43 PM, Reuven Lax wrote:
>
> Ok so this sounds like a bug in the DirectRunner then?
>
> *From: *Lukasz Cwik 
> *Date: *Fri, May 10, 2019 at 8:38 AM
> *To: *dev
>
> StateSpec should not be used as a key within any maps. We should use the
>> logical name of the StateSpec relative to the DoFn as its id and should
>> only be using that id for comparisons/lookups.
>>
>> On Fri, May 10, 2019 at 1:07 AM Jan Lukavský  wrote:
>>
>>> I'm not sure. Generally it affects any runner that uses HashMap to store
>>> StateSpec.
>>>
>>> Jan
>>> On 5/9/19 6:32 PM, Reuven Lax wrote:
>>>
>>> Is this specific to the DirectRunner, or does it affect other runners?
>>>
>>> On Thu, May 9, 2019 at 8:13 AM Jan Lukavský  wrote:
>>>
 Because of the use of hashCode in StateSpecs, I'd say that it is. But
 it is not obvious. That's why I'd suggest to make it abstract on Coder, so
 that all implementations have to override it. That's a simple solution, but
 the question is - should hashCode of Coder be used that way? I think that
 StateSpec instances should be equal only to itself. Then the hashCode can
 be stored in the instance, e.g.

   private final int hashCode = System.identityHashCode(this)

 and returned in hashCode(). There would be no need for Coder to
 implement hashCode anymore (if there aren't any other cases, where it is
 needed, in which case it would still be better to add abstract hashCode and
 equals methods on Coder).

 Jan
 On 5/9/19 5:04 PM, Reuven Lax wrote:

 Is a valid hashCode on Coder part of our contract or not? If it is,
 then the lack of hashCode on SchemaCoder is simply a bug.

 On Thu, May 9, 2019 at 7:42 AM Jan Lukavský  wrote:

> Hi,
>
> I have spent several hour digging into strange issue with
> DirectRunner,
> that manifested as non-deterministic run of pipeline. The pipeline
> contains basically only single stateful ParDo, which adds elements
> into
> state and after some timeout flushes these elements into output. The
> issues was, that sometimes (very often) when the timer fired, the
> state
> appeared to be empty, although I actually added something into the
> state. I will skip details, but the problem boils down to the fact,
> that
> StateSpecs hash Coder into hashCode - e.g.
>
>  @Override
>  public int hashCode() {
>return Objects.hash(getClass(), coder);
>  }
>
> in ValueStateSpec. Now, when Coder doesn't have hashCode and equals
> implemented (and there are some of those in the codebase itself - e.g.
> SchemaCoder), it all blows up in a very hard-to-debug manner. So the
> proposal is - either to add abstract hashCode and equals to Coder, or
> don't hash the Coder into hashCode of StateSpecs (we can generate
> unique
> ID for each StateSpec instance for example).
>
> Any thoughts about which path to follow? Or maybe both? :)
>
> Jan
>
>
>


Re: Unexpected behavior of StateSpecs

2019-05-10 Thread Jan Lukavský
Hm, yes, the fix might be also in fixing hashCode and equals of 
SimpleStateTag, so that it doesn't hash and compare the StateSpec, but 
only the StructureId. That looks like best option to me. But I'm not 
sure about other implications this might have.


Jan

On 5/10/19 5:43 PM, Reuven Lax wrote:

Ok so this sounds like a bug in the DirectRunner then?

*From: *Lukasz Cwik mailto:lc...@google.com>>
*Date: *Fri, May 10, 2019 at 8:38 AM
*To: *dev

StateSpec should not be used as a key within any maps. We should
use the logical name of the StateSpec relative to the DoFn as its
id and should only be using that id for comparisons/lookups.

On Fri, May 10, 2019 at 1:07 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

I'm not sure. Generally it affects any runner that uses
HashMap to store StateSpec.

Jan

On 5/9/19 6:32 PM, Reuven Lax wrote:

Is this specific to the DirectRunner, or does it affect other
runners?

On Thu, May 9, 2019 at 8:13 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Because of the use of hashCode in StateSpecs, I'd say
that it is. But it is not obvious. That's why I'd suggest
to make it abstract on Coder, so that all implementations
have to override it. That's a simple solution, but the
question is - should hashCode of Coder be used that way?
I think that StateSpec instances should be equal only to
itself. Then the hashCode can be stored in the instance, e.g.

  private final int hashCode = System.identityHashCode(this)

and returned in hashCode(). There would be no need for
Coder to implement hashCode anymore (if there aren't any
other cases, where it is needed, in which case it would
still be better to add abstract hashCode and equals
methods on Coder).

Jan

On 5/9/19 5:04 PM, Reuven Lax wrote:

Is a valid hashCode on Coder part of our contract or
not? If it is, then the lack of hashCode on SchemaCoder
is simply a bug.

On Thu, May 9, 2019 at 7:42 AM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

Hi,

I have spent several hour digging into strange issue
with DirectRunner,
that manifested as non-deterministic run of
pipeline. The pipeline
contains basically only single stateful ParDo, which
adds elements into
state and after some timeout flushes these elements
into output. The
issues was, that sometimes (very often) when the
timer fired, the state
appeared to be empty, although I actually added
something into the
state. I will skip details, but the problem boils
down to the fact, that
StateSpecs hash Coder into hashCode - e.g.

 @Override
 public int hashCode() {
   return Objects.hash(getClass(), coder);
 }

in ValueStateSpec. Now, when Coder doesn't have
hashCode and equals
implemented (and there are some of those in the
codebase itself - e.g.
SchemaCoder), it all blows up in a very
hard-to-debug manner. So the
proposal is - either to add abstract hashCode and
equals to Coder, or
don't hash the Coder into hashCode of StateSpecs (we
can generate unique
ID for each StateSpec instance for example).

Any thoughts about which path to follow? Or maybe
both? :)

Jan




Re: [DISCUSS] Portability representation of schemas

2019-05-10 Thread Brian Hulette
Ah thanks! I added some language there.

*From: *Kenneth Knowles 
*Date: *Thu, May 9, 2019 at 5:31 PM
*To: *dev


> *From: *Brian Hulette 
> *Date: *Thu, May 9, 2019 at 2:02 PM
> *To: * 
>
> We briefly discussed using arrow schemas in place of beam schemas entirely
>> in an arrow thread [1]. The biggest reason not to this was that we wanted
>> to have a type for large iterables in beam schemas. But given that large
>> iterables aren't currently implemented, beam schemas look very similar to
>> arrow schemas.
>>
>
>
>> I think it makes sense to take inspiration from arrow schemas where
>> possible, and maybe even copy them outright. Arrow already has a portable
>> (flatbuffers) schema representation [2], and implementations for it in many
>> languages that we may be able to re-use as we bring schemas to more SDKs
>> (the project has Python and Go implementations). There are a couple of
>> concepts in Arrow schemas that are specific for the format and wouldn't
>> make sense for us, (fields can indicate whether or not they are dictionary
>> encoded, and the schema has an endianness field), but if you drop those
>> concepts the arrow spec looks pretty similar to the beam proto spec.
>>
>
> FWIW I left a blank section in the doc for filling out what the
> differences are and why, and conversely what the interop opportunities may
> be. Such sections are some of my favorite sections of design docs.
>
> Kenn
>
>
> Brian
>>
>> [1]
>> https://lists.apache.org/thread.html/6be7715e13b71c2d161e4378c5ca1c76ac40cfc5988a03ba87f1c434@%3Cdev.beam.apache.org%3E
>> [2] https://github.com/apache/arrow/blob/master/format/Schema.fbs#L194
>>
>> *From: *Robert Bradshaw 
>> *Date: *Thu, May 9, 2019 at 1:38 PM
>> *To: *dev
>>
>> From: Reuven Lax 
>>> Date: Thu, May 9, 2019 at 7:29 PM
>>> To: dev
>>>
>>> > Also in the future we might be able to do optimizations at the runner
>>> level if at the portability layer we understood schemes instead of just raw
>>> coders. This could be things like only parsing a subset of a row (if we
>>> know only a few fields are accessed) or using a columnar data structure
>>> like Arrow to encode batches of rows across portability. This doesn't
>>> affect data semantics of course, but having a richer, more-expressive type
>>> system opens up other opportunities.
>>>
>>> But we could do all of that with a RowCoder we understood to designate
>>> the type(s), right?
>>>
>>> > On Thu, May 9, 2019 at 10:16 AM Robert Bradshaw 
>>> wrote:
>>> >>
>>> >> On the flip side, Schemas are equivalent to the space of Coders with
>>> >> the addition of a RowCoder and the ability to materialize to something
>>> >> other than bytes, right? (Perhaps I'm missing something big here...)
>>> >> This may make a backwards-compatible transition easier. (SDK-side, the
>>> >> ability to reason about and operate on such types is of course much
>>> >> richer than anything Coders offer right now.)
>>> >>
>>> >> From: Reuven Lax 
>>> >> Date: Thu, May 9, 2019 at 4:52 PM
>>> >> To: dev
>>> >>
>>> >> > FYI I can imagine a world in which we have no coders. We could
>>> define the entire model on top of schemas. Today's "Coder" is completely
>>> equivalent to a single-field schema with a logical-type field (actually the
>>> latter is slightly more expressive as you aren't forced to serialize into
>>> bytes).
>>> >> >
>>> >> > Due to compatibility constraints and the effort that would be
>>> involved in such a change, I think the practical decision should be for
>>> schemas and coders to coexist for the time being. However when we start
>>> planning Beam 3.0, deprecating coders is something I would like to suggest.
>>> >> >
>>> >> > On Thu, May 9, 2019 at 7:48 AM Robert Bradshaw 
>>> wrote:
>>> >> >>
>>> >> >> From: Kenneth Knowles 
>>> >> >> Date: Thu, May 9, 2019 at 10:05 AM
>>> >> >> To: dev
>>> >> >>
>>> >> >> > This is a huge development. Top posting because I can be more
>>> compact.
>>> >> >> >
>>> >> >> > I really think after the initial idea converges this needs a
>>> design doc with goals and alternatives. It is an extraordinarily
>>> consequential model change. So in the spirit of doing the work / bias
>>> towards action, I created a quick draft at
>>> https://s.apache.org/beam-schemas and added everyone on this thread as
>>> editors. I am still in the process of writing this to match the thread.
>>> >> >>
>>> >> >> Thanks! Added some comments there.
>>> >> >>
>>> >> >> > *Multiple timestamp resolutions*: you can use logcial types to
>>> represent nanos the same way Java and proto do.
>>> >> >>
>>> >> >> As per the other discussion, I'm unsure the value in supporting
>>> >> >> multiple timestamp resolutions is high enough to outweigh the cost.
>>> >> >>
>>> >> >> > *Why multiple int types?* The domain of values for these types
>>> are different. For a language with one "int" or "number" type, that's
>>> another domain of values.
>>> >> >>
>>> >> >> What is the value in having different domains? If your data has a
>>> >> >> 

Re: Coder Evolution

2019-05-10 Thread Lukasz Cwik
Yes, having evolution actually work is quite difficult. For example, take
the case of a map based side input where you try to lookup some value by a
key. The runner will have stored a bunch of this data using the old format,
would you ask that lookups are done using the old format or the new format
or would you recode all the data from the old format to the new format
(assuming our SDKs/protocols could do all of these things)?

1) Lookups using either the old format or new format have an issue where
you'll lookup the wrong thing because the old format may encode completely
differently then the new format.
2) Recoding all the data from the old format to the new format may reduce
the number of keys. For example lets say we have a schema that has two
fields A and B that is used as the key and that schema is now reduced to
just the first field A. How do you combine all the values that map onto the
same encoded key (this is where the discussion of only mergeable state vs
special SDK methods such as OnMerge is very relevant[1])?

I haven't seen any solution where you perform encoding lineage tracking
that works across multiple "evolutions" and recoding the data requires us
to have a solution for dealing with logically merging user data stored by
the runner.

1:
https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E

On Fri, May 10, 2019 at 6:30 AM Maximilian Michels  wrote:

> Thanks for the references Luke! I thought that there may have been prior
> discussions, so this thread could be a good place to consolidate.
>
> > Dataflow also has an update feature, but it's limited by the fact that
> Beam does not have a good concept of Coder evolution. As a result we try
> very hard never to change import Coders,
>
> Trying not to break Coders is a fair approach and could work fine for
> Beam itself, if the Coders were designed really carefully. But what
> about custom Coders users may have written? AvroCoder or ProtoCoder
> would be good candidates for forwards-compatibility, but even these do
> not have migration functionality built in.
>
> Is schema evolution already part of SchemaCoder? It's definitely a good
> candidate for evolution because a schema provides the insight-view for a
> Coder, but as for how to actually perform the evolution, it looks like
> this is still an open question.
>
> -Max
>
> On 09.05.19 18:56, Reuven Lax wrote:
> > Dataflow also has an update feature, but it's limited by the fact that
> > Beam does not have a good concept of Coder evolution. As a result we try
> > very hard never to change import Coders, which sometime makes
> > development of parts of Beam much more difficult. I think Beam would
> > benefit greatly by having a first-class concept of Coder evolution.
> >
> > BTW for schemas there is a natural way of defining evolution that can be
> > handled by SchemaCoder.
> >
> > On Wed, May 8, 2019 at 12:58 PM Lukasz Cwik  > > wrote:
> >
> > There was a thread about coder update in the past here[1]. Also,
> > Reuven sent out a doc[2] about pipeline drain and update which was
> > discussed in this thread[3]. I believe there have been more
> > references to pipeline update in other threads when people tried to
> > change coder encodings in the past as well.
> >
> > Reuven/Dan are the best contacts about this on how this works inside
> > of Google, the limitations and other ideas that had been proposed.
> >
> > 1:
> >
> https://lists.apache.org/thread.html/f3b2daa740075cc39dc04f08d1eaacfcc2d550cecc04e27be024cf52@%3Cdev.beam.apache.org%3E
> > 2:
> >
> https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#
> > 3:
> >
> https://lists.apache.org/thread.html/37c9aa4aa5011801a7f060bf3f53e687e539fa6154cc9f1c544d4f7a@%3Cdev.beam.apache.org%3E
> >
> > On Wed, May 8, 2019 at 11:45 AM Maximilian Michels  > > wrote:
> >
> > Hi,
> >
> > I'm looking into updating the Flink Runner to Flink version 1.8.
> > Since
> > version 1.7 Flink has a new optional interface for Coder
> evolution*.
> >
> > When a Flink pipeline is checkpointed, CoderSnapshots are
> > written out
> > alongside with the checkpointed data. When the pipeline is
> > restored from
> > that checkpoint, the CoderSnapshots are restored and used to
> > reinstantiate the Coders.
> >
> > Furthermore, there is a compatibility and migration check
> > between the
> > old and the new Coder. This allows to determine whether
> >
> >- The serializer did not change or is compatible (ok)
> >- The serialization format of the coder changed (ok after
> > migration)
> >- The coder needs to be reconfigured and we know how to that
> > based on
> >  the old version (ok after reconfiguration)
> >

Help reviewing DynamicMessage protobuf support PR

2019-05-10 Thread Alex Van Boxel
Hi, can someone help review my PR. As I needed to make some design
decisions it would be great to have some feedback.

https://github.com/apache/beam/pull/8496

I'm currently working on the new schema support for protobuf, as this also
need to support DynamicMessages feedback would be helpful. Thanks.

 _/
_/ Alex Van Boxel


Re: Unexpected behavior of StateSpecs

2019-05-10 Thread Reuven Lax
Ok so this sounds like a bug in the DirectRunner then?

*From: *Lukasz Cwik 
*Date: *Fri, May 10, 2019 at 8:38 AM
*To: *dev

StateSpec should not be used as a key within any maps. We should use the
> logical name of the StateSpec relative to the DoFn as its id and should
> only be using that id for comparisons/lookups.
>
> On Fri, May 10, 2019 at 1:07 AM Jan Lukavský  wrote:
>
>> I'm not sure. Generally it affects any runner that uses HashMap to store
>> StateSpec.
>>
>> Jan
>> On 5/9/19 6:32 PM, Reuven Lax wrote:
>>
>> Is this specific to the DirectRunner, or does it affect other runners?
>>
>> On Thu, May 9, 2019 at 8:13 AM Jan Lukavský  wrote:
>>
>>> Because of the use of hashCode in StateSpecs, I'd say that it is. But it
>>> is not obvious. That's why I'd suggest to make it abstract on Coder, so
>>> that all implementations have to override it. That's a simple solution, but
>>> the question is - should hashCode of Coder be used that way? I think that
>>> StateSpec instances should be equal only to itself. Then the hashCode can
>>> be stored in the instance, e.g.
>>>
>>>   private final int hashCode = System.identityHashCode(this)
>>>
>>> and returned in hashCode(). There would be no need for Coder to
>>> implement hashCode anymore (if there aren't any other cases, where it is
>>> needed, in which case it would still be better to add abstract hashCode and
>>> equals methods on Coder).
>>>
>>> Jan
>>> On 5/9/19 5:04 PM, Reuven Lax wrote:
>>>
>>> Is a valid hashCode on Coder part of our contract or not? If it is, then
>>> the lack of hashCode on SchemaCoder is simply a bug.
>>>
>>> On Thu, May 9, 2019 at 7:42 AM Jan Lukavský  wrote:
>>>
 Hi,

 I have spent several hour digging into strange issue with DirectRunner,
 that manifested as non-deterministic run of pipeline. The pipeline
 contains basically only single stateful ParDo, which adds elements into
 state and after some timeout flushes these elements into output. The
 issues was, that sometimes (very often) when the timer fired, the state
 appeared to be empty, although I actually added something into the
 state. I will skip details, but the problem boils down to the fact,
 that
 StateSpecs hash Coder into hashCode - e.g.

  @Override
  public int hashCode() {
return Objects.hash(getClass(), coder);
  }

 in ValueStateSpec. Now, when Coder doesn't have hashCode and equals
 implemented (and there are some of those in the codebase itself - e.g.
 SchemaCoder), it all blows up in a very hard-to-debug manner. So the
 proposal is - either to add abstract hashCode and equals to Coder, or
 don't hash the Coder into hashCode of StateSpecs (we can generate
 unique
 ID for each StateSpec instance for example).

 Any thoughts about which path to follow? Or maybe both? :)

 Jan





Re: Unexpected behavior of StateSpecs

2019-05-10 Thread Lukasz Cwik
StateSpec should not be used as a key within any maps. We should use the
logical name of the StateSpec relative to the DoFn as its id and should
only be using that id for comparisons/lookups.

On Fri, May 10, 2019 at 1:07 AM Jan Lukavský  wrote:

> I'm not sure. Generally it affects any runner that uses HashMap to store
> StateSpec.
>
> Jan
> On 5/9/19 6:32 PM, Reuven Lax wrote:
>
> Is this specific to the DirectRunner, or does it affect other runners?
>
> On Thu, May 9, 2019 at 8:13 AM Jan Lukavský  wrote:
>
>> Because of the use of hashCode in StateSpecs, I'd say that it is. But it
>> is not obvious. That's why I'd suggest to make it abstract on Coder, so
>> that all implementations have to override it. That's a simple solution, but
>> the question is - should hashCode of Coder be used that way? I think that
>> StateSpec instances should be equal only to itself. Then the hashCode can
>> be stored in the instance, e.g.
>>
>>   private final int hashCode = System.identityHashCode(this)
>>
>> and returned in hashCode(). There would be no need for Coder to implement
>> hashCode anymore (if there aren't any other cases, where it is needed, in
>> which case it would still be better to add abstract hashCode and equals
>> methods on Coder).
>>
>> Jan
>> On 5/9/19 5:04 PM, Reuven Lax wrote:
>>
>> Is a valid hashCode on Coder part of our contract or not? If it is, then
>> the lack of hashCode on SchemaCoder is simply a bug.
>>
>> On Thu, May 9, 2019 at 7:42 AM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> I have spent several hour digging into strange issue with DirectRunner,
>>> that manifested as non-deterministic run of pipeline. The pipeline
>>> contains basically only single stateful ParDo, which adds elements into
>>> state and after some timeout flushes these elements into output. The
>>> issues was, that sometimes (very often) when the timer fired, the state
>>> appeared to be empty, although I actually added something into the
>>> state. I will skip details, but the problem boils down to the fact, that
>>> StateSpecs hash Coder into hashCode - e.g.
>>>
>>>  @Override
>>>  public int hashCode() {
>>>return Objects.hash(getClass(), coder);
>>>  }
>>>
>>> in ValueStateSpec. Now, when Coder doesn't have hashCode and equals
>>> implemented (and there are some of those in the codebase itself - e.g.
>>> SchemaCoder), it all blows up in a very hard-to-debug manner. So the
>>> proposal is - either to add abstract hashCode and equals to Coder, or
>>> don't hash the Coder into hashCode of StateSpecs (we can generate unique
>>> ID for each StateSpec instance for example).
>>>
>>> Any thoughts about which path to follow? Or maybe both? :)
>>>
>>> Jan
>>>
>>>
>>>


Re: Coder Evolution

2019-05-10 Thread Maximilian Michels
Thanks for the references Luke! I thought that there may have been prior 
discussions, so this thread could be a good place to consolidate.



Dataflow also has an update feature, but it's limited by the fact that Beam 
does not have a good concept of Coder evolution. As a result we try very hard 
never to change import Coders,


Trying not to break Coders is a fair approach and could work fine for 
Beam itself, if the Coders were designed really carefully. But what 
about custom Coders users may have written? AvroCoder or ProtoCoder 
would be good candidates for forwards-compatibility, but even these do 
not have migration functionality built in.


Is schema evolution already part of SchemaCoder? It's definitely a good 
candidate for evolution because a schema provides the insight-view for a 
Coder, but as for how to actually perform the evolution, it looks like 
this is still an open question.


-Max

On 09.05.19 18:56, Reuven Lax wrote:
Dataflow also has an update feature, but it's limited by the fact that 
Beam does not have a good concept of Coder evolution. As a result we try 
very hard never to change import Coders, which sometime makes 
development of parts of Beam much more difficult. I think Beam would 
benefit greatly by having a first-class concept of Coder evolution.


BTW for schemas there is a natural way of defining evolution that can be 
handled by SchemaCoder.


On Wed, May 8, 2019 at 12:58 PM Lukasz Cwik > wrote:


There was a thread about coder update in the past here[1]. Also,
Reuven sent out a doc[2] about pipeline drain and update which was
discussed in this thread[3]. I believe there have been more
references to pipeline update in other threads when people tried to
change coder encodings in the past as well.

Reuven/Dan are the best contacts about this on how this works inside
of Google, the limitations and other ideas that had been proposed.

1:

https://lists.apache.org/thread.html/f3b2daa740075cc39dc04f08d1eaacfcc2d550cecc04e27be024cf52@%3Cdev.beam.apache.org%3E
2:

https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#
3:

https://lists.apache.org/thread.html/37c9aa4aa5011801a7f060bf3f53e687e539fa6154cc9f1c544d4f7a@%3Cdev.beam.apache.org%3E

On Wed, May 8, 2019 at 11:45 AM Maximilian Michels mailto:m...@apache.org>> wrote:

Hi,

I'm looking into updating the Flink Runner to Flink version 1.8.
Since
version 1.7 Flink has a new optional interface for Coder evolution*.

When a Flink pipeline is checkpointed, CoderSnapshots are
written out
alongside with the checkpointed data. When the pipeline is
restored from
that checkpoint, the CoderSnapshots are restored and used to
reinstantiate the Coders.

Furthermore, there is a compatibility and migration check
between the
old and the new Coder. This allows to determine whether

   - The serializer did not change or is compatible (ok)
   - The serialization format of the coder changed (ok after
migration)
   - The coder needs to be reconfigured and we know how to that
based on
     the old version (ok after reconfiguration)
   - The coder is incompatible (error)

I was wondering about the Coder evolution story in Beam. The
current
state is that checkpointed Beam pipelines are only guaranteed to
run
with the same Beam version and pipeline version. A newer version of
either might break the checkpoint format without any way to
migrate the
state.

Should we start thinking about supporting Coder evolution in Beam?

Thanks,
Max


* Coders are called TypeSerializers in Flink land. The interface is
TypeSerializerSnapshot.



Re: request for beam minor release

2019-05-10 Thread Maximilian Michels

Assuming 2.13 will include or otherwise be supported by flink-runner-1.7 then 
this should not be an issue.


Yes, we will keep supporting Flink 1.7 for Beam 2.13.

-Max

On 08.05.19 19:54, Kenneth Knowles wrote:
For the benefit of the thread, I will also call out our incubating LTS 
(Long-Term Support) policy. For critical fixes, we will issue patch 
releases on the 2.7 branch. We are currently gathering proposals for 
cherry-picks to 2.7.1 to do that release. Other than that, releases are 
a lot of work so we focus on a steady rate of minor version releases 
instead of patching non-LTS versions.


You can read at https://beam.apache.org/community/policies/.

Kenn

On Wed, May 8, 2019 at 9:22 AM Moorhead,Richard 
mailto:richard.moorhe...@cerner.com>> wrote:


Assuming 2.13 will include or otherwise be supported by
flink-runner-1.7 then this should not be an issue.


*From:* Jean-Baptiste Onofré mailto:j...@nanthrax.net>>
*Sent:* Wednesday, May 8, 2019 10:09 AM
*To:* dev@beam.apache.org 
*Subject:* Re: request for beam minor release
I second Max here. If you are just looking for this specific commit, you
can take a next release that will include it.

Regards
JB

On 08/05/2019 16:27, Maximilian Michels wrote:
> Hi Richard,
> 
> Would it be an option to use the upcoming 2.13.0 release? The commit

> will be part of that release.
> 
> Thanks,

> Max
> 
> On 08.05.19 15:43, Jean-Baptiste Onofré wrote:

>> Hi,
>>
>> Any release are tagging. We create a branch based on a master commit.
>>
>> Are you requesting 2.10.1 maintenance release ?
>>
>> Regards
>> JB
>>
>> On 08/05/2019 15:10, Moorhead,Richard wrote:
>>> Is there a process for tagging a commit in master for a minor release?
>>>
>>> I am trying to get this
>>> 

 commit
>>>
>>> released into 2.10.1
>>>  
>>> CONFIDENTIALITY NOTICE This message and any included attachments are

>>> from Cerner Corporation and are intended only for the addressee. The
>>> information contained in this message is confidential and may constitute
>>> inside or non-public information under international, federal, or state
>>> securities laws. Unauthorized forwarding, printing, copying,
>>> distribution, or use of such information is strictly prohibited and may
>>> be unlawful. If you are not the addressee, please promptly delete this
>>> message and notify the sender of the delivery error by e-mail or you may
>>> call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>> (816)221-1024.
>>>
>>

-- 
Jean-Baptiste Onofré

jbono...@apache.org 
http://blog.nanthrax.net
Talend - http://www.talend.com

CONFIDENTIALITY NOTICE This message and any included attachments are
from Cerner Corporation and are intended only for the addressee. The
information contained in this message is confidential and may
constitute inside or non-public information under international,
federal, or state securities laws. Unauthorized forwarding,
printing, copying, distribution, or use of such information is
strictly prohibited and may be unlawful. If you are not the
addressee, please promptly delete this message and notify the sender
of the delivery error by e-mail or you may call Cerner's corporate
offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.



Re: Streaming pipelines in all SDKs!

2019-05-10 Thread Maximilian Michels

So, FlinkRunner has some sort of special support for executing UnboundedSource 
via the runner in the portable world ? I see a transform override for bounded 
sources in PortableRunner [1] but nothing for unbounded sources.


It's in the translation code: 
https://github.com/apache/beam/blob/6679b00138a5b82a6a55e7bc94c453957cea501c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L216


For migration I think that's a valid path, especially because Runners 
already have the translation code in place. We can later swap-out the 
UnboundedSource translation with the SDF wrapper.


-Max

On 09.05.19 22:46, Robert Bradshaw wrote:

From: Chamikara Jayalath 
Date: Thu, May 9, 2019 at 7:49 PM
To: dev


From: Maximilian Michels 
Date: Thu, May 9, 2019 at 9:21 AM
To: 


Thanks for sharing your ideas for load testing!


According to other contributors knowledge/experience: I noticed that streaming with 
KafkaIO is currently supported by wrapping the ExternalTransform in Python SDK. Do you 
think that streaming pipelines will "just work" with the current state of 
portability if I do the same for UnboundedSyntheticSource or is there something else 
missing?


Basically yes, but it requires a bit more effort than just wrapping
about ExternalTransform. You need to provide an ExternalTransformBuilder
for the transform to be configured externally.

In portability UnboundedSources can only be supported via SDF. To still
be able to use legacy IO which uses UnboundedSource the Runner has to
supply this capability (which the Flink Runner supports). This will
likely go away if we have an UnboundedSource SDF Wrapper :)



So, FlinkRunner has some sort of special support for executing UnboundedSource 
via the runner in the portable world ? I see a transform override for bounded 
sources in PortableRunner [1] but nothing for unbounded sources.

Agree, that we cannot properly support cross-language unbounded sources till we 
have SDF and a proper unbounded source to SDF wrapper.


That is correct. Go will need SDF support as well.

As waiting on implementing the expansion service, except for the
vending of extra artifacts (which will be an extension), we discussed
this earlier and it's considered stable and ready to build on now.



Re: Unexpected behavior of StateSpecs

2019-05-10 Thread Jan Lukavský
I'm not sure. Generally it affects any runner that uses HashMap to store 
StateSpec.


Jan

On 5/9/19 6:32 PM, Reuven Lax wrote:

Is this specific to the DirectRunner, or does it affect other runners?

On Thu, May 9, 2019 at 8:13 AM Jan Lukavský > wrote:


Because of the use of hashCode in StateSpecs, I'd say that it is.
But it is not obvious. That's why I'd suggest to make it abstract
on Coder, so that all implementations have to override it. That's
a simple solution, but the question is - should hashCode of Coder
be used that way? I think that StateSpec instances should be equal
only to itself. Then the hashCode can be stored in the instance, e.g.

  private final int hashCode = System.identityHashCode(this)

and returned in hashCode(). There would be no need for Coder to
implement hashCode anymore (if there aren't any other cases, where
it is needed, in which case it would still be better to add
abstract hashCode and equals methods on Coder).

Jan

On 5/9/19 5:04 PM, Reuven Lax wrote:

Is a valid hashCode on Coder part of our contract or not? If it
is, then the lack of hashCode on SchemaCoder is simply a bug.

On Thu, May 9, 2019 at 7:42 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi,

I have spent several hour digging into strange issue with
DirectRunner,
that manifested as non-deterministic run of pipeline. The
pipeline
contains basically only single stateful ParDo, which adds
elements into
state and after some timeout flushes these elements into
output. The
issues was, that sometimes (very often) when the timer fired,
the state
appeared to be empty, although I actually added something
into the
state. I will skip details, but the problem boils down to the
fact, that
StateSpecs hash Coder into hashCode - e.g.

 @Override
 public int hashCode() {
   return Objects.hash(getClass(), coder);
 }

in ValueStateSpec. Now, when Coder doesn't have hashCode and
equals
implemented (and there are some of those in the codebase
itself - e.g.
SchemaCoder), it all blows up in a very hard-to-debug manner.
So the
proposal is - either to add abstract hashCode and equals to
Coder, or
don't hash the Coder into hashCode of StateSpecs (we can
generate unique
ID for each StateSpec instance for example).

Any thoughts about which path to follow? Or maybe both? :)

Jan




Re: Unexpected behavior of StateSpecs

2019-05-10 Thread Jan Lukavský

Hi Anton,

yes, if the keyCoder doesn't have proper hashCode and equals, then it 
would manifest exactly as described.


Jan

On 5/9/19 6:28 PM, Anton Kedin wrote:
Does it look similar to 
https://issues.apache.org/jira/browse/BEAM-6813 ? I also stumbled on a 
problem with a state in DirectRunner but wasn't able to figure it out 
yet: 
https://lists.apache.org/thread.html/dae8b605a218532c085a0eea4e71338eae51922c26820f37b24875c0@%3Cdev.beam.apache.org%3E


Regards,
Anton

*From: *Jan Lukavský mailto:je...@seznam.cz>>
*Date: *Thu, May 9, 2019 at 8:13 AM
*To: * mailto:dev@beam.apache.org>>

Because of the use of hashCode in StateSpecs, I'd say that it is.
But it is not obvious. That's why I'd suggest to make it abstract
on Coder, so that all implementations have to override it. That's
a simple solution, but the question is - should hashCode of Coder
be used that way? I think that StateSpec instances should be equal
only to itself. Then the hashCode can be stored in the instance, e.g.

  private final int hashCode = System.identityHashCode(this)

and returned in hashCode(). There would be no need for Coder to
implement hashCode anymore (if there aren't any other cases, where
it is needed, in which case it would still be better to add
abstract hashCode and equals methods on Coder).

Jan

On 5/9/19 5:04 PM, Reuven Lax wrote:

Is a valid hashCode on Coder part of our contract or not? If it
is, then the lack of hashCode on SchemaCoder is simply a bug.

On Thu, May 9, 2019 at 7:42 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi,

I have spent several hour digging into strange issue with
DirectRunner,
that manifested as non-deterministic run of pipeline. The
pipeline
contains basically only single stateful ParDo, which adds
elements into
state and after some timeout flushes these elements into
output. The
issues was, that sometimes (very often) when the timer fired,
the state
appeared to be empty, although I actually added something
into the
state. I will skip details, but the problem boils down to the
fact, that
StateSpecs hash Coder into hashCode - e.g.

 @Override
 public int hashCode() {
   return Objects.hash(getClass(), coder);
 }

in ValueStateSpec. Now, when Coder doesn't have hashCode and
equals
implemented (and there are some of those in the codebase
itself - e.g.
SchemaCoder), it all blows up in a very hard-to-debug manner.
So the
proposal is - either to add abstract hashCode and equals to
Coder, or
don't hash the Coder into hashCode of StateSpecs (we can
generate unique
ID for each StateSpec instance for example).

Any thoughts about which path to follow? Or maybe both? :)

Jan