Re: [EXTERNAL] Re: Vulnerabilities in Transitive dependencies

2023-05-02 Thread Brule, Joshua L. (Josh), CISSP via user
The SnakeYAML analysis is exactly what I was looking for. The library of 
concern is org.codehaus.jackson jackson-mapper-asl 1.9.13. Our scanner is 
reporting ~20 CVEs with a CVSS of >= 7 and ~60 CVEs total.

Thank you,
Josh

From: Bruno Volpato 
Date: Monday, May 1, 2023 at 9:04 PM
To: user@beam.apache.org , Brule, Joshua (Josh) L., CISSP 

Subject: [EXTERNAL] Re: Vulnerabilities in Transitive dependencies
Hi Joshua,

It may take a lot of effort and knowledge to review whether CVEs are 
exploitable or not.
I have seen this kind of analysis done in a few cases, such as SnakeYAML 
recently: https://s.apache.org/beam-and-cve-2022-1471 
(https://github.com/apache/beam/issues/25449)

If there is a patch available, I believe we should err on the side of caution 
and update them (if possible).

For the example that you mentioned, there is some work started by Alexey 
Romanenko to remove/decouple Avro from Beam core, so we can upgrade to newer 
versions: https://github.com/apache/beam/issues/25252.
Another recent progress is Beam releasing a new version of its vendored gRPC to 
move past some CVEs originated from protobuf-java: 
https://github.com/apache/beam/issues/25746


Is there any other particular dependency that you are concerned about?
Please consider filing an issue at https://github.com/apache/beam/issues so we 
can start tracking it.


Best,
Bruno



On Mon, May 1, 2023 at 5:28 PM Brule, Joshua L. (Josh), CISSP via user 
mailto:user@beam.apache.org>> wrote:
Hello,

I am hoping you could help me with our vulnerability remediation process. We 
have several development teams using Apache Beam in their projects. When 
performing our Software Composition Analysis (Third-Party Software) scan, 
projects utilizing Apache Beam have an incredible number of CVEs, Jackson Data 
Mapper being an extreme outlier.

I Jackson Data Mapper is a transitive dependency via Avro but I am wondering. 
Has the Apache Beam team reviewed these CVEs and found them NOT EXPLOITABLE as 
implemented. Or if exploitable implemented mitigations pre/post usage of the 
library?

Thank you for your time,
Josh

Joshua Brule | Sr Information Security Engineer


Vulnerabilities in Transitive dependencies

2023-05-01 Thread Brule, Joshua L. (Josh), CISSP via user
Hello,

I am hoping you could help me with our vulnerability remediation process. We 
have several development teams using Apache Beam in their projects. When 
performing our Software Composition Analysis (Third-Party Software) scan, 
projects utilizing Apache Beam have an incredible number of CVEs, Jackson Data 
Mapper being an extreme outlier.

I Jackson Data Mapper is a transitive dependency via Avro but I am wondering. 
Has the Apache Beam team reviewed these CVEs and found them NOT EXPLOITABLE as 
implemented. Or if exploitable implemented mitigations pre/post usage of the 
library?

Thank you for your time,
Josh

Joshua Brule | Sr Information Security Engineer


Accumulator with Map field in CombineFn not serializing correctly

2020-08-06 Thread Josh
Hi all,

In my Beam job I have defined my own CombineFn with an accumulator. Running
locally is no problem, but when I run the job on Dataflow I hit an Avro
serialization exception:
java.lang.NoSuchMethodException: java.util.Map.()
java.lang.Class.getConstructor0(Class.java:3082)
java.lang.Class.getDeclaredConstructor(Class.java:2178)
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)

I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
accumulator class. Is there anything special I need to do because one of
the fields in my accumulator class is a Map? I have pasted an outline of my
CombineFn below.

Thanks for any help with this!

Josh

private static class MyCombineFn extends CombineFn {

private static class ExpiringLinkedHashMap extends
LinkedHashMap {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return this.size() > 10;
}
}

@DefaultCoder(AvroCoder.class)
private static class PartialEventUpdate implements Serializable {
Long incrementCountBy = 0L;
Map recentEvents = new ExpiringLinkedHashMap<>();
Long lastSeenMillis = 0L;

PartialEventUpdate() {}
}

@DefaultCoder(AvroCoder.class)
private static class Accum implements Serializable {
Map eventIdToUpdate = new HashMap<>();

Accum() {}
}

@Override
public MyCombineFn.Accum createAccumulator() {
return new MyCombineFn.Accum();
}

...

}


Re: Python Development Environments for Apache Beam

2018-06-20 Thread Josh McGinley
Great idea!  Here is a link to the post in a tweet.
https://twitter.com/jmcginley/status/1009517852892770309


On Wed, Jun 20, 2018 at 12:04 PM Holden Karau 
wrote:

> Do you happen to have a tweet we reshould RT for reach?
>
> On Wed, Jun 20, 2018, 11:26 AM Josh McGinley  wrote:
>
>> Beam Users and Dev -
>>
>> I recently published a medium article
>> <https://medium.com/google-cloud/python-development-environments-for-apache-beam-on-google-cloud-platform-b6f276b344df>
>>  showing how to set up Python Apache Beam pipelines for debugging in an
>> IDE.
>>
>> I thought I would share the article with this community.  If you have any
>> feedback let me know.  Otherwise keep up the great work on Beam!
>>
>> --
>> Josh McGinley
>>
>

-- 
Josh McGinley


Python Development Environments for Apache Beam

2018-06-20 Thread Josh McGinley
Beam Users and Dev -

I recently published a medium article
<https://medium.com/google-cloud/python-development-environments-for-apache-beam-on-google-cloud-platform-b6f276b344df>
 showing how to set up Python Apache Beam pipelines for debugging in an
IDE.

I thought I would share the article with this community.  If you have any
feedback let me know.  Otherwise keep up the great work on Beam!

-- 
Josh McGinley


Scio 0.5.3 released

2018-05-01 Thread Josh Baer
Hi all,

We just released Scio 0.5.3 with a few enhancements and bug fixes.

Cheers,
Josh

https://github.com/spotify/scio/releases/tag/v0.5.3

*"Lasiorhinus latifrons"*
Features

   - Add enabled-parameter to SCollection#debug #1107
   <https://github.com/spotify/scio/pull/1107>
   - Support batching in BigtableIO #1057
   <https://github.com/spotify/scio/pull/1057> #1112
   <https://github.com/spotify/scio/pull/1112>
   - Update TensorFlow to 1.8.0 #1003
   <https://github.com/spotify/scio/issues/1003>
   - Upgrade sparkey <https://github.com/spotify/sparkey> to 2.3.0 #1105
   <https://github.com/spotify/scio/pull/1105>
   - Add support for setting max memory usage for sparkey objects #1106
   <https://github.com/spotify/scio/pull/1106>

Bug fixes

   - Fix step names in the saveAsTypedBigQuery transform #1061
   <https://github.com/spotify/scio/issues/1061> #1127
   <https://github.com/spotify/scio/pull/1127>


Advice on parallelizing network calls in DoFn

2018-03-09 Thread Josh Ferge
Hello all:

Our team has a pipeline that make external network calls. These pipelines
are currently super slow, and the hypothesis is that they are slow because
we are not threading for our network calls. The github issue below provides
some discussion around this:

https://github.com/apache/beam/pull/957

In beam 1.0, there was IntraBundleParallelization, which helped with this.
However, this was removed because it didn't comply with a few BEAM
paradigms.

Questions going forward:

What is advised for jobs that make blocking network calls? It seems
bundling the elements into groups of size X prior to passing to the DoFn,
and managing the threading within the function might work. thoughts?
Are these types of jobs even suitable for beam?
Are there any plans to develop features that help with this?

Thanks


Re: BigQueryIO streaming inserts - poor performance with multiple tables

2018-03-01 Thread Josh
Hi Cham,

Thanks, I have emailed the dataflow-feedback email address with the details.

Best regards,
Josh

On Thu, Mar 1, 2018 at 12:26 AM, Chamikara Jayalath <chamik...@google.com>
wrote:

> Could be a DataflowRunner specific issue. Would you mind reporting this
> with corresponding Dataflow job IDs to either Dataflow stackoverflow
> channel [1] or dataflow-feedb...@google.com ?
>
> I suspect Dataflow split writing to multiple tables into multiple workers
> which may be keep all workers busy but we have to look at the job to
> confirm.
>
> Thanks,
> Cham
>
> [1] https://stackoverflow.com/questions/tagged/google-cloud-dataflow
>
> On Tue, Feb 27, 2018 at 11:56 PM Josh <jof...@gmail.com> wrote:
>
>> Hi all,
>>
>> We are using BigQueryIO.write() to stream data into BigQuery, and are
>> seeing very poor performance in terms of number of writes per second per
>> worker.
>>
>> We are currently using *32* x *n1-standard-4* workers to stream ~15,000
>> writes/sec to BigQuery. Each worker has ~90% CPU utilisation. Strangely the
>> number of workers and worker CPU utilisation remains constant at ~90% even
>> when the rate of input fluctuates down to below 10,000 writes/sec. The job
>> always keeps up with the stream (no backlog).
>>
>> I've seen BigQueryIO benchmarks which show ~20k writes/sec being achieved
>> with a single node, when streaming data into a *single* BQ table... So
>> my theory is that writing to multiple tables is somehow causing the
>> performance issue. Our writes are spread (unevenly) across 200+ tables. The
>> job itself does very little processing, and looking at the Dataflow metrics
>> pretty much all of the wall time is spent in the *StreamingWrite* step
>> of BigQueryIO. The Beam version is 2.2.0.
>>
>> Our code looks like this:
>>
>> stream.apply(BigQueryIO.write()
>> .to(new ToDestination())
>> .withFormatFunction(new FormatForBigQuery())
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>
>> where ToDestination is a:
>>
>> SerializableFunction<ValueInSingleWindow, TableDestination>
>>
>> which returns a:
>>
>> new TableDestination(tableName, "")
>>
>> where tableName looks like "myproject:dataset.tablename$20180228"
>>
>> Has as anyone else seen this kind of poor performance when streaming writes 
>> to multiple BQ tables? Is there anything here that sounds wrong, or any 
>> optimisations we can make?
>>
>> Thanks for any advice!
>>
>> Josh
>>
>


Re: Partitioning a stream randomly and writing to files with TextIO

2018-02-23 Thread Josh
I see, thanks Lukasz - I will try setting that up. Good shout on using
hashcode / ensuring the pipeline is deterministic!

On 23 Feb 2018 01:27, "Lukasz Cwik" <lc...@google.com> wrote:

> 1) Creating a PartitionFn is the right way to go. I would suggest using
> something which would give you stable output so you could replay your
> pipeline and this would be useful for tests as well. Use something like the
> object's hashcode and divide the hash space into 80%/10%/10% segments could
> work just make sure that if you go with hashcode the hashcode function
> distribute elements well.
>
> 2) This is runner dependent but most runners don't require storing
> everything in memory. For example if you were using Dataflow, you would
> only need to store a couple of elements in memory not the entire
> PCollection.
>
> On Thu, Feb 22, 2018 at 11:38 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi all,
>>
>> I want to read a large dataset using BigQueryIO, and then randomly
>> partition the rows into three chunks, where one partition has 80% of the
>> data and there are two other partitions with 10% and 10%. I then want to
>> write the three partitions to three files in GCS.
>>
>> I have a couple of quick questions:
>> (1) What would be the best way to do this random partitioning with Beam?
>> I think I can just use a PartitionFn which uses Math.random to determine
>> which of the three partitions an element should go to, but not sure if
>> there is a better approach.
>>
>> (2) I would then take the resulting PCollectionList and use TextIO to
>> write each partition to a GCS file. For this, would I need all data for the
>> largest partition to fit into the memory of a single worker?
>>
>> Thanks for any advice,
>>
>> Josh
>>
>
>


Partitioning a stream randomly and writing to files with TextIO

2018-02-22 Thread Josh
Hi all,

I want to read a large dataset using BigQueryIO, and then randomly
partition the rows into three chunks, where one partition has 80% of the
data and there are two other partitions with 10% and 10%. I then want to
write the three partitions to three files in GCS.

I have a couple of quick questions:
(1) What would be the best way to do this random partitioning with Beam? I
think I can just use a PartitionFn which uses Math.random to determine
which of the three partitions an element should go to, but not sure if
there is a better approach.

(2) I would then take the resulting PCollectionList and use TextIO to write
each partition to a GCS file. For this, would I need all data for the
largest partition to fit into the memory of a single worker?

Thanks for any advice,

Josh


Re: PubSubIO withTimestampAttribute - what are the implications?

2017-08-04 Thread Josh
Ok great, thanks Lukasz. I will try turning off the timestamp attribute on
some of these jobs then!

On Thu, Aug 3, 2017 at 10:14 PM, Lukasz Cwik <lc...@google.com> wrote:

> To my knowledge, autoscaling is dependent on how many messages are
> backlogged within Pubsub and independent of the second subscription (the
> second subscription is really to compute a better watermark).
>
> On Thu, Aug 3, 2017 at 1:34 PM, <jof...@gmail.com> wrote:
>
>> Thanks Lukasz that's good to know! It sounds like we can halve our PubSub
>> costs then!
>>
>> Just to clarify, if I were to remove withTimestampAttribute from a job,
>> this would cause the watermark to always be up to date (processing time)
>> even if the job starts to lag behind (build up of unacknowledged PubSub
>> messages). In this case would Dataflow's autoscaling still scale up? I
>> thought the reason the autoscaler scales up is due to the watermark lagging
>> behind, but is it also aware of the acknowledged PubSub messages?
>>
>> On 3 Aug 2017, at 18:58, Lukasz Cwik <lc...@google.com> wrote:
>>
>> You understanding is correct - the data watermark will only matter for
>> windowing. It will not affect auto-scaling. If the pipeline is not doing
>> any windowing, triggering, etc then there is no need to pay for the cost of
>> the second subscription.
>>
>> On Thu, Aug 3, 2017 at 8:17 AM, Josh <jof...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> We've been running a few streaming Beam jobs on Dataflow, where each job
>>> is consuming from PubSub via PubSubIO. Each job does something like this:
>>>
>>> PubsubIO.readMessagesWithAttributes()
>>> .withIdAttribute("unique_id")
>>> .withTimestampAttribute("timestamp");
>>>
>>> My understanding of `withTimestampAttribute` is that it means we use the
>>> timestamp on the PubSub message as Beam's concept of time (the watermark) -
>>> so that any windowing we do in the job uses "event time" rather than
>>> "processing time".
>>>
>>> My question is: is my understanding correct, and does using
>>> `withTimestampAttribute` have any effect in a job that doesn't do any
>>> windowing? I have a feeling it may also have an effect on Dataflow's
>>> autoscaling, since I think Dataflow scales up when the watermark timestamp
>>> lags behind, but I'm not sure about this.
>>>
>>> The reason I'm concerned about this is because we've been using it in
>>> all our Dataflow jobs, and have now realised that whenever
>>> `withTimestampAttribute` is used, Dataflow creates an additional PubSub
>>> subscription (suffixed with `__streaming_dataflow_internal`), which
>>> appears to be doubling PubSub costs (since we pay per subscription)! So I
>>> want to remove `withTimestampAttribute` from jobs where possible, but want
>>> to first understand the implications.
>>>
>>> Thanks for any advice,
>>> Josh
>>>
>>
>>
>


