Re: Jira components for cross-language transforms

2020-05-31 Thread Chamikara Jayalath
On Fri, May 29, 2020 at 8:53 PM Kenneth Knowles  wrote:

> -0, sorry I'm late. But I guess you are not sorry since everyone else
> agrees :-)
>
> I just don't know what this accomplishes. Most components correspond to a
> software component/codebase. The only components that do not correspond to
> a codebase are things that are sort of independent, like "test-failures"
> and "gcp-quota".
>
> This seems like a label does all the same things, since you can tag with
> multiple components. AFAIK the only difference between a component and a
> label is that Jira requires >= component (actually I bet we control this).
> Does it make analytics dashboards more useful?
>
> A hypothetical test is: when will I tag with this component? That is
> actually not so clear. If I am a Python user in the future, and xlang is
> done very well, then I may not even know that I am using a Java-based
> component. So what I want to find is either "io-python-kafka" (for the
> wrapper) or "io-kafka" (if the wrapper is so good I don't even know). I
> think the future of portable Beam is probably to go with "io-kafka".
>

> The reason I am -0 is that it is harmless to have extra components and use
> them when a label would suffice.
>

I get your point and in the distant future where cross-language transforms
framework works seamlessly for all major SDK/runner combinations we might
come to a state where we do not need this Jira component. But we are not
there yet :).  Given that various folks are working on and/or interested in
picking up tasks related to cross-language transforms and given that many
users may potentially start using cross-language transforms in the near
future I think the Jira component will make things easier to manage. Also
please note that we at least have one component per language,
expansion-service, as a cross-language specific component.

Labels might achieve the same thing but seems like it's harder to maintain
a consistent label across folks working on various runners/SDKs.

Thanks,
Cham


>
> Kenn
>
> On Fri, May 29, 2020 at 10:38 AM Chamikara Jayalath 
> wrote:
>
>> Thanks. I'll go ahead and add related Jiras to it.
>>
>> - Cham
>>
>> On Fri, May 29, 2020 at 9:49 AM Luke Cwik  wrote:
>>
>>> I have added the new component:
>>>
>>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20component%20%3D%20cross-language
>>>
>>> I didn't set a component owner for it and made the default assignee be
>>> the project default (unassigned). If you would like any of these to be
>>> changed, please let me know.
>>>
>>> On Fri, May 29, 2020 at 9:18 AM Chamikara Jayalath 
>>> wrote:
>>>
 Good point. "cross-language" sgtm.

 On Fri, May 29, 2020, 8:57 AM Kyle Weaver  wrote:

