Re: AvroIO.to(DynamicAvroDestinations) deprecated?

2022-09-13 Thread Steve Niemitz
Ah this is super useful context, thank you!  I can submit a couple PRs to
get AvroIO.sink up to parity if that's the way forward.

On Tue, Sep 13, 2022 at 2:53 PM John Casey via user 
wrote:

> Hi Steve,
>
> I've asked around, and it looks like this confusing state is due to a
> migration that isn't complete (and likely won't be until Beam 3.0).
>
> Here is the doc that explains some of the history:
> https://docs.google.com/document/d/1zcF4ZGtq8pxzLZxgD_JMWAouSszIf9LnFANWHKBsZlg/edit
> And a PR that implements some of the changes:
> https://github.com/apache/beam/pull/3817
>
> Based on this, AvroIO.sink is what we recommend. Please feel free to raise
> issues on Github to account for features you're missing. In addition, if
> you think they are straightforward changes, I'd be happy to discuss
> designs, or look at proposed changes to make these features available.
>
> I hope this helps,
> John
>
> On Mon, Sep 12, 2022 at 3:38 PM Steve Niemitz  wrote:
>
>> We're trying to do some semi-advanced custom logic (custom writers and
>> schemas per destination) with AvroIO, and want to use
>> DynamicAvroDestinations to accomplish this.
>>
>> However, AvroIO.to(DynamicAvroDestinations) is deprecated, but there
>> doesn't seem to be any other way to accomplish what we want here.
>> AvroIO.sink is much less sophisticated than the non-sink options, missing
>> much of the configurability that the non-sink version has.  For example,
>> there's no way to project from the UserT -> OutputT with the sink version,
>> only from UserT -> GenericRecord, which isn't what we want.
>>
>> It seems like most things would be trivial to fix or add on the
>> AvroIO.sink implementation, is that the intended way that people would be
>> consuming AvroIO?  I'm a little confused with FileIO.write/writeDynamic vs
>> WriteFiles vs AvroIO.write, some seem deprecated, and some seem
>> not-deprecated-but-not-recommended.  To add to the confusion AvroIO.write
>> uses WriteFiles, but the documentation for the deprecated
>> AvroIO.to(DynamicAvroDestinations) points to FileIO.write.  Which is the
>> "right" one to use?
>>
>


AvroIO.to(DynamicAvroDestinations) deprecated?

2022-09-12 Thread Steve Niemitz
We're trying to do some semi-advanced custom logic (custom writers and
schemas per destination) with AvroIO, and want to use
DynamicAvroDestinations to accomplish this.

However, AvroIO.to(DynamicAvroDestinations) is deprecated, but there
doesn't seem to be any other way to accomplish what we want here.
AvroIO.sink is much less sophisticated than the non-sink options, missing
much of the configurability that the non-sink version has.  For example,
there's no way to project from the UserT -> OutputT with the sink version,
only from UserT -> GenericRecord, which isn't what we want.

It seems like most things would be trivial to fix or add on the AvroIO.sink
implementation, is that the intended way that people would be consuming
AvroIO?  I'm a little confused with FileIO.write/writeDynamic vs WriteFiles
vs AvroIO.write, some seem deprecated, and some seem
not-deprecated-but-not-recommended.  To add to the confusion AvroIO.write
uses WriteFiles, but the documentation for the deprecated
AvroIO.to(DynamicAvroDestinations) points to FileIO.write.  Which is the
"right" one to use?


Re: Breaking change for FileIO WriteDynamic in Beam 2.34?

2022-04-06 Thread Steve Niemitz
Without the full logs it's hard to say, but I've definitely seen that error
in the past when the worker disks are full.  ApplianceShuffleWriter needs
to extract a native library to a temp location, and if the disk is full
that'll fail, resulting in the NoClassDefFoundError.

On Wed, Apr 6, 2022 at 12:46 PM Chamikara Jayalath 
wrote:

> I'm not aware of a breaking change along these lines off the top of my
> head. Sounds like the classes required for Dataflow shuffle are missing
> somehow. Unless someone here can point to something, you might have to
> contact Google Cloud support so they can look at your job.
>
> Thanks,
> Cham
>
> On Wed, Apr 6, 2022 at 9:39 AM Ahmet Altay  wrote:
>
>> /cc @Chamikara Jayalath 
>>
>> On Tue, Apr 5, 2022 at 11:29 PM Siyu Lin  wrote:
>>
>>> Hi Beam community,
>>>
>>> We have a batch pipeline which does not run regularly. Recently we
>>> have upgraded to Beam 2.36 and this broke the FileIO WriteDynamic
>>> process.
>>>
>>> We are using Dataflow Runner, and the errors are like this when there
>>> are multiple workers:
>>>
>>> Error message from worker: java.lang.NoClassDefFoundError: Could not
>>> initialize class
>>> org.apache.beam.runners.dataflow.worker.ApplianceShuffleWriter
>>>
>>> org.apache.beam.runners.dataflow.worker.ShuffleSink.writer(ShuffleSink.java:348)
>>>
>>> org.apache.beam.runners.dataflow.worker.SizeReportingSinkWrapper.writer(SizeReportingSinkWrapper.java:46)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation.initializeWriter(WriteOperation.java:71)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation.start(WriteOperation.java:78)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
>>>
>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
>>>
>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
>>>
>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
>>>
>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>>
>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>>
>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> java.lang.Thread.run(Thread.java:748)
>>>
>>> However when there is only a single worker, the error is like this:
>>>
>>> The job failed because a work item has failed 4 times. Look in
>>> previous log entries for the cause of each one of the 4 failures. For
>>> more information, see
>>> https://cloud.google.com/dataflow/docs/guides/common-errors. The work
>>> item was attempted on these workers: xxx Root cause: The worker lost
>>> contact with the service.,
>>>
>>> The error guided suggested upgrade machine type.
>>>
>>> Those errors happen when using SDK 2.34+. When I switched to SDK 2.33,
>>> everything worked well without any issues. Tried SDK 2.34, 2.35 and
>>> 2.36, and found all of them got the same issue.
>>>
>>> Context: The code simply just reads from BigQuery with a fixed table
>>> of 4,034 records, does some transform, and outputs to GCS with
>>> FileIO.WriteDynamic. All tests were performed using the same machine
>>> type with the same worker number.
>>>
>>> Does anyone know if there are any breaking changes in this SDK /
>>> Dataflow runner?
>>>
>>> Thanks so much!
>>> Siyu
>>>
>>


Re: "Slowly updating global window side inputs" example buggy?

2022-02-22 Thread Steve Niemitz
Does "Repeatedly.forever(AfterPane.elementCountAtLeast(1)" solve this?  At
least in my tests it seems like this correctly only emits a single element
per pane, but I'm not sure how much of a guarantee there actually is that
there will never be more than N elements in a pane when
elementCountAtLeast(N) is set.

On Tue, Feb 22, 2022 at 2:06 PM Luke Cwik  wrote:

> I'm not certain that Latest would work since the processing time trigger
> would still cause multiple firings to occur each producing the "latest" at
> that point in time. All these firings would effectively be output to the
> PCollection that the view is over. The PCollection would effectively be a
> concatenation of all these firings.
>
>
>
> On Tue, Feb 22, 2022 at 10:57 AM Pavel Solomin 
> wrote:
>
>> I also did not succeed in making this pattern work some time ago. In the
>> link below there's my mail thread with code example - do you have a similar
>> use-case?
>>
>> https://lists.apache.org/thread/9l74o4vqbtfgc5vkj9qq0xofffmtxswc
>>
>> Will keep watching this thread for insights.
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin |
>> Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>>
>> On Tue, 22 Feb 2022 at 18:46, Steve Niemitz  wrote:
>>
>>> We had a team try to use the "slowly updating global window side inputs"
>>> pattern (on dataflow) to update some metadata in their pipeline every
>>> minute, but surprisingly ran into errors that the side input PCollection
>>> contained more than one element, [1] although this only manifested
>>> intermittently.
>>>
>>> My theory on why this breaks is as follows, can someone check my logic?
>>>
>>> Given that GenerateSequence operates on processing time, (although this
>>> might not actually matter) it's possible that if processing the source is
>>> delayed for whatever reason, the source may emit multiple elements at once
>>> in a single bundle.  For example, if I configure the source to generate an
>>> element every 10 seconds, and the evaluation of the source is delayed for
>>> 30 seconds, I'd get a bundle with 3 elements in it. (or so it seems)  All
>>> elements are then windowed into the global window, so they all end up in
>>> the same window.
>>>
>>> If a bundle with 3 elements enters
>>> the AfterProcessingTime.pastFirstElementInPane() state machine, all 3
>>> elements will be emitted in that pane.  This will then propagate down and
>>> break on the singleton view combiner.
>>>
>>> Is my thought process here correct?  Is the example here just buggy?
>>>
>>> [1] "pcollection view being accessed as a singleton despite having more
>>> than one input."
>>>
>>


"Slowly updating global window side inputs" example buggy?

2022-02-22 Thread Steve Niemitz
We had a team try to use the "slowly updating global window side inputs"
pattern (on dataflow) to update some metadata in their pipeline every
minute, but surprisingly ran into errors that the side input PCollection
contained more than one element, [1] although this only manifested
intermittently.

My theory on why this breaks is as follows, can someone check my logic?

Given that GenerateSequence operates on processing time, (although this
might not actually matter) it's possible that if processing the source is
delayed for whatever reason, the source may emit multiple elements at once
in a single bundle.  For example, if I configure the source to generate an
element every 10 seconds, and the evaluation of the source is delayed for
30 seconds, I'd get a bundle with 3 elements in it. (or so it seems)  All
elements are then windowed into the global window, so they all end up in
the same window.

If a bundle with 3 elements enters
the AfterProcessingTime.pastFirstElementInPane() state machine, all 3
elements will be emitted in that pane.  This will then propagate down and
break on the singleton view combiner.

Is my thought process here correct?  Is the example here just buggy?

[1] "pcollection view being accessed as a singleton despite having more
than one input."


Testing a jvm pipeline on the portability framework locally

2022-01-18 Thread Steve Niemitz
If I have a (jvm) pipeline, is there a simple way (ie DirectRunner) to run
it locally but using the portability framework?  I'm running into a lot of
weird bugs running it on dataflow (v2) and want to be able to run it
locally for a faster debug loop.


Re: Some questions about external tables in BeamSQL

2022-01-13 Thread Steve Niemitz
Thanks for the quick responses! Mine are inline as well.

On Thu, Jan 13, 2022 at 9:01 PM Brian Hulette  wrote:

> I added some responses inline. Also adding dev@ since this is getting
> into SQL internals.
>
> On Thu, Jan 13, 2022 at 10:29 AM Steve Niemitz 
> wrote:
>
>> I've been playing around with CREATE EXTERNAL TABLE (using a custom
>> TableProvider as well) w/ BeamSQL and really love it.  I have a few
>> questions though that I've accumulated as I've been using it I wanted to
>> ask.
>>
>> - I'm a little confused about the need to define columns in the CREATE
>> EXTERNAL TABLE statement.  If I have a BeamSqlTable implementation that can
>> provide the schema on its own, it seems like the columns supplied to the
>> CREATE statement are ignored.  This is ideal anyways, since it's infeasible
>> for users to provide the entire schema up-front, especially for more
>> complicated sources.  Should the column list be optional here instead?
>>
>
> Our documentation would seem to indicate that defining columns is optional
> - looking at the example for BigQuery here [1] the schema is not provided.
> Those docs must be aspirational though, I just checked and the
> BigQueryTableProvider definitely expects the schema to be defined and uses
> it [2].
>
> I think it would make sense to make the column list optional- that way we
> can actually fulfill our BigQuery documentation.
>

Big +1 to that.


> Note if you're building your own custom TableProvider, you might not need
> to use CREATE EXTERNAL TABLE. You could add an implementation for
> TableProvider.getTable that retrieves the metadata for a given table name
> and returns a Table instance that can build the necessary IOs. This is only
> possible if you can retrieve all the metadata you need to construct the
> IOs though. If you want users to be able to configure it further (one
> example might be specifying the read mode for BigQuery), this won't work.
>
> [1]
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#bigquery
> [2]
> https://github.com/apache/beam/blob/872455570ae7f3e2e35360bccf93b503ae9fdb5c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java#L82
>

Maybe I'm looking at the wrong thing?  Both those examples show a column
list, and the BNF (or whatever) syntax implies at least one "table element"
must be present.

But yeah, this is basically what I'm doing right now.  I just return the
"real" schema in BeamSqlTable.getSchema and ignore whatever was passed in.
It seems to work correctly.  Ideally the column list would be optional
here, as you alluded to above.  It'll be clunky explaining to users
something like "just include any random column list, we'll ignore it".


> - It seems like predicate pushdown only works if the schema is "flat" (has
>> no nested rows).  I understand the complication in pushing down more
>> complicated nested predicates, however, assuming the table implementation
>> doesn't actually attempt to push them down, it seems like it would be fine
>> to allow?
>>
>
> Do we have this limitation? I think predicate pushdown will work
> with predicates on nested fields. The table is presented with a list of
> RexNodes representing separable predicates, an individual predicate could
> add a filter on a nested column IIUC.
>
> We may have the limitation that project pushdown won't work on nested rows
> though, since the API just takes a list of field names. It's possible we
> handle this by passing a joined name (e.g. foo.bar.baz), but I bet not. The
> design doc [3] does have a note saying "no nested tables for now".
>
> [3]
> https://docs.google.com/document/d/1-ysD7U7qF3MAmSfkbXZO_5PLJBevAL9bktlLCerd_jE/edit
>
>

BeamIOPushDownRule short circuits on nested fields [1], I can also verify
this just by the fact that my constructFilter method isn't called when my
schema contains a nested row.

[1]
https://github.com/apache/beam/blob/v2.35.0/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java#L89


>
>> - As a follow up on the above, I'd like to expose a "virtual" field in my
>> schema that represents the partition the data has come from.  For example
>> BigQuery has a similar concept called _PARTITIONTIME.  This would be picked
>> up by the predicate pushdown and used to filter the partitions being read.
>> I can't really figure out how I'd construct something similar here, even if
>> pushdown worked in all cases.  For example, for this query:
>>
>> SELECT * from table
>> where _PARTITIONTIME between X and Y
>>
>> I'd want 

Some questions about external tables in BeamSQL

