Re: Connection leaks with PostgreSQL instance

2019-01-17 Thread Kenneth Knowles
My mistake - using @Teardown in this way is a good approach. It may not be
executed sometimes, but like Reuven says it means the process died.

Kenn

On Thu, Jan 17, 2019 at 9:31 AM Jean-Baptiste Onofré 
wrote:

> Hi,
>
> I don't think we have connection leak in normal behavior.
>
> The actual SQL statement is executed in @FinishBundle, where the
> connection is closed.
>
> The processElement adds record to process.
>
> Does it mean that an Exception occurs in the batch addition ?
>
> Regards
> JB
>
> On 17/01/2019 12:41, Alexey Romanenko wrote:
> > Kenn,
> >
> > I’m not sure that we have a connection leak in JdbcIO since new
> > connection is being obtained from an instance of /javax.sql.DataSource/
> > (created in @Setup) and which
> > is /org.apache.commons.dbcp2.BasicDataSource/ by default.
> > /BasicDataSource/ uses connection pool and closes all idle connections
> > in "close()”.
> >
> > In its turn, JdbcIO calls/DataSource.close()/ in @Teardown, so all idle
> > connections should be closed and released there in case of fails.
> > Though, potentially some connections, that has been delegated to client
> > before and were not not properly returned to pool, could be leaked…
> > Anyway, I think it could be a good idea to call "/connection.close()/”
> > (return to connection pool) explicitly in case of any exception happened
> > during bundle processing.
> >
> > Probably JB may provide more details as original author of JdbcIO.
> >
> >> On 14 Jan 2019, at 21:37, Kenneth Knowles  >> > wrote:
> >>
> >> Hi Jonathan,
> >>
> >> JdbcIO.write() just invokes this
> >> DoFn:
> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L765
> >>
> >> It establishes a connection in @StartBundle and then in @FinishBundle
> >> it commits a batch and closes the connection. If an error happens
> >> in @StartBundle or @ProcessElement there will be a retry with a fresh
> >> instance of the DoFn, which will establish a new connection. It looks
> >> like neither @StartBundle nor @ProcessElement closes the connection,
> >> so I'm guessing that the old connection sticks around because the
> >> worker process was not terminated. So the Beam runner and Dataflow
> >> service are working as intended and this is an issue with JdbcIO,
> >> unless I've made a mistake in my reading or analysis.
> >>
> >> Would you mind reporting these details
> >> to https://issues.apache.org/jira/projects/BEAM/ ?
> >>
> >> Kenn
> >>
> >> On Mon, Jan 14, 2019 at 12:51 AM Jonathan Perron
> >> mailto:jonathan.per...@lumapps.com>>
> wrote:
> >>
> >> Hello !
> >>
> >> My question is maybe mainly GCP-oriented, so I apologize if it is
> >> not fully related to the Beam community.
> >>
> >> We have a streaming pipeline running on Dataflow which writes data
> >> to a PostgreSQL instance hosted on Cloud SQL. This database is
> >> suffering from connection leak spikes on a regular basis:
> >>
> >> 
> >>
> >> The connections are kept alive until the pipeline is
> canceled/drained:
> >>
> >> 
> >>
> >> We are writing to the database with:
> >>
> >> - individual DoFn where we open/close the connection using the
> >> standard JDBC try/catch (SQLException ex)/finally statements;
> >>
> >> - a Pipeline.apply(JdbcIO.write()) operations.
> >>
> >> I observed that these spikes happens most of the time after I/O
> >> errors with the database. Has anyone observed the same situation ?
> >>
> >> I have several questions/observations, please correct me if I am
> >> wrong (I am not from the java environment, so some can seem pretty
> >> trivial) :
> >>
> >> - Does the apply method handles SQLException or I/O errors ?
> >>
> >> - Would the use of a connection pool prevents such behaviours ? If
> >> so, how would one implement it to allow all workers to use it ?
> >> Could it be implemented with JDBC Connection pooling ?
> >>
> >> I am worrying about the serialization if one would pass a
> >> Connection item as an argument of a DoFn.
> >>
> >> Thank you in advance for your comments and reactions.
> >>
> >> Best regards,
> >>
> >> Jonathan
> >>
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Kenneth Knowles
Actually, Reuven, that's no longer the case.

It used to be that incoming data was compared to the watermark but it is
not today. Instead, Jeff's first phrasing is perfect.

One way to see it is the think about what are the consequences of late
data: if there is a grouping/aggregation by key+window, the window
determines when the grouping is complete. We go ahead and include any data
that shows up before the window is complete. And if you set up allowed
lateness it matches exactly: any data that arrives before the ON_TIME
output gets to be in that output.

Previously, when we compared incoming elements to the watermark directly,
you could have a window that was still being aggregated but the elements
that fell in the window were dropped. There was no technical benefit to
losing this data, so we stopped dropping it. We also had lots of tricky
bugs and hard-to-manage code related to what we do if an element arrives
after the watermark. And you could have an ON_TIME firing that included a
bunch of "late" data which is confusing.

Now it is simple: if the window is still alive, the element goes into it.

I very rarely use the term "late data" when describing Beam's semantics
anyhow. I always found the term / definition a bit arbitrary.

Kenn

On Thu, Jan 17, 2019 at 8:13 PM Rui Wang  wrote:

> I created this PR: https://github.com/apache/beam/pull/7556
>
> Feel free to review/comment it.
>
> -Rui
>
> On Thu, Jan 17, 2019 at 2:37 PM Rui Wang  wrote:
>
>> It might be better to keep something like "watermark usually consistently
>> moves forward". But "Elements that arrive with a smaller timestamp than the
>> current watermark are considered late data." has already given the order of
>> late data ts and watermark.
>>
>>
>> -Rui
>>
>> On Thu, Jan 17, 2019 at 1:39 PM Jeff Klukas  wrote:
>>
>>> Reuven - I don't think I realized it was possible to have late data with
>>> the global window, so I'm definitely learning things through this
>>> discussion.
>>>
>>> New suggested wording, then:
>>>
>>> Elements that arrive with a smaller timestamp than the current
>>> watermark are considered late data.
>>>
>>> That says basically the same thing as the wording currently in the
>>> guide, but uses "smaller" (which implies a less-than-watermark comparison)
>>> rather than "later" (which folks have interpreted as a
>>> greater-than-watermark comparison).
>>>
>>> On Thu, Jan 17, 2019 at 3:40 PM Reuven Lax  wrote:
>>>
 Though it's not tied to window. You could be in the global window, so
 the watermark never advances past the end of the window, yet still get late
 data.

 On Thu, Jan 17, 2019, 11:14 AM Jeff Klukas >>>
> How about: "Once the watermark progresses past the end of a window,
> any further elements that arrive with a timestamp in that window are
> considered late data."
>
> On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:
>
>> Hi Community,
>>
>> In Beam programming guide [1], there is a sentence: "Data that
>> arrives with a timestamp after the watermark is considered *late
>> data*"
>>
>> Seems like people get confused by it. For example, see Stackoverflow
>> comment [2]. Basically it makes people think that a event timestamp that 
>> is
>> bigger than watermark is considered late (due to that "after").
>>
>> Although there is a example right after this sentence to explain late
>> data, seems to me that this sentence is incomplete. The complete sentence
>> to me can be: "The watermark consistently advances from -inf to +inf. 
>> Data
>> that arrives with a timestamp after the watermark is considered late 
>> data."
>>
>> Am I understand correctly? Is there better description for the order
>> of late data and watermark? I would happy to send PR to update Beam
>> documentation.
>>
>> -Rui
>>
>> [1]:
>> https://beam.apache.org/documentation/programming-guide/#windowing
>> [2]:
>> https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971
>>
>>
>>


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Rui Wang
I created this PR: https://github.com/apache/beam/pull/7556

Feel free to review/comment it.

-Rui

On Thu, Jan 17, 2019 at 2:37 PM Rui Wang  wrote:

