Re: [VOTE] Release 2.26.0, release candidate #1

2020-12-08 Thread Robert Burke
I'm +1 on RC1 based on the 7 tests I know I can check successfully.  I'll
be trying more tomorrow, but remember that release validation requires the
community to validate it meets our standards, and I can't do it alone.

Remember you can participate in the release validation by reviewing parts
of the documentation being published as well, not just by running the
Pyhton and Java artifacts.

 If you have contributed new python or java docs into this release, they'll
appear in the to be published docs.

Cheers,
Robert Burke
2.26.0 release manager

On Mon, Dec 7, 2020, 6:25 PM Robert Burke  wrote:

> Turns out no changes required affecting the dataflow artifacts this time
> around, so Dataflow is cleared for testing.
>
> Cheers.
> Robert Burke
> 2.26.0 Release Manager
>
> On Mon, Dec 7, 2020, 6:03 PM Robert Burke  wrote:
>
>>
>> Robert Burke 
>> Thu, Dec 3, 8:01 PM (4 days ago)
>> to dev
>> Hi everyone,
>> Please review and vote on the release candidate #1 for the version
>> 2.26.0, as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>>
>> Reviewers are encouraged to test their own use cases with the release
>> candidate, and vote +1
>>  if no issues are found.
>>
>> The complete staging area is available for your review, which includes:
>> * JIRA release notes [1],
>> * the official Apache source release to be deployed to dist.apache.org [2],
>> which is signed with the key with fingerprint
>> A52F5C83BAE26160120EC25F3D56ACFBFB2975E1 [3],
>> * all artifacts to be deployed to the Maven Central Repository [4],
>> * source code tag "v2.26.0-RC1" [5],
>> * website pull request listing the release [6], publishing the API
>> reference manual [7], and the blog post [8].
>> * Java artifacts were built with Maven 3.6.0 and OpenJDK 1.8.0_275.
>> * Python artifacts are deployed along with the source release to the
>> dist.apache.org [2].
>> * Validation sheet with a tab for 2.26.0 release to help with validation
>> [9].
>> * Docker images published to Docker Hub [10].
>>
>> The vote will be open for at least 72 hours (10th ~6pm PST). It is
>> adopted by majority approval, with at least 3 PMC affirmative votes.
>>
>> Thanks,
>> Robert Burke
>> 2.26.0 Release Manager
>>
>> [1]
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12348833
>> [2] https://dist.apache.org/repos/dist/dev/beam/2.26.0/
>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> [4] https://repository.apache.org/content/repositories/org apache
>> beam-1144/
>> [5] https://github.com/apache/beam/tree/v2.26.0-RC1
>> [6] https://github.com/apache/beam/pull/13481
>> [7] https://github.com/apache/beam-site/pull/609
>> [8] https://github.com/apache/beam/pull/13482
>> [9]
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=475997301
>> [10] https://hub.docker.com/search?q=apache%2Fbeam=image
>>
>> PS. New Dataflow artifacts likely need to be built and published but this
>> doesn't block vetting the remainder of the RC at this time. Thank you for
>> your patience.
>>
>>


Re: Proposal: Scheduled tasks

2020-12-08 Thread Chad Dombrova
Thanks!

On Tue, Dec 8, 2020 at 6:54 AM Pablo Estrada  wrote:

> Hi Chad!
> I've been meaning to review this, I've just not carved up the time. I'll
> try to get back to you this week with some thoughts!
> Thanks!
> -P.
>
> On Wed, Dec 2, 2020 at 10:31 AM Chad Dombrova  wrote:
>
>> Hi everyone,
>> Beam's niche is low latency, high throughput workloads, but Beam has
>> incredible promise as an orchestrator of long running work that gets sent
>> to a scheduler.  We've created a modified version of Beam that allows the
>> python SDK worker to outsource tasks to a scheduler, like Kubernetes
>> batch jobs[1], Argo[2], or Google's own OpenCue[3].
>>
>> The basic idea is that any element in a stream can be tagged to be
>> executed outside of the normal SdkWorker as an atomic "task".  A task is
>> one invocation of a stage, composed of one or more DoFns, against one a
>> slice of the data stream, composed of one or more tagged elements.   The
>> upshot is that we're able to slice up the processing of a stream across
>> potentially *many* workers, with the trade-off being the added overhead
>> of starting up a worker process for each task.
>>
>> For more info on how we use our modified version of Beam to make visual
>> effects for feature films, check out the talk[4] I gave at the Beam Summit.
>>
>> Here's our design doc:
>>
>> https://docs.google.com/document/d/1GrAvDWwnR1QAmFX7lnNA7I_mQBC2G1V2jE2CZOc6rlw/edit?usp=sharing
>>
>> And here's the github branch:
>> https://github.com/LumaPictures/beam/tree/taskworker_public
>>
>> Looking forward to your feedback!
>> -chad
>>
>>
>> [1] https://kubernetes.io/docs/concepts/workloads/controllers/job/
>> [2] https://argoproj.github.io/
>> [3] https://cloud.google.com/opencue
>> [4] https://www.youtube.com/watch?v=gvbQI3I03a8_channel=ApacheBeam
>>
>>