2022-01-13 Thread Steve Niemitz
I've been playing around with CREATE EXTERNAL TABLE (using a custom
TableProvider as well) w/ BeamSQL and really love it.  I have a few
questions though that I've accumulated as I've been using it I wanted to
ask.

- I'm a little confused about the need to define columns in the CREATE
EXTERNAL TABLE statement.  If I have a BeamSqlTable implementation that can
provide the schema on its own, it seems like the columns supplied to the
CREATE statement are ignored.  This is ideal anyways, since it's infeasible
for users to provide the entire schema up-front, especially for more
complicated sources.  Should the column list be optional here instead?

- It seems like predicate pushdown only works if the schema is "flat" (has
no nested rows).  I understand the complication in pushing down more
complicated nested predicates, however, assuming the table implementation
doesn't actually attempt to push them down, it seems like it would be fine
to allow?

- As a follow up on the above, I'd like to expose a "virtual" field in my
schema that represents the partition the data has come from.  For example
BigQuery has a similar concept called _PARTITIONTIME.  This would be picked
up by the predicate pushdown and used to filter the partitions being read.
I can't really figure out how I'd construct something similar here, even if
pushdown worked in all cases.  For example, for this query:

SELECT * from table
where _PARTITIONTIME between X and Y

I'd want that filter to be pushed down to my IO, but also the
_PARTITIONTIME column wouldn't be returned in the select list.  I was
hoping to use BigQueryIO as an example of how to do this, but it doesn't
seem like it exposes the virtual _PARTITIONTIME column either.


Re: Using CREATE EXTERNAL TABLE in a pipeline?

2022-01-12 Thread Steve Niemitz
Oh that's awesome, I didn't notice the extra method on the result of
"query" and was trying to just add the CREATE statement in there.  Thanks!

On Wed, Jan 12, 2022 at 5:00 PM Brian Hulette  wrote:

> Hi Steve,
>
> This functionality was added relatively recently, via
> SqlTransform(..).withDdlString(). There are some usage examples here:
> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html
>
> Brian
>
> On Wed, Jan 12, 2022 at 1:54 PM Steve Niemitz 
> wrote:
>
>> Is it possible to use CREATE EXTERNAL TABLE + a select statement in a
>> SqlTransform to act as a source for a pipeline?  The functionality seems
>> really useful, but it seems like it only works from the JDBC context?  Am I
>> missing anything?
>>
>


Using CREATE EXTERNAL TABLE in a pipeline?

2022-01-12 Thread Steve Niemitz
Is it possible to use CREATE EXTERNAL TABLE + a select statement in a
SqlTransform to act as a source for a pipeline?  The functionality seems
really useful, but it seems like it only works from the JDBC context?  Am I
missing anything?


Re: ImportError: __import__ not found on python job

2021-12-08 Thread Steve Niemitz
hm, interesting, are we using pybind anywhere?  I didn't see any references
to it.  I can give it a try on python 3.8 too though.

On Wed, Dec 8, 2021 at 9:19 AM Brian Hulette  wrote:

> A google search for "__import__ not found" turned up an issue filed with
> pybind [1]. I can't deduce a root cause from the discussion there, but it
> looks like they didn't experience the issue in Python 3.8 - it could be
> interesting to see if your problem goes away there.
>
> It looks like +Charles Chen  added the __import__('re')
> workaround in [2], maybe he remembers what was going on?
>
> [1] https://github.com/pybind/pybind11/issues/2557
> [2] https://github.com/apache/beam/pull/5071
>
> On Wed, Dec 8, 2021 at 5:30 AM Steve Niemitz  wrote:
>
>> Yeah, I can't imagine this is a "normal" problem.
>>
>> I'm on linux w/ py 3.7.  My script does have a __name__ == '__main__'
>> block.
>>
>> On Wed, Dec 8, 2021 at 12:38 AM Ning Kang  wrote:
>>
>>> I tried a pipeline:
>>>
>>> p = beam.Pipeline(DataflowRunner(), options=options)
>>> text = p | beam.Create(['Hello World, Hello You'])
>>>
>>>
>>> def tokenize(x):
>>> import re
>>> return re.findall('Hello', x)
>>>
>>>
>>> flatten = text | 'Split' >>
>>> (beam.FlatMap(tokenize).with_output_types(str))
>>> pipeline_result = p.run()
>>>
>>>
>>> Didn't run into the issue.
>>>
>>> What OS and Python version are you using? Does your script come with a
>>> `if __name__ == '__main__': `?
>>>
>>> On Tue, Dec 7, 2021 at 6:58 PM Steve Niemitz 
>>> wrote:
>>>
>>>> I have a fairly simple python word count job (although the packaging is
>>>> a little more complicated) that I'm trying to run.  (note: I'm explicitly
>>>> NOT using save_main_session.)
>>>>
>>>> In it is a method to tokenize the incoming text to words, and I used
>>>> something similar to how the wordcount example worked.
>>>>
>>>> def tokenize(row):
>>>>   import re
>>>>   return re.findall(r'[A-Za-z\']+', row.text)
>>>>
>>>> which is then used as the function for a FlatMap:
>>>> | 'Split' >> (
>>>> beam.FlatMap(tokenize).with_output_types(str))
>>>>
>>>> However, if I run this job on dataflow (2.33), the python runner fails
>>>> with a bizarre error:
>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2021-12-07T20:59:59.704Z:
>>>> JOB_MESSAGE_ERROR: Traceback (most recent call last):
>>>>   File "apache_beam/runners/common.py", line 1232, in
>>>> apache_beam.runners.common.DoFnRunner.process
>>>>   File "apache_beam/runners/common.py", line 572, in
>>>> apache_beam.runners.common.SimpleInvoker.invoke_process
>>>>   File "/tmp/tmpq_8l154y/wordcount_test.py", line 75, in tokenize
>>>> ImportError: __import__ not found
>>>>
>>>> I was able to find an example in the streaming wordcount snippet that
>>>> did something similar, but very strange [1]:
>>>> | 'ExtractWords' >>
>>>> beam.FlatMap(lambda x: __import__('re').findall(r'[A-Za-z\']+',
>>>> x))
>>>>
>>>> For whatever reason this actually fixed the issue in my job as well.  I
>>>> can't for the life of me understand why this works, or why the normal
>>>> import fails.  Someone else must have run into this same issue though for
>>>> that streaming wordcount example to be like that.  Any ideas what's going
>>>> on here?
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L692
>>>>
>>>


Re: ImportError: __import__ not found on python job

2021-12-08 Thread Steve Niemitz
Yeah, I can't imagine this is a "normal" problem.

I'm on linux w/ py 3.7.  My script does have a __name__ == '__main__' block.

On Wed, Dec 8, 2021 at 12:38 AM Ning Kang  wrote:

> I tried a pipeline:
>
> p = beam.Pipeline(DataflowRunner(), options=options)
> text = p | beam.Create(['Hello World, Hello You'])
>
>
> def tokenize(x):
> import re
> return re.findall('Hello', x)
>
>
> flatten = text | 'Split' >> (beam.FlatMap(tokenize).with_output_types(str))
> pipeline_result = p.run()
>
>
> Didn't run into the issue.
>
> What OS and Python version are you using? Does your script come with a `if
> __name__ == '__main__': `?
>
> On Tue, Dec 7, 2021 at 6:58 PM Steve Niemitz  wrote:
>
>> I have a fairly simple python word count job (although the packaging is a
>> little more complicated) that I'm trying to run.  (note: I'm explicitly NOT
>> using save_main_session.)
>>
>> In it is a method to tokenize the incoming text to words, and I used
>> something similar to how the wordcount example worked.
>>
>> def tokenize(row):
>>   import re
>>   return re.findall(r'[A-Za-z\']+', row.text)
>>
>> which is then used as the function for a FlatMap:
>> | 'Split' >> (
>> beam.FlatMap(tokenize).with_output_types(str))
>>
>> However, if I run this job on dataflow (2.33), the python runner fails
>> with a bizarre error:
>> INFO:apache_beam.runners.dataflow.dataflow_runner:2021-12-07T20:59:59.704Z:
>> JOB_MESSAGE_ERROR: Traceback (most recent call last):
>>   File "apache_beam/runners/common.py", line 1232, in
>> apache_beam.runners.common.DoFnRunner.process
>>   File "apache_beam/runners/common.py", line 572, in
>> apache_beam.runners.common.SimpleInvoker.invoke_process
>>   File "/tmp/tmpq_8l154y/wordcount_test.py", line 75, in tokenize
>> ImportError: __import__ not found
>>
>> I was able to find an example in the streaming wordcount snippet that did
>> something similar, but very strange [1]:
>> | 'ExtractWords' >>
>> beam.FlatMap(lambda x: __import__('re').findall(r'[A-Za-z\']+',
>> x))
>>
>> For whatever reason this actually fixed the issue in my job as well.  I
>> can't for the life of me understand why this works, or why the normal
>> import fails.  Someone else must have run into this same issue though for
>> that streaming wordcount example to be like that.  Any ideas what's going
>> on here?
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L692
>>
>


ImportError: __import__ not found on python job

2021-12-07 Thread Steve Niemitz
I have a fairly simple python word count job (although the packaging is a
little more complicated) that I'm trying to run.  (note: I'm explicitly NOT
using save_main_session.)

In it is a method to tokenize the incoming text to words, and I used
something similar to how the wordcount example worked.

def tokenize(row):
  import re
  return re.findall(r'[A-Za-z\']+', row.text)

which is then used as the function for a FlatMap:
| 'Split' >> (
beam.FlatMap(tokenize).with_output_types(str))

However, if I run this job on dataflow (2.33), the python runner fails with
a bizarre error:
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-12-07T20:59:59.704Z:
JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1232, in
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 572, in
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/tmp/tmpq_8l154y/wordcount_test.py", line 75, in tokenize
ImportError: __import__ not found

I was able to find an example in the streaming wordcount snippet that did
something similar, but very strange [1]:
| 'ExtractWords' >>
beam.FlatMap(lambda x: __import__('re').findall(r'[A-Za-z\']+', x))

For whatever reason this actually fixed the issue in my job as well.  I
can't for the life of me understand why this works, or why the normal
import fails.  Someone else must have run into this same issue though for
that streaming wordcount example to be like that.  Any ideas what's going
on here?

[1]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L692


Re: ExpansionService...as a service?

2021-10-06 Thread Steve Niemitz
> I was imagining a long running expansion service would just serve up a
fixed number of transforms, and changing this set would be done via a new
release of the service.

This is how I was thinking about it as well, I don't think we'd have any
requirement to dynamically add/remove transforms at runtime.

On Wed, Oct 6, 2021 at 1:27 PM Robert Bradshaw  wrote:

> On Wed, Oct 6, 2021 at 10:24 AM Chamikara Jayalath 
> wrote:
> >
> > Regarding the memory leak, I'm not sure if a function registration is
> needed here since it seems like that function is doing something pretty
> generic. +Heejong Lee should know more.
>
> +1
>
> > Regarding a long running expansion service, in addition to what others
> mentioned, I think we also need protocols for dynamically
> registering/unregistering transforms and a protocol for discovering
> transforms already registered in an expansion service.
>
> I was imagining a long running expansion service would just serve up a
> fixed number of transforms, and changing this set would be done via a
> new release of the service.
>
> Having a dynamic set of transforms served (e.g. expansion services as
> a service) is another level that could be interesting to explore
> (though more complicated, e.g. now you're hosting third party jars and
> running arbitrary code).
>
> > On Wed, Oct 6, 2021 at 10:02 AM Steve Niemitz 
> wrote:
> >>
> >> I noticed that Environments.getDeferredArtifacts will add a function to
> DefaultArtifactResolver.INSTANCE which is never removed, so those will
> accumulate forever.  That's the only one I noticed so far from code review,
> but it's possible there's more.
> >>
> >> On Wed, Oct 6, 2021 at 12:38 PM Robert Bradshaw 
> wrote:
> >>>
> >>> IIRC, the folks at Lyft offered this as a service (internally, not
> >>> publicly). We've toyed with the idea of offering it more generally.
> >>> You're right that there would be security implications (though some
> >>> generic sandboxing could go a long way). What were the memory leaks
> >>> that you were noticing?
> >>>
> >>> On Wed, Oct 6, 2021 at 9:13 AM Steve Niemitz 
> wrote:
> >>> >
> >>> > cool, thanks for the info.  I might be the first to try then :)
> >>> >
> >>> > On Wed, Oct 6, 2021 at 12:00 PM Luke Cwik  wrote:
> >>> >>
> >>> >> I believe that was one of the ideas behind an expansion service but
> to my knowledge I don't know of anyone who has a long running expansion
> service. I was thinking that the Apache Beam community at some point could
> host one so that users don't need to spin up Java/Python/Go expansion
> services to run a subset of XLang pipelines.
> >>> >>
> >>> >> The XLang effort has been focusing about getting more transforms
> working and solving ease of use issues for users so I don't believe what
> you describe has been done.
> >>> >>
> >>> >> On Wed, Oct 6, 2021 at 8:37 AM Ahmet Altay 
> wrote:
> >>> >>>
> >>> >>> /cc @Chamikara Jayalath @Robert Bradshaw
> >>> >>>
> >>> >>> On Wed, Oct 6, 2021 at 6:36 AM Steve Niemitz 
> wrote:
> >>> >>>>
> >>> >>>> Has anyone ever tried hosting a long-running expansion service as
> a real "service", the intent being that users don't need to run it locally,
> and can instead connect to the shared one when expanding pipelines?
> >>> >>>>
> >>> >>>> Looking around the code I already see a few assumptions that it
> will only live for a short while (memory leaks, etc), as well as some
> pretty large security concerns (eg, the artifact retrieval service can be
> used to download any file on the host) curious if anyone has put any work
> into this already?
>


Re: ExpansionService...as a service?

2021-10-06 Thread Steve Niemitz
I noticed that Environments.getDeferredArtifacts will add a function
to DefaultArtifactResolver.INSTANCE which is never removed, so those will
accumulate forever.  That's the only one I noticed so far from code review,
but it's possible there's more.

On Wed, Oct 6, 2021 at 12:38 PM Robert Bradshaw  wrote:

> IIRC, the folks at Lyft offered this as a service (internally, not
> publicly). We've toyed with the idea of offering it more generally.
> You're right that there would be security implications (though some
> generic sandboxing could go a long way). What were the memory leaks
> that you were noticing?
>
> On Wed, Oct 6, 2021 at 9:13 AM Steve Niemitz  wrote:
> >
> > cool, thanks for the info.  I might be the first to try then :)
> >
> > On Wed, Oct 6, 2021 at 12:00 PM Luke Cwik  wrote:
> >>
> >> I believe that was one of the ideas behind an expansion service but to
> my knowledge I don't know of anyone who has a long running expansion
> service. I was thinking that the Apache Beam community at some point could
> host one so that users don't need to spin up Java/Python/Go expansion
> services to run a subset of XLang pipelines.
> >>
> >> The XLang effort has been focusing about getting more transforms
> working and solving ease of use issues for users so I don't believe what
> you describe has been done.
> >>
> >> On Wed, Oct 6, 2021 at 8:37 AM Ahmet Altay  wrote:
> >>>
> >>> /cc @Chamikara Jayalath @Robert Bradshaw
> >>>
> >>> On Wed, Oct 6, 2021 at 6:36 AM Steve Niemitz 
> wrote:
> >>>>
> >>>> Has anyone ever tried hosting a long-running expansion service as a
> real "service", the intent being that users don't need to run it locally,
> and can instead connect to the shared one when expanding pipelines?
> >>>>
> >>>> Looking around the code I already see a few assumptions that it will
> only live for a short while (memory leaks, etc), as well as some pretty
> large security concerns (eg, the artifact retrieval service can be used to
> download any file on the host) curious if anyone has put any work into this
> already?
>


Re: ExpansionService...as a service?

2021-10-06 Thread Steve Niemitz
cool, thanks for the info.  I might be the first to try then :)

On Wed, Oct 6, 2021 at 12:00 PM Luke Cwik  wrote:

> I believe that was one of the ideas behind an expansion service but to my
> knowledge I don't know of anyone who has a long running expansion service.
> I was thinking that the Apache Beam community at some point could host one
> so that users don't need to spin up Java/Python/Go expansion services to
> run a subset of XLang pipelines.
>
> The XLang effort has been focusing about getting more transforms working
> and solving ease of use issues for users so I don't believe what you
> describe has been done.
>
> On Wed, Oct 6, 2021 at 8:37 AM Ahmet Altay  wrote:
>
>> /cc @Chamikara Jayalath  @Robert Bradshaw
>> 
>>
>> On Wed, Oct 6, 2021 at 6:36 AM Steve Niemitz  wrote:
>>
>>> Has anyone ever tried hosting a long-running expansion service as a real
>>> "service", the intent being that users don't need to run it locally, and
>>> can instead connect to the shared one when expanding pipelines?
>>>
>>> Looking around the code I already see a few assumptions that it will
>>> only live for a short while (memory leaks, etc), as well as some pretty
>>> large security concerns (eg, the artifact retrieval service can be used to
>>> download any file on the host) curious if anyone has put any work into this
>>> already?
>>>
>>


ExpansionService...as a service?

2021-10-06 Thread Steve Niemitz
Has anyone ever tried hosting a long-running expansion service as a real
"service", the intent being that users don't need to run it locally, and
can instead connect to the shared one when expanding pipelines?

Looking around the code I already see a few assumptions that it will only
live for a short while (memory leaks, etc), as well as some pretty large
security concerns (eg, the artifact retrieval service can be used to
download any file on the host) curious if anyone has put any work into this
already?


Re: KafkaIO with DirectRunner is creating tons of connections to Kafka Brokers

2021-05-24 Thread Steve Niemitz
Out of curiosity, does adding the "--experiments=use_deprecated_read"
argument fix things? (note, this flag was broken in beam 2.29 on the direct
runner and didn't do anything, so you'd need to test on 2.28 or 2.30)

On Mon, May 24, 2021 at 4:44 AM Sozonoff Serge  wrote:

> Hi.
>
> OK thanks.  Just to clarify, in my case the message throughput is zero
> when I start the Beam pipeline up and it will still crash once all file
> handles are consumed even if I dont send a single message to the kafka
> topic.
>
> Thanks,
> Serge
>
> On 24 May 2021 at 10:14:33, Jan Lukavský (je...@seznam.cz) wrote:
>
> It is not 100 consumers, the checkpoint is created every 100 records. So,
> if your message throughput is high enough, the consumers might be created
> really often. But most importantly - DirectRunner is really not intended
> for performance sensitive applications. You should use a different runner
> for that.
>
> Best,
>
>  Jan
> On 5/24/21 10:03 AM, Sozonoff Serge wrote:
>
> Hi Jan,
>
> So if I read your SO answer correctly and looking at the Github link you
> provided we are talking about ~100 consumers ? Since I am developing
> locally with a dockerized minimal Kafka broker it is possible that this is
> enough to hit the max open files limit.
>
> Depending on your definition of “limited” I would say there are more than
> a limited number present at the same time. If you look at the below log
> extract everyone of those “Kafka version: 2.5.0” lines corresponds to a
> Kafka consumer instantiation and that’s within a very short period of time
> !!
>
> Thanks,
> Serge
>
>
>
> [INFO ] 2021-05-24 09:53:48.663 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:48.688 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:48.803 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:48.815 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:48.864 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:48.871 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:48.955 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:48.969 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.046 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.052 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.113 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.128 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.231 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.236 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.278 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.281 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.316 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.321 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.435 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.444 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.486 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.494 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.564 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.575 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.662 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 09:53:49.668 [direct-runner-worker]
> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
> [INFO ] 2021-05-24 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Steve Niemitz
I think it was only broken in 2.29.

On Wed, May 12, 2021 at 5:53 PM Evan Galpin  wrote:

> Ah ok thanks for that. Do you mean use_deprecated_reads is broken
> specifically in 2.29.0 (regression) or broken in all versions up to and
> including 2.29.0 (ie never worked)?
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 17:12 Steve Niemitz  wrote:
>
>> Yeah, sorry my email was confusing.  use_deprecated_reads is broken on
>> the DirectRunner in 2.29.
>>
>> The behavior you describe is exactly the behavior I ran into as well when
>> reading from pubsub with the new read method.  I believe that soon the
>> default is being reverted back to the old read method, not using SDFs,
>> which will fix your performance issue.
>>
>> On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:
>>
>>> Hi Evan,
>>>
>>> It seems like the slow step is not the read that use_deprecated_read
>>> targets for. Would you like to share your pipeline code if possible?
>>>
>>> On Wed, May 12, 2021 at 1:35 PM Evan Galpin 
>>> wrote:
>>>
>>>> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
>>>> observed slow behavior again. Is it possible that use_deprecated_read is
>>>> broken in 2.29.0 as well?
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
>>>> wrote:
>>>>
>>>>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>>>>
>>>>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the
>>>>>> "faster" behavior, as did v2.23.0. But that "fast" behavior stopped at
>>>>>> v2.25.0 (for my use case at least) regardless of use_deprecated_read
>>>>>> setting.
>>>>>>
>>>>>> Thanks,
>>>>>> Evan
>>>>>>
>>>>>>
>>>>>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>>>>>> wrote:
>>>>>>
>>>>>>> use_deprecated_read was broken in 2.19 on the direct runner and
>>>>>>> didn't do anything. [1]  I don't think the fix is in 2.20 either, but 
>>>>>>> will
>>>>>>> be in 2.21.
>>>>>>>
>>>>>>> [1] https://github.com/apache/beam/pull/14469
>>>>>>>
>>>>>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I forgot to also mention that in all tests I was setting
>>>>>>>> --experiments=use_deprecated_read
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Evan
>>>>>>>>
>>>>>>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hmm, I think I spoke too soon. I'm still seeing an issue of
>>>>>>>>> overall DirectRunner slowness, not just pubsub. I have a pipeline 
>>>>>>>>> like so:
>>>>>>>>>
>>>>>>>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()
>>>>>>>>> |  FileIO.readMatches()  |  Read file contents  |  etc
>>>>>>>>>
>>>>>>>>> I have temporarily set up a transform between each step to log
>>>>>>>>> what's going on and illustrate timing issues.  I ran a series of tests
>>>>>>>>> changing only the SDK version each time since I hadn't noticed this
>>>>>>>>> performance issue with 2.19.0 (effectively git-bisect). Before each 
>>>>>>>>> test, I
>>>>>>>>> seeded the pubsub subscription with the exact same contents.
>>>>>>>>>
>>>>>>>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I
>>>>>>>>> couldn't seem to resolve) and onward show a significant slowdown.
>>>>>>>>>
>>>>>>>>> Here is a snippet of logging from v2.25.0:
>>>>>>>>>
>>>>>>>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>>>>>>> processElement
&

Re: Extremely Slow DirectRunner

2021-05-12 Thread Steve Niemitz
Yeah, sorry my email was confusing.  use_deprecated_reads is broken on the
DirectRunner in 2.29.

The behavior you describe is exactly the behavior I ran into as well when
reading from pubsub with the new read method.  I believe that soon the
default is being reverted back to the old read method, not using SDFs,
which will fix your performance issue.

On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:

> Hi Evan,
>
> It seems like the slow step is not the read that use_deprecated_read
> targets for. Would you like to share your pipeline code if possible?
>
> On Wed, May 12, 2021 at 1:35 PM Evan Galpin  wrote:
>
>> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
>> observed slow behavior again. Is it possible that use_deprecated_read is
>> broken in 2.29.0 as well?
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
>> wrote:
>>
>>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>>
>>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>>> wrote:
>>>
>>>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
>>>> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
>>>> my use case at least) regardless of use_deprecated_read setting.
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>>
>>>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>>>> wrote:
>>>>
>>>>> use_deprecated_read was broken in 2.19 on the direct runner and didn't
>>>>> do anything. [1]  I don't think the fix is in 2.20 either, but will be in
>>>>> 2.21.
>>>>>
>>>>> [1] https://github.com/apache/beam/pull/14469
>>>>>
>>>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> I forgot to also mention that in all tests I was setting
>>>>>> --experiments=use_deprecated_read
>>>>>>
>>>>>> Thanks,
>>>>>> Evan
>>>>>>
>>>>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>>>>> wrote:
>>>>>>
>>>>>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>>>>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>>>>>
>>>>>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>>>>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>>>>>
>>>>>>> I have temporarily set up a transform between each step to log
>>>>>>> what's going on and illustrate timing issues.  I ran a series of tests
>>>>>>> changing only the SDK version each time since I hadn't noticed this
>>>>>>> performance issue with 2.19.0 (effectively git-bisect). Before each 
>>>>>>> test, I
>>>>>>> seeded the pubsub subscription with the exact same contents.
>>>>>>>
>>>>>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
>>>>>>> seem to resolve) and onward show a significant slowdown.
>>>>>>>
>>>>>>> Here is a snippet of logging from v2.25.0:
>>>>>>>
>>>>>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>>>>> processElement
>>>>>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>>>>>> May 12, 2021 11:16:59 A.M.
>>>>>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>>>>>> INFO: Matched 2 files for pattern
>>>>>>> gs://my-bucket/my-dir/5004728247517184/**
>>>>>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>>>>> processElement
>>>>>>> INFO: Got ReadableFile: my-file1.json
>>>>>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>>>>> processElement
>>>>>>> INFO: Got ReadableFile: my-file2.json
>>>>>>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>>>>>> processElement
>>>>>>> INFO: Got file contents for document_id my-file1.json
>>>>>>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>>>>>>> processElement
>

Re: Extremely Slow DirectRunner

2021-05-12 Thread Steve Niemitz
oops sorry I was off by 10...I meant 2.29 not 2.19.

On Wed, May 12, 2021 at 2:55 PM Evan Galpin  wrote:

> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
> my use case at least) regardless of use_deprecated_read setting.
>
> Thanks,
> Evan
>
>
> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz  wrote:
>
>> use_deprecated_read was broken in 2.19 on the direct runner and didn't do
>> anything. [1]  I don't think the fix is in 2.20 either, but will be in 2.21.
>>
>> [1] https://github.com/apache/beam/pull/14469
>>
>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>> wrote:
>>
>>> I forgot to also mention that in all tests I was setting
>>> --experiments=use_deprecated_read
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>> wrote:
>>>
>>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>>
>>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>>
>>>> I have temporarily set up a transform between each step to log what's
>>>> going on and illustrate timing issues.  I ran a series of tests changing
>>>> only the SDK version each time since I hadn't noticed this performance
>>>> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
>>>> pubsub subscription with the exact same contents.
>>>>
>>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
>>>> seem to resolve) and onward show a significant slowdown.
>>>>
>>>> Here is a snippet of logging from v2.25.0:
>>>>
>>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>> processElement
>>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>>> May 12, 2021 11:16:59 A.M.
>>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>>> INFO: Matched 2 files for pattern
>>>> gs://my-bucket/my-dir/5004728247517184/**
>>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>> processElement
>>>> INFO: Got ReadableFile: my-file1.json
>>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>> processElement
>>>> INFO: Got ReadableFile: my-file2.json
>>>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>>> processElement
>>>> INFO: Got file contents for document_id my-file1.json
>>>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>>>> processElement
>>>> INFO: Got file contents for document_id my-file2.json
>>>>
>>>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>>>> 2.23.0 and identical user code, the same section of the pipeline took *2
>>>> seconds*:
>>>>
>>>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>> processElement
>>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>>> May 12, 2021 11:03:40 A.M.
>>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>>> INFO: Matched 2 files for pattern
>>>> gs://my-bucket/my-dir/5004728247517184/**
>>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>> processElement
>>>> INFO: Got ReadableFile: my-file1.json
>>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>> processElement
>>>> INFO: Got ReadableFile: my-file2.json
>>>> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>>> processElement
>>>> INFO: Got file contents for document_id my-file1.json
>>>> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
>>>> processElement
>>>> INFO: Got file contents for document_id my-file2.json
>>>>
>>>> Any thoughts on what could be causing this?
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> On Wed, May 12, 2021 at 9:53 AM Evan Galpin 
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang 
>>>>> wrote:
>>>>>
>

Re: Extremely Slow DirectRunner

2021-05-12 Thread Steve Niemitz
use_deprecated_read was broken in 2.19 on the direct runner and didn't do
anything. [1]  I don't think the fix is in 2.20 either, but will be in 2.21.

[1] https://github.com/apache/beam/pull/14469

On Wed, May 12, 2021 at 1:41 PM Evan Galpin  wrote:

> I forgot to also mention that in all tests I was setting
> --experiments=use_deprecated_read
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 1:39 PM Evan Galpin  wrote:
>
>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>
>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>> FileIO.readMatches()  |  Read file contents  |  etc
>>
>> I have temporarily set up a transform between each step to log what's
>> going on and illustrate timing issues.  I ran a series of tests changing
>> only the SDK version each time since I hadn't noticed this performance
>> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
>> pubsub subscription with the exact same contents.
>>
>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't seem
>> to resolve) and onward show a significant slowdown.
>>
>> Here is a snippet of logging from v2.25.0:
>>
>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>> processElement
>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:16:59 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
>> process
>> INFO: Matched 2 files for pattern
>> gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file1.json
>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file2.json
>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file1.json
>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file2.json
>>
>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>> 2.23.0 and identical user code, the same section of the pipeline took *2
>> seconds*:
>>
>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>> processElement
>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:03:40 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
>> process
>> INFO: Matched 2 files for pattern
>> gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file1.json
>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file2.json
>> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file1.json
>> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file2.json
>>
>> Any thoughts on what could be causing this?
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 9:53 AM Evan Galpin 
>> wrote:
>>
>>>
>>>
>>> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang  wrote:
>>>
 Hi Evan,

 What do you mean startup delay? Is it the time that from you start the
 pipeline to the time that you notice the first output record from PubSub?

>>>
>>> Yes that's what I meant, the seemingly idle system waiting for pubsub
>>> output despite data being in the subscription at pipeline start time.
>>>
>>> On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:

> Can you try running direct runner with the option
> `--experiments=use_deprecated_read`
>

>>> This seems to work for me, thanks for this! 
>>>
>>>
> Seems like an instance of
> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
> also reported in
> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>
> We should rollback using the SDF wrapper by default because of the
> usability and performance issues reported.
>
>
> On Sat, May 8, 2021 at 12:57 AM Evan Galpin 
> wrote:
>
>> Hi all,
>>
>> I’m experiencing very slow performance and startup delay when testing
>> a pipeline locally. I’m reading data from a Google PubSub subscription as
>> the data source, and before each pipeline execution I ensure that data is
>> present in the subscription (readable from GCP console).
>>
>> I’m seeing startup delay on the order of minutes with DirectRunner
>> (5-10 min). Is that expected? I did find a Jira ticket[1] that at first
>> seemed related, but I think it has more to do with BQ than 

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Steve Niemitz
This has been a common problem I've run into with lots of built-in IOs,
I've generally submitted PRs for them to add support for emitting something
once writed are completed.

On Wed, Mar 24, 2021 at 1:04 PM Vincent Marquez 
wrote:

>
> *~Vincent*
>
>
> On Wed, Mar 24, 2021 at 10:01 AM Reuven Lax  wrote:
>
>> Does that work if cassandra returns a PDone?
>>
>
> No, it doesn't work.  I wrote my own CassandraIO.Write that is a
> PTransform, PCollection> instead.
>
> I'm just asking if there's a better way of doing this because I'm having
> to do this with multiple types of Writers, and don't want to have to hand
> roll my own Write for each IO type I need this pattern for.
>
>
>>
>> On Wed, Mar 24, 2021 at 10:00 AM Chamikara Jayalath 
>> wrote:
>>
>>> If you want to wait for all records are written (per window) to
>>> Cassandra before writing that window to PubSub, you should be able to use
>>> the Wait transform:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
 Do you want to wait for ALL records are written for Cassandra and then
 write all successfully written records to PubSub or it should be performed
 "record by record"?

 On 24 Mar 2021, at 04:58, Vincent Marquez 
 wrote:

 I have a common use case where my pipeline looks like this:
 CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write

 I do NOT want my pipeline to look like the following:

 CassandraIO.readAll -> Aggregate -> CassandraIO.write
  |
   ->
 PubsubIO.write

 Because I need to ensure that only items written to Pubsub have
 successfully finished a (quorum) write.

 Since CassandraIO.write is a PTransform I can't actually use
 it here so I often roll my own 'writer', but maybe there is a recommended
 way of doing this?

 Thanks in advance for any help.

 *~Vincent*





Re: Querying Dataflow job status via Java SDK

2020-10-12 Thread Steve Niemitz
This is what I was referencing:
https://github.com/googleapis/google-api-java-client-services/tree/master/clients/google-api-services-dataflow/v1b3




On Mon, Oct 12, 2020 at 2:23 PM Peter Littig 
wrote:

> Thanks for the replies, Lukasz and Steve!
>
> Steve: do you have a link to the google client api wrappers (I'm not sure
> if I know what they are.)
>
> Thank you!
>
> On Mon, Oct 12, 2020 at 11:04 AM Steve Niemitz 
> wrote:
>
>> We use the Dataflow API [1] directly, via the google api client wrappers
>> (both python and java), pretty extensively.  It works well and doesn't
>> require a dependency on beam.
>>
>> [1] https://cloud.google.com/dataflow/docs/reference/rest
>>
>> On Mon, Oct 12, 2020 at 1:56 PM Luke Cwik  wrote:
>>
>>> It is your best way to do this right now and this hasn't changed in a
>>> while (region was added to project and job ids in the past 6 years).
>>>
>>> On Mon, Oct 12, 2020 at 10:53 AM Peter Littig 
>>> wrote:
>>>
>>>> Thanks for the reply, Kyle.
>>>>
>>>> The DataflowClient::getJob method uses a Dataflow instance that's
>>>> provided at construction time (via
>>>> DataflowPipelineOptions::getDataflowClient). If that Dataflow instance can
>>>> be obtained from a minimal instance of the options (i.e., containing only
>>>> the project ID and region) then it looks like everything should work.
>>>>
>>>> I suppose a secondary question here is whether or not this approach is
>>>> the recommended way to solve my problem (but I don't know of any
>>>> alternatives).
>>>>
>>>> On Mon, Oct 12, 2020 at 9:55 AM Kyle Weaver 
>>>> wrote:
>>>>
>>>>> > I think the answer is to use a DataflowClient in the second service,
>>>>> but creating one requires DataflowPipelineOptions. Are these options
>>>>> supposed to be exactly the same as those used by the first service? Or do
>>>>> only some of the fields have to be the same?
>>>>>
>>>>> Most options are not necessary for retrieving a job. In general,
>>>>> Dataflow jobs can always be uniquely identified by the project, region and
>>>>> job ID.
>>>>> https://github.com/apache/beam/blob/ecedd3e654352f1b51ab2caae0fd4665403bd0eb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java#L100
>>>>>
>>>>> On Mon, Oct 12, 2020 at 9:31 AM Peter Littig 
>>>>> wrote:
>>>>>
>>>>>> Hello, Beam users!
>>>>>>
>>>>>> Suppose I want to build two (Java) services, one that launches
>>>>>> (long-running) dataflow jobs, and the other that monitors the status of
>>>>>> dataflow jobs. Within a single service, I could simply track a
>>>>>> PipelineResult for each dataflow run and periodically call getState. How
>>>>>> can I monitor job status like this from a second, independent service?
>>>>>>
>>>>>> I think the answer is to use a DataflowClient in the second service,
>>>>>> but creating one requires DataflowPipelineOptions. Are these options
>>>>>> supposed to be exactly the same as those used by the first service? Or do
>>>>>> only some of the fields have to be the same?
>>>>>>
>>>>>> Or maybe there's a better alternative than DataflowClient?
>>>>>>
>>>>>> Thanks in advance!
>>>>>>
>>>>>> Peter
>>>>>>
>>>>>


Re: Querying Dataflow job status via Java SDK

2020-10-12 Thread Steve Niemitz
We use the Dataflow API [1] directly, via the google api client wrappers
(both python and java), pretty extensively.  It works well and doesn't
require a dependency on beam.

[1] https://cloud.google.com/dataflow/docs/reference/rest

On Mon, Oct 12, 2020 at 1:56 PM Luke Cwik  wrote:

> It is your best way to do this right now and this hasn't changed in a
> while (region was added to project and job ids in the past 6 years).
>
> On Mon, Oct 12, 2020 at 10:53 AM Peter Littig 
> wrote:
>
>> Thanks for the reply, Kyle.
>>
>> The DataflowClient::getJob method uses a Dataflow instance that's
>> provided at construction time (via
>> DataflowPipelineOptions::getDataflowClient). If that Dataflow instance can
>> be obtained from a minimal instance of the options (i.e., containing only
>> the project ID and region) then it looks like everything should work.
>>
>> I suppose a secondary question here is whether or not this approach is
>> the recommended way to solve my problem (but I don't know of any
>> alternatives).
>>
>> On Mon, Oct 12, 2020 at 9:55 AM Kyle Weaver  wrote:
>>
>>> > I think the answer is to use a DataflowClient in the second service,
>>> but creating one requires DataflowPipelineOptions. Are these options
>>> supposed to be exactly the same as those used by the first service? Or do
>>> only some of the fields have to be the same?
>>>
>>> Most options are not necessary for retrieving a job. In general,
>>> Dataflow jobs can always be uniquely identified by the project, region and
>>> job ID.
>>> https://github.com/apache/beam/blob/ecedd3e654352f1b51ab2caae0fd4665403bd0eb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java#L100
>>>
>>> On Mon, Oct 12, 2020 at 9:31 AM Peter Littig 
>>> wrote:
>>>
 Hello, Beam users!

 Suppose I want to build two (Java) services, one that launches
 (long-running) dataflow jobs, and the other that monitors the status of
 dataflow jobs. Within a single service, I could simply track a
 PipelineResult for each dataflow run and periodically call getState. How
 can I monitor job status like this from a second, independent service?

 I think the answer is to use a DataflowClient in the second service,
 but creating one requires DataflowPipelineOptions. Are these options
 supposed to be exactly the same as those used by the first service? Or do
 only some of the fields have to be the same?

 Or maybe there's a better alternative than DataflowClient?

 Thanks in advance!

 Peter

>>>


Re: Building Dataflow Worker

2020-06-15 Thread Steve Niemitz
I think you want the "legacy-worker" target instead:

./gradlew -Ppublishing -PnoSigning
:runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar

That's what I've always used at least.

On Mon, Jun 15, 2020 at 4:57 PM Luke Cwik  wrote:

> I noticed that you are not using the gradle wrapper but your own installed
> version. Apache Beam 2.19 is using gradle 5.2.1, is the installed version
> compatible with that?
>
> Try
> ./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
> in a clean workspace.
>
> On Fri, Jun 12, 2020 at 4:30 PM Talat Uyarer 
> wrote:
>
>> Hi,
>>
>> I want to build the dataflow worker on apache beram 2.19. However I faced
>> a grpc issue. I did not change anything. Just checked release-2.19.0 branch
>> and run build command. Could you help me understand why it does not build.
>> [1]
>>
>> Additional information, Based on my limited knowledge Looks like it is
>> looking for a class which is coming grpc 1.26 version. But beam 2.19
>> version is using grpc 1.21
>>
>> You can find build output below.
>>
>> Thanks
>> [1] http://dpaste.com/15X7429
>>
>


Re: daily dataflow job failing today

2020-02-12 Thread Steve Niemitz
avro-python3 1.9.2 was released on pypi 4 hours ago, and added pycodestyle
as a dependency, probably related?

On Wed, Feb 12, 2020 at 1:03 PM Luke Cwik  wrote:

> +dev 
>
> There was recently an update to add autoformatting to the Python SDK[1].
>
> I'm seeing this during testing of a PR as well.
>
> 1:
> https://lists.apache.org/thread.html/448bb5c2d73fbd74eec7aacb5f28fa2f9d791784c2e53a2e3325627a%40%3Cdev.beam.apache.org%3E
>
> On Wed, Feb 12, 2020 at 9:57 AM Alan Krumholz 
> wrote:
>
>> Some more information for this as I still can't get to fix it
>>
>> This job is triggered using the beam[gcp] python sdk from a KubeFlow
>> Pipelines component which runs on top of docker image:
>> tensorflow/tensorflow:1.13.1-py3
>>
>> I just checked and that image hasn't been updated recently. I also
>> redeployed my pipeline to another (older) deployment of KFP and it gives me
>> the same error (which tells me this isn't an internal KFP problem)
>>
>> The exact same pipeline/code running on the exact same image has been
>> running fine for days. Did anything changed on the beam/dataflow side since
>> yesterday morning?
>>
>> Thanks for your help! this is a production pipeline that is not running
>> for us :(
>>
>>
>>
>> On Wed, Feb 12, 2020 at 7:21 AM Alan Krumholz 
>> wrote:
>>
>>> Hi, I have a scheduled daily job that I have been running fine in
>>> dataflow for days now.
>>> We haven't changed anything on this code but this morning run failed
>>> (it couldn't even spin up the job)
>>> The job submits a setup.py file (that also hasn't changed) but maybe is
>>> causing the problem? (based on the error I'm getting)
>>>
>>> Anyone else having the same issue? or know how to fix it?
>>> Thanks!
>>>
>>> ERROR: Complete output from command python setup.py egg_info:
>>> 2 ERROR: Traceback (most recent call last):
>>> 3 File "", line 1, in 
>>> 4 File "/tmp/pip-install-42zyi89t/avro-python3/setup.py", line 41, in
>>> 
>>> 5 import pycodestyle
>>> 6 ImportError: No module named 'pycodestyle'
>>> 7 
>>> 8ERROR: Command "python setup.py egg_info" failed with error code 1 in
>>> /tmp/pip-install-42zyi89t/avro-python3/
>>> 9 ERROR: Complete output from command python setup.py egg_info:
>>> 10 ERROR: Traceback (most recent call last):
>>> 11 File "", line 1, in 
>>> 12 File "/tmp/pip-install-wrqytf9a/avro-python3/setup.py", line 41, in
>>> 
>>> 13 import pycodestyle
>>> 14 ImportError: No module named 'pycodestyle'
>>> 15 
>>> 16ERROR: Command "python setup.py egg_info" failed with error code 1 in
>>> /tmp/pip-install-wrqytf9a/avro-python3/
>>>
>>


Re: Stability of Timer.withOutputTimestamp

2020-02-06 Thread Steve Niemitz
cool, thank you.  I meant stable as in "my pipeline will produce correct
results", API changes are fine with me.

Still curious too on the second question wrt firing time vs output time
validation.

On Wed, Feb 5, 2020 at 11:20 PM Kenneth Knowles  wrote:

> It is definitely too new to be stable in the sense of not even tiny
> changes to the API / runtime compatibility.
>
> However, in my opinion it is so fundamental (and overdue) it will
> certainly exist in some form.
>
> Feel free to use it if you are OK with the possibility of minor
> compile-time adjustments and you do not require Dataflow pipeline update
> compatibility.
>
> Kenn
>
> On Wed, Feb 5, 2020 at 10:31 AM Luke Cwik  wrote:
>
>> +Reuven Lax 
>>
>> On Wed, Feb 5, 2020 at 7:33 AM Steve Niemitz  wrote:
>>
>>> Also, as a follow up, I'm curious about this commit:
>>>
>>> https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3
>>>
>>> My use case is that I want to set a timer to fire after the max
>>> timestamp of a window, but hold the watermark to the max timestamp until it
>>> fires, essentially delaying the window closing by some amount of event
>>> time.  Previous to that revert commit it seems like that would have been
>>> possible, but now it would fail (since the target is after the window's
>>> maxTimestamp).
>>>
>>> What was the reason this was reverted, and are there plans to un-revert
>>> it?
>>>
>>> On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz 
>>> wrote:
>>>
>>>> I noticed that Timer.withOutputTimestamp has landed in 2.19, but I
>>>> didn't see any mention of it in the release notes.
>>>>
>>>> Is this feature considered stable (specifically on dataflow)?
>>>>
>>>


Re: Stability of Timer.withOutputTimestamp

2020-02-05 Thread Steve Niemitz
Also, as a follow up, I'm curious about this commit:
https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3

My use case is that I want to set a timer to fire after the max timestamp
of a window, but hold the watermark to the max timestamp until it fires,
essentially delaying the window closing by some amount of event time.
Previous to that revert commit it seems like that would have been possible,
but now it would fail (since the target is after the window's maxTimestamp).

What was the reason this was reverted, and are there plans to un-revert it?

On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz  wrote:

> I noticed that Timer.withOutputTimestamp has landed in 2.19, but I didn't
> see any mention of it in the release notes.
>
> Is this feature considered stable (specifically on dataflow)?
>


Stability of Timer.withOutputTimestamp

2020-02-05 Thread Steve Niemitz
I noticed that Timer.withOutputTimestamp has landed in 2.19, but I didn't
see any mention of it in the release notes.

Is this feature considered stable (specifically on dataflow)?


Re: [FYI] Rephrasing the 'lull'/processing stuck logs

2020-01-09 Thread Steve Niemitz
One other nice enhancement around this would be if a transform could
indicate that it was executing a "slow" operation.

A good example is writing in BigQueryIO, it's very reasonable/normal for a
load job to run for more than 5 minutes, and the "stuck" message can be
confusing to users.  The rewording to "operation ongoing" in the PR seems
like a good improvement here as well though.

On Thu, Jan 9, 2020 at 8:26 PM Pablo Estrada  wrote:

> Hello Beam users and community,
>
> The Beam Python SDK, and Java workers have a utility where they will print
> a log message whenever there's an execution thread where no state
> transitions happen for over five minutes.
>
> These messages are common in two scenarios:
> 1. A deadlock happening in the worker (very uncommon, but possible)
> 2. An operation simply takes over 5 minutes (e.g. a slow RPC, waiting for
> an external event, etc).
>
> The old phrasing of these logs has often been a bit confusing, and
> led users to think that there was actual stuckness in the pipeline, when
> reality was more harmless: an operation was just slow.
>
> I am introducing a change[1] for the Apache Beam SDK to rephrase these
> logs, and make them less confusing.
>
> If you ever used these logs for your debugging, know that the string will
> change, but the logs will remain : ).
> If you didn't know about these, now you do, and hopefully they will be
> useful to you! : )
>
> Thanks!
> -P.
>
> [1] https://github.com/apache/beam/pull/10446/files
>


Re: Memory profiling on Dataflow with java

2019-11-18 Thread Steve Niemitz
If you go the port forwarding route, you need to use a SOCKS proxy as well
as forwarding the JMX port because of how JMX works.

For example, I SSH into a worker with:
ssh  *-D  -L :127.0.0.1: *

and then launch eg, jvisualvm with:
jvisualvm -J-DsocksProxyHost=localhost -J-DsocksProxyPort=

Then, set up a connection to the worker using its private IP address
(probably 10.something) on port  (make sure to allow non-SSL
connections as well).

On Mon, Nov 18, 2019 at 12:54 PM Luke Cwik  wrote:

> What Jeff mentioned is the easiest way to get heap dumps on OOM.
>
> If you want to connect to JMX, try using an SSH tunnel and forward the
> ports.
>
> On Mon, Nov 18, 2019 at 8:59 AM Jeff Klukas  wrote:
>
>> Using default Dataflow workers, this is the set of options I passed:
>>
>> --dumpHeapOnOOM --saveHeapDumpsToGcsPath=$MYBUCKET/heapdump
>> --diskSizeGb=100
>>
>>
>> On Mon, Nov 18, 2019 at 11:57 AM Jeff Klukas  wrote:
>>
>>> It sounds like you're generally doing the right thing. I've successfully
>>> used --saveHeapDumpsToGcsPath in a Java pipeline running on Dataflow and
>>> inspected the results in Eclipse MAT.
>>>
>>> I think that --saveHeapDumpsToGcsPath will automatically turn on
>>> --dumpHeapOnOOM but worth setting that explicitly too.
>>>
>>> Are your boot disks large enough to store the heap dumps? The docs for
>>> getSaveHeapDumpsToGcsPath [0] mention "CAUTION: This option implies
>>> dumpHeapOnOOM, and has similar caveats. Specifically, heap dumps can of
>>> comparable size to the default boot disk. Consider increasing the boot disk
>>> size before setting this flag to true."
>>>
>>> When I've done this in the past, I definitely had to increase boot disk
>>> size (though I forget now what the relevant Dataflow option was).
>>>
>>> [0]
>>> https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.html
>>>
>>> On Mon, Nov 18, 2019 at 11:35 AM Reynaldo Baquerizo <
>>> reynaldo.michel...@bairesdev.com> wrote:
>>>
 Hi all,

 We are running into OOM issues with one of our pipelines. They are not
 reproducible with DirectRunner, only with Dataflow.
 I tried --saveHeapDumpsToGcsPath, but it does not save any heap dump
 (MyOptions extends DataflowPipelineDebugOptions)
 I looked at the java process inside the docker container and it has
 remote jmx enabled through port , but outside traffic is firewalled.

 Beam SDK: 2.15.0

 Any ideas?

 Cheers,
 --
 Reynaldo

>>>


Re: Questions about the bundled PubsubIO read implementation

2019-07-10 Thread Steve Niemitz
Oh, one other important thing I forgot to mention is that I can't reproduce
(the empty message issue at least) locally on the DirectRunner.

On Wed, Jul 10, 2019 at 6:04 PM Steve Niemitz  wrote:

> Thanks for making JIRAs for these, I was going to, I just wanted to do a
> sanity check first. :)
>
> I reproduced them all with the stock PubsubIO first, and then again with
> the gRPC client.  I can try to throw together a much more minimal repro
> case too.
>
> On Wed, Jul 10, 2019 at 4:21 PM Kenneth Knowles  wrote:
>
>> This is pretty surprising. Seems valuable to file separate Jiras so we
>> can track investigation and resolution.
>>
>>  - use gRPC: https://issues.apache.org/jira/browse/BEAM-7718
>>  - empty message bodies: https://issues.apache.org/jira/browse/BEAM-7716
>>  - watermark tracking: https://issues.apache.org/jira/browse/BEAM-7717
>>
>> You reproduced these with the original PubsubIO?
>>
>> Kenn
>>
>> On Mon, Jul 8, 2019 at 10:38 AM Steve Niemitz 
>> wrote:
>>
>>> I was trying to use the bundled PubsubIO.Read implementation in beam on
>>> dataflow (using --experiments=enable_custom_pubsub_source to prevent
>>> dataflow from overriding it with its own implementation) and ran into some
>>> interesting issues.  I was curious if people have any experience with
>>> these.  I'd assume anyone using PubsubIO on a runner other than dataflow
>>> would have run into the same things.
>>>
>>> - The default implementation uses the HTTP REST API, which seems to be
>>> much less performant than the gRPC implementation.  Is there a reason that
>>> the gRPC implementation is essentially unavailable from the public API?
>>> PubsubIO.Read.withClientFactory is package private.  I worked around this
>>> by making it public and rebuilding, which led me to...
>>>
>>> - Both the JSON and gRPC implementation return empty message bodies for
>>> all messages read (using readMessages).  When running with the
>>> dataflow-specific reader, this doesn't happen and the message bodies have
>>> the content as expected.  I took a pipeline that works as expected on
>>> dataflow using PubsubIO.Read, added the experiment flag, and then my
>>> pipeline broke from empty message bodies.  This obviously blocked me from
>>> really experimenting much more.
>>>
>>> - The watermark tracking seems off.  The dataflow UI was reporting my
>>> watermark as around (but not exactly) the epoch (it was ~1970-01-19), which
>>> makes me wonder if seconds/milliseconds got confused somewhere (ie, if you
>>> take the time since epoch in milliseconds now and interpret it as seconds,
>>> you'll get somewhere around 1970-01-18).
>>>
>>


Re: Questions about the bundled PubsubIO read implementation

2019-07-10 Thread Steve Niemitz
Thanks for making JIRAs for these, I was going to, I just wanted to do a
sanity check first. :)

I reproduced them all with the stock PubsubIO first, and then again with
the gRPC client.  I can try to throw together a much more minimal repro
case too.

On Wed, Jul 10, 2019 at 4:21 PM Kenneth Knowles  wrote:

> This is pretty surprising. Seems valuable to file separate Jiras so we can
> track investigation and resolution.
>
>  - use gRPC: https://issues.apache.org/jira/browse/BEAM-7718
>  - empty message bodies: https://issues.apache.org/jira/browse/BEAM-7716
>  - watermark tracking: https://issues.apache.org/jira/browse/BEAM-7717
>
> You reproduced these with the original PubsubIO?
>
> Kenn
>
> On Mon, Jul 8, 2019 at 10:38 AM Steve Niemitz  wrote:
>
>> I was trying to use the bundled PubsubIO.Read implementation in beam on
>> dataflow (using --experiments=enable_custom_pubsub_source to prevent
>> dataflow from overriding it with its own implementation) and ran into some
>> interesting issues.  I was curious if people have any experience with
>> these.  I'd assume anyone using PubsubIO on a runner other than dataflow
>> would have run into the same things.
>>
>> - The default implementation uses the HTTP REST API, which seems to be
>> much less performant than the gRPC implementation.  Is there a reason that
>> the gRPC implementation is essentially unavailable from the public API?
>> PubsubIO.Read.withClientFactory is package private.  I worked around this
>> by making it public and rebuilding, which led me to...
>>
>> - Both the JSON and gRPC implementation return empty message bodies for
>> all messages read (using readMessages).  When running with the
>> dataflow-specific reader, this doesn't happen and the message bodies have
>> the content as expected.  I took a pipeline that works as expected on
>> dataflow using PubsubIO.Read, added the experiment flag, and then my
>> pipeline broke from empty message bodies.  This obviously blocked me from
>> really experimenting much more.
>>
>> - The watermark tracking seems off.  The dataflow UI was reporting my
>> watermark as around (but not exactly) the epoch (it was ~1970-01-19), which
>> makes me wonder if seconds/milliseconds got confused somewhere (ie, if you
>> take the time since epoch in milliseconds now and interpret it as seconds,
>> you'll get somewhere around 1970-01-18).
>>
>


Questions about the bundled PubsubIO read implementation

2019-07-08 Thread Steve Niemitz
I was trying to use the bundled PubsubIO.Read implementation in beam on
dataflow (using --experiments=enable_custom_pubsub_source to prevent
dataflow from overriding it with its own implementation) and ran into some
interesting issues.  I was curious if people have any experience with
these.  I'd assume anyone using PubsubIO on a runner other than dataflow
would have run into the same things.

- The default implementation uses the HTTP REST API, which seems to be much
less performant than the gRPC implementation.  Is there a reason that the
gRPC implementation is essentially unavailable from the public API?
PubsubIO.Read.withClientFactory is package private.  I worked around this
by making it public and rebuilding, which led me to...

- Both the JSON and gRPC implementation return empty message bodies for all
messages read (using readMessages).  When running with the
dataflow-specific reader, this doesn't happen and the message bodies have
the content as expected.  I took a pipeline that works as expected on
dataflow using PubsubIO.Read, added the experiment flag, and then my
pipeline broke from empty message bodies.  This obviously blocked me from
really experimenting much more.

- The watermark tracking seems off.  The dataflow UI was reporting my
watermark as around (but not exactly) the epoch (it was ~1970-01-19), which
makes me wonder if seconds/milliseconds got confused somewhere (ie, if you
take the time since epoch in milliseconds now and interpret it as seconds,
you'll get somewhere around 1970-01-18).


Performance of Wait.on (and side inputs in general) in dataflow

2019-05-24 Thread Steve Niemitz
Hi everyone.

I've been debugging a streaming job (on dataflow) for a little while now,
and seem to have tracked it down to the Wait.on transform that I'm using.

Some background: our pipeline takes in ~250,000 messages/sec from pubsub,
aggregates them in an hourly window, and then emits the results.  The final
output from the combiner is ~20,000,000 keys (emitted at the end of the
hour).

These results are tee-d, they get written somewhere, and they keys are also
sent into a Wait.on transform, where they wait for a signal, before being
written to pubsub.

If I let this pipeline run for a couple of hours, the input processing rate
eventually drops down below the rate of messages going into the queue, and
I'll get a bunch of deadline_exceeded errors from windmill for one specific
worker.  At this point the pipeline is basically unrecoverable and needs to
be restarted.

If I remove the wait transform, everything works great.

My back-of-the-envelope calculations are that the elements going into the
wait transform are ~1 GB total, so its not a huge input either.  My guess
is there's some kind of O(n^2) operation happening here, because this same
pipeline does work fairly reliably with a lower key space (~100,000-1
million).

The other interesting thing I've noticed is that the stage is constantly
processing operations even with no messages coming into it. (eg, in my
lower scale case, in the windmill status page, the stage has ~100,000
active operations / second "succeeding", but no messages are going into the
stage (since its not the end of the hour)).  It's also written 10 GB of
data, although the dataflow UI says that only 500 MB of data has gone into
the wait transform.

It seems like there might just be a bug here, but in the interim, is there
any way to construct something similar to the Wait transform but without
using side-inputs?  My first guess was possibly a CoGroupByKey?

Thanks!


Is it safe to cache the value of a singleton view (with a global window) in a DoFn?

2019-05-04 Thread Steve Niemitz
I have a singleton view in a global window that is read from a DoFn.  I'm
curious if its "correct" to cache that value from the view, or if I need to
read it every time.

As a (simplified) example, if I were to generate the view as such:

input.getPipeline
  .apply(Create.of(Collections.singleton[Void](null)))
  .apply(MapElements.via(new SimpleFunction[Void, JLong]() {
override def apply(input: Void): JLong = {
  Instant.now().getMillis
}
  })).apply(View.asSingleton[JLong]())

and then read it from a DoFn (using context.sideInput), is it guaranteed
that:
- every instance of the DoFn will read the same value?
- The value will never change?

If so it seems like it'd be safe to cache the value inside the DoFn.  It
seems like this would be the case, but I've also seen cases in dataflow
where the UI indicates that the MapElements step above produced more than
one element, so I'm curious what people have to say.

Thanks!


Re: Performance of stateful DoFn vs CombineByKey

2019-03-18 Thread Steve Niemitz
Interesting, thanks for the info!  Combiner lifting definitely makes sense
here, but as you mentioned I'm curious how much it helps performance in a
streaming pipeline.  The blog post you linked is great, I wonder if it
possible to make this information more visible?  It's pretty buried in the
blog list now, and I'll admit I never even got that far, because there's
another post on stateful processing almost directly above it.

I still plan on trying to do some benchmarks here because it'd be
interesting to see the differences.  I'll make sure to post results when I
do.

On Thu, Mar 14, 2019 at 3:43 PM Kenneth Knowles  wrote:

> Combine admits many more execution plans than stateful ParDo:
>
>  - "Combiner lifting" or "mapper-side combine", in which the CombineFn is
> used to reduce data before shuffling. This is tremendous in batch, but can
> still matter in streaming.
>  - Hot key fanout & recombine. This is important in both batch & streaming.
>
> I tried to cover the issues a little in this section of my blog post on
> state, because it also answers the converse question: why/when would you
> use state (without timers) when Combine is so similar?
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html#how-does-stateful-processing-fit-into-the-beam-model
>
> And here's a slide with the same idea but side-by-side illustrations:
> https://s.apache.org/ffsf-2017-beam-state#slide=id.g1dbf0d46d2_0_258
>
> Kenn
>
> On Tue, Mar 12, 2019 at 6:55 AM Steve Niemitz  wrote:
>
>> Hi all.
>>
>> I'm curious if anyone has done any comparison of the performance of a
>> pipeline that uses CombineByKey, vs one that uses a stateful DoFn with
>> combining state. [1]
>>
>> More specifically, if I had a pipeline that had a CombineByKey configured
>> with early firings every N minutes, and I replaced the CBK with a stateful
>> DoFn with combining state and a timer that fired every N minutes instead,
>> would there be a (significant?) performance difference?  Specifically I'm
>> using dataflow (with streaming engine) but I'd be curious for other runners
>> as well
>>
>> If no one has tried this I might do a benchmark to test, I'd be very
>> interested to see the results.
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/state/CombiningState.html
>>
>


Performance of stateful DoFn vs CombineByKey

2019-03-12 Thread Steve Niemitz
Hi all.

I'm curious if anyone has done any comparison of the performance of a
pipeline that uses CombineByKey, vs one that uses a stateful DoFn with
combining state. [1]

More specifically, if I had a pipeline that had a CombineByKey configured
with early firings every N minutes, and I replaced the CBK with a stateful
DoFn with combining state and a timer that fired every N minutes instead,
would there be a (significant?) performance difference?  Specifically I'm
using dataflow (with streaming engine) but I'd be curious for other runners
as well

If no one has tried this I might do a benchmark to test, I'd be very
interested to see the results.

[1]
https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/state/CombiningState.html


Re: Stateful processing : @OnWindowExpiration DoFn annotation

2019-02-26 Thread Steve Niemitz
You are correct that state is per-key.  In your case, if you just want all
elements in a window to be emitted when the window closes, you can just use
GroupByKey and assign everything the same key.  When the window closes
you'll get an iterable of all elements in that window.

Note: this won't scale well, as you'll be processing the group on a single
worker, however for smaller data sets it should be fine.

On Tue, Feb 26, 2019 at 10:26 AM Augustin Lafanechere <
augustin.lafanech...@kapten.com> wrote:

> I think I got the issue. I did not get that states are partitioned by key.
> So if I want to limit callback fires I need to change my partitioning logic
> for a more coarse grained one. Please tell me if I’m wrong or if a special
> feature exists to access a the global state, put due to shuffling issue I
> think it my not be possible / wanted.
>
> Thanks !
>
> Augustin
>
>
>
> Le 26 févr. 2019 à 14:06, Augustin Lafanechere <
> augustin.lafanech...@kapten.com> a écrit :
>
> Many thanks for your answers !
> The GroupIntoBatches transforms nearly implements the logic I am after,
> but I just want to execute the RPC call at the end of the window, not the
> flush on batch size limit reach.
>
> In order to do so I reimplemented the logic of the GroupIntoBatches that
> guarantees batch flush on window end.
>
> According to my logs its looks like the @OnTimer callback is fired for
> every element that reaches the processElement. Is it the expected behaviors
> ? I look after executing the callback only once (when the window is closed).
>
> Thanks for you help ! Please find below the snippet I am currently running.
>
> Augustin
>
> public final class Enrich extends DoFn, KV>
> {
>
> private static final Logger LOG = LoggerFactory.getLogger(TestPipeline.
> class);
>
> @TimerId("endOfWindow")
> private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
> @StateId("batch")
> private final StateSpec>> batchSpec = StateSpecs
> .bag();
>
> @ProcessElement
> public void processElement(
> final @TimerId("endOfWindow") Timer windowTimer,
> final @StateId("batch") BagState> batch,
> final @Element KV element,
> final BoundedWindow window,
> final OutputReceiver> receiver) {
>
> Instant windowExpires = window.maxTimestamp();
>
> LOG.info(
> "*** SET TIMER *** to point in time {} for window {}",
> windowExpires.toString(),
> window.toString());
> windowTimer.set(windowExpires);
> batch.add(element);
> LOG.info("*** BATCH *** Add element for window {} ", window.toString());
> }
>
> @OnTimer("endOfWindow")
> public void onTimerCallback(
> final OutputReceiver> receiver,
> final @Timestamp Instant timestamp,
> final @StateId("batch") BagState> batch,
> final BoundedWindow window) {
> LOG.info(
> "*** END OF WINDOW *** for timer timestamp {} in windows {}", timestamp,
> window.toString());
> flushBatch(receiver, batch);
> }
>
> private void flushBatch(
> final OutputReceiver> receiver, final BagState Long>> batch) {
> Iterable> values = batch.read();
> // when the timer fires, batch state might be empty
> if (!Iterables.isEmpty(values)) {
> for (KV elem : values) {
> receiver.output(elem);
> }
> }
> batch.clear();
> LOG.info("*** BATCH *** clear");
> }
> }
>
>
>
> Le 26 févr. 2019 à 00:49, Kenneth Knowles  a écrit :
>
> Sorry you hit this issue.
>
> Implementation of this feature has been marked in progress [1] for a
> while. It looks to be stalled so I unassigned the ticket. There is not any
> explicit runner support, yet, though the existing implementation is clever
> enough that it may automatically work for many runners.
>
> Kenn
>
> [1] https://issues.apache.org/jira/browse/BEAM-1589
>
> On Mon, Feb 25, 2019 at 1:04 PM Steve Niemitz  wrote:
>
>> I've noticed this doesn't seem to work either.  The workaround is to just
>> schedule an event-time timer at the end of the window + allowed lateness.
>> The built-in GroupIntoBatches transform [1] does just this, I suspect to
>> work around the issue as well.
>>
>> [1]
>> https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167
>>
>> On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere <
>> augustin.lafanech...@kapten.com> wrote:
>>
>>> Hello dear Beam community,
>>> Sorry, I sent this email on dev list first but it’s a user support
>>> question...
>>> I would like to write to you for a question a

Re: Stateful processing : @OnWindowExpiration DoFn annotation

2019-02-25 Thread Steve Niemitz
I've noticed this doesn't seem to work either.  The workaround is to just
schedule an event-time timer at the end of the window + allowed lateness.
The built-in GroupIntoBatches transform [1] does just this, I suspect to
work around the issue as well.

[1]
https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167

On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere <
augustin.lafanech...@kapten.com> wrote:

> Hello dear Beam community,
> Sorry, I sent this email on dev list first but it’s a user support
> question...
> I would like to write to you for a question about OnWindowExpiration
> annotation on DoFn.
> Does anyone of you have a working snippet with this ?
>
> I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery
> call for a historical metric value updated by an external process. I want
> to execute this query and sum the results with my events buffered in a
> state. The OnWindowExpiration looks very practical to accomplish this.
>
> It looks like the function annotated with @OnWindowExpiration is never
> call. My pipeline runs on Dataflow, perhaps its not a supported feature on
> this runner…
>
> Here is a snippet of what I try to accomplish. It seems like the annotated
> functions is never called, the log line is never appearing. Am I missing
> something ?
> I tried to replicate the logic found in this blog post
>  and
> pieces of information found in this PR.
> 
>
>
> // The window definition used in the pipeline sets in a higher transform
> // Window> w =
> // Window.into(FixedWindows.of(Duration.standardMinutes(1L)))
> // .withAllowedLateness(Duration.ZERO)
> // .discardingFiredPanes();
>
> public final class Enrich extends DoFn, KV>
> {
>
> @StateId("buffer")
> private final StateSpec>> bufferedEvents =
> StateSpecs.bag();
>
> @ProcessElement
> public void process(
> final ProcessContext context,
> final @StateId("buffer") BagState> bufferState) {
> bufferState.add(context.element());
> context.output(context.element());
> }
>
> @OnWindowExpiration
> public void onWindowExpiration(
> final @StateId("buffer") BagState> bufferState,
> final OutputReceiver> outputReceiver) {
> LOG . info ("The
> window expired");
> for (KV enrichedEvent : enrichWithBigQuery(bufferState.read()))
> {
> outputReceiver.output(enrichedEvent);
> }
> }
> }
>
>
> Thanks for your help,
>
>
> Augustin
>
> Chauffeur Privé devient kapten_ Plus d'informations ici
> 
>


Re: Some questions about ensuring correctness with windowing and triggering

2019-02-19 Thread Steve Niemitz
Thanks again for all the replies everyone.  Just as a final follow up here,
are there any concrete plans on addressing these issues I could start
following?  The sink trigger doc seems like a start, but also seems like
just a starting point in a larger re-architecture of sinks.

On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles  wrote:

>
>
> On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw 
> wrote:
>
>> On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz 
>> wrote:
>>
>>>
>>> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz 
>>>> wrote:
>>>>
>>>>> Thanks again for the answers so far!  I really appreciate it.  As for
>>>>> my specific use-case, we're using Bigtable as the final sink, and I'd
>>>>> prefer to keep our writes fully idempotent for other reasons (ie no
>>>>> read-modify-write).  We actually do track tentative vs final values
>>>>> already, but checking that at write-time would impose a pretty big 
>>>>> overhead
>>>>> in the write path.
>>>>>
>>>>> After this I actually instrumented one of my running pipelines to
>>>>> detect these "time traveling" panes, and did see it occurring pretty
>>>>> frequently, particularly when dataflow decides to scale up/down the job, 
>>>>> so
>>>>> that was interesting.
>>>>>
>>>>> From all this, it seems like using a stateful DoFn to prevent time
>>>>> traveling panes from overwriting newer ones is the best solution for now.
>>>>>
>>>>
>>>> Note that you can't "filter out" these time traveling panes, because at
>>>> the next fusion break they might get re-ordered again.
>>>>
>>>
>>> Ack, in a general sense.  To solve my specific problem my plan was to
>>> ensure the final writer sink would be fused to this filter step (or even
>>> build it directly into the DoFn itself that does the write), which would
>>> work in my specific case (it seems like at least).
>>>
>>>
>>>>
>>>>
>>>>> My last question / statement is just around general education and
>>>>> documentation about this.  I think the fact that PCollection are unordered
>>>>> makes sense and is pretty intuitive, but fired panes being delivered
>>>>> out-of-order seems very surprising.  I'm curious how many other pipelines
>>>>> exist that run into this (and produce incorrect results!) but people are
>>>>> unaware of.  Is there a way we can call this behavior out?  For example,
>>>>> many of the sample beam projects use early firings, but there's never any
>>>>> mention that the output may be out-of-order.
>>>>>
>>>>
>>>> +1 to improving the documentation here. Basically multiple firings
>>>> become independent elements of the resulting PCollection, they don't retain
>>>> any association/ordering.
>>>>
>>>> Multiply-triggered window are difficult to reason about (and not just
>>>> in this case), https://s.apache.org/beam-sink-triggers is IMHO the
>>>> right answer.
>>>>
>>>
>>> I was reading this yesterday, but couldn't see how it solved the
>>> out-of-order delivery problem here.  I do like the overall direction its
>>> proposing though, from my work with triggers so far I have found them very
>>> difficult to reason about (like you said).
>>>
>>
>> It moves the responsibility of doing things in the right order (and even
>> defining what order is "correct enough") to the runner (and sinks) such
>> that the side effects happen in order, even if all the processing did not.
>> To be clear there's still a fair amount of design to make that doc into a
>> workable system...
>>
>
> With or without sink triggers, transforms that write need to be
> pane-index-aware. The outputs themselves may be out of order, but they have
> sequence numbers on them, so sinks likely need to be made stateful so they
> can be idempotent in the face of reordering.
>
> Kenn
>
>
>>
>>
>>>
>>>
>>>>
>>>>
>>>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz 
>>>>>> wrote:
>>>>>

Re: Some questions about ensuring correctness with windowing and triggering

2019-02-13 Thread Steve Niemitz
On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw  wrote:

> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz  wrote:
>
>> Thanks again for the answers so far!  I really appreciate it.  As for my
>> specific use-case, we're using Bigtable as the final sink, and I'd prefer
>> to keep our writes fully idempotent for other reasons (ie no
>> read-modify-write).  We actually do track tentative vs final values
>> already, but checking that at write-time would impose a pretty big overhead
>> in the write path.
>>
>> After this I actually instrumented one of my running pipelines to detect
>> these "time traveling" panes, and did see it occurring pretty frequently,
>> particularly when dataflow decides to scale up/down the job, so that was
>> interesting.
>>
>> From all this, it seems like using a stateful DoFn to prevent time
>> traveling panes from overwriting newer ones is the best solution for now.
>>
>
> Note that you can't "filter out" these time traveling panes, because at
> the next fusion break they might get re-ordered again.
>

Ack, in a general sense.  To solve my specific problem my plan was to
ensure the final writer sink would be fused to this filter step (or even
build it directly into the DoFn itself that does the write), which would
work in my specific case (it seems like at least).


>
>
>> My last question / statement is just around general education and
>> documentation about this.  I think the fact that PCollection are unordered
>> makes sense and is pretty intuitive, but fired panes being delivered
>> out-of-order seems very surprising.  I'm curious how many other pipelines
>> exist that run into this (and produce incorrect results!) but people are
>> unaware of.  Is there a way we can call this behavior out?  For example,
>> many of the sample beam projects use early firings, but there's never any
>> mention that the output may be out-of-order.
>>
>
> +1 to improving the documentation here. Basically multiple firings become
> independent elements of the resulting PCollection, they don't retain any
> association/ordering.
>
> Multiply-triggered window are difficult to reason about (and not just in
> this case), https://s.apache.org/beam-sink-triggers is IMHO the right
> answer.
>

I was reading this yesterday, but couldn't see how it solved the
out-of-order delivery problem here.  I do like the overall direction its
proposing though, from my work with triggers so far I have found them very
difficult to reason about (like you said).


>
>
>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw 
>> wrote:
>>
>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz 
>>> wrote:
>>> >
>>> > wow, thats super unexpected and dangerous, thanks for clarifying!
>>> Time to go re-think how we do some of our writes w/ early firings then.
>>> >
>>> > Are there any workarounds to make things happen in-order in dataflow?
>>>  eg if the sink gets fused to the output of the GBK operation, will it
>>> always happen effectively in order (per key) even though it's not a
>>> guarantee?
>>>
>>> If things get fused, yes. Note that sinks themselves sometimes have
>>> fusion barriers though.
>>>
>>> > I also guess I could keep track of the last pane index my sink has
>>> seen, and ignore earlier ones (using state to keep track)?
>>>
>>> Yes, that would work.
>>>
>>> What kind of sink are you using? If it supports read-modify-write or
>>> some kind of transaction you may be able to mark early results as tentative
>>> (which would be useful anyway) and only overwrite tentative ones.
>>>
>>>
>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw 
>>> wrote:
>>> >>
>>> >> Correct, even within the same key there's no promise of event time
>>> ordering mapping of panes to real time ordering because the downstream
>>> operations may happen on a different machine. Multiply triggered windows
>>> add an element of non-determinism to the process.
>>> >>
>>> >> You're also correct that triggering with multiple panes requires lots
>>> of care, especially when it comes to operations with side effects (like
>>> sinks). Most safe is to only write the final pane to the sink, and handle
>>> early triggering in a different way.
>>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>>> easier to reason about.
>>> >>
>>> >>
>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz 
>

Re: Some questions about ensuring correctness with windowing and triggering

2019-02-13 Thread Steve Niemitz
Thanks again for the answers so far!  I really appreciate it.  As for my
specific use-case, we're using Bigtable as the final sink, and I'd prefer
to keep our writes fully idempotent for other reasons (ie no
read-modify-write).  We actually do track tentative vs final values
already, but checking that at write-time would impose a pretty big overhead
in the write path.

After this I actually instrumented one of my running pipelines to detect
these "time traveling" panes, and did see it occurring pretty frequently,
particularly when dataflow decides to scale up/down the job, so that was
interesting.

>From all this, it seems like using a stateful DoFn to prevent time
traveling panes from overwriting newer ones is the best solution for now.

My last question / statement is just around general education and
documentation about this.  I think the fact that PCollection are unordered
makes sense and is pretty intuitive, but fired panes being delivered
out-of-order seems very surprising.  I'm curious how many other pipelines
exist that run into this (and produce incorrect results!) but people are
unaware of.  Is there a way we can call this behavior out?  For example,
many of the sample beam projects use early firings, but there's never any
mention that the output may be out-of-order.

On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw  wrote:

> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz 
> wrote:
> >
> > wow, thats super unexpected and dangerous, thanks for clarifying!  Time
> to go re-think how we do some of our writes w/ early firings then.
> >
> > Are there any workarounds to make things happen in-order in dataflow?
>  eg if the sink gets fused to the output of the GBK operation, will it
> always happen effectively in order (per key) even though it's not a
> guarantee?
>
> If things get fused, yes. Note that sinks themselves sometimes have fusion
> barriers though.
>
> > I also guess I could keep track of the last pane index my sink has seen,
> and ignore earlier ones (using state to keep track)?
>
> Yes, that would work.
>
> What kind of sink are you using? If it supports read-modify-write or some
> kind of transaction you may be able to mark early results as tentative
> (which would be useful anyway) and only overwrite tentative ones.
>
>
> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw 
> wrote:
> >>
> >> Correct, even within the same key there's no promise of event time
> ordering mapping of panes to real time ordering because the downstream
> operations may happen on a different machine. Multiply triggered windows
> add an element of non-determinism to the process.
> >>
> >> You're also correct that triggering with multiple panes requires lots
> of care, especially when it comes to operations with side effects (like
> sinks). Most safe is to only write the final pane to the sink, and handle
> early triggering in a different way.
> https://s.apache.org/beam-sink-triggers is a proposal to make this easier
> to reason about.
> >>
> >>
> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz 
> wrote:
> >>>
> >>> Also to clarify here (I re-read this and realized it could be slightly
> unclear).  My question is only about in-order delivery of panes.  ie: will
> pane P always be delivered before P+1.
> >>>
> >>> I realize the use of "in-order" before could be confusing, I don't
> care about the ordering of the elements per-se, just the ordering of the
> pane delivery.
> >>>
> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1,
> P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at
> least, the final firing is always guaranteed to be delivered after all
> early-firings (eg we could have P0, P2, P1, but then always PLast).
> >>>
> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz 
> wrote:
> >>>>
> >>>> Are you also saying also that even in the first example (Source ->
> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
> delivered in-order from the Combine -> Sink transforms?  This seems like a
> pretty big "got-cha" for correctness if you ever use accumulating
> triggering.
> >>>>
> >>>> I'd also like to point out I'm not talking about a global ordering
> across the entire PCollection, I'm talking about within the same key after
> a GBK transform.
> >>>>
> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw 
> wrote:
> >>>>>
> >>>>> Due to the nature of distributed processing, order is not preserved.
> You can, however, inspect the PaneInfo to determine if a

Re: Some questions about ensuring correctness with windowing and triggering

2019-02-12 Thread Steve Niemitz
wow, thats super unexpected and dangerous, thanks for clarifying!  Time to
go re-think how we do some of our writes w/ early firings then.

Are there any workarounds to make things happen in-order in dataflow?  eg
if the sink gets fused to the output of the GBK operation, will it always
happen effectively in order (per key) even though it's not a guarantee?  I
also guess I could keep track of the last pane index my sink has seen, and
ignore earlier ones (using state to keep track)?


On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw  wrote:

> Correct, even within the same key there's no promise of event time
> ordering mapping of panes to real time ordering because the downstream
> operations *may* happen on a different machine. Multiply triggered
> windows add an element of non-determinism to the process.
>
> You're also correct that triggering with multiple panes requires lots of
> care, especially when it comes to operations with side effects (like
> sinks). Most safe is to only write the final pane to the sink, and handle
> early triggering in a different way.
> https://s.apache.org/beam-sink-triggers is a proposal to make this easier
> to reason about.
>
>
> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz  wrote:
>
>> Also to clarify here (I re-read this and realized it could be slightly
>> unclear).  My question is only about in-order delivery of panes.  ie: will
>> pane P always be delivered before P+1.
>>
>> I realize the use of "in-order" before could be confusing, I don't care
>> about the ordering of the elements per-se, just the ordering of the pane
>> delivery.
>>
>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2)
>> for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>> least, the final firing is always guaranteed to be delivered after all
>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>
>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz 
>> wrote:
>>
>>> Are you also saying also that even in the first example (Source ->
>>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>> pretty big "got-cha" for correctness if you ever use accumulating
>>> triggering.
>>>
>>> I'd also like to point out I'm not talking about a global ordering
>>> across the entire PCollection, I'm talking about within the same key after
>>> a GBK transform.
>>>
>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw 
>>> wrote:
>>>
>>>> Due to the nature of distributed processing, order is not preserved.
>>>> You can, however, inspect the PaneInfo to determine if an element was
>>>> early, on-time, or late and act accordingly.
>>>>
>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia 
>>>> wrote:
>>>>
>>>>> In my experience ordering is not guaranteed, you may need apply a
>>>>> transformation that sort the elements and then dispatch them sorted out.
>>>>>
>>>>> Or uses the Sorter extension for this:
>>>>>
>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>
>>>>> Steve Niemitz  schrieb am Di., 12. Feb. 2019,
>>>>> 16:31:
>>>>>
>>>>>> Hi everyone, I have some questions I want to ask about how windowing,
>>>>>> triggering, and panes work together, and how to ensure correctness
>>>>>> throughout a pipeline.
>>>>>>
>>>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>
>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>>>>>> accumulating panes, this is pretty straight forward.  However, this can 
>>>>>> get
>>>>>> more complicated if we add steps after the CombineByKey, for instance
>>>>>> (using the same windowing strategy):
>>>>>>
>>>>>> Say I want to buffer the results of the CombineByKey into batches of
>>>>>> N elements.  I can do this with the built-in GroupIntoBatches [1]
>>>>>> transform, now my pipeline looks like:
>>>>>>
>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>
>>>>>> *This leads to my main question:*
>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
>>>>>> result from early firing F+1 now comes BEFORE the firing F (because it 
>>>>>> was
>>>>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>>>>> gets F+1 before F, which means my resulting store has incorrect data
>>>>>> (possibly forever if F+1 was the final firing).
>>>>>>
>>>>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>>>>> own ordering back in after GroupIntoBatches.  GIB is an example here, 
>>>>>> but I
>>>>>> imagine this could happen with any GBK type operation.
>>>>>>
>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>>>
>>>>>


Re: Some questions about ensuring correctness with windowing and triggering

2019-02-12 Thread Steve Niemitz
Also to clarify here (I re-read this and realized it could be slightly
unclear).  My question is only about in-order delivery of panes.  ie: will
pane P always be delivered before P+1.

I realize the use of "in-order" before could be confusing, I don't care
about the ordering of the elements per-se, just the ordering of the pane
delivery.

I want to make sure that given a GBK that produces 3 panes (P0, P1, P2) for
a key, a downstream PCollection could never see P0, P2, P1.  OR at least,
the final firing is always guaranteed to be delivered after all
early-firings (eg we could have P0, P2, P1, but then always PLast).

On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz  wrote:

> Are you also saying also that even in the first example (Source ->
> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
> delivered in-order from the Combine -> Sink transforms?  This seems like a
> pretty big "got-cha" for correctness if you ever use accumulating
> triggering.
>
> I'd also like to point out I'm not talking about a global ordering across
> the entire PCollection, I'm talking about within the same key after a GBK
> transform.
>
> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw 
> wrote:
>
>> Due to the nature of distributed processing, order is not preserved. You
>> can, however, inspect the PaneInfo to determine if an element was early,
>> on-time, or late and act accordingly.
>>
>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia 
>> wrote:
>>
>>> In my experience ordering is not guaranteed, you may need apply a
>>> transformation that sort the elements and then dispatch them sorted out.
>>>
>>> Or uses the Sorter extension for this:
>>>
>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>
>>> Steve Niemitz  schrieb am Di., 12. Feb. 2019,
>>> 16:31:
>>>
>>>> Hi everyone, I have some questions I want to ask about how windowing,
>>>> triggering, and panes work together, and how to ensure correctness
>>>> throughout a pipeline.
>>>>
>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>> Source -> CombineByKey (Sum) -> Sink
>>>>
>>>> Given fixed windows of 1 hour, early firings every minute, and
>>>> accumulating panes, this is pretty straight forward.  However, this can get
>>>> more complicated if we add steps after the CombineByKey, for instance
>>>> (using the same windowing strategy):
>>>>
>>>> Say I want to buffer the results of the CombineByKey into batches of N
>>>> elements.  I can do this with the built-in GroupIntoBatches [1] transform,
>>>> now my pipeline looks like:
>>>>
>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>
>>>> *This leads to my main question:*
>>>> Is ordering preserved somehow here?  ie: is it possible that the result
>>>> from early firing F+1 now comes BEFORE the firing F (because it was
>>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>>> gets F+1 before F, which means my resulting store has incorrect data
>>>> (possibly forever if F+1 was the final firing).
>>>>
>>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>>> own ordering back in after GroupIntoBatches.  GIB is an example here, but I
>>>> imagine this could happen with any GBK type operation.
>>>>
>>>> Am I thinking about this the correct way?  Thanks!
>>>>
>>>> [1]
>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>
>>>


Re: Some questions about ensuring correctness with windowing and triggering

2019-02-12 Thread Steve Niemitz
Are you also saying also that even in the first example (Source ->
CombineByKey (Sum) -> Sink) there's no guarantee that events would be
delivered in-order from the Combine -> Sink transforms?  This seems like a
pretty big "got-cha" for correctness if you ever use accumulating
triggering.

I'd also like to point out I'm not talking about a global ordering across
the entire PCollection, I'm talking about within the same key after a GBK
transform.

On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw 
wrote:

> Due to the nature of distributed processing, order is not preserved. You
> can, however, inspect the PaneInfo to determine if an element was early,
> on-time, or late and act accordingly.
>
> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia 
> wrote:
>
>> In my experience ordering is not guaranteed, you may need apply a
>> transformation that sort the elements and then dispatch them sorted out.
>>
>> Or uses the Sorter extension for this:
>>
>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>
>> Steve Niemitz  schrieb am Di., 12. Feb. 2019, 16:31:
>>
>>> Hi everyone, I have some questions I want to ask about how windowing,
>>> triggering, and panes work together, and how to ensure correctness
>>> throughout a pipeline.
>>>
>>> Lets assume I have a very simple streaming pipeline that looks like:
>>> Source -> CombineByKey (Sum) -> Sink
>>>
>>> Given fixed windows of 1 hour, early firings every minute, and
>>> accumulating panes, this is pretty straight forward.  However, this can get
>>> more complicated if we add steps after the CombineByKey, for instance
>>> (using the same windowing strategy):
>>>
>>> Say I want to buffer the results of the CombineByKey into batches of N
>>> elements.  I can do this with the built-in GroupIntoBatches [1] transform,
>>> now my pipeline looks like:
>>>
>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>
>>> *This leads to my main question:*
>>> Is ordering preserved somehow here?  ie: is it possible that the result
>>> from early firing F+1 now comes BEFORE the firing F (because it was
>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>> gets F+1 before F, which means my resulting store has incorrect data
>>> (possibly forever if F+1 was the final firing).
>>>
>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>> own ordering back in after GroupIntoBatches.  GIB is an example here, but I
>>> imagine this could happen with any GBK type operation.
>>>
>>> Am I thinking about this the correct way?  Thanks!
>>>
>>> [1]
>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>
>>


Some questions about ensuring correctness with windowing and triggering

2019-02-12 Thread Steve Niemitz
Hi everyone, I have some questions I want to ask about how windowing,
triggering, and panes work together, and how to ensure correctness
throughout a pipeline.

Lets assume I have a very simple streaming pipeline that looks like:
Source -> CombineByKey (Sum) -> Sink

Given fixed windows of 1 hour, early firings every minute, and accumulating
panes, this is pretty straight forward.  However, this can get more
complicated if we add steps after the CombineByKey, for instance (using the
same windowing strategy):

Say I want to buffer the results of the CombineByKey into batches of N
elements.  I can do this with the built-in GroupIntoBatches [1] transform,
now my pipeline looks like:

Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink

*This leads to my main question:*
Is ordering preserved somehow here?  ie: is it possible that the result
from early firing F+1 now comes BEFORE the firing F (because it was
re-ordered in the GroupIntoBatches).  This would mean that the sink then
gets F+1 before F, which means my resulting store has incorrect data
(possibly forever if F+1 was the final firing).

If ordering is not preserved, it seems as if I'd need to introduce my own
ordering back in after GroupIntoBatches.  GIB is an example here, but I
imagine this could happen with any GBK type operation.

Am I thinking about this the correct way?  Thanks!

[1]
https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html


Re: Using gRPC with PubsubIO?

2019-01-02 Thread Steve Niemitz
Something to consider: if you're running in Dataflow, the entire Pubsub
read step becomes a noop [1], and the underlying streaming implementation
itself handles reading from pubsub (either windmill or the streaming
engine).

[1]
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L373

On Wed, Jan 2, 2019 at 12:11 PM Jeff Klukas  wrote:

> I see that the Beam codebase includes a PubsubGrpcClient, but there
> doesn't appear to be any way to configure PubsubIO to use that client over
> the PubsubJsonClient.
>
> There's even a PubsubIO.Read#withClientFactory, but it's marked as for
> testing only.
>
> Is gRPC support something that's still in development? Or am I missing
> something about how to configure this?
>
> I'm particularly interested in using gRPC due to the message size
> inflation of base64 encoding required for JSON transport. My payloads are
> all below the 10 MB Pubsub limit, but I need to support some near the top
> end of that range that are currently causing errors due to base64 inflation.
>


Re: Join PCollection Data with HBase Large Data - Suggestion Requested

2018-12-04 Thread Steve Niemitz
interesting to know that the state scales so well!

On Tue, Dec 4, 2018 at 12:21 PM Lukasz Cwik  wrote:

> Your correct in saying that StatefulDoFn is pointless if you only see
> every key+window once. The users description wasn't exactly clear but it
> seemed to me they were reading from a stream and wanted to store all old
> values that they had seen implying they see keys more then once. The user
> would need to ensure that the windowing strategy triggers more then once
> for my suggestion to be useful (e.g. global window with after element count
> trigger) but without further details my suggestion is a guess.
>
> Also. the implementation for state storage is Runner dependent but I am
> aware of users storing very large amounts (>> 1 TiB) within state on
> Dataflow and in general scales very well with the number of keys and
> windows.
>
> On Tue, Dec 4, 2018 at 8:01 AM Steve Niemitz  wrote:
>
>> We have a similar use case, except with BigtableIO instead of HBase.
>>
>> We ended up building a custom transform that was basically
>> PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows
>> from Bigtable based on the input, however it's tricky to get right because
>> of batching, etc.
>>
>> I'm curious how a StatefulDoFn would help here, it seems like it's more
>> of just a cache than an actual join (and in my use-case we're never reading
>> a key more than once so a cache wouldn't help here anyways).  Also I'd be
>> interested to see how the state storage performs with "large" amounts of
>> state.  We're reading ~1 TB of data from Bigtable in a run, and it doesn't
>> seem reasonable to store that all in a DoFn's state.
>>
>>
>>
>> On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik  wrote:
>>
>>> What about a StatefulDoFn where you append the value(s) in a bag state
>>> as you see them?
>>>
>>> If you need to seed the state information, you could do a one time
>>> lookup in processElement for each key to HBase if the key hasn't yet been
>>> seen (storing the fact that you loaded the data in a boolean) but
>>> afterwards you would rely on reading the value(s) from the bag state.
>>>
>>> processElement(...) {
>>>   Value newValue = ...
>>>   Iterable values;
>>>   if (!hasSeenKeyBooleanValueState.read()) {
>>> values = ... load from HBase ...
>>> valuesBagState.append(values);
>>> hasSeenKeyBooleanValueState.set(true);
>>>   } else {
>>> values = valuesBagState.read();
>>>   }
>>>   ... perform processing using values ...
>>>
>>>valuesBagState.append(newValue);
>>> }
>>>
>>> This blog post[1] has a good example.
>>>
>>> 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>>
>>> On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas 
>>> wrote:
>>>
>>>> Hello All,
>>>> I have a use case where I have PCollection> data coming
>>>> from Kafka source. When processing each record (KV) I need all
>>>> old values for that Key stored in a hbase table. The naive approach is to
>>>> do HBase lookup in the DoFn.processElement. I considered sideinput but it'
>>>> not going to work because of large dataset. Any suggestion?
>>>>
>>>> Thanks,
>>>> Chandan
>>>>
>>>


Re: Join PCollection Data with HBase Large Data - Suggestion Requested

2018-12-04 Thread Steve Niemitz
We have a similar use case, except with BigtableIO instead of HBase.

We ended up building a custom transform that was basically
PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows
from Bigtable based on the input, however it's tricky to get right because
of batching, etc.

I'm curious how a StatefulDoFn would help here, it seems like it's more of
just a cache than an actual join (and in my use-case we're never reading a
key more than once so a cache wouldn't help here anyways).  Also I'd be
interested to see how the state storage performs with "large" amounts of
state.  We're reading ~1 TB of data from Bigtable in a run, and it doesn't
seem reasonable to store that all in a DoFn's state.



On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik  wrote:

> What about a StatefulDoFn where you append the value(s) in a bag state as
> you see them?
>
> If you need to seed the state information, you could do a one time lookup
> in processElement for each key to HBase if the key hasn't yet been seen
> (storing the fact that you loaded the data in a boolean) but afterwards you
> would rely on reading the value(s) from the bag state.
>
> processElement(...) {
>   Value newValue = ...
>   Iterable values;
>   if (!hasSeenKeyBooleanValueState.read()) {
> values = ... load from HBase ...
> valuesBagState.append(values);
> hasSeenKeyBooleanValueState.set(true);
>   } else {
> values = valuesBagState.read();
>   }
>   ... perform processing using values ...
>
>valuesBagState.append(newValue);
> }
>
> This blog post[1] has a good example.
>
> 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
> On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas 
> wrote:
>
>> Hello All,
>> I have a use case where I have PCollection> data coming
>> from Kafka source. When processing each record (KV) I need all
>> old values for that Key stored in a hbase table. The naive approach is to
>> do HBase lookup in the DoFn.processElement. I considered sideinput but it'
>> not going to work because of large dataset. Any suggestion?
>>
>> Thanks,
>> Chandan
>>
>


Re:

2017-12-01 Thread Steve Niemitz
I do something almost exactly like this, but with BigtableIO instead.  I
have a pull request open here [1] (which reminds me I need to finish this
up...).  It would really be nice for most IOs to support something like
this.

Essentially you do a GroupByKey (or some CombineFn) on the output from the
BigtableIO, and then feed that into your function which will run when all
writes finish.

You probably want to avoid doing something in the main method because
there's no guarantee it'll actually run (maybe the driver will die, get
killed, machine will explode, etc).

[1] https://github.com/apache/beam/pull/3997

On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick  wrote:

> Assuming you're in Java. You could just follow on in your Main method.
> Checking the state of the Result.
>
> Example:
> PipelineResult result = pipeline.run();
> try {
> result.waitUntilFinish();
> if(result.getState() == PipelineResult.State.DONE) {
> //DO ES work
> }
> } catch(Exception e) {
> result.cancel();
> throw e;
> }
>
> Otherwise you could also use Oozie to construct a work flow.
>
> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré 
> wrote:
>
>> Hi,
>>
>> yes, we had a similar question some days ago.
>>
>> We can imagine to have a user callback fn fired when the sink batch is
>> complete.
>>
>> Let me think about that.
>>
>> Regards
>> JB
>>
>> On 12/01/2017 09:04 AM, Philip Chan wrote:
>>
>>> Hey JB,
>>>
>>> Thanks for getting back so quickly.
>>> I suppose in that case I would need a way of monitoring when the ES
>>> transform completes successfully before I can proceed with doing the swap.
>>> The problem with this is that I can't think of a good way to determine
>>> that termination state short of polling the new index to check the document
>>> count compared to the size of input PCollection.
>>> That, or maybe I'd need to use an external system like you mentioned to
>>> poll on the state of the pipeline (I'm using Google Dataflow, so maybe
>>> there's a way to do this with some API).
>>> But I would have thought that there would be an easy way of simply
>>> saying "do not process this transform until this other transform completes".
>>> Is there no established way of "signaling" between pipelines when some
>>> pipeline completes, or have some way of declaring a dependency of 1
>>> transform on another transform?
>>>
>>> Thanks again,
>>> Philip
>>>
>>> On Thu, Nov 30, 2017 at 11:44 PM, Jean-Baptiste Onofré >> > wrote:
>>>
>>> Hi Philip,
>>>
>>> You won't be able to do (3) in the same pipeline as the
>>> Elasticsearch Sink
>>> PTransform ends the pipeline with PDone.
>>>
>>> So, (3) has to be done in another pipeline (using a DoFn) or in
>>> another
>>> "system" (like Camel for instance). I would do a check of the data
>>> in the
>>> index and then trigger the swap there.
>>>
>>> Regards
>>> JB
>>>
>>> On 12/01/2017 08:41 AM, Philip Chan wrote:
>>>
>>> Hi,
>>>
>>> I'm pretty new to Beam, and I've been trying to use the
>>> ElasticSearchIO
>>> sink to write docs into ES.
>>> With this, I want to be able to
>>> 1. ingest and transform rows from DB (done)
>>> 2. write JSON docs/strings into a new ES index (done)
>>> 3. After (2) is complete and all documents are written into a
>>> new index,
>>> trigger an atomic index swap under an alias to replace the
>>> current
>>> aliased index with the new index generated in step 2. This is
>>> basically
>>> a single POST request to the ES cluster.
>>>
>>> The problem I'm facing is that I don't seem to be able to find a
>>> way to
>>> have a way for (3) to happen after step (2) is complete.
>>>
>>> The ElasticSearchIO.Write transform returns a PDone, and I'm not
>>> sure
>>> how to proceed from there because it doesn't seem to let me do
>>> another
>>> apply on it to "define" a dependency.
>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org
>>> /apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >> g/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>
>>> >> g/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html
>>> >> g/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.Write.html>>
>>>
>>> Is there a recommended way to construct pipelines workflows like
>>> this?
>>>
>>> Thanks in advance,
>>> Philip
>>>
>>>
>>> -- Jean-Baptiste Onofré
>>> jbono...@apache.org 
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - 

Re: Slack channel

2017-08-16 Thread Steve Niemitz
Ah interesting, I guess no one told the Mesos guys that! :D

Thanks for the invite though!

On Wed, Aug 16, 2017 at 1:25 PM, Lukasz Cwik <lc...@google.com> wrote:

> Welcome Griselda, Steve, and Apache.
>
> Steve, this has come up before but it is against Slack's free tier policy
> for having a bot which sends invites out automatically.
>
> On Wed, Aug 16, 2017 at 10:18 AM, Apache Enthu <apacheen...@gmail.com>
> wrote:
>
>> Please could you add me too?
>>
>> Thanks,
>> Almas
>>
>> On 16 Aug 2017 22:41, "Steve Niemitz" <sniem...@apache.org> wrote:
>>
>>> I'll jump on this thread as well, can I get an invite too?
>>>
>>> Also, has anyone though of making this self service?  The apache mesos
>>> slack has this set up [1].
>>>
>>> [1] https://mesos-slackin.herokuapp.com
>>>
>>> On Aug 16, 2017 1:08 PM, "Griselda Cuevas" <g...@google.com> wrote:
>>>
>>>> Hi Manu, I'd like to piggy back on shen's request, could you add me to
>>>> the channel as well?
>>>>
>>>> On 15 August 2017 at 21:32, Manu Zhang <owenzhang1...@gmail.com> wrote:
>>>>
>>>>> Invitation sent. Welcome.
>>>>>
>>>>> On Wed, Aug 16, 2017 at 11:41 AM shen yu <yuz...@gmail.com> wrote:
>>>>>
>>>>>> Hi, I'd like to join the Slack channel for Apache Beam. I work at
>>>>>> Klook and would like to get involved in the Apache Beam community.
>>>>>> My email is you...@klook.com
>>>>>>
>>>>>
>>>>
>


Re: Slack channel

2017-08-16 Thread Steve Niemitz
I'll jump on this thread as well, can I get an invite too?

Also, has anyone though of making this self service?  The apache mesos
slack has this set up [1].

[1] https://mesos-slackin.herokuapp.com

On Aug 16, 2017 1:08 PM, "Griselda Cuevas"  wrote:

> Hi Manu, I'd like to piggy back on shen's request, could you add me to the
> channel as well?
>
> On 15 August 2017 at 21:32, Manu Zhang  wrote:
>
>> Invitation sent. Welcome.
>>
>> On Wed, Aug 16, 2017 at 11:41 AM shen yu  wrote:
>>
>>> Hi, I'd like to join the Slack channel for Apache Beam. I work at Klook
>>> and would like to get involved in the Apache Beam community. My email
>>> is you...@klook.com
>>>
>>
>