> It might be better to keep something like "watermark usually consistently
> moves forward". But "Elements that arrive with a smaller timestamp than the
> current watermark are considered late data." has already given the order of
> late data ts and watermark.
>
>
> -Rui
>
> On Thu, Jan 17, 2019 at 1:39 PM Jeff Klukas  wrote:
>
>> Reuven - I don't think I realized it was possible to have late data with
>> the global window, so I'm definitely learning things through this
>> discussion.
>>
>> New suggested wording, then:
>>
>> Elements that arrive with a smaller timestamp than the current
>> watermark are considered late data.
>>
>> That says basically the same thing as the wording currently in the guide,
>> but uses "smaller" (which implies a less-than-watermark comparison) rather
>> than "later" (which folks have interpreted as a greater-than-watermark
>> comparison).
>>
>> On Thu, Jan 17, 2019 at 3:40 PM Reuven Lax  wrote:
>>
>>> Though it's not tied to window. You could be in the global window, so
>>> the watermark never advances past the end of the window, yet still get late
>>> data.
>>>
>>> On Thu, Jan 17, 2019, 11:14 AM Jeff Klukas >>
 How about: "Once the watermark progresses past the end of a window, any
 further elements that arrive with a timestamp in that window are considered
 late data."

 On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:

> Hi Community,
>
> In Beam programming guide [1], there is a sentence: "Data that
> arrives with a timestamp after the watermark is considered *late data*
> "
>
> Seems like people get confused by it. For example, see Stackoverflow
> comment [2]. Basically it makes people think that a event timestamp that 
> is
> bigger than watermark is considered late (due to that "after").
>
> Although there is a example right after this sentence to explain late
> data, seems to me that this sentence is incomplete. The complete sentence
> to me can be: "The watermark consistently advances from -inf to +inf. Data
> that arrives with a timestamp after the watermark is considered late 
> data."
>
> Am I understand correctly? Is there better description for the order
> of late data and watermark? I would happy to send PR to update Beam
> documentation.
>
> -Rui
>
> [1]:
> https://beam.apache.org/documentation/programming-guide/#windowing
> [2]:
> https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971
>
>
>


Re: Where should I locate shared/common Java Code (Java SDK and Java RH both use)?

2019-01-17 Thread Alex Amato
Cool, thanks! Sounds like a good place to put it. Appreciate the pointer :)

On Thu, Jan 17, 2019 at 3:47 PM Kenneth Knowles  wrote:

> I think runners/core-java fits in this case. It started life as a place
> for shared functionality to all JVM-based runners, but much of that also
> turns out to apply to the Java SDK harness. I see it is already used in the
> harness.
>
> Kenn
>
> On Thu, Jan 17, 2019 at 1:55 PM Alex Amato  wrote:
>
>> any Java SDK Harness coult use*
>>
>> On Thu, Jan 17, 2019 at 1:22 PM Alex Amato  wrote:
>>
>>> Just the SDK harness, it doesn't need to be part of the user APIs or
>>> anything. But I want to offer it as a library that any Java SDK could use
>>> to enable state sampling as a low weight estimation of execution times,
>>> which is useful to implement the execution time metrics. So I don't see a
>>> need to put it on the core SDK.
>>>
>>> Then I think that our options would be:
>>>
>>>- create a new: beam/java/*extensions/state-sampling*
>>>- reuse the existing: beam/*sdks*/java/*extensions/state-sampling*
>>>
>>>
>>> On Thu, Jan 17, 2019 at 10:59 AM Kenneth Knowles  wrote:
>>>
 Do you want to use it in the core SDK or the SDK harness? Does the
 state sampling code depend on the core SDK?

 I'd really like to avoid layers with names like "common". Just delete
 that layer of the directory and the path hierarchy has exactly the same
 meaning, but is more concise and clear.

 Kenn

 On Thu, Jan 17, 2019 at 10:49 AM Alex Amato  wrote:

> I would like to reuse the State Sampling classes from the Dataflow
> Runner Harness in the Beam Java SDK. I created a Refactoring plan
> and
> removed the Dataflow references from the classes in PR/7507
> .
>
> SInce the Java SDK cannot depend on the Dataflow Runner Harness, then
> I can do one of these options
>
>- Create a new gradle project/folder and locate the code in a
>shared location?
>   - i.e. create a new: beam/*common*/java/
>   *extensions/state-sampling*
>- Move the code in a Java SDK projects, which will work since the
>runner harness can depend on the SDK?
>   - i.e.reuse the existing: beam/*sdks*/java/
>   *extensions/state-sampling*
>
>


Adding KMS support to generic filesystem interface

2019-01-17 Thread Udi Meiri
Hi,
I'd like to add support for creating files using a cloud Key Management
System.
A KMS allows you to audit, create, rotate, and disable encryption keys.
Both AWS and GCP have such a service..

I wanted to show the community what I've been working on and see if there
are any comments or objection before submitting a PR.
https://github.com/udim/beam/commit/d29f1ef26c58489416a2d413eb029596d96e1f25

Reference docs:
AWS S3:
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
GCP GCS:
https://cloud.google.com/storage/docs/encryption/using-customer-managed-keys#add-object-key


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Where should I locate shared/common Java Code (Java SDK and Java RH both use)?

2019-01-17 Thread Kenneth Knowles
I think runners/core-java fits in this case. It started life as a place for
shared functionality to all JVM-based runners, but much of that also turns
out to apply to the Java SDK harness. I see it is already used in the
harness.

Kenn

On Thu, Jan 17, 2019 at 1:55 PM Alex Amato  wrote:

> any Java SDK Harness coult use*
>
> On Thu, Jan 17, 2019 at 1:22 PM Alex Amato  wrote:
>
>> Just the SDK harness, it doesn't need to be part of the user APIs or
>> anything. But I want to offer it as a library that any Java SDK could use
>> to enable state sampling as a low weight estimation of execution times,
>> which is useful to implement the execution time metrics. So I don't see a
>> need to put it on the core SDK.
>>
>> Then I think that our options would be:
>>
>>- create a new: beam/java/*extensions/state-sampling*
>>- reuse the existing: beam/*sdks*/java/*extensions/state-sampling*
>>
>>
>> On Thu, Jan 17, 2019 at 10:59 AM Kenneth Knowles  wrote:
>>
>>> Do you want to use it in the core SDK or the SDK harness? Does the state
>>> sampling code depend on the core SDK?
>>>
>>> I'd really like to avoid layers with names like "common". Just delete
>>> that layer of the directory and the path hierarchy has exactly the same
>>> meaning, but is more concise and clear.
>>>
>>> Kenn
>>>
>>> On Thu, Jan 17, 2019 at 10:49 AM Alex Amato  wrote:
>>>
 I would like to reuse the State Sampling classes from the Dataflow
 Runner Harness in the Beam Java SDK. I created a Refactoring plan
 and
 removed the Dataflow references from the classes in PR/7507
 .

 SInce the Java SDK cannot depend on the Dataflow Runner Harness, then I
 can do one of these options

- Create a new gradle project/folder and locate the code in a
shared location?
   - i.e. create a new: beam/*common*/java/
   *extensions/state-sampling*
- Move the code in a Java SDK projects, which will work since the
runner harness can depend on the SDK?
   - i.e.reuse the existing: beam/*sdks*/java/
   *extensions/state-sampling*




[Proposal] Requesting PMC approval to start planning for Beam Summits 2019

2019-01-17 Thread joanafilipa
Dear Project Management Committee, 


The Beam Summits are community events funded by a Sponsoring Committee and 
organized by an Organizing Committee. I’d like to get the following approvals: 

To organize and host the Summits under the name of Beam Summit  , 
i.e. Beam Summit North America 2019, Beam Summit Europe 2019 and Beam Summit 
Asia 2019. 

To form organizing committees for each edition

Approval to host each edition on the following dates and locations:

- Beam Summit North America, on April 3rd, San Francisco, CA. (150 attendees)

- Beam Summit Europe, on June 19th, Berlin, Germany. (150 attendees)

- Beam Summit Asia, October (exact date tbc), Tokyo, Japan. (150 attendees)

The events will provide educational content selected by the Organizing 
Committee, and will be a not-for-profit event, however, we might charge a fee 
to support the organization and logistics costs. This matter will be decided by 
the Organizing Committee and will be brought back to the PMC if needed. The 
events will be advertised on various channels, including the Apache Beam’s and 
Summit sponsor’s social media accounts.