> Nit: can we name it `cross-language` instead of `xlang`?
> Component names auto-complete, so there's no reason to abbreviate.
>
> On Fri, May 29, 2020 at 11:54 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Thanks for the comments.
>>
>> Can someone please create a single Jira component named "xlang" ?
>> Looks like I don't have access to create components.
>>
>> Thanks,
>> Cham
>>
>> On Fri, May 29, 2020 at 7:24 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> +1 to new non split component.
>>>
>>> On 29 May 2020, at 07:19, Heejong Lee  wrote:
>>>
>>> If we use one meta component tag for all xlang related issues, I
>>> would prefer just "xlang". Then we could attach the "xlang" tag to not 
>>> only
>>> language specific sdk tags but also other runner tags e.g. ['xlang',
>>> 'io-java-kafka'], ['xlang'', 'runner-dataflow'].
>>>
>>> On Thu, May 28, 2020 at 7:49 PM Robert Burke 
>>> wrote:
>>>
 +1 to new component not split. The language concerns can be
 represented and filtered with the existing sdk tags. I know I'm 
 interested
 in all sdk-go issues, and would prefer not to have to union tags when
 searching for Go related issues.

 On Thu, 28 May 2020 at 15:48, Ismaël Mejía 
 wrote:

> +1 to new component not splitted
>
> Other use case is using libraries not available in your language
> e.g. using some python transform that relies in a python only API in 
> the
> middle of a Java pipeline.
>
>
> On Thu, May 28, 2020 at 11:12 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> I proposed three components since the audience might be
>> different. Also we can use the same component to track issues 
>> related to
>> all cross-language wrappers available in a given SDK. If this is too 
>> much a
>> single component is fine as well.
>>
>> Ashwin, as others pointed out, the cross-language transforms
>> framework is primarily for giving SDKs 

Re: JdbcIO for writing to Dynamic Schemas in Postgres

2020-05-31 Thread Jean-Baptiste Onofré
Did you create a jira about that already ?I will do the improvement on JdbcIO. Regards JBThanksRegards JBLe dim. 31 mai 2020 ? 11:25, Willem Pienaar  a ?crit :Hi Reuven,To be clear, we already have this solved for BigQueryIO. I am hoping there is a similar solution for JdbcIO.Regards,WillemOn Sun, May 31, 2020, at 12:42 PM, Reuven Lax wrote:This should be possible using the Beam programmatic API. You can pass BigQueryIO a function that determines the BigQuery table based on the input element.On Sat, May 30, 2020 at 9:20 PM Willem Pienaar  wrote:Hi JB,  Apologies for resurrecting this thread, but I have a related question.  We've built a feature store Feast (https://github.com/feast-dev/feast) primarily on Beam. We have been very happy with our decision to use Beam thus far. Beam is mostly used as the ingestion layer that writes data into stores (BigQuery, Redis). I am currently implementing JdbcIO (for PostgreSQL) and it's working fine so far. I set up all the tables when the job is launched, and I write into different tables depending on the input elements.  However, a problem we are facing is that schema changes are happening very rapidly based on our users' activity. Every time the user changes a collection of features/fields, we have to launch a new Dataflow job in order to support the new database schema. This can take 3-4 minutes. Every time the jobs are in an updating state we have to block all user activity, which is quite disruptive.  What we want to do is dynamically configure the SQL insert statement based on the input elements. This would allow us to keep the same job running indefinitely, dramatically improving the user experience. We have found solutions for BigQueryIO and our other IO, but not yet for JdbcIO. As far as I can tell it isn't possible to modify the SQL insert statement to write to a new table or to the same table with new columns, without restarting the job.  Do you have any suggestions one how we can achieve the above? If it can't be done with the current implementation, would it be reasonable to contribute this functionality back to Beam?  Regards, Willem  On Tue, Mar 3, 2020, at 1:30 AM, Jean-Baptiste Onofre wrote: > Hi >  > You have the setPrepareStatement() method where you define the target tables. > However, it?s in the same database (datasource) per pipeline. >  > You can define several datasources and use a different datasource in  > each JdbcIO write. Meaning that you can divide in sub pipelines. >  > Regards > JB >  > > Le 29 f?vr. 2020 ? 17:52, Vasu Gupta  a ?crit : > >  > > Hey folks, > >  > > Can we use JdbcIO for writing data to multiple Schemas(For Postgres Database) dynamically using Apache beam Java Framework? Currently, I can't find any property that I could set to JdbcIO transform for providing schema or maybe I am missing something. > >  > > Thanks >  >

Re: JdbcIO for writing to Dynamic Schemas in Postgres

2020-05-31 Thread Reuven Lax
It doesn't look to me like JdbcIO currently supports dynamic destinations.
I think it wouldn't be too hard to add this functionality. If you wanted to
help contribute this change to JdbcIO.java, I'm sure that we would be happy
to help guide you.

Reuven

On Sun, May 31, 2020 at 2:26 AM Willem Pienaar  wrote:

> Hi Reuven,
>
> To be clear, we already have this solved for BigQueryIO. I am hoping there
> is a similar solution for JdbcIO.
>
> Regards,
> Willem
>
> On Sun, May 31, 2020, at 12:42 PM, Reuven Lax wrote:
>
> This should be possible using the Beam programmatic API. You can pass
> BigQueryIO a function that determines the BigQuery table based on the input
> element.
>
> On Sat, May 30, 2020 at 9:20 PM Willem Pienaar  wrote:
>
> Hi JB,
>
> Apologies for resurrecting this thread, but I have a related question.
>
> We've built a feature store Feast (https://github.com/feast-dev/feast)
> primarily on Beam. We have been very happy with our decision to use Beam
> thus far. Beam is mostly used as the ingestion layer that writes data into
> stores (BigQuery, Redis). I am currently implementing JdbcIO (for
> PostgreSQL) and it's working fine so far. I set up all the tables when the
> job is launched, and I write into different tables depending on the input
> elements.
>
> However, a problem we are facing is that schema changes are happening very
> rapidly based on our users' activity. Every time the user changes a
> collection of features/fields, we have to launch a new Dataflow job in
> order to support the new database schema. This can take 3-4 minutes. Every
> time the jobs are in an updating state we have to block all user activity,
> which is quite disruptive.
>
> What we want to do is dynamically configure the SQL insert statement based
> on the input elements. This would allow us to keep the same job running
> indefinitely, dramatically improving the user experience. We have found
> solutions for BigQueryIO and our other IO, but not yet for JdbcIO. As far
> as I can tell it isn't possible to modify the SQL insert statement to write
> to a new table or to the same table with new columns, without restarting
> the job.
>
> Do you have any suggestions one how we can achieve the above? If it can't
> be done with the current implementation, would it be reasonable to
> contribute this functionality back to Beam?
>
> Regards,
> Willem
>
> On Tue, Mar 3, 2020, at 1:30 AM, Jean-Baptiste Onofre wrote:
> > Hi
> >
> > You have the setPrepareStatement() method where you define the target
> tables.
> > However, it’s in the same database (datasource) per pipeline.
> >
> > You can define several datasources and use a different datasource in
> > each JdbcIO write. Meaning that you can divide in sub pipelines.
> >
> > Regards
> > JB
> >
> > > Le 29 févr. 2020 à 17:52, Vasu Gupta  a
> écrit :
> > >
> > > Hey folks,
> > >
> > > Can we use JdbcIO for writing data to multiple Schemas(For Postgres
> Database) dynamically using Apache beam Java Framework? Currently, I can't
> find any property that I could set to JdbcIO transform for providing schema
> or maybe I am missing something.
> > >
> > > Thanks
> >
> >
>
>
>


Re: DoFnSignature#isStateful deprecated

2020-05-31 Thread Jan Lukavský

On 5/30/20 5:39 AM, Kenneth Knowles wrote:
Agree to delete them, though for different reasons. I think this code 
comes from a desire to have methods that can be called on a DoFn 
directly. And from reviewing the code history I think they are copied 
in from another class. So that's why they are the way they are. 
Accepting a DoFnSignature would be more appropriate to the 
"plural-class-name companion class" pattern. But I doubt the perf 
impact of this is ever measurable, and of course not relative to a big 
data processing job. If we really wanted the current API, a cache is 
trivial, but also not important so we shouldn't add one.


Reason I think they should be deleted:
1. They seem to exist as a shortcut to people don't forget to call 
both DoFnSignatures#usesState and DoFnSignatures#usesTimers [1]. But 
now if another relevant method is added, the new method doesn't 
include it, so the problem of not forgetting to call all relevant 
methods is not solved.


There are multiple ways runners test for "statefulness" of a DoFn. Some 
use DoFnSignature#usesState(), some DoFnSignatures#usesState(), some 
DoFnSignatures#isStateful() and some even 
DoFnSignature.stateDeclarations() > 0. Having so many ways for a simple 
check that DoFn needs to be executed as a stateful seems to be suboptimal.


I don't see anything weird on definition of "stateful dofn", which is 
any DoFn, that has the following requirements:


 a) is keyed

 b) requires shuffling same keys to same workers

 c) requires support for both state and timers