PubSubIO withTimestampAttribute - what are the implications?

2017-08-03 Thread Josh
Hi all,

We've been running a few streaming Beam jobs on Dataflow, where each job is
consuming from PubSub via PubSubIO. Each job does something like this:

PubsubIO.readMessagesWithAttributes()
.withIdAttribute("unique_id")
.withTimestampAttribute("timestamp");

My understanding of `withTimestampAttribute` is that it means we use the
timestamp on the PubSub message as Beam's concept of time (the watermark) -
so that any windowing we do in the job uses "event time" rather than
"processing time".

My question is: is my understanding correct, and does using
`withTimestampAttribute` have any effect in a job that doesn't do any
windowing? I have a feeling it may also have an effect on Dataflow's
autoscaling, since I think Dataflow scales up when the watermark timestamp
lags behind, but I'm not sure about this.

The reason I'm concerned about this is because we've been using it in all
our Dataflow jobs, and have now realised that whenever
`withTimestampAttribute` is used, Dataflow creates an additional PubSub
subscription (suffixed with `__streaming_dataflow_internal`), which appears
to be doubling PubSub costs (since we pay per subscription)! So I want to
remove `withTimestampAttribute` from jobs where possible, but want to first
understand the implications.

Thanks for any advice,
Josh


Re: What state is buffered when using Combine.perKey with an accumulator?

2017-06-20 Thread Josh
Hi Kenn,

Thanks for the reply, that makes sense.
As far as I can tell, the DirectPipelineRunner doesn't do this optimisation
(when I test the pipeline locally) but I guess the DataflowRunner will.

Josh

On Tue, Jun 20, 2017 at 4:26 PM, Kenneth Knowles <k...@google.com> wrote:

> Hi Josh,
>
> Exactly what is stored technically depends on optimization decisions by
> the runner. But you can generally expect that only the accumulator is
> stored across trigger firings, not the input elements.
>
> Kenn
>
> On Tue, Jun 20, 2017 at 6:32 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have a question about how much state is buffered when using
>> Combine.perKey with a custom accumulator. For example, I have:
>>
>> PCollection<KV<String, String>> elements = ...;
>>
>> PCollection<KV<String, List> topValuesPerKey = elements
>>
>> .apply(Window.into(new GlobalWindows())
>>
>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>> ElementInPane()
>>
>> .plusDelayOf(Duration.standardSeconds(10
>>
>> .accumulatingFiredPanes())
>>
>> .apply(Combine.perKey(new MyCombineFunction()));
>>
>>
>> Here MyCombineFunction is for each key, counting the occurrences of each
>> value. It's output for each key is a List of the values that occur
>> most frequently. In this case the accumulator for each key just stores a
>> Map<String, Long> of values and their associated counts.
>>
>>
>> My question is - since I am accumulatingFiredPanes forever on the global
>> window - is every element going to be buffered forever (i.e. amount of
>> space needed will constantly increase)? Or, is the amount of state buffered
>> determined by my accumulator (i.e. determined by the number of unique
>> values across all keys)? If the former is the case, how can I optimise my
>> job so that the accumulator is the only state stored across panes?
>>
>>
>> Thanks for any advice,
>>
>> Josh
>>
>
>


What state is buffered when using Combine.perKey with an accumulator?

2017-06-20 Thread Josh
Hi all,

I have a question about how much state is buffered when using
Combine.perKey with a custom accumulator. For example, I have:

PCollection<KV<String, String>> elements = ...;

PCollection<KV<String, List> topValuesPerKey = elements

.apply(Window.into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()

.plusDelayOf(Duration.standardSeconds(10

.accumulatingFiredPanes())

.apply(Combine.perKey(new MyCombineFunction()));


Here MyCombineFunction is for each key, counting the occurrences of each
value. It's output for each key is a List of the values that occur
most frequently. In this case the accumulator for each key just stores a
Map<String, Long> of values and their associated counts.


My question is - since I am accumulatingFiredPanes forever on the global
window - is every element going to be buffered forever (i.e. amount of
space needed will constantly increase)? Or, is the amount of state buffered
determined by my accumulator (i.e. determined by the number of unique
values across all keys)? If the former is the case, how can I optimise my
job so that the accumulator is the only state stored across panes?


Thanks for any advice,

Josh


Re: How to partition a stream by key before writing with FileBasedSink?

2017-06-06 Thread Josh
I see, thanks for the tips!

Last question about this! How could this be adapted to work in a
unbounded/streaming job? To work in an unbounded job, I need to put a
Window.into with a trigger before GroupByKey.
I guess this would mean that the "shard gets processed by a single thread
in MyDofn" guarantee will only apply to messages within a single window,
and would not apply across windows?
If this is the case, is there a better solution? I would like to avoid
buffering data in windows, and want the shard guarantee to apply across
windows.



On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:

> Your code looks like what I was describing. My only comment would be to
> use a deterministic hashing function which is stable across JVM versions
> and JVM instances as it will help in making your pipeline consistent across
> different runs/environments.
>
> Parallelizing across 8 instances instead of 4 would break the contract
> around GroupByKey (since it didn't group all the elements for a key
> correctly). Also, each element is the smallest unit of work and
> specifically in your pipeline you have chosen to reduce all your elements
> into 4 logical elements (each containing some proportion of your original
> data).
>
> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jof...@gmail.com> wrote:
>
>> Thanks for the reply, Lukasz.
>>
>>
>> What I meant was that I want to shard my data by a "shard key", and be
>> sure that any two elements with the same "shard key" are processed by the
>> same thread on the same worker. (Or if that's not possible, by the same
>> worker JVM with no thread guarantee would be good enough). It doesn't
>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>> processing the data.
>>
>>
>> It sounds like what you suggested will work for this, with the downside
>> of me needing to choose a number of shards/DoFns (e.g. 4).
>>
>> It seems a bit long and messy but am I right in thinking it would look
>> like this? ...
>>
>>
>> PCollection elements = ...;
>>
>> elements
>>
>> .apply(MapElements
>>
>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>> TypeDescriptor.of(MyElement.class)))
>>
>> .via((MyElement e) -> KV.of(
>>
>> e.getKey().toString().hashCode() % 4, e)))
>>
>> .apply(GroupByKey.create())
>>
>> .apply(Partition.of(4,
>>
>> (Partition.PartitionFn<KV<Integer, Iterable>>) (kv, i) ->
>> kv.getKey()))
>>
>> .apply(ParDo.of(new MyDofn()));
>>
>> // Where MyDofn must be changed to handle a KV<Integer,
>> Iterable> as input instead of just a MyElement
>>
>>
>> I was wondering is there a guarantee that the runner won't parallelise
>> the final MyDofn across e.g. 8 instances instead of 4? If there are two
>> input elements with the same key are they actually guaranteed to be
>> processed on the same instance?
>>
>>
>> Thanks,
>>
>> Josh
>>
>>
>>
>>
>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I think this is what your asking for but your statement about 4
>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4
>>> completely different DoFns. Also its unclear what you mean by
>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn
>>> each being processed by a single thread.
>>>
>>> This is a bad idea because you limit your parallelism but this is
>>> similar to what the default file sharding logic does. In Apache Beam the
>>> smallest unit of output for a GroupByKey is a single key+iterable pair. We
>>> exploit this by assigning all our values to a fixed number of keys and then
>>> performing a GroupByKey. This is the same trick that powers the file
>>> sharding logic in AvroIO/TextIO/...
>>>
>>> Your pipeline would look like (fixed width font diagram):
>>> your data      -> apply shard key   -> GroupByKey->
>>> partition by key -> your dofn #1
>>>
>>>  \> your dofn #2
>>>
>>>  \> ...
>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>
>>> This is not exactly the same as processing a single DoFn instance/thread
>>> because it relies on the Runner to be able to schedule each key to be
>>> processed on a different machine. For example a Runner may choose to
>>> process value 1,[a,c] and 2,[b,d] sequentially on the sam

Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Hi Raghu,

My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for taking a
look!

Yes I'm using BigtableIO for the sink and I am measuring the end-to-end
latency. It seems to take 3-6 seconds typically, I would like to get it
down to ~1s.

Thanks,
Josh

On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi <rang...@google.com> wrote:

> Josh,
>
> Can you share your job_id? I could take look. Are you measuring latency
> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO
> for sink?
>
> There is no easy way to use more workers when auto-scaling is enabled. It
> thinks your backlog and CPU are low enough and does not need to scale.
> Raghu.
>
> On Wed, May 24, 2017 at 10:14 AM, Josh <jof...@gmail.com> wrote:
>
>> Thanks Ankur, that's super helpful! I will give these optimisations a go.
>>
>> About the "No operations completed" message - there are a few of these in
>> the logs (but very few, like 1 an hour or something) - so probably no need
>> to scale up Bigtable.
>> I did however see a lot of INFO messages "Wrote 0 records" in the logs. 
>> Probably
>> about 50% of the "Wrote n records" messages are zero. While the other 50%
>> are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
>> bad setting?
>>
>> Josh
>>
>>
>>
>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <an...@malloc64.com>
>> wrote:
>>
>>> There are two main things to see here:
>>>
>>> * In the logs, are there any messages like "No operations completed
>>> within the last 61 seconds. There are still 1 simple operations and 1
>>> complex operations in progress.” This means you are underscaled on the
>>> bigtable side and would benefit from  increasing the node count.
>>> * We also saw some improvement in performance (workload dependent) by
>>> going to a bigger worker machine type.
>>> * Another optimization that worked for our use case:
>>>
>>> // streaming dataflow has larger machines with smaller bundles, so we can 
>>> queue up a lot more without blowing up
>>> private static BigtableOptions 
>>> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
>>> return new BigtableOptions.Builder()
>>> .setProjectId(opts.getProject())
>>> .setInstanceId(opts.getBigtableInstanceId())
>>> .setUseCachedDataPool(true)
>>> .setDataChannelCount(32)
>>> .setBulkOptions(new BulkOptions.Builder()
>>> .setUseBulkApi(true)
>>> .setBulkMaxRowKeyCount(2048)
>>> .setBulkMaxRequestSize(8_388_608L)
>>> .setAsyncMutatorWorkerCount(32)
>>> .build())
>>> .build();
>>> }
>>>
>>>
>>> There is a lot of trial and error involved in getting the end-to-end
>>> latency down so I would suggest enabling the profiling using the
>>> —saveProfilesToGcs option and get a sense of what is exactly happening.
>>>
>>> — Ankur Chauhan
>>>
>>> On May 24, 2017, at 9:09 AM, Josh <jof...@gmail.com> wrote:
>>>
>>> Ah ok - I am using the Dataflow runner. I didn't realise about the
>>> custom implementation being provided at runtime...
>>>
>>> Any ideas of how to tweak my job to either lower the latency consuming
>>> from PubSub or to lower the latency in writing to Bigtable?
>>>
>>>
>>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>>>> ...)?
>>>>
>>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <an...@malloc64.com>
>>>> wrote:
>>>>
>>>>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>>>>> runner are you using? If you are using google cloud dataflow then the
>>>>> PubsubIO class is not the one doing the reading from the pubsub topic. 
>>>>> They
>>>>> provide a custom implementation at run time.
>>>>>
>>>>> Ankur Chauhan
>>>>> Sent from my iPhone
>>>>>
>>>>> On May 24, 2017, at 07:52, Josh <jof...@gmail.com> wrote:
>>>>>
>>>>> Hi Ankur,
>>>>>
>>>>> What do you mean by runner address?
>>>>> Would you be able to l

Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Josh
Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!