The Organizing Committee will acknowledge the Apache Software Foundation's 
ownership of the Apache Beam trademark and will add attribution required by the 
foundation’s policy on all marketing channels. The Apache Beam branding will be 
used in accordance with the foundation’s trademark and events policies 
specifically as outlined in Third Party Event Branding Policy. The Organizing 
Committee does not request the ASF to become a Community Partner of the event.


Please feel free to request further information if needed. 


Kind Regards, 

Joana Carrasqueira


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Rui Wang
It might be better to keep something like "watermark usually consistently
moves forward". But "Elements that arrive with a smaller timestamp than the
current watermark are considered late data." has already given the order of
late data ts and watermark.


-Rui

On Thu, Jan 17, 2019 at 1:39 PM Jeff Klukas  wrote:

> Reuven - I don't think I realized it was possible to have late data with
> the global window, so I'm definitely learning things through this
> discussion.
>
> New suggested wording, then:
>
> Elements that arrive with a smaller timestamp than the current
> watermark are considered late data.
>
> That says basically the same thing as the wording currently in the guide,
> but uses "smaller" (which implies a less-than-watermark comparison) rather
> than "later" (which folks have interpreted as a greater-than-watermark
> comparison).
>
> On Thu, Jan 17, 2019 at 3:40 PM Reuven Lax  wrote:
>
>> Though it's not tied to window. You could be in the global window, so the
>> watermark never advances past the end of the window, yet still get late
>> data.
>>
>> On Thu, Jan 17, 2019, 11:14 AM Jeff Klukas >
>>> How about: "Once the watermark progresses past the end of a window, any
>>> further elements that arrive with a timestamp in that window are considered
>>> late data."
>>>
>>> On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:
>>>
 Hi Community,

 In Beam programming guide [1], there is a sentence: "Data that arrives
 with a timestamp after the watermark is considered *late data*"

 Seems like people get confused by it. For example, see Stackoverflow
 comment [2]. Basically it makes people think that a event timestamp that is
 bigger than watermark is considered late (due to that "after").

 Although there is a example right after this sentence to explain late
 data, seems to me that this sentence is incomplete. The complete sentence
 to me can be: "The watermark consistently advances from -inf to +inf. Data
 that arrives with a timestamp after the watermark is considered late data."

 Am I understand correctly? Is there better description for the order of
 late data and watermark? I would happy to send PR to update Beam
 documentation.

 -Rui

 [1]: https://beam.apache.org/documentation/programming-guide/#windowing
 [2]:
 https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971





Re: Where should I locate shared/common Java Code (Java SDK and Java RH both use)?

2019-01-17 Thread Alex Amato
any Java SDK Harness coult use*

On Thu, Jan 17, 2019 at 1:22 PM Alex Amato  wrote:

> Just the SDK harness, it doesn't need to be part of the user APIs or
> anything. But I want to offer it as a library that any Java SDK could use
> to enable state sampling as a low weight estimation of execution times,
> which is useful to implement the execution time metrics. So I don't see a
> need to put it on the core SDK.
>
> Then I think that our options would be:
>
>- create a new: beam/java/*extensions/state-sampling*
>- reuse the existing: beam/*sdks*/java/*extensions/state-sampling*
>
>
> On Thu, Jan 17, 2019 at 10:59 AM Kenneth Knowles  wrote:
>
>> Do you want to use it in the core SDK or the SDK harness? Does the state
>> sampling code depend on the core SDK?
>>
>> I'd really like to avoid layers with names like "common". Just delete
>> that layer of the directory and the path hierarchy has exactly the same
>> meaning, but is more concise and clear.
>>
>> Kenn
>>
>> On Thu, Jan 17, 2019 at 10:49 AM Alex Amato  wrote:
>>
>>> I would like to reuse the State Sampling classes from the Dataflow
>>> Runner Harness in the Beam Java SDK. I created a Refactoring plan
>>> and
>>> removed the Dataflow references from the classes in PR/7507
>>> .
>>>
>>> SInce the Java SDK cannot depend on the Dataflow Runner Harness, then I
>>> can do one of these options
>>>
>>>- Create a new gradle project/folder and locate the code in a shared
>>>location?
>>>   - i.e. create a new: beam/*common*/java/
>>>   *extensions/state-sampling*
>>>- Move the code in a Java SDK projects, which will work since the
>>>runner harness can depend on the SDK?
>>>   - i.e.reuse the existing: beam/*sdks*/java/
>>>   *extensions/state-sampling*
>>>
>>>


Re: Where should I locate shared/common Java Code (Java SDK and Java RH both use)?

2019-01-17 Thread Alex Amato
Just the SDK harness, it doesn't need to be part of the user APIs or
anything. But I want to offer it as a library that any Java SDK could use
to enable state sampling as a low weight estimation of execution times,
which is useful to implement the execution time metrics. So I don't see a
need to put it on the core SDK.

Then I think that our options would be:

   - create a new: beam/java/*extensions/state-sampling*
   - reuse the existing: beam/*sdks*/java/*extensions/state-sampling*


On Thu, Jan 17, 2019 at 10:59 AM Kenneth Knowles  wrote:

> Do you want to use it in the core SDK or the SDK harness? Does the state
> sampling code depend on the core SDK?
>
> I'd really like to avoid layers with names like "common". Just delete that
> layer of the directory and the path hierarchy has exactly the same meaning,
> but is more concise and clear.
>
> Kenn
>
> On Thu, Jan 17, 2019 at 10:49 AM Alex Amato  wrote:
>
>> I would like to reuse the State Sampling classes from the Dataflow Runner
>> Harness in the Beam Java SDK. I created a Refactoring plan
>> and
>> removed the Dataflow references from the classes in PR/7507
>> .
>>
>> SInce the Java SDK cannot depend on the Dataflow Runner Harness, then I
>> can do one of these options
>>
>>- Create a new gradle project/folder and locate the code in a shared
>>location?
>>   - i.e. create a new: beam/*common*/java/*extensions/state-sampling*
>>- Move the code in a Java SDK projects, which will work since the
>>runner harness can depend on the SDK?
>>   - i.e.reuse the existing: beam/*sdks*/java/
>>   *extensions/state-sampling*
>>
>>


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Jeff Klukas
Reuven - I don't think I realized it was possible to have late data with
the global window, so I'm definitely learning things through this
discussion.

New suggested wording, then:

Elements that arrive with a smaller timestamp than the current
watermark are considered late data.

That says basically the same thing as the wording currently in the guide,
but uses "smaller" (which implies a less-than-watermark comparison) rather
than "later" (which folks have interpreted as a greater-than-watermark
comparison).

On Thu, Jan 17, 2019 at 3:40 PM Reuven Lax  wrote:

> Though it's not tied to window. You could be in the global window, so the
> watermark never advances past the end of the window, yet still get late
> data.
>
> On Thu, Jan 17, 2019, 11:14 AM Jeff Klukas 
>> How about: "Once the watermark progresses past the end of a window, any
>> further elements that arrive with a timestamp in that window are considered
>> late data."
>>
>> On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:
>>
>>> Hi Community,
>>>
>>> In Beam programming guide [1], there is a sentence: "Data that arrives
>>> with a timestamp after the watermark is considered *late data*"
>>>
>>> Seems like people get confused by it. For example, see Stackoverflow
>>> comment [2]. Basically it makes people think that a event timestamp that is
>>> bigger than watermark is considered late (due to that "after").
>>>
>>> Although there is a example right after this sentence to explain late
>>> data, seems to me that this sentence is incomplete. The complete sentence
>>> to me can be: "The watermark consistently advances from -inf to +inf. Data
>>> that arrives with a timestamp after the watermark is considered late data."
>>>
>>> Am I understand correctly? Is there better description for the order of
>>> late data and watermark? I would happy to send PR to update Beam
>>> documentation.
>>>
>>> -Rui
>>>
>>> [1]: https://beam.apache.org/documentation/programming-guide/#windowing
>>> [2]:
>>> https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971
>>>
>>>
>>>


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Reuven Lax
Though it's not tied to window. You could be in the global window, so the
watermark never advances past the end of the window, yet still get late
data.