2. They seem incorrect [2]. Just because something requires time 
sorted input *does not* mean it uses bag state.


Yes, this is unfortunate. What makes the DoFn use bag state is "when the 
runner executes the DoFn using default expansion". I agree this is not 
the same, but the correct solution seems again routed to the discussion 
about pipeline requirements vs. runner capabilities vs. default and 
overridden expansions. It would be better to use the standard expansion 
mechanism, but AFAIK it is not possible currently, because it is not 
possible to simply wrap two stateful dofns one inside another (that 
would require dynamic states).


Jan



Kenn

[1] 
https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2432
[2] 
https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2449


On Fri, May 29, 2020 at 8:46 AM Luke Cwik > wrote:


To go back to your original question.

I would remove the static convenience methods in DoFnSignatures
since they construct a DoFnSignature and then throw it away. This
construction is pretty involved, nothing as large as an IO call
but it would become noticeable if it was abused. We can already
see that it is being used multiple times in a row [1, 2].

Runners should create their own derived properties based upon
knowledge of how they are implemented and we shouldn't create
derived properties for different concepts (e.g. merging isStateful
and @RequiresTimeSortedInput). If there is a common implementation
that is shared across multiple runners, it could "translate" a
DoFnSignature based upon how it is implemented and/or define its
own thing.

