Siyu - The Beam metrics interface includes the Distribution metric type
which can be used for histograms:
https://beam.apache.org/documentation/programming-guide/#types-of-metrics
Particulars of support depend on the runner. For Cloud Dataflow, the
reported values are MAX, MIN, MEAN, and COUNT,
ri, May 21, 2021 at 5:10 AM Jeff Klukas wrote:
>
>> Beam users,
>>
>> We're attempting to write a Java pipeline that uses Count.perKey() to
>> collect event counts, and then flush those to an HTTP API every ten minutes
>> based on processing time.
Beam users,
We're attempting to write a Java pipeline that uses Count.perKey() to
collect event counts, and then flush those to an HTTP API every ten minutes
based on processing time.
We've tried expressing this using GlobalWindows with an AfterProcessingTime
trigger, but we find that when we
Hi Thiago,
Note that Dataflow has a custom implementation of PubSub interaction, so
the code you see in PubsubIO in the Beam codebase does not necessarily
reflect Pubsub handling in Dataflow.
Dataflow acks messages as soon as they are first checkpointed, so the first
step in your pipeline that
(although not sure if these errors are problematic)
>
> I’ll definitely give the SSD option a shot.
>
> Thanks.
>
> --
> Mark Kelly
> Sent with Airmail
>
> On 14 July 2020 at 15:12:46, Jeff Klukas (jklu...@mozilla.com) wrote:
>
> In my experience with writing to BQ
In my experience with writing to BQ via BigQueryIO in the Java SDK, the
bottleneck tends to be disk I/O. The BigQueryIO logic requires several
shuffles that cause checkpointing even in the case of streaming inserts,
which in the Dataflow case means writing to disk. I assume the Python logic
is
.html
>>
>>
>>
>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich
>> wrote:
>>
>>> Thanks for explaining. Is it documented somewhere that TableRow contains
>>> Map?
>>> I don't construct it, I fetch from Google Analytics export to BigQuery
le Analytics export to BigQuery
> table.
>
> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas wrote:
>
>> I would expect the following line to fail:
>>
>> List h = ((List) bigQueryRow.get("hits"));
>>
>> The top-level bigQueryRow will be
; at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
> cannot be cast to class com.google.api.services.bigquery.model.TableRow
> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
> com.google.api.services.bigquery.model.Table
;,
> ParDo.of(new CreateSessionMetrics(boolean and
> string params)))
>// few more transformations
>
> }
>
> This is basically similar to examples you can find here
> https://beam.apache.org/documentation/io/built-in/google-bigquery/
&
On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich
wrote:
> So from what I understand, it works like this by design and it's not
> possible to test my code with the current coder implementation. Is that
> correct?
>
I would argue that this test failure is indicating an area of potential
failure
On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich
wrote:
> So it's correct implementation of TableRow that encode(decode(a)) != a?
>
A TableRow can contain fields of any map implementation. It makes sense to
me that once a TableRow object is serialized and deserialized, that the
coder must make
have issues running this pipeline in production. I have this
> issue, only when I tried to write end to end test.
> Do you know if there are existing coders for TableRow that I can use? I've
> tried TableRowJsonCoder, but seems like it converts all object inside
> TableRow to LinkedHash
Kirill - Can you tell us more about what Job.runJob is doing? I would not
expect the Beam SDK itself to do any casting to TableRow, so is there a
line in your code where you're explicitly casting to TableRow? There may be
a point where you need to explicitly set the coder on a PCollection to
Yeah - we are issuing a query rather than reading a table. Materializing
>> the results myself and reading them back seems simple enough. I will give
>> that a try!
>>
>> Thanks,
>> Matt
>>
>> On Thu, Jul 2, 2020 at 9:42 AM Jeff Klukas wrote:
>>
>>>
It sounds like your pipeline is issuing a query rather than reading a whole
table.
Are you using Java or Python? I'm only familiar with the Java SDK so my
answer may be Java-biased.
I would recommend materializing the query results to a table, and then
configuring your pipeline to read that
nd if a single element in the
> bundle fails and we ignore the error on the single element, then the bundle
> is considered still successfully processed am I correct? Then it would just
> ACK everything in the bundle
>
> Kishore
>
> On Mon, Jun 1, 2020 at 10:27 AM Jeff Klukas wr
Is this a Python or Java pipeline?
I'm familiar with PubsubIO in Java, though I expect the behavior in Python
is similar. It will ack messages at the first checkpoint step in the
pipeline, so the behavior in your case depends on whether there is a
GroupByKey operation happening before the
e them as they are very lightweight.
>
> 1:
> https://guava.dev/releases/19.0/api/docs/com/google/common/base/Suppliers.html#memoize(com.google.common.base.Supplier)
>
> On Thu, Apr 30, 2020 at 12:45 PM Jeff Klukas wrote:
>
>> Beam Java users,
>>
>> I've run in
Beam Java users,
I've run into a few cases where I want to present a single thread-safe data
structure to all threads on a worker, and I end up writing a good bit of
custom code each time involving a synchronized method that handles creating
the resource exactly once, and then each thread has its
Aniruddh - Using BigQueryIO.Read with EXPORT method involves a potentially
long wait on BQ to complete the export.
I have experience with running Dataflow batch jobs which use this read
method to ingest ~10 TB of data in a single job. The behavior I generally
see is that the job will progress
Mozilla hosts the code for our data ingestion system publicly on GitHub. A
good chunk of that architecture consists of Beam pipelines running on
Dataflow.
See:
https://github.com/mozilla/gcp-ingestion/tree/master/ingestion-beam
and rendered usage documentation at:
We also had throughput issues in writing to BQ in a streaming pipeline and
we mitigated by provisioning a large quantity of SSD storage to improve I/O
throughput to disk for checkpoints.
I also Erik's suggestion to look into Streaming Engine. We are currently
looking into migrating our streaming
gt;
That's normal; I also never saw those heap dump options display in the
Dataflow UI. I think Dataflow doesn't show any options that originate from
"Debug" options interfaces.
> On Mon, Nov 18, 2019 at 11:59 AM Jeff Klukas wrote:
>
>> Using default Dataflow w
Using default Dataflow workers, this is the set of options I passed:
--dumpHeapOnOOM --saveHeapDumpsToGcsPath=$MYBUCKET/heapdump --diskSizeGb=100
On Mon, Nov 18, 2019 at 11:57 AM Jeff Klukas wrote:
> It sounds like you're generally doing the right thing. I've successfully
>
It sounds like you're generally doing the right thing. I've successfully
used --saveHeapDumpsToGcsPath in a Java pipeline running on Dataflow and
inspected the results in Eclipse MAT.
I think that --saveHeapDumpsToGcsPath will automatically turn on
--dumpHeapOnOOM but worth setting that
I ran into exactly this same problem of finding some accented characters
getting replaced with "?" in a pipeline only when running on Dataflow and
not when using the Direct Runner. KafkaIO was not involved, but I'd bet the
root cause is the same.
In my case, the input turned out to be properly
ariables and you have to
> resort to some hackery that is JVM version dependent[2].
>
> 1:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java
> 2: https://blog.sebastian-daschner.com/entries/changing_env_java
>
I have also wrestled with throughput for FileIO and BigQueryIO on Dataflow,
and in my case the bottleneck came down to disk I/O throughput on the
worker machines. Writing with FileIO or BigQueryIO involves several group
by key operations that in the Dataflow case require checkpointing state to
You can use the read* and write* methods of FileIO to read and write
arbitrary binary files. The examples in the Javadoc for FileIO [0] include
an example of reading the entire contents of a file as a string into a Beam
record, along with metadata about the file.
If a one-to-one mapping of files
>> delivery). Drain should work for a pipeline like this without any
>> issues.
>>
>> On Fri, Apr 5, 2019 at 3:33 PM Jeff Klukas wrote:
>> >
>> > Thanks for the suggestions. Ideally, I'd like to avoid GBK completely,
>> which if I understand semantics
iscussion on
> > this ticket:
> > https://issues.apache.org/jira/browse/BEAM-6886
> >
> > On Thu, Apr 4, 2019 at 9:42 PM Jeff Klukas wrote:
> > >
> > > As far as I can tell, Beam expects runners to have full control over
> separation of individual el
As far as I can tell, Beam expects runners to have full control over
separation of individual elements into bundles and this is something users
have no control over. Is that true? Or are there any ways that I might
exert some influence over bundle sizes?
My main interest at the moment is
I'm not aware that there's currently any way to trigger based on data size.
As you state, AfterPane.elementCountAtLeast lets you trigger based on
number of elements, but from my reading of the implementations of triggers
in the Java SDK, triggers don't have access to sufficient data to maintain
To question 1, I also would have expected the pipeline to fail in the case
of files failing to load; I'm not sure why it's not. I thought the BigQuery
API returns a 400 level response code in the case of files failing and that
would bubble up to a pipeline execution error, but I haven't dug
ybe share the code of your pipeline?
>
> Cheers,
> Tobi
>
> On Tue, Jan 22, 2019 at 9:28 PM Jeff Klukas wrote:
>
>> I'm attempting to deploy a fairly simple job on the Dataflow runner that
>> reads from PubSub and writes to BigQuery using file loads, but I ha
Hi Sri,
To Question 1, you should be able to set `...
.to("/tmp/parquet/").withNumShards(1)` to produce a single output file.
To Question 2, yes if your desired output file name depends on contents of
the record itself, that's exactly what FileIO.writeDynamic() is for. If you
can get the name
I'm attempting to deploy a fairly simple job on the Dataflow runner that
reads from PubSub and writes to BigQuery using file loads, but I have so
far not been able to tune it to keep up with the incoming data rate.
I have configured BigQueryIO.write to trigger loads every 5 minutes, and
I've let
> this means that I am loosing data or if this will be retried by the sink?
I don't have direct experience with KafkaIO, but noting that this exception
happened in the finishBundle method, Beam will not have committed the
bundle.
More specifically, looking at the KafkaWriter code, I see that
Related to a previous thread about custom triggering on GlobalWindows [0],
are there general recommendations for controlling size of output files from
FileIO.Write?
A general pattern I've seen in systems that need to batch individual
records to files is that they offer both a maximum file size
t; do with Flattening two PCollections together) with their original trigger.
>> Without this, we also know that you can have three PCollections with
>> identical triggering and you can CoGroupByKey them together but you cannot
>> do this three-way join as a sequence of binary joins.
>&
ing issue here but writing
> unbounded input to files when using GlobalWindows for unsharded output is a
> valid usecase so sounds like a bug. Feel free to create a JIRA.
>
> - Cham
>
> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas wrote:
>
>> I've read more deeply in
path that doesn't flatten collections and no
exception is thrown.
So, this might really be considered a bug of WriteFiles (and thus FileIO).
But I'd love to hear other interpretations.
On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas wrote:
> I'm building a pipeline that streams from Pubsub and wri
I'm building a pipeline that streams from Pubsub and writes to files. I'm
using FileIO's dynamic destinations to place elements into different
directories according to date and I really don't care about ordering of
elements beyond the date buckets.
So, I think GlobalWindows is appropriate in this
hub.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L373
>
> On Wed, Jan 2, 2019 at 12:11 PM Jeff Klukas wrote:
>
>> I see that the Beam codebase includes a PubsubGrpcClient, but there
>> doesn
at runtime.
On Thu, Dec 20, 2018 at 4:47 PM Jeff Klukas wrote:
> I did some more testing this afternoon, and it looks like I'm incorrect on
> some details here.
>
> Parameters implemented as ValueProviders can be provided at compile time
> and they do end up in the template. A
? Or experience handling default values for ValueProviders?
On Thu, Dec 20, 2018 at 3:49 PM Jeff Klukas wrote:
> I am deploying Beam pipelines with the DataflowRunner and would like to
> move more of the pipeline options to use the ValueProvider interface so
> they can be specified at runti
I am deploying Beam pipelines with the DataflowRunner and would like to
move more of the pipeline options to use the ValueProvider interface so
they can be specified at runtime rather than at template compile time, but
running into various issues.
First, it's been unclear to the operations
pipeline like this?
In effect, would you recommend we write a custom application using the
Pub/Sub and Storage SDKs directly rather than trying to use Beam's
abstractions?
On Wed, Dec 5, 2018 at 2:24 PM Raghu Angadi wrote:
>
> On Wed, Dec 5, 2018 at 11:13 AM Jeff Klukas wrote:
&
We are attempting to build a Beam pipeline running on Dataflow that reads
from a Cloud Pub/Sub subscription and writes 10 minute windows to files on
GCS via FileIO. At-least-once completeness is critical in our use case, so
we are seeking the simplest possible solution with obvious and verifiable
You can likely achieve what you want using FileIO with dynamic
destinations, which is described in the "Advanced features" section of the
TextIO docs [0].
Your case might look something like:
PCollection events = ...;
events.apply(FileIO.writeDynamic()
.by(event ->
I'm trying to write a robust pipeline that takes input from PubSub and
writes to BigQuery. For every PubsubMessage that is not successfully
written to BigQuery, I'd like to get the original PubsubMessage back and be
able to write to an error output collection. I'm not sure this is quite
possible,
Another option here would be to make the perl script operate on batches.
Your DoFn could then store the records to a buffer rather than outputting
them and then periodically flush the buffer, sending records through the
perl script and sending to output.
On Wed, Oct 24, 2018 at 3:03 PM Robert
?
> * can't supply display data?
>
> +user@beam.apache.org , do users think that the
> provided API would be useful enough for it to be added to the core SDK or
> would the addition of the method provide noise/detract from the existing
> API?
>
> On Mon, Sep 17, 2018 at 12:57 P
54 matches
Mail list logo