On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:

> Google Cloud Dataflow won't override your setting. The dynamic sharding
> occurs if you don't explicitly set a numShard value.
>
> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi Lukasz,
>>
>> Thanks for the example. That sounds like a nice solution -
>> I am running on Dataflow though, which dynamically sets numShards - so if
>> I set numShards to 1 on each of those AvroIO writers, I can't be sure that
>> Dataflow isn't going to override my setting right? I guess this should work
>> fine as long as I partition my stream into a large enough number of
>> partitions so that Dataflow won't override numShards.
>>
>> Josh
>>
>>
>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Since your using a small number of shards, add a Partition transform
>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>> Write each partition with a single shard.
>>>
>>> (Fixed width diagram below)
>>> Pipeline -> AvroIO(numShards = 4)
>>> Becomes:
>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>   |-> AvroIO(numShards = 1)
>>>   |-> AvroIO(numShards = 1)
>>>   \-> AvroIO(numShards = 1)
>>>
>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>
>>>> I would like to partition the stream by some key in the element, so
>>>> that all elements with the same key will get processed by the same shard
>>>> writer, and therefore written to the same file. Is there a way to do this?
>>>> Note that in my stream the number of keys is very large (most elements have
>>>> a unique key, while a few elements share a key).
>>>>
>>>> Thanks,
>>>> Josh
>>>>
>>>
>>>
>>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Thanks Ankur, that's super helpful! I will give these optimisations a go.

About the "No operations completed" message - there are a few of these in
the logs (but very few, like 1 an hour or something) - so probably no need
to scale up Bigtable.
I did however see a lot of INFO messages "Wrote 0 records" in the
logs. Probably
about 50% of the "Wrote n records" messages are zero. While the other 50%
are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate a
bad setting?

Josh



On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <an...@malloc64.com> wrote:

> There are two main things to see here:
>
> * In the logs, are there any messages like "No operations completed within
> the last 61 seconds. There are still 1 simple operations and 1 complex
> operations in progress.” This means you are underscaled on the bigtable
> side and would benefit from  increasing the node count.
> * We also saw some improvement in performance (workload dependent) by
> going to a bigger worker machine type.
> * Another optimization that worked for our use case:
>
> // streaming dataflow has larger machines with smaller bundles, so we can 
> queue up a lot more without blowing up
> private static BigtableOptions 
> createStreamingBTOptions(AnalyticsPipelineOptions opts) {
> return new BigtableOptions.Builder()
> .setProjectId(opts.getProject())
> .setInstanceId(opts.getBigtableInstanceId())
> .setUseCachedDataPool(true)
> .setDataChannelCount(32)
> .setBulkOptions(new BulkOptions.Builder()
> .setUseBulkApi(true)
> .setBulkMaxRowKeyCount(2048)
> .setBulkMaxRequestSize(8_388_608L)
> .setAsyncMutatorWorkerCount(32)
> .build())
> .build();
> }
>
>
> There is a lot of trial and error involved in getting the end-to-end
> latency down so I would suggest enabling the profiling using the
> —saveProfilesToGcs option and get a sense of what is exactly happening.
>
> — Ankur Chauhan
>
> On May 24, 2017, at 9:09 AM, Josh <jof...@gmail.com> wrote:
>
> Ah ok - I am using the Dataflow runner. I didn't realise about the custom
> implementation being provided at runtime...
>
> Any ideas of how to tweak my job to either lower the latency consuming
> from PubSub or to lower the latency in writing to Bigtable?
>
>
> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex,
>> ...)?
>>
>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <an...@malloc64.com>
>> wrote:
>>
>>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>>> runner are you using? If you are using google cloud dataflow then the
>>> PubsubIO class is not the one doing the reading from the pubsub topic. They
>>> provide a custom implementation at run time.
>>>
>>> Ankur Chauhan
>>> Sent from my iPhone
>>>
>>> On May 24, 2017, at 07:52, Josh <jof...@gmail.com> wrote:
>>>
>>> Hi Ankur,
>>>
>>> What do you mean by runner address?
>>> Would you be able to link me to the comment you're referring to?
>>>
>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>> o/gcp/pubsub/PubsubIO.java
>>>
>>> Thanks,
>>> Josh
>>>
>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <an...@malloc64.com>
>>> wrote:
>>>
>>>> What runner address you using. Google cloud dataflow uses a closed
>>>> source version of the pubsub reader as noted in a comment on Read class.
>>>>
>>>> Ankur Chauhan
>>>> Sent from my iPhone
>>>>
>>>> On May 24, 2017, at 04:05, Josh <jof...@gmail.com> wrote:
>>>>
>>>> Hi all,
>>>>
>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>>>> seconds between the messages being published and being written to Bigtable.
>>>>
>>>> I want to try and decrease the latency to <1s if possible - does anyone
>>>> have any tips for doing this?
>>>>
>>>> I noticed that there is a PubsubGrpcClient
>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>>> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource
>>>> is initialised with a PubsubJsonClient, so the Grpc client doesn't appear
>>>> to be being used. Is there a way to switch to the Grpc client - as perhaps
>>>> that would give better performance?
>>>>
>>>> Also, I am running my job on Dataflow using autoscaling, which has only
>>>> allocated one n1-standard-4 instance to the job, which is running at
>>>> ~50% CPU. Could forcing a higher number of nodes help improve latency?
>>>>
>>>> Thanks for any advice,
>>>> Josh
>>>>
>>>>
>>>
>>
>
>


Re: How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Josh
Hi Lukasz,

Thanks for the example. That sounds like a nice solution -
I am running on Dataflow though, which dynamically sets numShards - so if I
set numShards to 1 on each of those AvroIO writers, I can't be sure that
Dataflow isn't going to override my setting right? I guess this should work
fine as long as I partition my stream into a large enough number of
partitions so that Dataflow won't override numShards.

Josh

On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:

> Since your using a small number of shards, add a Partition transform which
> uses a deterministic hash of the key to choose one of 4 partitions. Write
> each partition with a single shard.
>
> (Fixed width diagram below)
> Pipeline -> AvroIO(numShards = 4)
> Becomes:
> Pipeline -> Partition --> AvroIO(numShards = 1)
>   |-> AvroIO(numShards = 1)
>   |-> AvroIO(numShards = 1)
>   \-> AvroIO(numShards = 1)
>
> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi,
>>
>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>> (withWindowedWrites, hourly windows, numShards=4).
>>
>> I would like to partition the stream by some key in the element, so that
>> all elements with the same key will get processed by the same shard writer,
>> and therefore written to the same file. Is there a way to do this? Note
>> that in my stream the number of keys is very large (most elements have a
>> unique key, while a few elements share a key).
>>
>> Thanks,
>> Josh
>>
>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Ah ok - I am using the Dataflow runner. I didn't realise about the custom
implementation being provided at runtime...

Any ideas of how to tweak my job to either lower the latency consuming from
PubSub or to lower the latency in writing to Bigtable?


On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <lc...@google.com> wrote:

> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, ...)?
>
> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <an...@malloc64.com> wrote:
>
>> Sorry that was an autocorrect error. I meant to ask - what dataflow
>> runner are you using? If you are using google cloud dataflow then the
>> PubsubIO class is not the one doing the reading from the pubsub topic. They
>> provide a custom implementation at run time.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 07:52, Josh <jof...@gmail.com> wrote:
>>
>> Hi Ankur,
>>
>> What do you mean by runner address?
>> Would you be able to link me to the comment you're referring to?
>>
>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/
>> io/gcp/pubsub/PubsubIO.java
>>
>> Thanks,
>> Josh
>>
>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <an...@malloc64.com>
>> wrote:
>>
>>> What runner address you using. Google cloud dataflow uses a closed
>>> source version of the pubsub reader as noted in a comment on Read class.
>>>
>>> Ankur Chauhan
>>> Sent from my iPhone
>>>
>>> On May 24, 2017, at 04:05, Josh <jof...@gmail.com> wrote:
>>>
>>> Hi all,
>>>
>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then
>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-5
>>> seconds between the messages being published and being written to Bigtable.
>>>
>>> I want to try and decrease the latency to <1s if possible - does anyone
>>> have any tips for doing this?
>>>
>>> I noticed that there is a PubsubGrpcClient
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/
>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i
>>> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource is
>>> initialised with a PubsubJsonClient, so the Grpc client doesn't appear to
>>> be being used. Is there a way to switch to the Grpc client - as perhaps
>>> that would give better performance?
>>>
>>> Also, I am running my job on Dataflow using autoscaling, which has only
>>> allocated one n1-standard-4 instance to the job, which is running at
>>> ~50% CPU. Could forcing a higher number of nodes help improve latency?
>>>
>>> Thanks for any advice,
>>> Josh
>>>
>>>
>>
>


Re: How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Hi Ankur,

What do you mean by runner address?
Would you be able to link me to the comment you're referring to?

I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java

Thanks,
Josh

On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <an...@malloc64.com> wrote:

> What runner address you using. Google cloud dataflow uses a closed source
> version of the pubsub reader as noted in a comment on Read class.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 04:05, Josh <jof...@gmail.com> wrote:
>
> Hi all,
>
> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes
> the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds
> between the messages being published and being written to Bigtable.
>
> I want to try and decrease the latency to <1s if possible - does anyone
> have any tips for doing this?
>
> I noticed that there is a PubsubGrpcClient https://github.com/apache/beam
> /blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/
> main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java however
> the PubsubUnboundedSource is initialised with a PubsubJsonClient, so the
> Grpc client doesn't appear to be being used. Is there a way to switch to
> the Grpc client - as perhaps that would give better performance?
>
> Also, I am running my job on Dataflow using autoscaling, which has only
> allocated one n1-standard-4 instance to the job, which is running at ~50%
> CPU. Could forcing a higher number of nodes help improve latency?
>
> Thanks for any advice,
> Josh
>
>


How to decrease latency when using PubsubIO.Read?

2017-05-24 Thread Josh
Hi all,

I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes
the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds
between the messages being published and being written to Bigtable.

I want to try and decrease the latency to <1s if possible - does anyone
have any tips for doing this?