1:

https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java#L61
2:

https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java#L73

On Wed, May 27, 2020 at 3:16 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Right, this might be about a definition of what these methods
really should return. Currently, the most visible issue is
[1]. When a DoFn has no state or timer, but is annotated with
@RequiresTimeSortedInput this annotation is silently ignored,
because DoFnSignature#usesState returns false and the ParDo is
executed as stateless.

I agree that there are two points - what user declares and
what runner effectively needs to execute a DoFn. Another
complication to this is that what runner needs might depend
not only on the DoFn itself, but on other conditions - e.g.
RequiresTimeSortedInput does not require any state or timer in
bounded case, when runner can presort the data. There might be
additional inputs to this decision as well.

I don't quite agree that DoFnSignature#isStateful is a bad
name - when a DoFn has only timer and no state, it is still
stateful, although usesState should return false. Or we would
have to declare 

Re: DoFnSignature#isStateful deprecated

2020-05-31 Thread Jan Lukavský

Answers inline.

On 5/29/20 5:46 PM, Luke Cwik wrote:

To go back to your original question.

I would remove the static convenience methods in DoFnSignatures since 
they construct a DoFnSignature and then throw it away. This 
construction is pretty involved, nothing as large as an IO call but it 
would become noticeable if it was abused. We can already see that it 
is being used multiple times in a row [1, 2].
There should be no performance implications of this, as there is cache 
involved [1].


Runners should create their own derived properties based upon 
knowledge of how they are implemented and we shouldn't create derived 
properties for different concepts (e.g. merging isStateful and 
@RequiresTimeSortedInput). If there is a common implementation that is 
shared across multiple runners, it could "translate" a DoFnSignature 
based upon how it is implemented and/or define its own thing.
The problem here is that in order to use a common implementation a 
runner must know that it should use it (in this specific case to use 
StatefulDoFnRunner instead of plain SimpleDoFnRunner). This might 
slightly relate to discussion about pipeline requirements vs. runner 
capabilities, although from a different perspective.


[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L262


Jan



On Wed, May 27, 2020 at 3:16 AM Jan Lukavský > wrote:


Right, this might be about a definition of what these methods
really should return. Currently, the most visible issue is [1].
When a DoFn has no state or timer, but is annotated with
@RequiresTimeSortedInput this annotation is silently ignored,
because DoFnSignature#usesState returns false and the ParDo is
executed as stateless.

I agree that there are two points - what user declares and what
runner effectively needs to execute a DoFn. Another complication
to this is that what runner needs might depend not only on the
DoFn itself, but on other conditions - e.g.
RequiresTimeSortedInput does not require any state or timer in
bounded case, when runner can presort the data. There might be
additional inputs to this decision as well.

I don't quite agree that DoFnSignature#isStateful is a bad name -
when a DoFn has only timer and no state, it is still stateful,
although usesState should return false. Or we would have to
declare timer a state, which would be even more confusing
(although it might be technically correct).

[1] https://issues.apache.org/jira/browse/BEAM-10072

On 5/27/20 1:21 AM, Luke Cwik wrote:

I believe DoFnSignature#isStateful is remnant of a bad API name
choice and was renamed to usesState. I would remove
DoFnSignature#isStateful as it does not seem to be used anywhere.

Does DoFnSignatures#usesValueState return true if the DoFn says
it needs @RequiresTimeSortedInput because of how a DoFn is being
"wrapped" with a stateful DoFn that provides the time sorting
functionality?

That doesn't seem right since I would have always expected that
DoFnSignature(s) should be about the DoFn passed in and not about
the implementation details that a runner might be using in how it
provides @RequiresTimeSortedInput.

(similarly for
DoFnSignatures#usesBagState, DoFnSignatures#usesWatermarkHold, 
DoFnSignatures#usesTimers, DoFnSignatures#usesState)




On Mon, May 25, 2020 at 2:31 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi,

I have come across issue with multiple way of getting a
meaningful flags
for DoFns. We have

  a) DoFnSignature#{usesState,usesTimers,isStateful,...}, and

  b) DoFnSignatures#{usesState,usesTimers,isStateful,...}