Re: Use case for reading to dynamic Pub/Sub subscriptions?

2020-12-08 Thread Vincent Marquez
KafkaIO has a readAll method that returns a
PTransform, PCollection> is that
what you mean? Then it could read in a 'dynamic' number of topics generated
from somewhere else.  Is that what you mean?

*~Vincent*


On Tue, Dec 8, 2020 at 5:15 PM Daniel Collins  wrote:

> /s/Combine/Flatten
>
> On Tue, Dec 8, 2020 at 8:06 PM Daniel Collins 
> wrote:
>
>> Hi all,
>>
>> I'm trying to figure out if there's any possible use for reading from a
>> dynamic set of Pub/Sub [Lite] subscriptions in a beam pipeline, although
>> the same logic would apply to kafka topics. Does anyone know of a use case
>> where you'd want to apply the same set of processing logic to all messages
>> on a set of topics, but, you wouldn't know that set of topics when the
>> pipeline is started? (otherwise you could just use Combine).
>>
>> -Dan
>>
>


Re: Use case for reading to dynamic Pub/Sub subscriptions?

2020-12-08 Thread Daniel Collins
/s/Combine/Flatten

On Tue, Dec 8, 2020 at 8:06 PM Daniel Collins  wrote:

> Hi all,
>
> I'm trying to figure out if there's any possible use for reading from a
> dynamic set of Pub/Sub [Lite] subscriptions in a beam pipeline, although
> the same logic would apply to kafka topics. Does anyone know of a use case
> where you'd want to apply the same set of processing logic to all messages
> on a set of topics, but, you wouldn't know that set of topics when the
> pipeline is started? (otherwise you could just use Combine).
>
> -Dan
>


Use case for reading to dynamic Pub/Sub subscriptions?

2020-12-08 Thread Daniel Collins
Hi all,

I'm trying to figure out if there's any possible use for reading from a
dynamic set of Pub/Sub [Lite] subscriptions in a beam pipeline, although
the same logic would apply to kafka topics. Does anyone know of a use case
where you'd want to apply the same set of processing logic to all messages
on a set of topics, but, you wouldn't know that set of topics when the
pipeline is started? (otherwise you could just use Combine).

-Dan


Re: Throttling stream outputs per trigger?

2020-12-08 Thread Boyuan Zhang
I think your understanding is correct. Does the CommitOffset transform have
side-effects on your pipeline?

On Tue, Dec 8, 2020 at 3:35 PM Vincent Marquez 
wrote:

>
> *~Vincent*
>
>
> On Tue, Dec 8, 2020 at 3:13 PM Boyuan Zhang  wrote:
>
>> Please note that each record output from ReadFromKafkaDoFn is in a
>> GlobalWindow. The workflow is:
>> ReadFromKafkaDoFn -> Reshuffle -> Window.into(FixedWindows) ->
>> Max.longsPerKey -> CommitDoFn
>>|
>>---> downstream consumers
>>
>> but won't there still be 5 commits that happen as fast as possible for
>>> each of the windows that were constructed from the initial fetch?
>>
>> I'm not sure what you mean here. Would you like to elaborate more on your
>> questions?
>>
>
> Sure, I'll try to explain, it's very possible I just am misunderstanding
> Windowing here.
>
> Assumption 1:  Windowing works on the output timestamp.
> Assumption 2:  Max.longsPerKey will fire as fast as it can, in other
> words, there is no throttling.
>
> So, if we have a topic that has the following msgs:
> msg | timestamp (mm,ss)
> ---
>A  |  01:00
>B  |  01:01
>D  |  06:00
>E  |  06:04
>F  |  12:02
>
> and we read them all at once, we will have one window that contains [A,B]
> and another one that has [D,E], and a third that has [F].  Once we get the
> max offset for all three, won't they fire back to back without delay? So F
> will fire as soon as E is finished committing, which fires immediately
> after B is committed?
>
>


Re: Throttling stream outputs per trigger?

2020-12-08 Thread Vincent Marquez
*~Vincent*


On Tue, Dec 8, 2020 at 3:13 PM Boyuan Zhang  wrote:

> Please note that each record output from ReadFromKafkaDoFn is in a
> GlobalWindow. The workflow is:
> ReadFromKafkaDoFn -> Reshuffle -> Window.into(FixedWindows) ->
> Max.longsPerKey -> CommitDoFn
>|
>---> downstream consumers
>
> but won't there still be 5 commits that happen as fast as possible for
>> each of the windows that were constructed from the initial fetch?
>
> I'm not sure what you mean here. Would you like to elaborate more on your
> questions?
>

Sure, I'll try to explain, it's very possible I just am misunderstanding
Windowing here.

Assumption 1:  Windowing works on the output timestamp.
Assumption 2:  Max.longsPerKey will fire as fast as it can, in other words,
there is no throttling.