On Thu, Jan 17, 2019, 11:14 AM Jeff Klukas  How about: "Once the watermark progresses past the end of a window, any
> further elements that arrive with a timestamp in that window are considered
> late data."
>
> On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:
>
>> Hi Community,
>>
>> In Beam programming guide [1], there is a sentence: "Data that arrives
>> with a timestamp after the watermark is considered *late data*"
>>
>> Seems like people get confused by it. For example, see Stackoverflow
>> comment [2]. Basically it makes people think that a event timestamp that is
>> bigger than watermark is considered late (due to that "after").
>>
>> Although there is a example right after this sentence to explain late
>> data, seems to me that this sentence is incomplete. The complete sentence
>> to me can be: "The watermark consistently advances from -inf to +inf. Data
>> that arrives with a timestamp after the watermark is considered late data."
>>
>> Am I understand correctly? Is there better description for the order of
>> late data and watermark? I would happy to send PR to update Beam
>> documentation.
>>
>> -Rui
>>
>> [1]: https://beam.apache.org/documentation/programming-guide/#windowing
>> [2]:
>> https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971
>>
>>
>>


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Rui Wang
It is better, and it also fits the following example.

-Rui

On Thu, Jan 17, 2019 at 11:14 AM Jeff Klukas  wrote:

> How about: "Once the watermark progresses past the end of a window, any
> further elements that arrive with a timestamp in that window are considered
> late data."
>
> On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:
>
>> Hi Community,
>>
>> In Beam programming guide [1], there is a sentence: "Data that arrives
>> with a timestamp after the watermark is considered *late data*"
>>
>> Seems like people get confused by it. For example, see Stackoverflow
>> comment [2]. Basically it makes people think that a event timestamp that is
>> bigger than watermark is considered late (due to that "after").
>>
>> Although there is a example right after this sentence to explain late
>> data, seems to me that this sentence is incomplete. The complete sentence
>> to me can be: "The watermark consistently advances from -inf to +inf. Data
>> that arrives with a timestamp after the watermark is considered late data."
>>
>> Am I understand correctly? Is there better description for the order of
>> late data and watermark? I would happy to send PR to update Beam
>> documentation.
>>
>> -Rui
>>
>> [1]: https://beam.apache.org/documentation/programming-guide/#windowing
>> [2]:
>> https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971
>>
>>
>>


Re: [PROPOSAL] Prepare Beam 2.10.0 release

2019-01-17 Thread Maximilian Michels

An issue with the Flink Runner when restarting streaming pipelines:
https://jira.apache.org/jira/browse/BEAM-6460

Looks like it will be easy to fix by invalidating the Jackson cache.

-Max

On 16.01.19 23:00, Kenneth Knowles wrote:

Quick update on this. There are three remaining issues:

  - https://issues.apache.org/jira/browse/BEAM-6407: A DirectRunner self-check 
was broken from 2.8.0 to 2.9.0 - PR looks good modulo our infra flakes
  - https://issues.apache.org/jira/browse/BEAM-6354: PAssert + DirectRunner + 
Unbounded data busted? Investigation not started
  - https://issues.apache.org/jira/browse/BEAM-6352: Watch was broken from 2.8.0 
to 2.9.0 - will rollback if no forward fix by the time everything else is resolved


Kenn

On Wed, Jan 16, 2019 at 6:00 AM Kenneth Knowles > wrote:


Thanks, Ismaël!

On Wed, Jan 16, 2019 at 2:13 AM Ismaël Mejía mailto:ieme...@gmail.com>> wrote:

Ok since there were not many issues I did the 'update' for the
misplaced issues to version 2.10. We are good to go. New resolved
issues in master musg go now into 2.11.0

On Wed, Jan 16, 2019 at 10:38 AM Ismaël Mejía mailto:ieme...@gmail.com>> wrote:
 >
 > This means that the tickets resolved and marked for 2.11 since 
January
 > 2 should be reviewed and retargetted to version 2.10.
 > So this is a call for action for committers who have merged fixes
 > after the cut to update the tickets if required.
 >
 > Ismaël
 >
 > On Tue, Jan 15, 2019 at 9:22 PM Kenneth Knowles mailto:k...@apache.org>> wrote:
 > >
 > > As a heads up, I did not realize that the release guide specified a
custom process for starting a release branch. It makes sense;
cut_release_branch.sh consolidates knowledge about all the places the
version is hardcoded in the codebase. To keep the history simple, I will
re-cut the release branch at the point where master moved from
2.10.0-SNAPSHOT to 2.11.0-SNAPSHOT. All PRs to the branch have been
cherry-picked from master, so they will all be incorporated without any
action by their authors.
 > >
 > > Kenn
 > >
 > > On Tue, Jan 15, 2019 at 10:31 AM Kenneth Knowles mailto:k...@google.com>> wrote:
 > >>
 > >> I'm on it.
 > >>
 > >> On Tue, Jan 15, 2019 at 8:10 AM Ismaël Mejía mailto:ieme...@gmail.com>> wrote:
 > >>>
 > >>> There is also another issue, after the 2.10.0 branch cut some
 > >>> identifier in the build was not changed and the Apache Beam 
Snapshots
 > >>> keep generating SNAPSHOTS for 2.10.0 instead of the now current
 > >>> 2.11.0-SNAPSHOT. Can somebody PTAL?
 > >>>
 > >>> On Thu, Jan 3, 2019 at 6:17 PM Maximilian Michels 
mailto:m...@apache.org>> wrote:
 > >>> >
 > >>> > Thanks for driving this Kenn! I'm in favor of a strict cut off,
but I'd like to
 > >>> > propose a week for cherry-picking relevant changes to the
release branch. It
 > >>> > looks like many people are returning from holidays or are still
off.
 > >>> >
 > >>> > Cheers,
 > >>> > Max
 > >>> >
 > >>> > On 02.01.19 17:20, Kenneth Knowles wrote:
 > >>> > > Done. I've created the Jira tag for 2.11.0.
 > >>> > >
 > >>> > > Previously, there was a few days warning to get things in
before the branch is
 > >>> > > cut. You can just cherry-pick them. This is a bit better for
release stability
 > >>> > > by avoiding all the other changes on master. The timing of
the cut is always
 > >>> > > going to include older and newer changes anyhow.
 > >>> > >
 > >>> > > Kenn
 > >>> > >
 > >>> > > On Wed, Jan 2, 2019 at 1:08 PM Ismaël Mejía
mailto:ieme...@gmail.com>
 > >>> > > >> wrote:
 > >>> > >
 > >>> > >     Can you please create 2.11 tag in JIRA so we can move the
JIRAs that
 > >>> > >     are not blocking. I have quite a bunch of pending code
reviews that
 > >>> > >     hoped to get into this one but well now probably they
shall wait. (My
 > >>> > >     excuses for the people who may be impacted, I had not
checked that the
 > >>> > >     date was in the first week).
 > >>> > >
 > >>> > >     On Wed, Jan 2, 2019 at 4:45 PM Jean-Baptiste Onofré
mailto:j...@nanthrax.net>
 > >>> > >     >> 
wrote:
 > >>> > >      >
 > >>> > >      > It sounds good to me.
 > >>> > >      >
 > >>> > >      > Regards
 > >>> > >      > JB
 > >>> > >      >
 > >>> > >      > On 0

Re: [PROPOSAL] decrease the number of threads for BigQuery streaming insertAll

2019-01-17 Thread Chamikara Jayalath
Thanks Heejong.

I agree that writing to a service using 50 unlimited threadpools sounds
excessive and can result in flooding that service (BigQuery in this case)
in error scenarios where we should backoff. Determining a suitable and
limited amount of parallelization (50 in this case) sounds good to me.

Thanks,
Cham

On Wed, Jan 16, 2019 at 6:53 PM Heejong Lee  wrote:

> Hi,
>
> I want to suggest the change[1] of the thread pool type in BigQuery
> streaming insert for Java SDK (BEAM-6443). When we insert small data into
> BigQuery very fast by using BigQueryIO.write, it generates lots of rate
> limit exceeded errors in a log file. It's mainly because the number of
> threads to be used for the inserting job is just too large (50 shards *
> dozens of futures executed by unlimited thread pool per each bundle). I've
> conducted some benchmarks[2] and could see that the change from unlimited
> thread pool to single thread pool reduces the number of (same repeated,
> possibly meaningless) error messages by 1/4 while retaining the same
> performance. I think that this change will not break any important
> performance measure but if anybody has any concerns about this change
> please let me know.
>
> Thanks,
>
> [1] https://github.com/apache/beam/pull/7547
> [2]
> https://docs.google.com/document/d/1EhRNWLevm86GD_QtvlrTauHITVMwQBzuemyp-w4Z_ck/edit#heading=h.c0angyd9tn21
>


flink portable runner usage

2019-01-17 Thread Hai Lu
Hi Thomas,

This is Hai who works on portable runner for Samza. I have a few minor
question that I would like to get clarification on from you.

We chatted briefly at last beam meetup and you mention your flink portable
runner (Python) is going into production. So today are you using Beam
Python on Flink in streaming mode or batch mode? And what are you input
sources (Kafka? Kinesis?)

Also we talked about how bundling would help lift the perf by a lot. But it
seems like flink runner today only does bundling in batch mode, not in
streaming mode. Am I missing something?

BTW, looking forward to the Beam @Lyft meetup in February!

Thanks,
Hai (LinkedIn)


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Jeff Klukas
How about: "Once the watermark progresses past the end of a window, any
further elements that arrive with a timestamp in that window are considered
late data."

On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:

> Hi Community,
>
> In Beam programming guide [1], there is a sentence: "Data that arrives
> with a timestamp after the watermark is considered *late data*"
>
> Seems like people get confused by it. For example, see Stackoverflow
> comment [2]. Basically it makes people think that a event timestamp that is
> bigger than watermark is considered late (due to that "after").
>
> Although there is a example right after this sentence to explain late
> data, seems to me that this sentence is incomplete. The complete sentence
> to me can be: "The watermark consistently advances from -inf to +inf. Data
> that arrives with a timestamp after the watermark is considered late data."
>
> Am I understand correctly? Is there better description for the order of
> late data and watermark? I would happy to send PR to update Beam
> documentation.
>
> -Rui
>
> [1]: https://beam.apache.org/documentation/programming-guide/#windowing
> [2]:
> https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971
>
>
>


Re: Where should I locate shared/common Java Code (Java SDK and Java RH both use)?

2019-01-17 Thread Kenneth Knowles
Do you want to use it in the core SDK or the SDK harness? Does the state
sampling code depend on the core SDK?

I'd really like to avoid layers with names like "common". Just delete that
layer of the directory and the path hierarchy has exactly the same meaning,
but is more concise and clear.

Kenn

On Thu, Jan 17, 2019 at 10:49 AM Alex Amato  wrote:

> I would like to reuse the State Sampling classes from the Dataflow Runner
> Harness in the Beam Java SDK. I created a Refactoring plan
> and
> removed the Dataflow references from the classes in PR/7507
> .
>
> SInce the Java SDK cannot depend on the Dataflow Runner Harness, then I
> can do one of these options
>
>- Create a new gradle project/folder and locate the code in a shared
>location?
>   - i.e. create a new: beam/*common*/java/*extensions/state-sampling*
>- Move the code in a Java SDK projects, which will work since the
>runner harness can depend on the SDK?
>   - i.e.reuse the existing: beam/*sdks*/java/
>   *extensions/state-sampling*
>
>


Where should I locate shared/common Java Code (Java SDK and Java RH both use)?

2019-01-17 Thread Alex Amato
I would like to reuse the State Sampling classes from the Dataflow Runner
Harness in the Beam Java SDK. I created a Refactoring plan
and
removed the Dataflow references from the classes in PR/7507
.