These two might not (and actually are not) aligned with each
other. That
can be solved quite easily (removing any logic from
DoFnSignatures and
put it to DoFnSignature), but what I'm not sure is why
DoFnSignature#isStateful is deprecated in favor of
DoFnSignature#usesState. In my understanding, it should hold
that
`isStateful() iff usesState() || usesTimers()`, which means
these two
should not be used interchangeably. I'd suggest to
undeprecate the
`DoFnSignature#isStateful` and align the various (static and
non-static)
versions of these calls.

Thoughts?

  Jan



Re: dealing with late data output timestamps

2020-05-31 Thread Jan Lukavský
Minor self-correction - in the property c) it MIGHT be possible to 
update output watermark to time greater than input watermark, _as long 
as any future element cannot be assigned timestamp that is less than the 
output watermark_. That seems to be the case only for 
TimestampCombiner.END_OF_WINDOW, as that actually does not depend on 
timestamps of the actual elements. This doesn't quite change the 
reasoning below, it might be possible to not store input watermark to 
watermark hold for this combiner, although it would probably have 
negligible practical impact.


On 5/31/20 12:17 PM, Jan Lukavský wrote:


Hi Reuven,

the asynchronicity of watermark update is what I was missing - it is 
what relates watermarkhold with element output timestamp. On the other 
hand, we have some invariants that have to hold, namely:


 a) element arriving as non-late MUST NOT be changed to late

 b) element arriving as late MIGHT be changed to non-late

 c) operator's output watermark MUST be less than input watermark at 
any time


Properties a) and b) are somewhat natural requirements and property c) 
follows the fact, that it is impossible to exactly predict future.


Now, having these three properties, would it be possible to:

 a) when pane contains both late and on-time elements, split the pane 
into two, containing only late and on-time elements


 b) calculate output timestamp of all panes using timestamp combiner 
(now pane contains only late or on time elements, so no timestamp 
combiner should be able to violate neither of properties a) or b))


 c) calculate when there is pane that contains only late elements, 
update watermark hold to min(current input watermark, window gc time) 
- so that the output watermark can progress up to input watermark (and 
not violate property c) above)


I seems to me that what currently stands in the way is that

 a) panes are not split to late and non-late only (and this might be 
tricky, mostly for combining transforms)


 b) the watermark hold with late-only pane is set to window gc time 
(instead of adding the input watermark as well) - [1]


With TimestampCombiner.LATEST and END_OF_WINDOW it seems that 
splitting the panes would not be required, as the timestamp combiner 
can only shift late elements forward (make use of property b)). 
TimestampCombiner.EARLIEST would probably require splitting the panes, 
which seems to solve the mentioned [BEAM-2262].


WDYT?

[1] 
https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f


On 5/29/20 5:01 PM, Reuven Lax wrote:
This does seem non intuitive, though I'm not sure what the best 
approach is.


The problem with using currentOutputWatermark as the output timestamp 
is that Beam does not define watermark advancement to be synchronous, 
and at least the Dataflow runner calculates watermarks completely 
independent of bundle processing. This means that the output 
watermark could advance immediately after being checked, which would 
cause the records output to be arbitrarily late. So for example, if 
allowedLateness is 10 seconds, then this trigger will accept a record 
that is 5 seconds late. However if currentOutputWaternark advances by 
15 seconds after checking it, then you would end up outputting a 
result that is 15 seconds late and therefore would be dropped.


IMO it's most important that on-time elements are never turned into 
late. elements. However the above behavior also seems confusing to users.


Worth noting that I don't think that the current behavior is that 
much better. If the output watermark is close to the end of the 
window, then I think the existing model can also cause this scenario 
to happen.