So, if we have a topic that has the following msgs:
msg | timestamp (mm,ss)
---
   A  |  01:00
   B  |  01:01
   D  |  06:00
   E  |  06:04
   F  |  12:02

and we read them all at once, we will have one window that contains [A,B]
and another one that has [D,E], and a third that has [F].  Once we get the
max offset for all three, won't they fire back to back without delay? So F
will fire as soon as E is finished committing, which fires immediately
after B is committed?


Re: Throttling stream outputs per trigger?

2020-12-08 Thread Boyuan Zhang
Please note that each record output from ReadFromKafkaDoFn is in a
GlobalWindow. The workflow is:
ReadFromKafkaDoFn -> Reshuffle -> Window.into(FixedWindows) ->
Max.longsPerKey -> CommitDoFn
   |
   ---> downstream consumers

but won't there still be 5 commits that happen as fast as possible for each
> of the windows that were constructed from the initial fetch?

I'm not sure what you mean here. Would you like to elaborate more on your
questions?

On Tue, Dec 8, 2020 at 1:46 PM Vincent Marquez 
wrote:

>
> *~Vincent*
>
>
> On Tue, Dec 8, 2020 at 1:34 PM Boyuan Zhang  wrote:
>
>> Hi Vicent,
>>
>> Window.into(FixedWindows.of(Duration.standardMinutes(5))) operation just
>> applies the window information to each element, not really does the
>> grouping operation. And in the commit transform, there is a combine
>> transform applied(Max.longsPerKey()).
>> Window.into(FixedWindows.of(Duration.standardMinutes(5))) + Max.longsPerKey()
>> means to output 1 element per 5 mins. This is different from your case
>> since the trigger in the CommitTransform is for the combine purpose.
>> And in order to prevent the data loss error you mentioned, there is a
>> persistent layer(Reshuffle) between Kafka read and any downstream
>> transform.
>>
>>
> Apologies, I don't understand how the delay would work here though.  If we
> have a kafka topic that has 100 messages in it, each with a timestamp one
> minute apart, that means we have 20 windows that will be generated from one
> possible fetch, outputted by the ReadFromKafkaDoFn.   I understand the
> Max.longsPerKey() will take the max per window, but won't there still be 5
> commits that happen as fast as possible for each of the windows that were
> constructed from the initial fetch?
>
>
>
>
>
>
>
>
>> For your case, will the pipeline like KafkaRead -> Reshuffle ->
>> GroupIntoBatches -> downstream help you?
>>
>> On Tue, Dec 8, 2020 at 1:19 PM Vincent Marquez 
>> wrote:
>>
>>> If this is the case that the pipeline has no way of enforcing fixed time
>>> windows, how does this work:
>>>
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126
>>>
>>> Isn't this supposed to only trigger every five minutes, regardless of
>>> how much data can immediately be grouped together in five minute windows?
>>> If there is a way to mark that the fixed window should only trigger every
>>> so many minutes, that would solve my use case.  If there isn't a way to do
>>> this, the Kafka offset code seems broken and could result in 'data loss' by
>>> improperly committing offsets before they are run through the rest of the
>>> pipeline?
>>>
>>> *~Vincent*
>>>
>>>
>>> On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels 
>>> wrote:
>>>
 > the downstream consumer has these requirements.

 Blocking should normally be avoided at all cost, but if the downstream
 operator has the requirement to only emit a fixed number of messages
 per
 second, it should enforce this, i.e. block once the maximum number of
 messages for a time period have been reached. This will automatically
 lead to backpressure in Runners like Flink or Dataflow.

 -Max

 On 07.10.20 18:30, Luke Cwik wrote:
 > SplittableDoFns apply to both batch and streaming pipelines. They are
 > allowed to produce an unbounded amount of data and can either self
 > checkpoint saying they want to resume later or the runner will ask
 them
 > to checkpoint via a split call.
 >
 > There hasn't been anything concrete on backpressure, there has been
 work
 > done about exposing signals[1] related to IO that a runner can then
 use
 > intelligently but throttling isn't one of them yet.
 >
 > 1:
 >
 https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
 > <
 https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
 >
 >
 > On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez
 > mailto:vincent.marq...@gmail.com>> wrote:
 >
 > Thanks for the response.  Is my understanding correct that
 > SplittableDoFns are only applicable to Batch pipelines?  I'm
 > wondering if there's any proposals to address backpressure needs?
 > /~Vincent/
 >
 >
 > On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik >>> > > wrote:
 >
 > There is no general back pressure mechanism within Apache Beam
 > (runners should be intelligent about this but there is
 currently
 > no way to say I'm being throttled so runners don't know that
 > throwing more CPUs at a problem won't make it go faster). Y
 >
 > You can control how quickly 

Re: Caching issue in BigQueryIO

2020-12-08 Thread Reuven Lax
How long does it take to rebuild? Even for thousands of tables I would not
expect it to take very long, unless you are hitting quota rate limits with
BigQuery. If that's the case, maybe a better solution is to see if those
quotas could be raised?

On Fri, Dec 4, 2020 at 9:57 AM Vasu Gupta  wrote:

> Hey Reuven, yes you are correct that BigQueryIO is working intended but
> the issue is that since it's a local cache, this cache will rebuild again
> from sratch when pipeline is redeployed which is very time consuming for
> thousands of table.
>
> On 2020/12/03 17:58:04, Reuven Lax  wrote:
> > What exactly is the issue? If the cache is empty, then BigQueryIO will
> try
> > and create the table again, and the creation will fail since the table
> > exists. This is working as intended.
> >
> > The only reason for the cache is so that BigQueryIO doesn't continuously
> > hammer BigQuery with creation requests every second.
> >
> > On Wed, Dec 2, 2020 at 3:20 PM Vasu Gupta 
> wrote:
> >
> > > Hey folks,
> > >
> > > While using BigQueryIO for 10k tables insertion, I found that it has an
> > > issue in it's local caching technique for table creation. Tables are
> first
> > > search in BigQueryIO's local cache and then checks whether to create a
> > > table or not. The main issue is when inserting to thousands of table:
> let's
> > > suppose we have 10k tables to insert in realtime and now since we will
> > > deploy a fresh dataflow pipeline once in a week, local cache will be
> empty
> > > and it will take a huge time just to build that cache for 10k tables
> even
> > > though these 10k tables were already created in BigQuery.
> > >
> > > The solution i could propose for this is we can provide an option for
> > > using external caching services like Redis/Memcached so that we don't
> have
> > > to rebuild cache again and again after a fresh deployment of pipeline.
> > >
> >
>


[PROPOSAL] Preparing for Beam release 2.27.0

2020-12-08 Thread Pablo Estrada
Hello everyone!

The next Beam release (2.27.0) is scheduled to be cut on December 16th
according to the release calendar [1].

I'd like to volunteer to handle this release. I plan on cutting the branch
on December 16th as scheduled. I'll keep updates on this thread.

Any comments or objections?

Thanks!
-P.
[1] https://calendar.google.com/calendar/u/0
/embed?src=0p73sl034k80oob7seouani...@group.calendar.google.com


Re: Implementing ARR_AGG

2020-12-08 Thread Robin Qiu
Hi Sonam, I replied directly to your draft PR. Please see me comments there
and let me know if that is helpful.

On Mon, Dec 7, 2020 at 4:37 AM Sonam Ramchand <
sonam.ramch...@venturedive.com> wrote:

> Hi Devs,
> I have tried to implement the ARR_AGG function for Zetasql dialect by
> following the STRING_AGG implementation (
> https://github.com/apache/beam/pull/11895).
> Draft PR for ARR_AGG is (https://github.com/apache/beam/pull/13483). When
> i try to run the test,
>
> @Test
> public void testArrayAggregation() {
>   String sql =
>   "SELECT ARRAY_AGG(x) AS array_agg\n" +
>   "FROM UNNEST([2, 1, -2, 3, -2, 1, 2]) AS x";  
> ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
>   BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
>   PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
> beamRelNode);  Schema schema = Schema.builder().addArrayField("array_field", 
> FieldType.of(Schema.TypeName.ARRAY)).build();
>   PAssert.that(stream)
>   .containsInAnyOrder(Row.withSchema(schema).addArray(2, 1, -2, 3, 
> -2, 1, 2).build());  
> pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
> }
>
> I am getting an error,
> type mismatch:
> aggCall type:
> BIGINT NOT NULL ARRAY NOT NULL
> inferred type:
> ARRAY NOT NULL
> java.lang.AssertionError: type mismatch:
> aggCall type:
> BIGINT NOT NULL ARRAY NOT NULL
> inferred type:
> ARRAY NOT NULL at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.util.Litmus$1.fail(Litmus.java:31)
> at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.plan.RelOptUtil.eq(RelOptUtil.java:1958)
> at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.rel.core.Aggregate.typeMatchesInferred(Aggregate.java:434)
> at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.rel.core.Aggregate.(Aggregate.java:159)
> at org.apache.beam.vendor.calcite.v1_20_0.org
> .apache.calcite.rel.logical.LogicalAggregate.(LogicalAggregate.java:65)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:113)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.AggregateScanConverter.convert(AggregateScanConverter.java:50)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:102)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.Collections$2.tryAdvance(Collections.java:4719)
> at java.util.Collections$2.forEachRemaining(Collections.java:4727)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:101)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convert(QueryStatementConverter.java:89)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertRootQuery(QueryStatementConverter.java:55)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:141)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:152)
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlDialectSpecTest.testArrayAggregation(ZetaSqlDialectSpecTest.java:4071)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:266)
> at 