I noticed that there is a PubsubGrpcClient https://github.com/apache/
beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/
src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
however the PubsubUnboundedSource is initialised with a PubsubJsonClient,
so the Grpc client doesn't appear to be being used. Is there a way to
switch to the Grpc client - as perhaps that would give better performance?

Also, I am running my job on Dataflow using autoscaling, which has only
allocated one n1-standard-4 instance to the job, which is running at ~50%
CPU. Could forcing a higher number of nodes help improve latency?

Thanks for any advice,
Josh


How to partition a stream by key before writing with FileBasedSink?

2017-05-24 Thread Josh
Hi,

I am using a FileBasedSink (AvroIO.write) on an unbounded stream
(withWindowedWrites, hourly windows, numShards=4).

I would like to partition the stream by some key in the element, so that
all elements with the same key will get processed by the same shard writer,
and therefore written to the same file. Is there a way to do this? Note
that in my stream the number of keys is very large (most elements have a
unique key, while a few elements share a key).

Thanks,
Josh


Re: Using PubSubIO.read with windowing

2017-05-09 Thread Josh
I'm just testing this locally at the moment, using the direct runner.

No worries if you can't help! I've been looking at the code and feel like
maybe this is related to the PubSub source logic - for example here
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L178
there is some hard coded configuration which says there must be minimum 10
messages before the watermark is updated. I will keep looking into it -
hopefully someone who understands the PubSub source can give some insight...

Best,
Josh

On Tue, May 9, 2017 at 10:30 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Josh,
> What is this running on? I suspect the Dataflow service? In that case I’m
> afraid I can’t help because I know to little about it.
>
> Best,
> Aljoscha
>
> On 8. May 2017, at 14:25, Josh <jof...@gmail.com> wrote:
>
> Hi Aljoscha, what's the best way to investigate that?
>
> I can see the logs from PubSubIO (example pasted below). They show a "last
> reported watermark" however it is not consistent (it is often logged as a
> min timestamp until quite a few messages have been consumed) - I assume
> this is because there are several threads consuming from PubSub and some
> threads have not seen any messages yet?
>
> In my logs below you can see that one of the threads reported a watermark:
> 2017-05-08T11:45:53.798Z last reported watermark - however even though
> these watermarks are being reported, my SendWindowToAPI DoFn is not
> processing any windows until later.
>
>
>
> 05/08 12:46:55 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource
> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received
> messages, 0 current unread messages, 0 current unread bytes, 0 current
> in-flight msgs, no oldest in-flight, 1 current in-flight checkpoints, 1 max
> in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
> extended, 1 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
>
> 05/08 12:46:56 INFO [pool-3-thread-4] o.a.b.s.i.g.p.PubsubUnboundedSource
> - Pubsub projects/myproject/subscriptions/mysubscription has 3 received
> messages, 0 current unread messages, 0 current unread bytes, 1 current
> in-flight msgs, 7554ms oldest in-flight, 1 current in-flight checkpoints, 2
> max in-flight checkpoints, 13B/s recent read, 3 recent received, 0 recent
> extended, 2 recent late extended, 1 recent ACKed, 0 recent NACKed, 0 recent
> expired, 14093ms recent message timestamp skew, 9224866280808573ms recent
> watermark skew, 0 recent late messages, 2017-05-08T11:45:53.798Z last
> reported watermark
>
> 05/08 12:46:57 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource
> - Pubsub projects/myproject/subscriptions/mysubscription has 1 received
> messages, 0 current unread messages, 0 current unread bytes, 1 current
> in-flight msgs, 6423ms oldest in-flight, 1 current in-flight checkpoints, 1
> max in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
> extended, 0 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
> expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
> recent late messages, -290308-12-21T19:59:05.225Z last reported watermark
>
> On Mon, May 8, 2017 at 9:56 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> One suspicion I have is that the watermark could be lacking behind a bit.
>> Have you looked at that?
>>
>> On 7. May 2017, at 22:44, Josh <jof...@gmail.com> wrote:
>>
>> Thanks for the replies.
>> @Ankur I tried putting a GroupByKey between the Window.into and the sink,
>> and it didn't seem to make any difference...
>> @Aljoscha I see, that makes sense - so the windowed write code (which
>> uses TextIO.write().withWindowedWrites()) is not closing the files as
>> soon as the window has ended?
>>
>> I was trying this out with windowed writes, but what I really want to do
>> doesn't involve windowed writes. I am actually trying to do this:
>>
>> pipeline
>> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
>> .apply(ParDo.of(new MapToKV())
>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10
>> .apply(Combine.perKey(new MyCombineFn()))
>> .apply(ParDo.of(new SendWindowToAPI()));
>>
>> So here Combine.perKey will do a GroupByKey and then MyCombineFn will
>> aggregate the values for each key. I then want to use another DoFn
>> SendWindowToAPI which will ping the aggregate result for 

Re: Using PubSubIO.read with windowing

2017-05-08 Thread Josh
Hi Aljoscha, what's the best way to investigate that?

I can see the logs from PubSubIO (example pasted below). They show a "last
reported watermark" however it is not consistent (it is often logged as a
min timestamp until quite a few messages have been consumed) - I assume
this is because there are several threads consuming from PubSub and some
threads have not seen any messages yet?

In my logs below you can see that one of the threads reported a
watermark: 2017-05-08T11:45:53.798Z
last reported watermark - however even though these watermarks are being
reported, my SendWindowToAPI DoFn is not processing any windows until
later.



05/08 12:46:55 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource -
Pubsub projects/myproject/subscriptions/mysubscription has 1 received
messages, 0 current unread messages, 0 current unread bytes, 0 current
in-flight msgs, no oldest in-flight, 1 current in-flight checkpoints, 1 max
in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
extended, 1 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
recent late messages, -290308-12-21T19:59:05.225Z last reported watermark


05/08 12:46:56 INFO [pool-3-thread-4] o.a.b.s.i.g.p.PubsubUnboundedSource -
Pubsub projects/myproject/subscriptions/mysubscription has 3 received
messages, 0 current unread messages, 0 current unread bytes, 1 current
in-flight msgs, 7554ms oldest in-flight, 1 current in-flight checkpoints, 2
max in-flight checkpoints, 13B/s recent read, 3 recent received, 0 recent
extended, 2 recent late extended, 1 recent ACKed, 0 recent NACKed, 0 recent
expired, 14093ms recent message timestamp skew, 9224866280808573ms recent
watermark skew, 0 recent late messages, 2017-05-08T11:45:53.798Z last
reported watermark


05/08 12:46:57 INFO [pool-3-thread-2] o.a.b.s.i.g.p.PubsubUnboundedSource -
Pubsub projects/myproject/subscriptions/mysubscription has 1 received
messages, 0 current unread messages, 0 current unread bytes, 1 current
in-flight msgs, 6423ms oldest in-flight, 1 current in-flight checkpoints, 1
max in-flight checkpoints, 4B/s recent read, 1 recent received, 0 recent
extended, 0 recent late extended, 0 recent ACKed, 0 recent NACKed, 0 recent
expired, 0ms recent message timestamp skew, 0ms recent watermark skew, 0
recent late messages, -290308-12-21T19:59:05.225Z last reported watermark

