Re: Histogram metrics in Dataflow/Beam

2022-04-04 Thread Jeff Klukas
Siyu - The Beam metrics interface includes the Distribution metric type which can be used for histograms: https://beam.apache.org/documentation/programming-guide/#types-of-metrics Particulars of support depend on the runner. For Cloud Dataflow, the reported values are MAX, MIN, MEAN, and COUNT,

Re: How to flush window when draining a Dataflow pipeline?

2021-05-21 Thread Jeff Klukas
ri, May 21, 2021 at 5:10 AM Jeff Klukas wrote: > >> Beam users, >> >> We're attempting to write a Java pipeline that uses Count.perKey() to >> collect event counts, and then flush those to an HTTP API every ten minutes >> based on processing time.

How to flush window when draining a Dataflow pipeline?

2021-05-21 Thread Jeff Klukas
Beam users, We're attempting to write a Java pipeline that uses Count.perKey() to collect event counts, and then flush those to an HTTP API every ten minutes based on processing time. We've tried expressing this using GlobalWindows with an AfterProcessingTime trigger, but we find that when we

Re: DataflowRunner and PubSub Acknowledge

2020-10-05 Thread Jeff Klukas
Hi Thiago, Note that Dataflow has a custom implementation of PubSub interaction, so the code you see in PubsubIO in the Beam codebase does not necessarily reflect Pubsub handling in Dataflow. Dataflow acks messages as soon as they are first checkpointed, so the first step in your pipeline that

Re: WriteToBigQuery - performance issues?

2020-07-14 Thread Jeff Klukas
(although not sure if these errors are problematic) > > I’ll definitely give the SSD option a shot. > > Thanks. > > -- > Mark Kelly > Sent with Airmail > > On 14 July 2020 at 15:12:46, Jeff Klukas (jklu...@mozilla.com) wrote: > > In my experience with writing to BQ

Re: WriteToBigQuery - performance issues?

2020-07-14 Thread Jeff Klukas
In my experience with writing to BQ via BigQueryIO in the Java SDK, the bottleneck tends to be disk I/O. The BigQueryIO logic requires several shuffles that cause checkpointing even in the case of streaming inserts, which in the Dataflow case means writing to disk. I assume the Python logic is

Re: TableRow class is not the same after serialization

2020-07-09 Thread Jeff Klukas
.html >> >> >> >> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich >> wrote: >> >>> Thanks for explaining. Is it documented somewhere that TableRow contains >>> Map? >>> I don't construct it, I fetch from Google Analytics export to BigQuery

Re: TableRow class is not the same after serialization

2020-07-09 Thread Jeff Klukas
le Analytics export to BigQuery > table. > > On Thu, 9 Jul 2020 at 16:40, Jeff Klukas wrote: > >> I would expect the following line to fail: >> >> List h = ((List) bigQueryRow.get("hits")); >> >> The top-level bigQueryRow will be

Re: TableRow class is not the same after serialization

2020-07-09 Thread Jeff Klukas
; at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap > cannot be cast to class com.google.api.services.bigquery.model.TableRow > (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; > com.google.api.services.bigquery.model.Table

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
;, > ParDo.of(new CreateSessionMetrics(boolean and > string params))) >// few more transformations > > } > > This is basically similar to examples you can find here > https://beam.apache.org/documentation/io/built-in/google-bigquery/ &

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich wrote: > So from what I understand, it works like this by design and it's not > possible to test my code with the current coder implementation. Is that > correct? > I would argue that this test failure is indicating an area of potential failure

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich wrote: > So it's correct implementation of TableRow that encode(decode(a)) != a? > A TableRow can contain fields of any map implementation. It makes sense to me that once a TableRow object is serialized and deserialized, that the coder must make

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
have issues running this pipeline in production. I have this > issue, only when I tried to write end to end test. > Do you know if there are existing coders for TableRow that I can use? I've > tried TableRowJsonCoder, but seems like it converts all object inside > TableRow to LinkedHash

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
Kirill - Can you tell us more about what Job.runJob is doing? I would not expect the Beam SDK itself to do any casting to TableRow, so is there a line in your code where you're explicitly casting to TableRow? There may be a point where you need to explicitly set the coder on a PCollection to

Re: BigQuery query caching?

2020-07-02 Thread Jeff Klukas
Yeah - we are issuing a query rather than reading a table. Materializing >> the results myself and reading them back seems simple enough. I will give >> that a try! >> >> Thanks, >> Matt >> >> On Thu, Jul 2, 2020 at 9:42 AM Jeff Klukas wrote: >> >>>