Re: Throttling stream outputs per trigger?

2020-12-08 Thread Vincent Marquez
*~Vincent*


On Tue, Dec 8, 2020 at 1:34 PM Boyuan Zhang  wrote:

> Hi Vicent,
>
> Window.into(FixedWindows.of(Duration.standardMinutes(5))) operation just
> applies the window information to each element, not really does the
> grouping operation. And in the commit transform, there is a combine
> transform applied(Max.longsPerKey()).
> Window.into(FixedWindows.of(Duration.standardMinutes(5))) + Max.longsPerKey()
> means to output 1 element per 5 mins. This is different from your case
> since the trigger in the CommitTransform is for the combine purpose.
> And in order to prevent the data loss error you mentioned, there is a
> persistent layer(Reshuffle) between Kafka read and any downstream
> transform.
>
>
Apologies, I don't understand how the delay would work here though.  If we
have a kafka topic that has 100 messages in it, each with a timestamp one
minute apart, that means we have 20 windows that will be generated from one
possible fetch, outputted by the ReadFromKafkaDoFn.   I understand the
Max.longsPerKey() will take the max per window, but won't there still be 5
commits that happen as fast as possible for each of the windows that were
constructed from the initial fetch?








> For your case, will the pipeline like KafkaRead -> Reshuffle ->
> GroupIntoBatches -> downstream help you?
>
> On Tue, Dec 8, 2020 at 1:19 PM Vincent Marquez 
> wrote:
>
>> If this is the case that the pipeline has no way of enforcing fixed time
>> windows, how does this work:
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126
>>
>> Isn't this supposed to only trigger every five minutes, regardless of how
>> much data can immediately be grouped together in five minute windows?  If
>> there is a way to mark that the fixed window should only trigger every so
>> many minutes, that would solve my use case.  If there isn't a way to do
>> this, the Kafka offset code seems broken and could result in 'data loss' by
>> improperly committing offsets before they are run through the rest of the
>> pipeline?
>>
>> *~Vincent*
>>
>>
>> On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels 
>> wrote:
>>
>>> > the downstream consumer has these requirements.
>>>
>>> Blocking should normally be avoided at all cost, but if the downstream
>>> operator has the requirement to only emit a fixed number of messages per
>>> second, it should enforce this, i.e. block once the maximum number of
>>> messages for a time period have been reached. This will automatically
>>> lead to backpressure in Runners like Flink or Dataflow.
>>>
>>> -Max
>>>
>>> On 07.10.20 18:30, Luke Cwik wrote:
>>> > SplittableDoFns apply to both batch and streaming pipelines. They are
>>> > allowed to produce an unbounded amount of data and can either self
>>> > checkpoint saying they want to resume later or the runner will ask
>>> them
>>> > to checkpoint via a split call.
>>> >
>>> > There hasn't been anything concrete on backpressure, there has been
>>> work
>>> > done about exposing signals[1] related to IO that a runner can then
>>> use
>>> > intelligently but throttling isn't one of them yet.
>>> >
>>> > 1:
>>> >
>>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>>> > <
>>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>>> >
>>> >
>>> > On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez
>>> > mailto:vincent.marq...@gmail.com>> wrote:
>>> >
>>> > Thanks for the response.  Is my understanding correct that
>>> > SplittableDoFns are only applicable to Batch pipelines?  I'm
>>> > wondering if there's any proposals to address backpressure needs?
>>> > /~Vincent/
>>> >
>>> >
>>> > On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik >> > > wrote:
>>> >
>>> > There is no general back pressure mechanism within Apache Beam
>>> > (runners should be intelligent about this but there is
>>> currently
>>> > no way to say I'm being throttled so runners don't know that
>>> > throwing more CPUs at a problem won't make it go faster). Y
>>> >
>>> > You can control how quickly you ingest data for runners that
>>> > support splittable DoFns with SDK initiated checkpoints with
>>> > resume delays. A splittable DoFn is able to return
>>> > resume().withDelay(Duration.seconds(10)) from
>>> > the @ProcessElement method. See Watch[1] for an example.
>>> >
>>> > The 2.25.0 release enables more splittable DoFn features on
>>> more
>>> > runners. I'm working on a blog (initial draft[2], still mostly
>>> > empty) to update the old blog from 2017.
>>> >
>>> > 1:
>>> >
>>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>>> > 