SInce the Java SDK cannot depend on the Dataflow Runner Harness, then I can
do one of these options

   - Create a new gradle project/folder and locate the code in a shared
   location?
  - i.e. create a new: beam/*common*/java/*extensions/state-sampling*
   - Move the code in a Java SDK projects, which will work since the runner
   harness can depend on the SDK?
  - i.e.reuse the existing: beam/*sdks*/java/*extensions/state-sampling*


Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Rui Wang
Hi Community,

In Beam programming guide [1], there is a sentence: "Data that arrives with
a timestamp after the watermark is considered *late data*"

Seems like people get confused by it. For example, see Stackoverflow
comment [2]. Basically it makes people think that a event timestamp that is
bigger than watermark is considered late (due to that "after").

Although there is a example right after this sentence to explain late data,
seems to me that this sentence is incomplete. The complete sentence to me
can be: "The watermark consistently advances from -inf to +inf. Data that
arrives with a timestamp after the watermark is considered late data."

Am I understand correctly? Is there better description for the order of
late data and watermark? I would happy to send PR to update Beam
documentation.

-Rui

[1]: https://beam.apache.org/documentation/programming-guide/#windowing
[2]:
https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971


Re: Connection leaks with PostgreSQL instance

2019-01-17 Thread Jean-Baptiste Onofré
Hi,

I don't think we have connection leak in normal behavior.

The actual SQL statement is executed in @FinishBundle, where the
connection is closed.

The processElement adds record to process.

Does it mean that an Exception occurs in the batch addition ?

Regards
JB

On 17/01/2019 12:41, Alexey Romanenko wrote:
> Kenn,
> 
> I’m not sure that we have a connection leak in JdbcIO since new
> connection is being obtained from an instance of /javax.sql.DataSource/
> (created in @Setup) and which
> is /org.apache.commons.dbcp2.BasicDataSource/ by default.
> /BasicDataSource/ uses connection pool and closes all idle connections
> in "close()”. 
> 
> In its turn, JdbcIO calls/DataSource.close()/ in @Teardown, so all idle
> connections should be closed and released there in case of fails.
> Though, potentially some connections, that has been delegated to client
> before and were not not properly returned to pool, could be leaked…
> Anyway, I think it could be a good idea to call "/connection.close()/”
> (return to connection pool) explicitly in case of any exception happened
> during bundle processing.
> 
> Probably JB may provide more details as original author of JdbcIO.
> 
>> On 14 Jan 2019, at 21:37, Kenneth Knowles > > wrote:
>>
>> Hi Jonathan,
>>
>> JdbcIO.write() just invokes this
>> DoFn: 
>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L765
>>
>> It establishes a connection in @StartBundle and then in @FinishBundle
>> it commits a batch and closes the connection. If an error happens
>> in @StartBundle or @ProcessElement there will be a retry with a fresh
>> instance of the DoFn, which will establish a new connection. It looks
>> like neither @StartBundle nor @ProcessElement closes the connection,
>> so I'm guessing that the old connection sticks around because the
>> worker process was not terminated. So the Beam runner and Dataflow
>> service are working as intended and this is an issue with JdbcIO,
>> unless I've made a mistake in my reading or analysis.
>>
>> Would you mind reporting these details
>> to https://issues.apache.org/jira/projects/BEAM/ ?
>>
>> Kenn
>>
>> On Mon, Jan 14, 2019 at 12:51 AM Jonathan Perron
>> mailto:jonathan.per...@lumapps.com>> wrote:
>>
>> Hello !
>>
>> My question is maybe mainly GCP-oriented, so I apologize if it is
>> not fully related to the Beam community.
>>
>> We have a streaming pipeline running on Dataflow which writes data
>> to a PostgreSQL instance hosted on Cloud SQL. This database is
>> suffering from connection leak spikes on a regular basis:
>>
>> 
>>
>> The connections are kept alive until the pipeline is canceled/drained:
>>
>> 
>>
>> We are writing to the database with:
>>
>> - individual DoFn where we open/close the connection using the
>> standard JDBC try/catch (SQLException ex)/finally statements;
>>
>> - a Pipeline.apply(JdbcIO.write()) operations.
>>
>> I observed that these spikes happens most of the time after I/O
>> errors with the database. Has anyone observed the same situation ?
>>
>> I have several questions/observations, please correct me if I am
>> wrong (I am not from the java environment, so some can seem pretty
>> trivial) :
>>
>> - Does the apply method handles SQLException or I/O errors ?
>>
>> - Would the use of a connection pool prevents such behaviours ? If
>> so, how would one implement it to allow all workers to use it ?
>> Could it be implemented with JDBC Connection pooling ?
>>
>> I am worrying about the serialization if one would pass a
>> Connection item as an argument of a DoFn.
>>
>> Thank you in advance for your comments and reactions.
>>
>> Best regards,
>>
>> Jonathan
>>
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Connection leaks with PostgreSQL instance

2019-01-17 Thread Reuven Lax
But I think it will be called unless the process crashes though.

On Thu, Jan 17, 2019, 7:05 AM Ismaël Mejía  Couldn't this be related also to the fact that @Teardown is
> best-effort in Dataflow?
>
> On Thu, Jan 17, 2019 at 12:41 PM Alexey Romanenko
>  wrote:
> >
> > Kenn,
> >
> > I’m not sure that we have a connection leak in JdbcIO since new
> connection is being obtained from an instance of javax.sql.DataSource
> (created in @Setup) and which is org.apache.commons.dbcp2.BasicDataSource
> by default. BasicDataSource uses connection pool and closes all idle
> connections in "close()”.
> >
> > In its turn, JdbcIO calls DataSource.close() in @Teardown, so all idle
> connections should be closed and released there in case of fails. Though,
> potentially some connections, that has been delegated to client before and
> were not not properly returned to pool, could be leaked… Anyway, I think it
> could be a good idea to call "connection.close()” (return to connection
> pool) explicitly in case of any exception happened during bundle processing.
> >
> > Probably JB may provide more details as original author of JdbcIO.
> >
> > On 14 Jan 2019, at 21:37, Kenneth Knowles  wrote:
> >
> > Hi Jonathan,
> >
> > JdbcIO.write() just invokes this DoFn:
> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L765
> >
> > It establishes a connection in @StartBundle and then in @FinishBundle it
> commits a batch and closes the connection. If an error happens in
> @StartBundle or @ProcessElement there will be a retry with a fresh instance
> of the DoFn, which will establish a new connection. It looks like neither
> @StartBundle nor @ProcessElement closes the connection, so I'm guessing
> that the old connection sticks around because the worker process was not
> terminated. So the Beam runner and Dataflow service are working as intended
> and this is an issue with JdbcIO, unless I've made a mistake in my reading
> or analysis.
> >
> > Would you mind reporting these details to
> https://issues.apache.org/jira/projects/BEAM/ ?
> >
> > Kenn
> >
> > On Mon, Jan 14, 2019 at 12:51 AM Jonathan Perron <
> jonathan.per...@lumapps.com> wrote:
> >>
> >> Hello !
> >>
> >> My question is maybe mainly GCP-oriented, so I apologize if it is not
> fully related to the Beam community.
> >>
> >> We have a streaming pipeline running on Dataflow which writes data to a
> PostgreSQL instance hosted on Cloud SQL. This database is suffering from
> connection leak spikes on a regular basis:
> >>
> >> 
> >>
> >> The connections are kept alive until the pipeline is canceled/drained:
> >>
> >> 
> >>
> >> We are writing to the database with:
> >>
> >> - individual DoFn where we open/close the connection using the standard
> JDBC try/catch (SQLException ex)/finally statements;
> >>
> >> - a Pipeline.apply(JdbcIO.write()) operations.
> >>
> >> I observed that these spikes happens most of the time after I/O errors
> with the database. Has anyone observed the same situation ?
> >>
> >> I have several questions/observations, please correct me if I am wrong
> (I am not from the java environment, so some can seem pretty trivial) :
> >>
> >> - Does the apply method handles SQLException or I/O errors ?
> >>
> >> - Would the use of a connection pool prevents such behaviours ? If so,
> how would one implement it to allow all workers to use it ? Could it be
> implemented with JDBC Connection pooling ?
> >>
> >> I am worrying about the serialization if one would pass a Connection
> item as an argument of a DoFn.
> >>
> >> Thank you in advance for your comments and reactions.
> >>
> >> Best regards,
> >>
> >> Jonathan
> >
> >
>


Re: Connection leaks with PostgreSQL instance

2019-01-17 Thread Ismaël Mejía
Couldn't this be related also to the fact that @Teardown is
best-effort in Dataflow?

On Thu, Jan 17, 2019 at 12:41 PM Alexey Romanenko
 wrote:
>
> Kenn,
>
> I’m not sure that we have a connection leak in JdbcIO since new connection is 
> being obtained from an instance of javax.sql.DataSource (created in @Setup) 
> and which is org.apache.commons.dbcp2.BasicDataSource by default. 
> BasicDataSource uses connection pool and closes all idle connections in 
> "close()”.
>
> In its turn, JdbcIO calls DataSource.close() in @Teardown, so all idle 
> connections should be closed and released there in case of fails. Though, 
> potentially some connections, that has been delegated to client before and 
> were not not properly returned to pool, could be leaked… Anyway, I think it 
> could be a good idea to call "connection.close()” (return to connection pool) 
> explicitly in case of any exception happened during bundle processing.
>
> Probably JB may provide more details as original author of JdbcIO.
>
> On 14 Jan 2019, at 21:37, Kenneth Knowles  wrote:
>
> Hi Jonathan,
>
> JdbcIO.write() just invokes this DoFn: 
> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L765
>
> It establishes a connection in @StartBundle and then in @FinishBundle it 
> commits a batch and closes the connection. If an error happens in 
> @StartBundle or @ProcessElement there will be a retry with a fresh instance 
> of the DoFn, which will establish a new connection. It looks like neither 
> @StartBundle nor @ProcessElement closes the connection, so I'm guessing that 
> the old connection sticks around because the worker process was not 
> terminated. So the Beam runner and Dataflow service are working as intended 
> and this is an issue with JdbcIO, unless I've made a mistake in my reading or 
> analysis.
>
> Would you mind reporting these details to 
> https://issues.apache.org/jira/projects/BEAM/ ?
>
> Kenn
>
> On Mon, Jan 14, 2019 at 12:51 AM Jonathan Perron 
>  wrote:
>>
>> Hello !
>>
>> My question is maybe mainly GCP-oriented, so I apologize if it is not fully 
>> related to the Beam community.
>>
>> We have a streaming pipeline running on Dataflow which writes data to a 
>> PostgreSQL instance hosted on Cloud SQL. This database is suffering from 
>> connection leak spikes on a regular basis:
>>
>> 
>>
>> The connections are kept alive until the pipeline is canceled/drained:
>>
>> 
>>
>> We are writing to the database with:
>>
>> - individual DoFn where we open/close the connection using the standard JDBC 
>> try/catch (SQLException ex)/finally statements;
>>
>> - a Pipeline.apply(JdbcIO.write()) operations.
>>
>> I observed that these spikes happens most of the time after I/O errors with 
>> the database. Has anyone observed the same situation ?
>>
>> I have several questions/observations, please correct me if I am wrong (I am 
>> not from the java environment, so some can seem pretty trivial) :
>>
>> - Does the apply method handles SQLException or I/O errors ?
>>
>> - Would the use of a connection pool prevents such behaviours ? If so, how 
>> would one implement it to allow all workers to use it ? Could it be 
>> implemented with JDBC Connection pooling ?
>>
>> I am worrying about the serialization if one would pass a Connection item as 
>> an argument of a DoFn.
>>
>> Thank you in advance for your comments and reactions.
>>
>> Best regards,
>>
>> Jonathan
>
>


Re: Our jenkins beam1 server is down

2019-01-17 Thread Ismaël Mejía
Thanks Yifan for taking care.

On Thu, Jan 17, 2019 at 1:24 AM Yifan Zou  wrote:
>
> Yes, beam14 is offline as well. We're on it.
>
> On Wed, Jan 16, 2019 at 4:11 PM Ruoyun Huang  wrote:
>>
>> With another try, succeeding on beam10.
>>
>> Thanks for the fix.
>>
>> On Wed, Jan 16, 2019 at 3:53 PM Ruoyun Huang  wrote:
>>>
>>> Just did a rerun, got error saying "10:12:21 ERROR: beam14 is offline; 
>>> cannot locate JDK 1.8 (latest)".
>>>
>>> Beam1 is not the only one broken?
>>>
>>> On Wed, Jan 16, 2019 at 3:45 PM Yifan Zou  wrote:

 The beam1 was still accepting jobs and breaking them after reset this 
 morning. We temporarily disconnect it so that jobs could be scheduled on 
 healthy nodes. Infra is making efforts to fix beam1.

 On Wed, Jan 16, 2019 at 11:15 AM Yifan Zou  wrote:
>
> The VM instance was reset and Infra is trying to repuppetize it. 
> https://issues.apache.org/jira/browse/INFRA-17672 is created to track 
> this issue.
>
> On Wed, Jan 16, 2019 at 10:51 AM Mark Liu  wrote:
>>
>> Thanks you Yifan!
>>
>> Looks like following precommits are affected according to my PR:
>>
>> Java_Examples_Dataflow,
>> Portable_Python,
>> Website_Stage_GCS
>>
>> On Wed, Jan 16, 2019 at 9:25 AM Yifan Zou  wrote:
>>>
>>> I am looking on it.
>>>
>>> On Wed, Jan 16, 2019 at 8:18 AM Ismaël Mejía  wrote:

 Can somebody PTAL. Sadly the poor jenkins shuffling algorithm is
 sending most builds to it so there are issues to validate some PRs.
>>>
>>>
>>>
>>> --
>>> 
>>> Ruoyun  Huang
>>>
>>
>>
>> --
>> 
>> Ruoyun  Huang
>>


Re: [spark runner based on dataset POC] your opinion

2019-01-17 Thread Etienne Chauchot
Hi Manu,Yes a json schema can make its way to the spark source with no 
difficulty. but still we need to store
windowedValue not only the elements that would comply with this schema. The 
problem is that spark will try to match the
element (windowedValue) to the schema of the source at any element wise 
processing. (and downstream it will auto guess
the schema with the content of dataset. For example if I extract timestamp in a 
pardo I get a Long schema in the output
dataset). The problem is that windowedValue is complex and has many subclasses. 
Maybe bytes  serialization is still the
best way to go, but we don't leverage schema PCollections. BestEtienne
Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit :
> Nice Try, Etienne ! Is it possible to pass in the schema through pipeline 
> options ?
> Manu 
> On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot  wrote:
> > Hi Kenn,
> > Sure, in spark DataSourceV2 providing a schema is mandatory:- if I set it 
> > to null, I obviously get a NPE- if I set
> > it empty: I get an array out of bounds exception- if I set it to 
> > Datatype.Null, null is stored as actual elements =>
> > Consequently I set it to binary.
> > As the beam reader is responsible for reading both the element and the 
> > timestamp, the source outputs a
> > Dataset. So, the solution I found, for which I asked your 
> > opinion, is to serialize windowedValue to
> > bytes using beam FullWindowedValueCoder in reader.get() and deserialize the 
> > whole dataset once the source is done
> > using a map to get the windowedValue back and give it to the transforms 
> > downstream.
> > I am aware that this is not optimal because of the bytes serialization 
> > roundtrip, and I wanted your suggestions
> > around that.
> > ThanksEtienne
> > 
> > Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit :
> > > Cool!
> > > I don't quite understand the issue in "bytes serialization to comply to 
> > > spark dataset schemas to store
> > > windowedValues". Can you say a little more?
> > > 
> > > Kenn
> > > On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot  
> > > wrote:
> > > > Hi guys,
> > > > regarding the new (made from scratch) spark runner POC based on the 
> > > > dataset API, I was able to make a big step
> > > > forward: it can now run a first batch pipeline with a source !
> > > > 
> > > > See 
> > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
> > > > 
> > > > there is no test facilities for now, testmode is enabled and it just 
> > > > prints the output PCollection .
> > > > 
> > > > I made some workarounds especially String serialization to pass beam 
> > > > objects (was forced to) and also bytes
> > > > serialization to comply to spark dataset schemas to store 
> > > > windowedValues.
> > > > 
> > > > Can you give me your thoughts especially regarding these last 2 matters?
> > > > 
> > > > The other parts are not ready for showing yet
> > > > 
> > > > Here is the whole branch:
> > > > 
> > > > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming
> > > > 
> > > > Thanks,
> > > > 
> > > > Etienne
> > > > 
> > > > 


Re: [spark runner based on dataset POC] your opinion

2019-01-17 Thread Manu Zhang
Nice Try, Etienne ! Is it possible to pass in the schema through pipeline
options ?

Manu

On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot 
wrote:

> Hi Kenn,
>
> Sure, in spark DataSourceV2 providing a schema is mandatory:
> - if I set it to null, I obviously get a NPE
> - if I set it empty: I get an array out of bounds exception
> - if I set it to Datatype.Null, null is stored as actual elements
> => Consequently I set it to binary.
>
> As the beam reader is responsible for reading both the element and the
> timestamp, the source outputs a Dataset. So, the solution I
> found, for which I asked your opinion, is to serialize windowedValue to
> bytes using beam FullWindowedValueCoder in reader.get() and deserialize the
> whole dataset once the source is done using a map to get the windowedValue
> back and give it to the transforms downstream.
>
> I am aware that this is not optimal because of the bytes serialization
> roundtrip, and I wanted your suggestions around that.
>
> Thanks
> Etienne
>
>
> Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit :
>
> Cool!
>
> I don't quite understand the issue in "bytes serialization to comply to
> spark dataset schemas to store windowedValues". Can you say a little more?
>
> Kenn
>
> On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot 
> wrote:
>
> Hi guys,
> regarding the new (made from scratch) spark runner POC based on the
> dataset API, I was able to make a big step forward: it can now run a first
> batch pipeline with a source !
>
> See
> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
>
> there is no test facilities for now, testmode is enabled and it just
> prints the output PCollection .
>
> I made some workarounds especially String serialization to pass beam
> objects (was forced to) and also bytes serialization to comply to spark
> dataset schemas to store windowedValues.
>
> Can you give me your thoughts especially regarding these last 2 matters?
>
> The other parts are not ready for showing yet
>
> Here is the whole branch:
>
>
> https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming
>
> Thanks,
>
> Etienne
>
>


Re: Proposal: Portability SDKHarness Docker Image Release with Beam Version Release.

2019-01-17 Thread Alan Myrvold
+1 This would be great. gcr.io seems like a good option for snapshots due
to the permissions from jenkins to upload and ability to keep snapshots
around.

On Wed, Jan 16, 2019 at 6:51 PM Ruoyun Huang  wrote:

> +1 This would be a great thing to have.
>
> On Wed, Jan 16, 2019 at 6:11 PM Ankur Goenka  wrote:
>
>> grc.io seems to be a good option. Given that we don't need the hosting
>> server name in the image name makes it easily changeable later.
>>
>> Docker container for Apache Flink is named "flink" and they have
>> different tags for different releases and configurations
>> https://hub.docker.com/_/flink .We can follow a similar model and can
>> name the image as "beam" (beam doesn't seem to be taken on docker hub) and
>> use tags to distinguish Java/Python/Go and versions etc.
>>
>> Tags will look like:
>> java-SNAPSHOT
>> java-2.10.1
>> python2-SNAPSHOT
>> python2-2.10.1
>> go-SNAPSHOT
>> go-2.10.1
>>
>>
>> On Wed, Jan 16, 2019 at 5:56 PM Ahmet Altay  wrote:
>>
>>> For snapshots, we could use gcr.io. Permission would not be a problem
>>> since Jenkins is already correctly setup. The cost will be covered under
>>> apache-beam-testing project. And since this is only for snapshots, it will
>>> be only for temporary artifacts not for release artifacts.
>>>
>>> On Wed, Jan 16, 2019 at 5:50 PM Valentyn Tymofieiev 
>>> wrote:
>>>
 +1, releasing containers is a useful process that we need to build in
 Beam and it is required for FnApi users. Among other reasons, having
 officially-released Beam SDK harness container images will make it easier
 for users to do simple customizations to  container images, as they will be
 able to use container image released by Beam as a base image.

 Good point about potential storage limitations on Bintray. With Beam
 Release cadence we may quickly exceed the 10 GB quota. It may also affect
 our decisions as to which images we want to release, for example: do we
 want to only release one container image with Python 3 interpreter, or do
 we want to release a container image for each Python 3 minor version that
 Beam is compatible with.

>>>
>>> Probably worth a separate discussion. I would favor first releasing a
>>> python 3 compatible version before figuring out how we would target
>>> multiple python 3 versions.
>>>
>>
>>>

 On Wed, Jan 16, 2019 at 5:48 PM Ankur Goenka  wrote:

>
>
> On Wed, Jan 16, 2019 at 5:37 PM Ahmet Altay  wrote:
>
>>
>>
>> On Wed, Jan 16, 2019 at 5:28 PM Ankur Goenka 
>> wrote:
>>
>>> - Could we start from snapshots first and then do it for releases?
>>> +1, releasing snapsots first makes sense to me.
>>> - For snapshots, do we need to clean old containers after a while?
>>> Otherwise I guess we will accumulate lots of containers.
>>> For snap shots we can maintain a single snapshot image from git HEAD
>>> daily. Docker has the internal image container id which changes 
>>> everytime
>>> an image is changed and pulls new images as needed.
>>>
>>
>> There is a potential use this may not work with. If a user picks up a
>> snaphsot build and want to use it until the next release arrives. I guess
>> in that case the user can copy the snapshotted container image and rely 
>> on
>> that.
>>
>>
> Yes, that should be reasonable.
>
>> - Do we also need additional code changes for snapshots and releases
>>> to default to these specific containers? There could be a version based
>>> mechanism to resolve the correct container to use.
>>> The current image defaults have username in it. We should be ok by
>>> just updating the default image url to published image url.
>>>
>>> We should also check for pricing and details about Apache-Bintray
>>> agreement before pushing images and changing defaults.
>>>
>>
>> There is information on bintray's pricing page about open source
>> projects [1]. I do not know if there is a special apache-bintray 
>> agreement
>> or not. If there is no special agreement there is a 10GB storage limit 
>> for
>> using bintray.
>>
> As each image can easily run into Gigs, 10GB might not be sufficient
> for future proofing.
> We can also register docker image to docker image registry and not
> have bintray in the name to later host images on a different vendor for
> future proofing.
>
>
>> [1] https://bintray.com/account/pricing?tab=account&type=pricing
>>
>>
>>>
>>> On Wed, Jan 16, 2019 at 5:11 PM Ahmet Altay 
>>> wrote:
>>>
 This sounds like a good idea. Some questions:

 - Could we start from snapshots first and then do it for releases?
 - For snapshots, do we need to clean old containers after a while?
 Otherwise I guess we will accumulate lots of containers.
 - Do we also need additional code chan

Re: Connection leaks with PostgreSQL instance

2019-01-17 Thread Alexey Romanenko
Kenn,

I’m not sure that we have a connection leak in JdbcIO since new connection is 
being obtained from an instance of javax.sql.DataSource (created in @Setup) and 
which is org.apache.commons.dbcp2.BasicDataSource by default. BasicDataSource 
uses connection pool and closes all idle connections in "close()”. 

In its turn, JdbcIO calls DataSource.close() in @Teardown, so all idle 
connections should be closed and released there in case of fails. Though, 
potentially some connections, that has been delegated to client before and were 
not not properly returned to pool, could be leaked… Anyway, I think it could be 
a good idea to call "connection.close()” (return to connection pool) explicitly 
in case of any exception happened during bundle processing.

Probably JB may provide more details as original author of JdbcIO.

> On 14 Jan 2019, at 21:37, Kenneth Knowles  wrote:
> 
> Hi Jonathan,
> 
> JdbcIO.write() just invokes this DoFn: 
> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L765
>  
> 
> 
> It establishes a connection in @StartBundle and then in @FinishBundle it 
> commits a batch and closes the connection. If an error happens in 
> @StartBundle or @ProcessElement there will be a retry with a fresh instance 
> of the DoFn, which will establish a new connection. It looks like neither 
> @StartBundle nor @ProcessElement closes the connection, so I'm guessing that 
> the old connection sticks around because the worker process was not 
> terminated. So the Beam runner and Dataflow service are working as intended 
> and this is an issue with JdbcIO, unless I've made a mistake in my reading or 
> analysis.
> 
> Would you mind reporting these details to 
> https://issues.apache.org/jira/projects/BEAM/ 
>  ?
> 
> Kenn
> 
> On Mon, Jan 14, 2019 at 12:51 AM Jonathan Perron  > wrote:
> Hello !
> 
> My question is maybe mainly GCP-oriented, so I apologize if it is not fully 
> related to the Beam community.
> 
> We have a streaming pipeline running on Dataflow which writes data to a 
> PostgreSQL instance hosted on Cloud SQL. This database is suffering from 
> connection leak spikes on a regular basis:
> 
> 
> 
> The connections are kept alive until the pipeline is canceled/drained:
> 
> 
> 
> We are writing to the database with:
> 
> - individual DoFn where we open/close the connection using the standard JDBC 
> try/catch (SQLException ex)/finally statements;
> 
> - a Pipeline.apply(JdbcIO.write()) operations.
> 
> I observed that these spikes happens most of the time after I/O errors with 
> the database. Has anyone observed the same situation ?
> 
> I have several questions/observations, please correct me if I am wrong (I am 
> not from the java environment, so some can seem pretty trivial) :
> 
> - Does the apply method handles SQLException or I/O errors ?
> 
> - Would the use of a connection pool prevents such behaviours ? If so, how 
> would one implement it to allow all workers to use it ? Could it be 
> implemented with JDBC Connection pooling ?
> 
> I am worrying about the serialization if one would pass a Connection item as 
> an argument of a DoFn.
> 
> Thank you in advance for your comments and reactions.
> 
> Best regards,
> 
> Jonathan



Re: [spark runner based on dataset POC] your opinion

2019-01-17 Thread Etienne Chauchot
Hi Kenn,
Sure, in spark DataSourceV2 providing a schema is mandatory:- if I set it to 
null, I obviously get a NPE- if I set it
empty: I get an array out of bounds exception- if I set it to Datatype.Null, 
null is stored as actual elements =>
Consequently I set it to binary.
As the beam reader is responsible for reading both the element and the 
timestamp, the source outputs a
Dataset. So, the solution I found, for which I asked your 
opinion, is to serialize windowedValue to
bytes using beam FullWindowedValueCoder in reader.get() and deserialize the 
whole dataset once the source is done using
a map to get the windowedValue back and give it to the transforms downstream.
I am aware that this is not optimal because of the bytes serialization 
roundtrip, and I wanted your suggestions around
that.
ThanksEtienne

Le mercredi 16 janvier 2019 à 19:04 -0800, Kenneth Knowles a écrit :
> Cool!
> I don't quite understand the issue in "bytes serialization to comply to spark 
> dataset schemas to store
> windowedValues". Can you say a little more?
> 
> Kenn
> On Tue, Jan 15, 2019 at 8:54 AM Etienne Chauchot  wrote:
> > Hi guys,
> > regarding the new (made from scratch) spark runner POC based on the dataset 
> > API, I was able to make a big step
> > forward: it can now run a first batch pipeline with a source !
> > 
> > See 
> > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
> > 
> > there is no test facilities for now, testmode is enabled and it just prints 
> > the output PCollection .
> > 
> > I made some workarounds especially String serialization to pass beam 
> > objects (was forced to) and also bytes
> > serialization to comply to spark dataset schemas to store windowedValues.
> > 
> > Can you give me your thoughts especially regarding these last 2 matters?
> > 
> > The other parts are not ready for showing yet
> > 
> > Here is the whole branch:
> > 
> > https://github.com/apache/beam/blob/spark-runner_structured-streaming/runners/spark-structured-streaming
> > 
> > Thanks,
> > 
> > Etienne
> > 
> >