Re: BigQuery query caching?

2020-07-02 Thread Jeff Klukas
It sounds like your pipeline is issuing a query rather than reading a whole table. Are you using Java or Python? I'm only familiar with the Java SDK so my answer may be Java-biased. I would recommend materializing the query results to a table, and then configuring your pipeline to read that

Re: Exceptions PubsubUnbounded source ACK

2020-06-01 Thread Jeff Klukas
nd if a single element in the > bundle fails and we ignore the error on the single element, then the bundle > is considered still successfully processed am I correct? Then it would just > ACK everything in the bundle > > Kishore > > On Mon, Jun 1, 2020 at 10:27 AM Jeff Klukas wr

Re: Exceptions PubsubUnbounded source ACK

2020-06-01 Thread Jeff Klukas
Is this a Python or Java pipeline? I'm familiar with PubsubIO in Java, though I expect the behavior in Python is similar. It will ack messages at the first checkpoint step in the pipeline, so the behavior in your case depends on whether there is a GroupByKey operation happening before the

Re: Pattern for sharing a resource across all worker threads in Beam Java SDK

2020-05-01 Thread Jeff Klukas
e them as they are very lightweight. > > 1: > https://guava.dev/releases/19.0/api/docs/com/google/common/base/Suppliers.html#memoize(com.google.common.base.Supplier) > > On Thu, Apr 30, 2020 at 12:45 PM Jeff Klukas wrote: > >> Beam Java users, >> >> I've run in

Pattern for sharing a resource across all worker threads in Beam Java SDK

2020-04-30 Thread Jeff Klukas
Beam Java users, I've run into a few cases where I want to present a single thread-safe data structure to all threads on a worker, and I end up writing a good bit of custom code each time involving a synchronized method that handles creating the resource exactly once, and then each thread has its

Re:

2020-04-23 Thread Jeff Klukas
Aniruddh - Using BigQueryIO.Read with EXPORT method involves a potentially long wait on BQ to complete the export. I have experience with running Dataflow batch jobs which use this read method to ingest ~10 TB of data in a single job. The behavior I generally see is that the job will progress

Re: Large public Beam projects?

2020-04-21 Thread Jeff Klukas
Mozilla hosts the code for our data ingestion system publicly on GitHub. A good chunk of that architecture consists of Beam pipelines running on Dataflow. See: https://github.com/mozilla/gcp-ingestion/tree/master/ingestion-beam and rendered usage documentation at:

Re: Fanout and OOMs on Dataflow while streaming to BQ

2019-11-22 Thread Jeff Klukas
We also had throughput issues in writing to BQ in a streaming pipeline and we mitigated by provisioning a large quantity of SSD storage to improve I/O throughput to disk for checkpoints. I also Erik's suggestion to look into Streaming Engine. We are currently looking into migrating our streaming

Re: Memory profiling on Dataflow with java

2019-11-18 Thread Jeff Klukas
gt; That's normal; I also never saw those heap dump options display in the Dataflow UI. I think Dataflow doesn't show any options that originate from "Debug" options interfaces. > On Mon, Nov 18, 2019 at 11:59 AM Jeff Klukas wrote: > >> Using default Dataflow w

Re: Memory profiling on Dataflow with java

2019-11-18 Thread Jeff Klukas
Using default Dataflow workers, this is the set of options I passed: --dumpHeapOnOOM --saveHeapDumpsToGcsPath=$MYBUCKET/heapdump --diskSizeGb=100 On Mon, Nov 18, 2019 at 11:57 AM Jeff Klukas wrote: > It sounds like you're generally doing the right thing. I've successfully >

Re: Memory profiling on Dataflow with java

2019-11-18 Thread Jeff Klukas
It sounds like you're generally doing the right thing. I've successfully used --saveHeapDumpsToGcsPath in a Java pipeline running on Dataflow and inspected the results in Eclipse MAT. I think that --saveHeapDumpsToGcsPath will automatically turn on --dumpHeapOnOOM but worth setting that

Re: Encoding Problem: Kafka - DataFlow

2019-10-31 Thread Jeff Klukas
I ran into exactly this same problem of finding some accented characters getting replaced with "?" in a pipeline only when running on Dataflow and not when using the Direct Runner. KafkaIO was not involved, but I'd bet the root cause is the same. In my case, the input turned out to be properly

Re: Setting environment and system properties on Dataflow workers

2019-08-30 Thread Jeff Klukas
ariables and you have to > resort to some hackery that is JVM version dependent[2]. > > 1: > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java > 2: https://blog.sebastian-daschner.com/entries/changing_env_java >