Re: Throttling stream outputs per trigger?

2020-12-08 Thread Boyuan Zhang
Hi Vicent,

Window.into(FixedWindows.of(Duration.standardMinutes(5))) operation just
applies the window information to each element, not really does the
grouping operation. And in the commit transform, there is a combine
transform applied(Max.longsPerKey()).
Window.into(FixedWindows.of(Duration.standardMinutes(5))) + Max.longsPerKey()
means to output 1 element per 5 mins. This is different from your case
since the trigger in the CommitTransform is for the combine purpose.
And in order to prevent the data loss error you mentioned, there is a
persistent layer(Reshuffle) between Kafka read and any downstream
transform.

For your case, will the pipeline like KafkaRead -> Reshuffle ->
GroupIntoBatches -> downstream help you?

On Tue, Dec 8, 2020 at 1:19 PM Vincent Marquez 
wrote:

> If this is the case that the pipeline has no way of enforcing fixed time
> windows, how does this work:
>
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126
>
> Isn't this supposed to only trigger every five minutes, regardless of how
> much data can immediately be grouped together in five minute windows?  If
> there is a way to mark that the fixed window should only trigger every so
> many minutes, that would solve my use case.  If there isn't a way to do
> this, the Kafka offset code seems broken and could result in 'data loss' by
> improperly committing offsets before they are run through the rest of the
> pipeline?
>
> *~Vincent*
>
>
> On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels  wrote:
>
>> > the downstream consumer has these requirements.
>>
>> Blocking should normally be avoided at all cost, but if the downstream
>> operator has the requirement to only emit a fixed number of messages per
>> second, it should enforce this, i.e. block once the maximum number of
>> messages for a time period have been reached. This will automatically
>> lead to backpressure in Runners like Flink or Dataflow.
>>
>> -Max
>>
>> On 07.10.20 18:30, Luke Cwik wrote:
>> > SplittableDoFns apply to both batch and streaming pipelines. They are
>> > allowed to produce an unbounded amount of data and can either self
>> > checkpoint saying they want to resume later or the runner will ask them
>> > to checkpoint via a split call.
>> >
>> > There hasn't been anything concrete on backpressure, there has been
>> work
>> > done about exposing signals[1] related to IO that a runner can then use
>> > intelligently but throttling isn't one of them yet.
>> >
>> > 1:
>> >
>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>> > <
>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>> >
>> >
>> > On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez
>> > mailto:vincent.marq...@gmail.com>> wrote:
>> >
>> > Thanks for the response.  Is my understanding correct that
>> > SplittableDoFns are only applicable to Batch pipelines?  I'm
>> > wondering if there's any proposals to address backpressure needs?
>> > /~Vincent/
>> >
>> >
>> > On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik > > > wrote:
>> >
>> > There is no general back pressure mechanism within Apache Beam
>> > (runners should be intelligent about this but there is currently
>> > no way to say I'm being throttled so runners don't know that
>> > throwing more CPUs at a problem won't make it go faster). Y
>> >
>> > You can control how quickly you ingest data for runners that
>> > support splittable DoFns with SDK initiated checkpoints with
>> > resume delays. A splittable DoFn is able to return
>> > resume().withDelay(Duration.seconds(10)) from
>> > the @ProcessElement method. See Watch[1] for an example.
>> >
>> > The 2.25.0 release enables more splittable DoFn features on more
>> > runners. I'm working on a blog (initial draft[2], still mostly
>> > empty) to update the old blog from 2017.
>> >
>> > 1:
>> >
>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>> > <
>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>> >
>> > 2:
>> >
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
>> > <
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
>> >
>> >
>> >
>> > On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez
>> > mailto:vincent.marq...@gmail.com>>
>> > wrote:
>> >
>> > Hmm, I'm not sure how that will help, I understand how to
>> > batch up the data, but it is the triggering part that I
>> >  