On Mon, May 8, 2017 at 9:56 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> One suspicion I have is that the watermark could be lacking behind a bit.
> Have you looked at that?
>
> On 7. May 2017, at 22:44, Josh <jof...@gmail.com> wrote:
>
> Thanks for the replies.
> @Ankur I tried putting a GroupByKey between the Window.into and the sink,
> and it didn't seem to make any difference...
> @Aljoscha I see, that makes sense - so the windowed write code (which uses
> TextIO.write().withWindowedWrites()) is not closing the files as soon as
> the window has ended?
>
> I was trying this out with windowed writes, but what I really want to do
> doesn't involve windowed writes. I am actually trying to do this:
>
> pipeline
> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
> .apply(ParDo.of(new MapToKV())
> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10
> .apply(Combine.perKey(new MyCombineFn()))
> .apply(ParDo.of(new SendWindowToAPI()));
>
> So here Combine.perKey will do a GroupByKey and then MyCombineFn will
> aggregate the values for each key. I then want to use another DoFn
> SendWindowToAPI which will ping the aggregate result for each window to a
> REST API. I am trying to hack it this way for now since there is no RestIO
> sink yet.
>
> I'm having the same problem doing this as when running my write windowed
> files example - the SendWindowToAPI DoFn seems to only ping the API after a
> few minutes / 30+ messages have been sent, rather than immediately after
> each window.
>
> Any ideas what's going on here?
>
> Thanks,
> Josh
>
>
>
>
> On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> First, a bit of clarification (or refinement): a windowing strategy is
>> used in all subsequent GroupByKey operations until another windowing
>> strategy is specified. That being said, from quickly glancing at the
>> windowed write-code I have the suspicion that triggers are not used for
>> windowed writing and that instead some other scheme is used for determining
>> when to close a file.
>>
>> I’m sure others with more knowledge of those parts will jump in later but
>> I nevertheless wanted to give you this quick answer.
>>
>> Best,
>> Aljoscha
>>
>> On 7. May 2017, at 00:57, Ankur Chauhan <an...@m

Re: Using PubSubIO.read with windowing

2017-05-07 Thread Josh
Thanks for the replies.
@Ankur I tried putting a GroupByKey between the Window.into and the sink,
and it didn't seem to make any difference...
@Aljoscha I see, that makes sense - so the windowed write code (which uses
TextIO.write().withWindowedWrites()) is not closing the files as soon as
the window has ended?

I was trying this out with windowed writes, but what I really want to do
doesn't involve windowed writes. I am actually trying to do this:

pipeline
.apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
.apply(ParDo.of(new MapToKV())
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(10
.apply(Combine.perKey(new MyCombineFn()))
.apply(ParDo.of(new SendWindowToAPI()));

So here Combine.perKey will do a GroupByKey and then MyCombineFn will
aggregate the values for each key. I then want to use another DoFn
SendWindowToAPI which will ping the aggregate result for each window to a
REST API. I am trying to hack it this way for now since there is no RestIO
sink yet.

I'm having the same problem doing this as when running my write windowed
files example - the SendWindowToAPI DoFn seems to only ping the API after a
few minutes / 30+ messages have been sent, rather than immediately after
each window.

Any ideas what's going on here?

Thanks,
Josh




On Sun, May 7, 2017 at 12:18 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> First, a bit of clarification (or refinement): a windowing strategy is
> used in all subsequent GroupByKey operations until another windowing
> strategy is specified. That being said, from quickly glancing at the
> windowed write-code I have the suspicion that triggers are not used for
> windowed writing and that instead some other scheme is used for determining
> when to close a file.
>
> I’m sure others with more knowledge of those parts will jump in later but
> I nevertheless wanted to give you this quick answer.
>
> Best,
> Aljoscha
>
> On 7. May 2017, at 00:57, Ankur Chauhan <an...@malloc64.com> wrote:
>
> That I believe is expected behavior. The windowing strategy is applied at
> the following group by key operation. To have the windows fire the way you
> want, try putting a group by key immediately after the desired windowing
> function.
>
> The messages right now are being bundled aggressively for performance
> reasons and doing a gbk would ensure desired bundle delineations.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 6, 2017, at 14:11, Josh <jof...@gmail.com> wrote:
>
> Hi all,
> I am using a PubSubIO source, windowing every 10 seconds and then doing
> something with the windows, for example:
>
> pipeline
> .apply(PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(..))
> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10
> .apply(MapElements
> .into(TypeDescriptors.strings())
> .via((PubsubMessage msg) -> msg.getAttribute(*..*)))
> .apply(new WriteOneFilePerWindow(..));
>
> My expectation was that if I publish a pubsub message, and then publish 
> another 10+ seconds later, a single file should be written for the previous 
> 10 second window. However I find that I need to publish a lot of messages for 
> any files to be written at all (e.g. 30+ messages).
>
> Is this expected behaviour when using PubSubIO? Is there a way to tweak it to 
> fire the windows more eagerly?
>
> Thanks,
>
> Josh
>
>
>


Re: Fwd: Slack Invite

2017-05-05 Thread Josh
Could someone add me too please? at j...@permutive.com

On Fri, May 5, 2017 at 9:08 AM, Jean-Baptiste Onofré 
wrote:

> Done
>
> Regards
> JB
>
>
> On 05/05/2017 10:02 AM, Edward Bosher wrote:
>
>> i,
>>
>> Whenever you have time I'd love to get an invite to slack on this email
>> address.
>>
>> edbosher at gmail com
>>
>> Thanks,
>> Ed
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Http sink - Does it make sense?

2017-05-04 Thread Josh
Hi all,

I'm computing some counts in a Beam job and want to sink the results to a
REST API via HTTP. For example, group incoming elements by a key, count by
key, and then every minute write the count to an API which supports
incremental updates.

Is this something that other people have done with Beam? I was unable to
find any examples of an Http sink online. If I write my own custom sink to
do this, is there anything to be wary of?

Thanks for any advice,
Josh


Slack channel invite

2017-05-02 Thread Josh Di Fabio
Please will someone kindly invite joshdifa...@gmail.com to the Beam slack
channel?


Re: How to skip processing on failure at BigQueryIO sink?

2017-04-12 Thread Josh
Thanks for the replies,
@Lukasz that sounds like a good option. It's just it may be hard to catch
and filter out every case that will result in a 4xx error. I just want to
avoid the whole pipeline failing in the case of a few elements in the
stream being bad.

@Dan that sounds promising, I will keep an eye on BEAM-190. do you have any
idea if there will be an initial version of this to try out in the next
couple of weeks?

On Tue, Apr 11, 2017 at 11:37 PM, Dan Halperin <dhalp...@google.com> wrote:

> I believe this is BEAM-190, which is actually being worked on today.
> However, it will probably not be ready in time for the first stable release.
>
> https://issues.apache.org/jira/browse/BEAM-190
>
> On Tue, Apr 11, 2017 at 7:52 AM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Have you thought of fetching the schema upfront from BigQuery and
>> prefiltering out any records in a preceeding DoFn instead of relying on
>> BigQuery telling you that the schema doesn't match?
>>
>> Otherwise you are correct in believing that you will need to update
>> BigQueryIO to have the retry/error semantics that you want.
>>
>> On Tue, Apr 11, 2017 at 1:12 AM, Josh <jof...@gmail.com> wrote:
>>
>>> What I really want to do is configure BigQueryIO to log an error and
>>> skip the write if it receives a 4xx response from BigQuery (e.g. element
>>> does not match table schema). And for other errors (e.g. 5xx) I want it to
>>> retry n times with exponential backoff.
>>>
>>> Is there any way to do this at the moment? Will I need to make some
>>> custom changes to BigQueryIO?
>>>
>>>
>>>
>>> On Mon, Apr 10, 2017 at 7:11 PM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm using BigQueryIO to write the output of an unbounded streaming job
>>>> to BigQuery.
>>>>
>>>> In the case that an element in the stream cannot be written to
>>>> BigQuery, the BigQueryIO seems to have some default retry logic which
>>>> retries the write a few times. However, if the write fails repeatedly, it
>>>> seems to cause the whole pipeline to halt.
>>>>
>>>> How can I configure beam so that if writing an element fails a few
>>>> times, it simply gives up on writing that element and moves on without
>>>> affecting the pipeline?
>>>>
>>>> Thanks for any advice,
>>>> Josh
>>>>
>>>
>>>
>>
>


Re: How to skip processing on failure at BigQueryIO sink?

2017-04-11 Thread Josh
What I really want to do is configure BigQueryIO to log an error and skip
the write if it receives a 4xx response from BigQuery (e.g. element does
not match table schema). And for other errors (e.g. 5xx) I want it to retry
n times with exponential backoff.

Is there any way to do this at the moment? Will I need to make some custom
changes to BigQueryIO?



On Mon, Apr 10, 2017 at 7:11 PM, Josh <jof...@gmail.com> wrote:

> Hi,
>
> I'm using BigQueryIO to write the output of an unbounded streaming job to
> BigQuery.
>
> In the case that an element in the stream cannot be written to BigQuery,
> the BigQueryIO seems to have some default retry logic which retries the
> write a few times. However, if the write fails repeatedly, it seems to
> cause the whole pipeline to halt.
>
> How can I configure beam so that if writing an element fails a few times,
> it simply gives up on writing that element and moves on without affecting
> the pipeline?
>
> Thanks for any advice,
> Josh
>


How to skip processing on failure at BigQueryIO sink?

2017-04-10 Thread Josh
Hi,

I'm using BigQueryIO to write the output of an unbounded streaming job to
BigQuery.

In the case that an element in the stream cannot be written to BigQuery,
the BigQueryIO seems to have some default retry logic which retries the
write a few times. However, if the write fails repeatedly, it seems to
cause the whole pipeline to halt.

How can I configure beam so that if writing an element fails a few times,
it simply gives up on writing that element and moves on without affecting
the pipeline?

Thanks for any advice,
Josh


Re: BigQueryIO - Why is CREATE_NEVER not supported when using a tablespec?

2017-04-07 Thread Josh
Hi Dan,

Ok great thanks for confirming. I will create a JIRA and submit a PR to
remove this check then.

Thanks,
Josh

On Fri, Apr 7, 2017 at 6:09 PM, Dan Halperin <dhalp...@apache.org> wrote:

> Hi Josh,
> You raise a good point. I think we had put this check in (long before
> partition tables existed) because we need schema to create a table and we
> assumed the number of tables would be unbounded. But now it's an outdated
> check, overly conservative, and probably should be removed.
>
> Would you like to send a PR to fix this?
>
> Thanks,
> Dan
>
>
> On Fri, Apr 7, 2017 at 10:03 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have a use case where I want to stream into BigQuery, using a tablespec
>> but with CreateDisposition.CREATE_NEVER.I want to partition/shard my
>> data by date, and use BigQuery's date partitioning feature within a single
>> table (rather than creating a new BigQuery table for every day). In this
>> case writes would be made to a partition in a single table, e.g.
>> `my-project:dataset.my_table$20170407`, and in my tablespec I would just
>> be choosing the partition decorator using the window.
>>
>> Unfortunately this doesn't seem possible with BigQueryIO at the moment,
>> because it requires me to use CreateDisposition.CREATE_IF_NEEDED. I
>> can't use CreateDisposition.CREATE_IF_NEEDED because it requires me to
>> provide a table schema and my BigQuery schema isn't available at compile
>> time.
>>
>> Is there any good reason why CREATE_NEVER is not allowed when using a
>> tablespec?
>>
>> Thanks,
>> Josh
>>
>
>


BigQueryIO - Why is CREATE_NEVER not supported when using a tablespec?

2017-04-07 Thread Josh
Hi all,

I have a use case where I want to stream into BigQuery, using a tablespec
but with CreateDisposition.CREATE_NEVER.I want to partition/shard my data
by date, and use BigQuery's date partitioning feature within a single table
(rather than creating a new BigQuery table for every day). In this case
writes would be made to a partition in a single table, e.g.
`my-project:dataset.my_table$20170407`, and in my tablespec I would just be
choosing the partition decorator using the window.

Unfortunately this doesn't seem possible with BigQueryIO at the moment,
because it requires me to use CreateDisposition.CREATE_IF_NEEDED. I can't
use CreateDisposition.CREATE_IF_NEEDED because it requires me to provide a
table schema and my BigQuery schema isn't available at compile time.

Is there any good reason why CREATE_NEVER is not allowed when using a
tablespec?

Thanks,
Josh


Re: Having a local cache (per JVM) to use in DoFns

2017-04-06 Thread Josh
Thanks Lukasz, that's very helpful!

On Thu, Apr 6, 2017 at 1:34 PM, Lukasz Cwik <lc...@google.com> wrote:

> You should follow any valid singleton pattern and preferably initialize on
> class load or within a method annotated with @Setup [1]
>
> @Setup/@Teardown is called each time an instance of a DoFn is
> created/discarded respectively. @Setup/@Teardown generally will be called
> fewer times then startBundle/finishBundle but more than one instance of a
> DoFn may be created within a single JVM still which is why you still are
> required to follow any valid singleton pattern.
>
> For example:
> class MyDoFn {
>   private static volatile CachedService cachedService;
>
>   @Setup
>   public void setup() {
> // Initialize and store as static member if not already initialized
> if (cachedService == null) {
>   synchronized (MyDoFn.class) {
> if (cachedService == null) {
>   cachedService = ...
> }
>   }
>   }
> }
>
> [1]: https://github.com/apache/beam/blob/master/sdks/
> java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L496
>
> On Thu, Apr 6, 2017 at 4:46 AM, Josh <jof...@gmail.com> wrote:
>
>> Hello!
>>
>> I'm just getting started with Beam (in java), and have a question about
>> the best way to initialise and keep a local cache.
>>
>> In my case my DoFn needs to occasionally look up some info in an external
>> service. I have a Service class which interacts with the external service
>> and I have a CachedService which wraps an instance of Service and caches
>> the responses.
>>
>> I want this CachedService to be initialised once per JVM. What's the best
>> way to do this in Beam? Should the cache just be a static field in the
>> DoFn? Or should I be using the DoFn.StartBundle method and initialising the
>> cache in there? What if I want my cache to be used in two separate DoFns
>> (which sometimes run in the same JVM) - how can I ensure one cache per JVM
>> rather than one cache per DoFn?
>>
>> Thanks for any advice,
>>
>> Josh
>>
>
>


Having a local cache (per JVM) to use in DoFns

2017-04-06 Thread Josh
Hello!

I'm just getting started with Beam (in java), and have a question about the
best way to initialise and keep a local cache.

In my case my DoFn needs to occasionally look up some info in an external
service. I have a Service class which interacts with the external service
and I have a CachedService which wraps an instance of Service and caches
the responses.

I want this CachedService to be initialised once per JVM. What's the best
way to do this in Beam? Should the cache just be a static field in the
DoFn? Or should I be using the DoFn.StartBundle method and initialising the
cache in there? What if I want my cache to be used in two separate DoFns
(which sometimes run in the same JVM) - how can I ensure one cache per JVM
rather than one cache per DoFn?

Thanks for any advice,

Josh