Re: Cost efficient loading of Kafka high throughput event stream to Bigquery

2019-05-17 Thread Jeff Klukas
I have also wrestled with throughput for FileIO and BigQueryIO on Dataflow, and in my case the bottleneck came down to disk I/O throughput on the worker machines. Writing with FileIO or BigQueryIO involves several group by key operations that in the Dataflow case require checkpointing state to

Re: Reading and Writing Binary files in Beam

2019-04-26 Thread Jeff Klukas
You can use the read* and write* methods of FileIO to read and write arbitrary binary files. The examples in the Javadoc for FileIO [0] include an example of reading the entire contents of a file as a string into a Beam record, along with metadata about the file. If a one-to-one mapping of files

Re: Do I have any control over bundle sizes?

2019-04-05 Thread Jeff Klukas
>> delivery). Drain should work for a pipeline like this without any >> issues. >> >> On Fri, Apr 5, 2019 at 3:33 PM Jeff Klukas wrote: >> > >> > Thanks for the suggestions. Ideally, I'd like to avoid GBK completely, >> which if I understand semantics

Re: Do I have any control over bundle sizes?

2019-04-05 Thread Jeff Klukas
iscussion on > > this ticket: > > https://issues.apache.org/jira/browse/BEAM-6886 > > > > On Thu, Apr 4, 2019 at 9:42 PM Jeff Klukas wrote: > > > > > > As far as I can tell, Beam expects runners to have full control over > separation of individual el

Do I have any control over bundle sizes?

2019-04-04 Thread Jeff Klukas
As far as I can tell, Beam expects runners to have full control over separation of individual elements into bundles and this is something users have no control over. Is that true? Or are there any ways that I might exert some influence over bundle sizes? My main interest at the moment is

Re: Push Kafka data to S3 based on window size in bytes

2019-02-14 Thread Jeff Klukas
I'm not aware that there's currently any way to trigger based on data size. As you state, AfterPane.elementCountAtLeast lets you trigger based on number of elements, but from my reading of the implementations of triggers in the Java SDK, triggers don't have access to sufficient data to maintain

Re: Beam pipeline should fail when one FILE_LOAD fails for BigQueryIO on Flink

2019-02-13 Thread Jeff Klukas
To question 1, I also would have expected the pipeline to fail in the case of files failing to load; I'm not sure why it's not. I thought the BigQuery API returns a 400 level response code in the case of files failing and that would bubble up to a pipeline execution error, but I haven't dug

Re: Tuning BigQueryIO.Write

2019-01-30 Thread Jeff Klukas
ybe share the code of your pipeline? > > Cheers, > Tobi > > On Tue, Jan 22, 2019 at 9:28 PM Jeff Klukas wrote: > >> I'm attempting to deploy a fairly simple job on the Dataflow runner that >> reads from PubSub and writes to BigQuery using file loads, but I ha

Re: How to disable sharding with FileIO.write()/FileIO.writeDynamic

2019-01-24 Thread Jeff Klukas
Hi Sri, To Question 1, you should be able to set `... .to("/tmp/parquet/").withNumShards(1)` to produce a single output file. To Question 2, yes if your desired output file name depends on contents of the record itself, that's exactly what FileIO.writeDynamic() is for. If you can get the name

Tuning BigQueryIO.Write

2019-01-22 Thread Jeff Klukas
I'm attempting to deploy a fairly simple job on the Dataflow runner that reads from PubSub and writes to BigQuery using file loads, but I have so far not been able to tune it to keep up with the incoming data rate. I have configured BigQueryIO.write to trigger loads every 5 minutes, and I've let

Re: KafkaIO error

2019-01-22 Thread Jeff Klukas
> this means that I am loosing data or if this will be retried by the sink? I don't have direct experience with KafkaIO, but noting that this exception happened in the finishBundle method, Beam will not have committed the bundle. More specifically, looking at the KafkaWriter code, I see that

Recommendations for controlling FileIO.Write size

2019-01-16 Thread Jeff Klukas
Related to a previous thread about custom triggering on GlobalWindows [0], are there general recommendations for controlling size of output files from FileIO.Write? A general pattern I've seen in systems that need to batch individual records to files is that they offer both a maximum file size

Re: Possible to use GlobalWindows for writing unbounded input to files?

2019-01-11 Thread Jeff Klukas
t; do with Flattening two PCollections together) with their original trigger. >> Without this, we also know that you can have three PCollections with >> identical triggering and you can CoGroupByKey them together but you cannot >> do this three-way join as a sequence of binary joins. >&