Re: Throttling stream outputs per trigger?

2020-12-08 Thread Vincent Marquez
If this is the case that the pipeline has no way of enforcing fixed time
windows, how does this work:

https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126

Isn't this supposed to only trigger every five minutes, regardless of how
much data can immediately be grouped together in five minute windows?  If
there is a way to mark that the fixed window should only trigger every so
many minutes, that would solve my use case.  If there isn't a way to do
this, the Kafka offset code seems broken and could result in 'data loss' by
improperly committing offsets before they are run through the rest of the
pipeline?

*~Vincent*


On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels  wrote:

> > the downstream consumer has these requirements.
>
> Blocking should normally be avoided at all cost, but if the downstream
> operator has the requirement to only emit a fixed number of messages per
> second, it should enforce this, i.e. block once the maximum number of
> messages for a time period have been reached. This will automatically
> lead to backpressure in Runners like Flink or Dataflow.
>
> -Max
>
> On 07.10.20 18:30, Luke Cwik wrote:
> > SplittableDoFns apply to both batch and streaming pipelines. They are
> > allowed to produce an unbounded amount of data and can either self
> > checkpoint saying they want to resume later or the runner will ask them
> > to checkpoint via a split call.
> >
> > There hasn't been anything concrete on backpressure, there has been work
> > done about exposing signals[1] related to IO that a runner can then use
> > intelligently but throttling isn't one of them yet.
> >
> > 1:
> >
> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
> > <
> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
> >
> >
> > On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez
> > mailto:vincent.marq...@gmail.com>> wrote:
> >
> > Thanks for the response.  Is my understanding correct that
> > SplittableDoFns are only applicable to Batch pipelines?  I'm
> > wondering if there's any proposals to address backpressure needs?
> > /~Vincent/
> >
> >
> > On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik  > > wrote:
> >
> > There is no general back pressure mechanism within Apache Beam
> > (runners should be intelligent about this but there is currently
> > no way to say I'm being throttled so runners don't know that
> > throwing more CPUs at a problem won't make it go faster). Y
> >
> > You can control how quickly you ingest data for runners that
> > support splittable DoFns with SDK initiated checkpoints with
> > resume delays. A splittable DoFn is able to return
> > resume().withDelay(Duration.seconds(10)) from
> > the @ProcessElement method. See Watch[1] for an example.
> >
> > The 2.25.0 release enables more splittable DoFn features on more
> > runners. I'm working on a blog (initial draft[2], still mostly
> > empty) to update the old blog from 2017.
> >
> > 1:
> >
> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
> > <
> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
> >
> > 2:
> >
> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
> > <
> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
> >
> >
> >
> > On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez
> > mailto:vincent.marq...@gmail.com>>
> > wrote:
> >
> > Hmm, I'm not sure how that will help, I understand how to
> > batch up the data, but it is the triggering part that I
> > don't see how to do.  For example, in Spark Structured
> > Streaming, you can set a time trigger which happens at a
> > fixed interval all the way up to the source, so the source
> > can throttle how much data to read even.
> >
> > Here is my use case more thoroughly explained:
> >
> > I have a Kafka topic (with multiple partitions) that I'm
> > reading from, and I need to aggregate batches of up to 500
> > before sending a single batch off in an RPC call.  However,
> > the vendor specified a rate limit, so if there are more than
> > 500 unread messages in the topic, I must wait 1 second
> > before issuing another RPC call. When searching on Stack
> > Overflow I found this answer:
> > https://stackoverflow.com/a/57275557/25658
> >  