Reuven

On Fri, May 29, 2020 at 12:54 AM Jan Lukavský > wrote:


Hi,

what seems the most "surprising" to me is that we are using
TimestampCombiners to actually do two (orthogonal) things:

 a) calculate a watermark hold for a window, so on-time elements
emitted from a pane are not late in downstream processing

 b) calculate timestamp of elements in output pane

These two follow a little different constraints - while in case
a) it is not allowed to shift watermark "back in time" in case b)
it seems OK to output data with timestamp lower than output
watermark (what comes late, might leave late). So, while it seems
OK to discard late elements for the sake of calculation output
watermark, it seems wrong to discard them when calculating output
timestamp. Maybe these two timestamps might be held in different
states (the state will be held until GC time for accumulating
panes and reset on discarding panes)?

Jan

On 5/28/20 5:02 PM, David Morávek wrote:

Hi,

I've came across "unexpected" model behaviour when dealing with
late data and custom timestamp combiners. Let's take a following
pipeline as an example:

final PCollection input = ...;
input.apply(
      "GlobalWindows",
   

Re: dealing with late data output timestamps

2020-05-31 Thread Jan Lukavský

Hi Reuven,

the asynchronicity of watermark update is what I was missing - it is 
what relates watermarkhold with element output timestamp. On the other 
hand, we have some invariants that have to hold, namely:


 a) element arriving as non-late MUST NOT be changed to late

 b) element arriving as late MIGHT be changed to non-late

 c) operator's output watermark MUST be less than input watermark at 
any time


Properties a) and b) are somewhat natural requirements and property c) 
follows the fact, that it is impossible to exactly predict future.


Now, having these three properties, would it be possible to:

 a) when pane contains both late and on-time elements, split the pane 
into two, containing only late and on-time elements


 b) calculate output timestamp of all panes using timestamp combiner 
(now pane contains only late or on time elements, so no timestamp 
combiner should be able to violate neither of properties a) or b))


 c) calculate when there is pane that contains only late elements, 
update watermark hold to min(current input watermark, window gc time) - 
so that the output watermark can progress up to input watermark (and not 
violate property c) above)


I seems to me that what currently stands in the way is that

 a) panes are not split to late and non-late only (and this might be 
tricky, mostly for combining transforms)


 b) the watermark hold with late-only pane is set to window gc time 
(instead of adding the input watermark as well) - [1]


With TimestampCombiner.LATEST and END_OF_WINDOW it seems that splitting 
the panes would not be required, as the timestamp combiner can only 
shift late elements forward (make use of property b)). 
TimestampCombiner.EARLIEST would probably require splitting the panes, 
which seems to solve the mentioned [BEAM-2262].


WDYT?

[1] 
https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f


On 5/29/20 5:01 PM, Reuven Lax wrote:
This does seem non intuitive, though I'm not sure what the best 
approach is.


The problem with using currentOutputWatermark as the output timestamp 
is that Beam does not define watermark advancement to be synchronous, 
and at least the Dataflow runner calculates watermarks completely 
independent of bundle processing. This means that the output watermark 
could advance immediately after being checked, which would cause the 
records output to be arbitrarily late. So for example, if 
allowedLateness is 10 seconds, then this trigger will accept a record 
that is 5 seconds late. However if currentOutputWaternark advances by 
15 seconds after checking it, then you would end up outputting a 
result that is 15 seconds late and therefore would be dropped.


IMO it's most important that on-time elements are never turned into 
late. elements. However the above behavior also seems confusing to users.


Worth noting that I don't think that the current behavior is that much 
better. If the output watermark is close to the end of the window, 
then I think the existing model can also cause this scenario to happen.


Reuven

On Fri, May 29, 2020 at 12:54 AM Jan Lukavský > wrote:


Hi,

what seems the most "surprising" to me is that we are using
TimestampCombiners to actually do two (orthogonal) things:

 a) calculate a watermark hold for a window, so on-time elements
emitted from a pane are not late in downstream processing

 b) calculate timestamp of elements in output pane