Re: Possible to use GlobalWindows for writing unbounded input to files?

2019-01-09 Thread Jeff Klukas
ing issue here but writing > unbounded input to files when using GlobalWindows for unsharded output is a > valid usecase so sounds like a bug. Feel free to create a JIRA. > > - Cham > > On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas wrote: > >> I've read more deeply in

Re: Possible to use GlobalWindows for writing unbounded input to files?

2019-01-09 Thread Jeff Klukas
path that doesn't flatten collections and no exception is thrown. So, this might really be considered a bug of WriteFiles (and thus FileIO). But I'd love to hear other interpretations. On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas wrote: > I'm building a pipeline that streams from Pubsub and wri

Possible to use GlobalWindows for writing unbounded input to files?

2019-01-09 Thread Jeff Klukas
I'm building a pipeline that streams from Pubsub and writes to files. I'm using FileIO's dynamic destinations to place elements into different directories according to date and I really don't care about ordering of elements beyond the date buckets. So, I think GlobalWindows is appropriate in this

Re: Using gRPC with PubsubIO?

2019-01-02 Thread Jeff Klukas
hub.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L373 > > On Wed, Jan 2, 2019 at 12:11 PM Jeff Klukas wrote: > >> I see that the Beam codebase includes a PubsubGrpcClient, but there >> doesn

Re: Providing default values for ValueProvider

2018-12-20 Thread Jeff Klukas
at runtime. On Thu, Dec 20, 2018 at 4:47 PM Jeff Klukas wrote: > I did some more testing this afternoon, and it looks like I'm incorrect on > some details here. > > Parameters implemented as ValueProviders can be provided at compile time > and they do end up in the template. A

Re: Providing default values for ValueProvider

2018-12-20 Thread Jeff Klukas
? Or experience handling default values for ValueProviders? On Thu, Dec 20, 2018 at 3:49 PM Jeff Klukas wrote: > I am deploying Beam pipelines with the DataflowRunner and would like to > move more of the pipeline options to use the ValueProvider interface so > they can be specified at runti

Providing default values for ValueProvider

2018-12-20 Thread Jeff Klukas
I am deploying Beam pipelines with the DataflowRunner and would like to move more of the pipeline options to use the ValueProvider interface so they can be specified at runtime rather than at template compile time, but running into various issues. First, it's been unclear to the operations

Re: [Dataflow] Delaying PubSubIO acks until FileIO completes writes

2018-12-06 Thread Jeff Klukas
pipeline like this? In effect, would you recommend we write a custom application using the Pub/Sub and Storage SDKs directly rather than trying to use Beam's abstractions? On Wed, Dec 5, 2018 at 2:24 PM Raghu Angadi wrote: > > On Wed, Dec 5, 2018 at 11:13 AM Jeff Klukas wrote: &

[Dataflow] Delaying PubSubIO acks until FileIO completes writes

2018-12-05 Thread Jeff Klukas
We are attempting to build a Beam pipeline running on Dataflow that reads from a Cloud Pub/Sub subscription and writes 10 minute windows to files on GCS via FileIO. At-least-once completeness is critical in our use case, so we are seeking the simplest possible solution with obvious and verifiable

Re: TextIO setting file dynamically issue

2018-11-28 Thread Jeff Klukas
You can likely achieve what you want using FileIO with dynamic destinations, which is described in the "Advanced features" section of the TextIO docs [0]. Your case might look something like: PCollection events = ...; events.apply(FileIO.writeDynamic() .by(event ->

BigQueryIO failure handling for writes

2018-11-16 Thread Jeff Klukas
I'm trying to write a robust pipeline that takes input from PubSub and writes to BigQuery. For every PubsubMessage that is not successfully written to BigQuery, I'd like to get the original PubsubMessage back and be able to write to an error output collection. I'm not sure this is quite possible,

Re: Is it possible to run a perl scrip in Dataflow worker?

2018-10-24 Thread Jeff Klukas
Another option here would be to make the perl script operate on batches. Your DoFn could then store the records to a buffer rather than outputting them and then periodically flush the buffer, sending records through the perl script and sending to output. On Wed, Oct 24, 2018 at 3:03 PM Robert

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-19 Thread Jeff Klukas
? > * can't supply display data? > > +user@beam.apache.org , do users think that the > provided API would be useful enough for it to be added to the core SDK or > would the addition of the method provide noise/detract from the existing > API? > > On Mon, Sep 17, 2018 at 12:57 P