Re: Add emilymye to JIRA contributers

2020-12-08 Thread Pablo Estrada
Hi Emily,
I've checked and someone has added you.
Best
-P.

On Tue, Dec 8, 2020 at 10:18 AM Griselda Cuevas  wrote:

> Hi Emily, did you get access?
>
> On Mon, 7 Dec 2020 at 13:33, Emily Ye  wrote:
>
>> Hi dev@
>>
>> Just realized I didn't request contributor access to JIRA a while back -
>> would someone be able to add me so I can self-assign my issues?
>>
>> Thank you!
>> Emily
>>
>


Re: Docker Development Environment

2020-12-08 Thread Alex Kosolapov
Hi!



Thank you for creating Docker build environment - makes build environment setup 
so much easier!



I ran start-build-env.sh on a macOS, and I ran into some items that wanted to 
share + propose how to improve Docker build environment for macOS support:

  1.  ./start-build-env.sh: line 75: getent: command not found and script build 
error downstream.



Step 26/26 : RUN echo '. /scripts/beam_env_checks.sh' >> /root/.bash_aliases

 ---> Using cache

 ---> fe48b8b26e91

Successfully built fe48b8b26e91

Successfully tagged beam-build:latest

./start-build-env.sh: line 75: getent: command not found

Sending build context to Docker daemon   2.56kB

Step 1/10 : FROM beam-build

 ---> fe48b8b26e91

Step 2/10 : RUN rm -f /var/log/faillog /var/log/lastlog

 ---> Using cache

 ---> 11ea31f2099e

Step 3/10 : RUN groupadd --non-unique -g 100 alex

 ---> Using cache

 ---> cf3f56f51d9f

Step 4/10 : RUN groupmod -g  docker

 ---> Running in 5d5bc473be3b

groupmod: invalid group ID 'docker'

The command '/bin/bash -o pipefail -c groupmod -g  docker' returned a non-zero 
code: 3