These two follow a little different constraints - while in case a)
it is not allowed to shift watermark "back in time" in case b) it
seems OK to output data with timestamp lower than output watermark
(what comes late, might leave late). So, while it seems OK to
discard late elements for the sake of calculation output
watermark, it seems wrong to discard them when calculating output
timestamp. Maybe these two timestamps might be held in different
states (the state will be held until GC time for accumulating
panes and reset on discarding panes)?

Jan

On 5/28/20 5:02 PM, David Morávek wrote:

Hi,

I've came across "unexpected" model behaviour when dealing with
late data and custom timestamp combiners. Let's take a following
pipeline as an example:

final PCollection input = ...;
input.apply(
      "GlobalWindows",
      Window.into(new GlobalWindows())
          .triggering(
              AfterWatermark.pastEndOfWindow()
                  .withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10
.withTimestampCombiner(TimestampCombiner.LATEST)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
          .accumulatingFiredPanes())
  .apply("Aggregate", Count.perElement())

The above pipeline emits updates with the latest input timestamp
it has seen so far (from non-late elements). We write the output
from this 

Re: JdbcIO for writing to Dynamic Schemas in Postgres

2020-05-31 Thread Willem Pienaar
Hi Reuven,

To be clear, we already have this solved for BigQueryIO. I am hoping there is a 
similar solution for JdbcIO.

Regards,
Willem

On Sun, May 31, 2020, at 12:42 PM, Reuven Lax wrote:
> This should be possible using the Beam programmatic API. You can pass 
> BigQueryIO a function that determines the BigQuery table based on the input 
> element.
> 
> On Sat, May 30, 2020 at 9:20 PM Willem Pienaar  wrote:
>> Hi JB,
>> 
>>  Apologies for resurrecting this thread, but I have a related question.
>> 
>>  We've built a feature store Feast (https://github.com/feast-dev/feast) 
>> primarily on Beam. We have been very happy with our decision to use Beam 
>> thus far. Beam is mostly used as the ingestion layer that writes data into 
>> stores (BigQuery, Redis). I am currently implementing JdbcIO (for 
>> PostgreSQL) and it's working fine so far. I set up all the tables when the 
>> job is launched, and I write into different tables depending on the input 
>> elements.
>> 
>>  However, a problem we are facing is that schema changes are happening very 
>> rapidly based on our users' activity. Every time the user changes a 
>> collection of features/fields, we have to launch a new Dataflow job in order 
>> to support the new database schema. This can take 3-4 minutes. Every time 
>> the jobs are in an updating state we have to block all user activity, which 
>> is quite disruptive.
>> 
>>  What we want to do is dynamically configure the SQL insert statement based 
>> on the input elements. This would allow us to keep the same job running 
>> indefinitely, dramatically improving the user experience. We have found 
>> solutions for BigQueryIO and our other IO, but not yet for JdbcIO. As far as 
>> I can tell it isn't possible to modify the SQL insert statement to write to 
>> a new table or to the same table with new columns, without restarting the 
>> job.
>> 
>>  Do you have any suggestions one how we can achieve the above? If it can't 
>> be done with the current implementation, would it be reasonable to 
>> contribute this functionality back to Beam?
>> 
>>  Regards,
>>  Willem
>> 
>>  On Tue, Mar 3, 2020, at 1:30 AM, Jean-Baptiste Onofre wrote:
>>  > Hi
>>  > 
>>  > You have the setPrepareStatement() method where you define the target 
>> tables.
>>  > However, it’s in the same database (datasource) per pipeline.
>>  > 
>>  > You can define several datasources and use a different datasource in 
>>  > each JdbcIO write. Meaning that you can divide in sub pipelines.
>>  > 
>>  > Regards
>>  > JB
>>  > 
>>  > > Le 29 févr. 2020 à 17:52, Vasu Gupta  a écrit :
>>  > > 
>>  > > Hey folks,
>>  > > 
>>  > > Can we use JdbcIO for writing data to multiple Schemas(For Postgres 
>> Database) dynamically using Apache beam Java Framework? Currently, I can't 
>> find any property that I could set to JdbcIO transform for providing schema 
>> or maybe I am missing something.
>>  > > 
>>  > > Thanks
>>  > 
>>  >