I investigated the issues and found that start-build-env.sh expected group 
"docker" created in the system groups. Creating a docker group is an optional 
post install step (https://docs.docker.com/engine/install/linux-postinstall/), 
another relevant reference to 
forum gives a hint of 
using staff group on macOS. Solution that worked for me was pick another GID 
for DOCKER_GROUP_ID on Mac, e.g.

#DOCKER_GROUP_ID=$(getent group docker | cut -d':' -f3)



if [ "$(uname -s)" = "Linux" ]; then

DOCKER_GROUP_ID=$(getent group docker | cut -d':' -f3)

fi

if [ "$(uname -s)" = "Darwin" ]; then

DOCKER_GROUP_ID=1000

fi





  1.  Step 10/10 : RUN chown -R alex: /Users/alex/.cache

 ---> Running in f09e2bb0e045

chown: cannot access '/Users/alex/.cache': No such file or directory

The command '/bin/bash -o pipefail -c chown -R alex: /Users/alex/.cache' 
returned a non-zero code: 1



Fix for this was to change from HOME to DOCKER_HOME_DIR:

  RUN chown -R ${USER_NAME}:${GROUP_ID} 
${DOCKER_HOME_DIR}/.cache

ENV GOPATH ${DOCKER_HOME_DIR}/beam/sdks/go/examples/.gogradle/project_gopath



If these proposed solutions make sense I will create JIRA ticket and submit 
these improvements for Docker build support on macOS.



Thank you,

Alex Kosolapov


From: Brian Hulette 
Reply-To: "dev@beam.apache.org" 
Date: Friday, December 4, 2020 at 1:15 PM
To: dev 
Cc: Omar Ismail 
Subject: Re: Docker Development Environment

I think https://github.com/apache/beam/pull/13308 is about ready to merge. One 
question was whether or not to install pyenv in the container - I think we 
should try to do without it. Users of this environment will already be 
operating within a container, so they shouldn't need pyenv to create isolated 
python environments, they could just use the container's system python.

The two issues Alex mentioned are still outstanding, but they seem to be issues 
with ./gradlew check unrelated to the container. I filed BEAM-11402 [1] to 
track this separately.

[1] https://issues.apache.org/jira/browse/BEAM-11402

On Mon, Nov 30, 2020 at 11:43 AM Alex Amato 
mailto:ajam...@google.com>> wrote:
If any of these are suitable for at least some development. I propose we merge 
them, and update them with fixes later. Rather than trying to get things 100% 
working in the first PR.

Looks like this one was opened in early Sept, and never got merged. Which is a 
pretty long time. Perhaps abandoned for the later?
https://github.com/apache/beam/pull/12837

This one looks like its just failing on just a few tests (Which may be 
addressed soon, but the PR was opened 19 days ago).
https://github.com/apache/beam/pull/13308
(Can we set a deadline for this? And just say merge it by the end of the week, 
regardless if the last two tests can be fixed or not?)

(Would like to nudge this along, as it's been a pain point for many for a while 
now).

Thanks for the work here Niels, Omar and Sam.
Looking forward to giving this a try :)


On Mon, Nov 30, 2020 at 11:32 AM Brian Hulette 
mailto:bhule...@google.com>> wrote:
I agree this is a good idea. I remember my first experience with Beam 
development - I ran through the steps at [1] and had `./gradlew check` fail. I 
don't think I ever got it working before moving on and just running more 
specific tasks.
It would be great if we had a reliable way for new contributors to establish an 
environment that can successfully run `./gradlew check`.

Niels Basjes' PR (https://github.com/apache/beam/pull/13308) seems to be close 
to that, so I think we should focus on getting that working and iterate from 
there. Omar concurred with that in https://github.com/apache/beam/pull/12837.

[1] https://beam.apache.org/contribute/#development-setup


On Wed, Nov 25, 2020 at 3:39 PM Ahmet Altay 
mailto:al...@google.com>> wrote:

Getting Sprint Management rights on Jira

2020-12-08 Thread Griselda Cuevas
Hi folks, I'd like to request Sprint management rights on Jira to organize
the Website redesign work around the content creation part.

Could someone in the PMC help me with this?


Re: Add emilymye to JIRA contributers

2020-12-08 Thread Griselda Cuevas
Hi Emily, did you get access?

On Mon, 7 Dec 2020 at 13:33, Emily Ye  wrote:

> Hi dev@
>
> Just realized I didn't request contributor access to JIRA a while back -
> would someone be able to add me so I can self-assign my issues?
>
> Thank you!
> Emily
>


Re: Proposal: Scheduled tasks

2020-12-08 Thread Pablo Estrada
Hi Chad!
I've been meaning to review this, I've just not carved up the time. I'll
try to get back to you this week with some thoughts!
Thanks!
-P.

On Wed, Dec 2, 2020 at 10:31 AM Chad Dombrova  wrote:

> Hi everyone,
> Beam's niche is low latency, high throughput workloads, but Beam has
> incredible promise as an orchestrator of long running work that gets sent
> to a scheduler.  We've created a modified version of Beam that allows the
> python SDK worker to outsource tasks to a scheduler, like Kubernetes
> batch jobs[1], Argo[2], or Google's own OpenCue[3].
>
> The basic idea is that any element in a stream can be tagged to be
> executed outside of the normal SdkWorker as an atomic "task".  A task is
> one invocation of a stage, composed of one or more DoFns, against one a
> slice of the data stream, composed of one or more tagged elements.   The
> upshot is that we're able to slice up the processing of a stream across
> potentially *many* workers, with the trade-off being the added overhead
> of starting up a worker process for each task.
>
> For more info on how we use our modified version of Beam to make visual
> effects for feature films, check out the talk[4] I gave at the Beam Summit.
>
> Here's our design doc:
>
> https://docs.google.com/document/d/1GrAvDWwnR1QAmFX7lnNA7I_mQBC2G1V2jE2CZOc6rlw/edit?usp=sharing
>
> And here's the github branch:
> https://github.com/LumaPictures/beam/tree/taskworker_public
>
> Looking forward to your feedback!
> -chad
>
>
> [1] https://kubernetes.io/docs/concepts/workloads/controllers/job/
> [2] https://argoproj.github.io/
> [3] https://cloud.google.com/opencue
> [4] https://www.youtube.com/watch?v=gvbQI3I03a8_channel=ApacheBeam
>
>