Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
Thomas, I reworked using the GroupIntoBatches PTransform, and things
working great (with fewer lines of code).

Thanks

Jacob

On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble  wrote:

> That gist isn't working right now, but I'll update it when I find the bug.
>
> The direct runner grows memory, but never writes files.
> The dataflow runner writes temp files, but FinalizeGroupByKey never moves
> them to the final destination.
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble 
> wrote:
>
>> Consider multiple instances of a DoFn:
>>
>> @ProcessElement
>> public void window(ProcessContext context,
>> @StateId("count") ValueState countState) {
>>
>> int count = MoreObjects.firstNonNull(countState.read(), 0);
>> count += 1;
>> countState.write(count);
>>
>> If two instances read countState, then write countState, will countState
>> not be incremented by 1, but not by 2?
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik  wrote:
>>
>>> What do you mean by non-atomic?
>>>
>>> All output/state changes/timers from a process bundle are an all or
>>> nothing change. So if processing a bundle fails, any state changes are
>>> discarded and the state is reset to what it was before the bundle was
>>> processed.
>>>
>>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble 
>>> wrote:
>>>
 Here's a gist: https://gist.github.com/jacobm
 arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c

 Should I consider StateId value mutations to be non-atomic?

 Jacob

 On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:

> Feel free to share it with an online paste or a link to a github repo
> containing the code.
>
> Other users may be interested in your solution.
>
> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
> wrote:
>
>> Lukasz-
>>
>> That worked. I created a stateful DoFn with a stale timer, an initial
>> timestamp state, and a counter state, along with a buffer of elements to
>> bundle. When the counter or timer exceeds max values, 
>> outputWithTimestamp().
>>
>> I'm happy to post the entire implementation somewhere, not sure about
>> etiquette and how this mailing list handles attachments.
>>
>> Jacob
>>
>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik 
>> wrote:
>>
>>> Have you considered using a stateful DoFn, buffering/batching based
>>> upon a certain number of elements is shown in this blog[1] and could be
>>> extended for your usecase.
>>>
>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>
>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
>>> wrote:
>>>
 My first streaming pipeline is pretty simple, it just pipes a queue
 into files:

 - read JSON objects from PubsubIO
 - event time = processing time
 - 5 minute windows (
 - write n files to GCS, (TextIO.withNumShards() not dynamic)

 When the pipeline gets behind (for example, when the pipeline is
 stopped for an hour and restarted) this creates problems because the 
 amount
 of data per file becomes too much, and the pipeline stays behind.

 I believe that what is needed is a new step, just before "write to
 GCS":

 - split/partition/window into ceil(totalElements / maxElements)
 groups

 My next idea is to implement my own Partition and PartitionDoFn
 that accept a PCollectionView from Count.perElemen().

 Is there a more built-in way to accomplish dynamic partitions by
 element quantity?

 Jacob

>>>
>>>
>>
>

>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
That gist isn't working right now, but I'll update it when I find the bug.

The direct runner grows memory, but never writes files.
The dataflow runner writes temp files, but FinalizeGroupByKey never moves
them to the final destination.

Jacob

On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble  wrote:

> Consider multiple instances of a DoFn:
>
> @ProcessElement
> public void window(ProcessContext context,
> @StateId("count") ValueState countState) {
>
> int count = MoreObjects.firstNonNull(countState.read(), 0);
> count += 1;
> countState.write(count);
>
> If two instances read countState, then write countState, will countState
> not be incremented by 1, but not by 2?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik  wrote:
>
>> What do you mean by non-atomic?
>>
>> All output/state changes/timers from a process bundle are an all or
>> nothing change. So if processing a bundle fails, any state changes are
>> discarded and the state is reset to what it was before the bundle was
>> processed.
>>
>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble 
>> wrote:
>>
>>> Here's a gist: https://gist.github.com/jacobm
>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>
>>> Should I consider StateId value mutations to be non-atomic?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:
>>>
 Feel free to share it with an online paste or a link to a github repo
 containing the code.

 Other users may be interested in your solution.

 On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
 wrote:

> Lukasz-
>
> That worked. I created a stateful DoFn with a stale timer, an initial
> timestamp state, and a counter state, along with a buffer of elements to
> bundle. When the counter or timer exceeds max values, 
> outputWithTimestamp().
>
> I'm happy to post the entire implementation somewhere, not sure about
> etiquette and how this mailing list handles attachments.
>
> Jacob
>
> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:
>
>> Have you considered using a stateful DoFn, buffering/batching based
>> upon a certain number of elements is shown in this blog[1] and could be
>> extended for your usecase.
>>
>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
>> wrote:
>>
>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>> into files:
>>>
>>> - read JSON objects from PubsubIO
>>> - event time = processing time
>>> - 5 minute windows (
>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>
>>> When the pipeline gets behind (for example, when the pipeline is
>>> stopped for an hour and restarted) this creates problems because the 
>>> amount
>>> of data per file becomes too much, and the pipeline stays behind.
>>>
>>> I believe that what is needed is a new step, just before "write to
>>> GCS":
>>>
>>> - split/partition/window into ceil(totalElements / maxElements)
>>> groups
>>>
>>> My next idea is to implement my own Partition and PartitionDoFn that
>>> accept a PCollectionView from Count.perElemen().
>>>
>>> Is there a more built-in way to accomplish dynamic partitions by
>>> element quantity?
>>>
>>> Jacob
>>>
>>
>>
>

>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Thomas Groh
The calls are essentially atomic per-key.

More specifically, the two calls can occur in one of two ways:

1) They are for elements which share a key. If so, the calls _must_ be made
serially, so the second read() will see the result of the first write()
2) They are for elements which do not share a key. If so, the state
instances are unique and independent, so both will contain 1.

As an aside, I unfortunately missed some of this - part of the change may
be duplication of the 'GroupIntoBatches' PTransform within the Core SDK.

On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble  wrote:

> Consider multiple instances of a DoFn:
>
> @ProcessElement
> public void window(ProcessContext context,
> @StateId("count") ValueState countState) {
>
> int count = MoreObjects.firstNonNull(countState.read(), 0);
> count += 1;
> countState.write(count);
>
> If two instances read countState, then write countState, will countState
> not be incremented by 1, but not by 2?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik  wrote:
>
>> What do you mean by non-atomic?
>>
>> All output/state changes/timers from a process bundle are an all or
>> nothing change. So if processing a bundle fails, any state changes are
>> discarded and the state is reset to what it was before the bundle was
>> processed.
>>
>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble 
>> wrote:
>>
>>> Here's a gist: https://gist.github.com/jacobm
>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>
>>> Should I consider StateId value mutations to be non-atomic?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:
>>>
 Feel free to share it with an online paste or a link to a github repo
 containing the code.

 Other users may be interested in your solution.

 On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
 wrote:

> Lukasz-
>
> That worked. I created a stateful DoFn with a stale timer, an initial
> timestamp state, and a counter state, along with a buffer of elements to
> bundle. When the counter or timer exceeds max values, 
> outputWithTimestamp().
>
> I'm happy to post the entire implementation somewhere, not sure about
> etiquette and how this mailing list handles attachments.
>
> Jacob
>
> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:
>
>> Have you considered using a stateful DoFn, buffering/batching based
>> upon a certain number of elements is shown in this blog[1] and could be
>> extended for your usecase.
>>
>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
>> wrote:
>>
>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>> into files:
>>>
>>> - read JSON objects from PubsubIO
>>> - event time = processing time
>>> - 5 minute windows (
>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>
>>> When the pipeline gets behind (for example, when the pipeline is
>>> stopped for an hour and restarted) this creates problems because the 
>>> amount
>>> of data per file becomes too much, and the pipeline stays behind.
>>>
>>> I believe that what is needed is a new step, just before "write to
>>> GCS":
>>>
>>> - split/partition/window into ceil(totalElements / maxElements)
>>> groups
>>>
>>> My next idea is to implement my own Partition and PartitionDoFn that
>>> accept a PCollectionView from Count.perElemen().
>>>
>>> Is there a more built-in way to accomplish dynamic partitions by
>>> element quantity?
>>>
>>> Jacob
>>>
>>
>>
>

>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
Consider multiple instances of a DoFn:

@ProcessElement
public void window(ProcessContext context,
@StateId("count") ValueState countState) {

int count = MoreObjects.firstNonNull(countState.read(), 0);
count += 1;
countState.write(count);

If two instances read countState, then write countState, will countState
not be incremented by 1, but not by 2?

Jacob

On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik  wrote:

> What do you mean by non-atomic?
>
> All output/state changes/timers from a process bundle are an all or
> nothing change. So if processing a bundle fails, any state changes are
> discarded and the state is reset to what it was before the bundle was
> processed.
>
> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble 
> wrote:
>
>> Here's a gist: https://gist.github.com/jacobm
>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>
>> Should I consider StateId value mutations to be non-atomic?
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:
>>
>>> Feel free to share it with an online paste or a link to a github repo
>>> containing the code.
>>>
>>> Other users may be interested in your solution.
>>>
>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
>>> wrote:
>>>
 Lukasz-

 That worked. I created a stateful DoFn with a stale timer, an initial
 timestamp state, and a counter state, along with a buffer of elements to
 bundle. When the counter or timer exceeds max values, 
 outputWithTimestamp().

 I'm happy to post the entire implementation somewhere, not sure about
 etiquette and how this mailing list handles attachments.

 Jacob

 On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:

> Have you considered using a stateful DoFn, buffering/batching based
> upon a certain number of elements is shown in this blog[1] and could be
> extended for your usecase.
>
> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
> wrote:
>
>> My first streaming pipeline is pretty simple, it just pipes a queue
>> into files:
>>
>> - read JSON objects from PubsubIO
>> - event time = processing time
>> - 5 minute windows (
>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>
>> When the pipeline gets behind (for example, when the pipeline is
>> stopped for an hour and restarted) this creates problems because the 
>> amount
>> of data per file becomes too much, and the pipeline stays behind.
>>
>> I believe that what is needed is a new step, just before "write to
>> GCS":
>>
>> - split/partition/window into ceil(totalElements / maxElements) groups
>>
>> My next idea is to implement my own Partition and PartitionDoFn that
>> accept a PCollectionView from Count.perElemen().
>>
>> Is there a more built-in way to accomplish dynamic partitions by
>> element quantity?
>>
>> Jacob
>>
>
>

>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Lukasz Cwik
What do you mean by non-atomic?

All output/state changes/timers from a process bundle are an all or nothing
change. So if processing a bundle fails, any state changes are discarded
and the state is reset to what it was before the bundle was processed.

On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble  wrote:

> Here's a gist: https://gist.github.com/jacobmarble/
> 6ca40e0a14828e6a0dfe89b9cb2e4b4c
>
> Should I consider StateId value mutations to be non-atomic?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik  wrote:
>
>> Feel free to share it with an online paste or a link to a github repo
>> containing the code.
>>
>> Other users may be interested in your solution.
>>
>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble 
>> wrote:
>>
>>> Lukasz-
>>>
>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>> timestamp state, and a counter state, along with a buffer of elements to
>>> bundle. When the counter or timer exceeds max values, outputWithTimestamp().
>>>
>>> I'm happy to post the entire implementation somewhere, not sure about
>>> etiquette and how this mailing list handles attachments.
>>>
>>> Jacob
>>>
>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:
>>>
 Have you considered using a stateful DoFn, buffering/batching based
 upon a certain number of elements is shown in this blog[1] and could be
 extended for your usecase.

 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html

 On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
 wrote:

> My first streaming pipeline is pretty simple, it just pipes a queue
> into files:
>
> - read JSON objects from PubsubIO
> - event time = processing time
> - 5 minute windows (
> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>
> When the pipeline gets behind (for example, when the pipeline is
> stopped for an hour and restarted) this creates problems because the 
> amount
> of data per file becomes too much, and the pipeline stays behind.
>
> I believe that what is needed is a new step, just before "write to
> GCS":
>
> - split/partition/window into ceil(totalElements / maxElements) groups
>
> My next idea is to implement my own Partition and PartitionDoFn that
> accept a PCollectionView from Count.perElemen().
>
> Is there a more built-in way to accomplish dynamic partitions by
> element quantity?
>
> Jacob
>


>>>
>>
>


Re: [VOTE] [DISCUSSION] Remove support for Java 7

2017-10-18 Thread Eugene Kirpichov
So far we seem to have unanimous consensus in favor of dropping Java7, and
we seem to also be converging on declaring that this doesn't require
increasing major version - but the discussion has been going on for only a
couple of days and we might not have reached some users.

Robert - I think it's a great idea to make 2.2 release notes mention that
Beam is considering dropping Java 7 support in 2.3. We could also link to
this thread and encourage people to chime in; and we would need to make the
release notes explain the implications: 1) that people will need to use a
Java 8 compiler 2) if they are running on an on-prem cluster such as Spark
or Flink, they'll need to make sure their cluster runs Java8; for example,
Spark supports Java8 only starting from 1.6.

We should probably also tweet a link to this thread from the Apache Beam
account. Something like: "Apache Beam is discussing ending Java7 support in
a subsequent release; please chime in on
https://lists.apache.org/thread.html/2e1890c62d9f022f09b20e9f12f130fe9f1042e391979087f725d2e0@%3Cuser.beam.apache.org%3E
"

The Dataflow team can probably poll Dataflow customers in parallel with
this (Dataflow runner per se already supports Java8 - AFAIK the Dataflow
worker runs a Java 8 JVM).

I'd say if ~2 weeks after 2.2 release notes are out we still have no voices
from people for whom it's a show-stopper, then we should declare victory
and start implementation, targeting 2.3. Perhaps we can already file an
umbrella JIRA for dropping Java7.

Does this sound reasonable to people?

On Wed, Oct 18, 2017 at 8:21 AM Robert Bradshaw  wrote:

> On Oct 18, 2017 3:25 AM, "Jean-Baptiste Onofré"  wrote:
>
> What happens for the users using spark 1.5 that run with Java 7 only ?
>
>
> One of the goals of this thread is to tease out such users if any.
>
> To reach a wider audience, maybe the next set of release notes could
> mention that we're considering dropping support for Java 7 in the
> subsequent release.
>
>
> On Oct 18, 2017, at 12:06, "Ismaël Mejía"  wrote:
>>
>> +1
>>
>> I forgot to vote yesterday, I don't really think this is a change
>> worth requiring a major version of Beam. Just clear information in the
>> site/release notes should make it. Also I am afraid that if we wait
>> until we have enough changes to switch Beam to a new major version the
>> switch to Java 8 will happen too late, probably after Java 8's end of
>> life. And I am not exaggerating, Java 8 is planned to EOL next march
>> 2018! (of course Oracle usually changes this), in any case go go Java
>> 8 ASAP !
>>
>>
>> On Wed, Oct 18, 2017 at 8:08 AM, Prabeesh K.  wrote:
>>
>>>  +1
>>>
>>>  On 18 October 2017 at 05:16, Griselda Cuevas  wrote:
>>>

  +1

  On 17 October 2017 at 16:36, Robert Bradshaw  wrote:

>
>  +1 to removing Java 7 support, pending no major user outcry to the
>  contrary.
>
>  In terms of versioning, I fall into the camp that this isn't
>  sufficiently incompatible to warrant a major version increase.
>  Semantic versioning is all about messaging, and upgrading the major
>  version so soon after GA for such a minor change would IMHO cause more
>  confusion that clarity. Hitting 3.0 should signal a major improvement
>  to Beam itself.
>
>  On Tue, Oct 17, 2017 at 3:52 PM, Eugene Kirpichov 
>  wrote:
>
>>  +1 to removing Java 7 support.
>>
>>  In terms of release 3.0, we can handle this two ways:
>>  - Wait until enough other potentially incompatible changes accumulate,
>>  do
>>  all of them, and call it a "3.0" release, so that 3.0 will truly differ
>>  in a
>>  lot of incompatible and hopefully nice ways from 2.x. This might well
>>  take a
>>  year or so.
>>  - Make a release in which Java 7 support is removed, and call it a
>>  "3.0"
>>  release just to signal the incompatibility, and other potentially
>>  incompatible changes will wait until "4.0" etc.
>>
>>  I suppose the decision depends on whether we have a lot of other
>>  incompatible changes we would like to do, and whether we have any other
>>  truly great features enabled by those changes, or at least truly great
>>  features justifying increasing the major version number. If we go with
>>  #1,
>>  I'd say, among the current work happening in Beam, portability comes to
>>  mind
>>  as a sufficiently huge milestone, so maybe drop Java 7 in the same
>>  release
>>  that offers a sufficient chunk of the portability work?
>>
>>  (There's also a third path: declare that dropping Java7 support is not
>>  sufficiently "incompatible" to warrant a major version increase,
>>  because
>>  people don't have to rewrite their code but only switch their compiler
>>  version, and people who 

Re: KafkaIO and Avro

2017-10-18 Thread Eugene Kirpichov
It seems that KafkaAvroDeserializer implements Deserializer, though
I suppose with proper configuration that Object will at run-time be your
desired type. Have you tried adding some Java type casts to make it compile?

On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson 
wrote:

> I just tried quickly and see the same as you Andrew.
> We're missing something obvious or else extending KafkaAvroDeserializer seems
> necessary right?
>
> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
>> Hi,
>>
>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
>> it should be as simple as:
>>
>> p.apply(KafkaIO.*read*()
>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>   AvroCoder.of(Envelope.class))
>>
>> Where Envelope is the name of the Avro class. However, that does not
>> compile and I get the following error:
>>
>> incompatible types:
>> java.lang.Class
>> cannot be converted to java.lang.Class>
>> org.apache.kafka.common.serialization.Deserializer>
>>
>> I've tried a number of variations on this theme but haven't yet worked
>> it out and am starting to run out of ideas...
>>
>> Has anyone successfully read Avro data from Kafka?
>>
>> The code I'm using can be found at
>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
>> environment can be created with Docker.
>>
>> Thanks,
>> Andrew
>>
>
>


Re: Limit the number of DoFn instances per worker?

2017-10-18 Thread Lukasz Cwik
Using the semaphore approach will lead to unprocessed elements getting
buffered internally as well its just that you won't be splitting the CPU
time too thinly if every element is heavy on compute.

On Tue, Oct 17, 2017 at 11:37 PM, Raghu Angadi  wrote:

> One way you can limit DoFn parallelism is to reshuffle input into fixed
> number of shards. If you want to limit it to 32 across 8 workers, you can
> reshuffle into 32 shards. In Dataflow, this roughly evenly distribute among
> the workers. You can take a look at the this stackoverflow question
> 
>  where
> the user wanted to increase parallelism.
> In Dataflow DoFn parallelism depends not just on cores on the workers.. it
> can have much higher parallism if it is after a GroupByKey (depending on
> key cardinality)
>
> Regd Autoscaling: if you have small number of records with a lot
> processing (order of minutes), it might be hard to trigger upscaling if
> this skew is very high. All the pending records might fit in internal
> queues and the runner might see zero backlog. Limiting parallelism with
> reshuffle should help with this.
>
>
> On Tue, Oct 17, 2017 at 5:33 PM, Derek Hao Hu 
> wrote:
>
>> Thanks Rafal and Lukasz! These are great suggestions! One quick question
>> about using semaphore though, would it be possible for multiple elements to
>> pile up in a particular worker instance, waiting to acquire the semaphore
>> but can't? I'll definitely test it though.
>>
>> Lukasz, let me try to explain why I feel this autoscaling might not be
>> the ideal solution first. I'll definitely contact
>> dataflow-feedb...@google.com as well but I'll try to give some of my
>> [probably incorrect] thoughts.
>>
>> So basically based on my understanding if Beam tries to allocate multiple
>> elements to a single machine, let's assume an ideal computational model
>> where each single core takes T time to finish processing an element but if
>> all 32 cores can be used to process this element then it takes T/32 time.
>>
>> Therefore, if we have 32 incoming elements, if Beam allocates 32 threads
>> on a worker instance for this DoFn, each element using a single core will
>> be finished in T time and therefore there would be no back log during this
>> time since all the elements are being processed. But if we can tune the
>> parameter to say Beam should allocate fewer elements per worker instance,
>> then this creates a backlog and autoscaling might trigger earlier, so
>> technically the overall system lag might actually be better?
>>
>> I haven't tested this hypothesis yet but basically the above is my
>> reasoning.
>>
>> Thanks,
>>
>> Derek
>>
>> On Tue, Oct 17, 2017 at 8:49 AM, Lukasz Cwik  wrote:
>>
>>> The `numberOfWorkerHarnessThreads` is worker wide and not per DoFn.
>>> Setting this value to constrain how many threads are executing will impact
>>> all parts of your pipeline. One idea is to use a Semaphore as a static
>>> object within your DoFn with a fixed number of allowed actors that can
>>> enter and execute your Tensorflow.
>>>
>>> class TfDoFn {
>>>   private static final int MAX_TF_ACTORS = 4;
>>>   private static final Semaphore semaphore = new
>>> Semaphore(MAX_TF_ACTORS, true);
>>>
>>>   @ProcessElement
>>>   public void processElement(X x) {
>>> try {
>>>   semaphore.acquire();
>>>   // Do TF work
>>> } finally {
>>>   semaphore.release();
>>> }
>>>   }
>>> }
>>>
>>> This will ensure that your processing each TF item in a more timely
>>> manner but it will still mean that there could be many other TF items which
>>> are still sitting around waiting for the semaphore to be acquired.
>>>
>>> As an alternative, I would recommend contacting
>>> dataflow-feedb...@google.com specifically referencing how you believe
>>> autoscaling is not working well for your usecase/pipeline. Also provide a
>>> description of your pipeline and some job ids (if possible).
>>>
>>>
>>> On Mon, Oct 16, 2017 at 6:26 PM, Rafal Wojdyla  wrote:
>>>
 Hi.
 To answer your question: if we limit ourselves to DataflowRunner, you
 could use `numberOfWorkerHarnessThreads`. See more here
 .
 That said, I'm not gonna comment whether that is a good remedy for your
 actual problem.
 - rav

 On Mon, Oct 16, 2017 at 8:48 PM, Derek Hao Hu 
 wrote:

> Hi,
>
> ​Is there an easy way to limit the number of DoFn instances per worker?
>
> The use case is like this: we are calling TensorFlow in our DoFn and
> each TensorFlow call would automatically try to allocate the available CPU
> resources. So in a streaming pipeline, what I'm seeing is the inference
> time will become longer over 

Re: How to window by quantity of data?

2017-10-18 Thread Lukasz Cwik
Feel free to share it with an online paste or a link to a github repo
containing the code.

Other users may be interested in your solution.

On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble  wrote:

> Lukasz-
>
> That worked. I created a stateful DoFn with a stale timer, an initial
> timestamp state, and a counter state, along with a buffer of elements to
> bundle. When the counter or timer exceeds max values, outputWithTimestamp().
>
> I'm happy to post the entire implementation somewhere, not sure about
> etiquette and how this mailing list handles attachments.
>
> Jacob
>
> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik  wrote:
>
>> Have you considered using a stateful DoFn, buffering/batching based upon
>> a certain number of elements is shown in this blog[1] and could be extended
>> for your usecase.
>>
>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble 
>> wrote:
>>
>>> My first streaming pipeline is pretty simple, it just pipes a queue into
>>> files:
>>>
>>> - read JSON objects from PubsubIO
>>> - event time = processing time
>>> - 5 minute windows (
>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>
>>> When the pipeline gets behind (for example, when the pipeline is stopped
>>> for an hour and restarted) this creates problems because the amount of data
>>> per file becomes too much, and the pipeline stays behind.
>>>
>>> I believe that what is needed is a new step, just before "write to GCS":
>>>
>>> - split/partition/window into ceil(totalElements / maxElements) groups
>>>
>>> My next idea is to implement my own Partition and PartitionDoFn that
>>> accept a PCollectionView from Count.perElemen().
>>>
>>> Is there a more built-in way to accomplish dynamic partitions by element
>>> quantity?
>>>
>>> Jacob
>>>
>>
>>
>


Re: [VOTE] [DISCUSSION] Remove support for Java 7

2017-10-18 Thread Robert Bradshaw
On Oct 18, 2017 3:25 AM, "Jean-Baptiste Onofré"  wrote:

What happens for the users using spark 1.5 that run with Java 7 only ?


One of the goals of this thread is to tease out such users if any.

To reach a wider audience, maybe the next set of release notes could
mention that we're considering dropping support for Java 7 in the
subsequent release.


On Oct 18, 2017, at 12:06, "Ismaël Mejía"  wrote:
>
> +1
>
> I forgot to vote yesterday, I don't really think this is a change
> worth requiring a major version of Beam. Just clear information in the
> site/release notes should make it. Also I am afraid that if we wait
> until we have enough changes to switch Beam to a new major version the
> switch to Java 8 will happen too late, probably after Java 8's end of
> life. And I am not exaggerating, Java 8 is planned to EOL next march
> 2018! (of course Oracle usually changes this), in any case go go Java
> 8 ASAP !
>
>
> On Wed, Oct 18, 2017 at 8:08 AM, Prabeesh K.  wrote:
>
>>  +1
>>
>>  On 18 October 2017 at 05:16, Griselda Cuevas  wrote:
>>
>>>
>>>  +1
>>>
>>>  On 17 October 2017 at 16:36, Robert Bradshaw  wrote:
>>>

  +1 to removing Java 7 support, pending no major user outcry to the
  contrary.

  In terms of versioning, I fall into the camp that this isn't
  sufficiently incompatible to warrant a major version increase.
  Semantic versioning is all about messaging, and upgrading the major
  version so soon after GA for such a minor change would IMHO cause more
  confusion that clarity. Hitting 3.0 should signal a major improvement
  to Beam itself.

  On Tue, Oct 17, 2017 at 3:52 PM, Eugene Kirpichov 
  wrote:

>  +1 to removing Java 7 support.
>
>  In terms of release 3.0, we can handle this two ways:
>  - Wait until enough other potentially incompatible changes accumulate,
>  do
>  all of them, and call it a "3.0" release, so that 3.0 will truly differ
>  in a
>  lot of incompatible and hopefully nice ways from 2.x. This might well
>  take a
>  year or so.
>  - Make a release in which Java 7 support is removed, and call it a
>  "3.0"
>  release just to signal the incompatibility, and other potentially
>  incompatible changes will wait until "4.0" etc.
>
>  I suppose the decision depends on whether we have a lot of other
>  incompatible changes we would like to do, and whether we have any other
>  truly great features enabled by those changes, or at least truly great
>  features justifying increasing the major version number. If we go with
>  #1,
>  I'd say, among the current work happening in Beam, portability comes to
>  mind
>  as a sufficiently huge milestone, so maybe drop Java 7 in the same
>  release
>  that offers a sufficient chunk of the portability work?
>
>  (There's also a third path: declare that dropping Java7 support is not
>  sufficiently "incompatible" to warrant a major version increase,
>  because
>  people don't have to rewrite their code but only switch their compiler
>  version, and people who already use a Java8 compiler won't even notice.
>  This
>  path could perhaps be considered if we had evidence that switching to a
>  Beam
>  release without Java7 support would require 0 work for an overwhelming
>  majority of users)
>
>
>
>  On Tue, Oct 17, 2017 at 3:34 PM Randal Moore 
>  wrote:
>
>>
>>  +1
>>
>>  On Tue, Oct 17, 2017 at 5:21 PM Raghu Angadi 
>>  wrote:
>>
>>>
>>>  +1.
>>>
>>>  On Tue, Oct 17, 2017 at 2:11 PM, David McNeill 
>>>  wrote:
>>>

  The final version of Beam that supports Java 7 should be clearly
  stated
  in the docs, so those stuck on old production infrastructure for
  other java
  app dependencies know where to stop upgrading.

  David McNeill
  021 721 015



  On 18 October 2017 at 05:16, Ismaël Mejía  wrote:

>
>  We have discussed recently in the developer mailing list about the
>  idea of removing support for Java 7 on Beam. There are multiple
>  reasons for this:
>
>  - Java 7 has not received public updates for almost two years and
>  most
>  companies are moving / have already moved to Java 8.
>  - A good amount of the systems Beam users rely on have decided to
>  drop
>  Java 7 support, e.g. Spark, Flink, Elasticsearch, even Hadoop plans
>  to
>  do it on version 3.
>  - Most Big data distributions and Cloud managed Spark/Hadoop
>  services
>  

Re: [VOTE] [DISCUSSION] Remove support for Java 7

2017-10-18 Thread Neville Dipale
+1

On 18 October 2017 at 16:00, Ismaël Mejía  wrote:

> Small correction EOL of Java 8 is Sep. 2018 not Mar. 2018.
> http://www.oracle.com/technetwork/java/eol-135779.html
>
> JB the goal of this thread is to get an opinion from the users of all
> the runners on their opinions/constraints, but we have to reach some
> consensus and deal with the tradeoffs of existing users vs the future
> of the project. So far we don't have many reports from users on Spark
> 1.5 or more important from people constrained by the need of Java 7
> support, but we need to wait and see before taking a decision.
>
>
> On Wed, Oct 18, 2017 at 12:59 PM, Srinivas Reddy
>  wrote:
> > +1
> >
> > -
> > Srinivas
> >
> > - Typed on tiny keys. pls ignore typos.{mobile app}
> >
> > On 17-Oct-2017 9:47 PM, "Ismaël Mejía"  wrote:
> >>
> >> We have discussed recently in the developer mailing list about the
> >> idea of removing support for Java 7 on Beam. There are multiple
> >> reasons for this:
> >>
> >> - Java 7 has not received public updates for almost two years and most
> >> companies are moving / have already moved to Java 8.
> >> - A good amount of the systems Beam users rely on have decided to drop
> >> Java 7 support, e.g. Spark, Flink, Elasticsearch, even Hadoop plans to
> >> do it on version 3.
> >> - Most Big data distributions and Cloud managed Spark/Hadoop services
> >> have already moved to Java 8.
> >> - Recent versions of core libraries Beam uses are moving to be Java 8
> >> only (or mostly), e.g. Guava, Google Auto, etc.
> >> - Java 8 has some nice features that can make Beam code nicer e.g.
> >> lambdas, streams.
> >>
> >> Considering that Beam is a ‘recent’ project we expect users to be
> >> already using Java 8. However we wanted first to ask the opinion of
> >> the Beam users on this subject. It could be the case that some of the
> >> users are still dealing with some old cluster running on Java 7 or
> >> have another argument to keep the Java 7 compatibility.
> >>
> >> So, please vote:
> >> +1 Yes, go ahead and move Beam support to Java 8.
> >>  0 Do whatever you want. I don’t have a preference.
> >> -1 Please keep Java 7 compatibility (if possible add your argument to
> >> keep supporting for Java 7).
>


Re: [VOTE] [DISCUSSION] Remove support for Java 7

2017-10-18 Thread Ismaël Mejía
Small correction EOL of Java 8 is Sep. 2018 not Mar. 2018.
http://www.oracle.com/technetwork/java/eol-135779.html

JB the goal of this thread is to get an opinion from the users of all
the runners on their opinions/constraints, but we have to reach some
consensus and deal with the tradeoffs of existing users vs the future
of the project. So far we don't have many reports from users on Spark
1.5 or more important from people constrained by the need of Java 7
support, but we need to wait and see before taking a decision.


On Wed, Oct 18, 2017 at 12:59 PM, Srinivas Reddy
 wrote:
> +1
>
> -
> Srinivas
>
> - Typed on tiny keys. pls ignore typos.{mobile app}
>
> On 17-Oct-2017 9:47 PM, "Ismaël Mejía"  wrote:
>>
>> We have discussed recently in the developer mailing list about the
>> idea of removing support for Java 7 on Beam. There are multiple
>> reasons for this:
>>
>> - Java 7 has not received public updates for almost two years and most
>> companies are moving / have already moved to Java 8.
>> - A good amount of the systems Beam users rely on have decided to drop
>> Java 7 support, e.g. Spark, Flink, Elasticsearch, even Hadoop plans to
>> do it on version 3.
>> - Most Big data distributions and Cloud managed Spark/Hadoop services
>> have already moved to Java 8.
>> - Recent versions of core libraries Beam uses are moving to be Java 8
>> only (or mostly), e.g. Guava, Google Auto, etc.
>> - Java 8 has some nice features that can make Beam code nicer e.g.
>> lambdas, streams.
>>
>> Considering that Beam is a ‘recent’ project we expect users to be
>> already using Java 8. However we wanted first to ask the opinion of
>> the Beam users on this subject. It could be the case that some of the
>> users are still dealing with some old cluster running on Java 7 or
>> have another argument to keep the Java 7 compatibility.
>>
>> So, please vote:
>> +1 Yes, go ahead and move Beam support to Java 8.
>>  0 Do whatever you want. I don’t have a preference.
>> -1 Please keep Java 7 compatibility (if possible add your argument to
>> keep supporting for Java 7).


Re: How does Beam set up the bundle size in streaming mode (like Pub/Sub)?

2017-10-18 Thread Kenneth Knowles
Bundles are decidedly not windows, so let's keep the two terms separate. It
sounds like you are asking about bundles.

The bundle size is a performance tuning parameter and is arbitrarily chosen
arbitrarily and dynamically chosen by a runner. The runner chooses based on
its best effort to amortize @StartBundle/@FinishBundle operations across
multiple @ProcessElement/@OnTimer calls. Your code must yield correct
results for for any bundling - you should be implementing per-element
logic, where @StartBundle/@FinishBundle are implementation details.

Kenn

On Tue, Oct 17, 2017 at 5:37 PM, Derek Hao Hu 
wrote:

> Hi,
>
> Is there any more detailed explanation on how Beam chooses the window size
> (bundle size) in streaming mode? It seems there is no clear answer in the
> [Beam Programming Guide](https://beam.apache.
> org/documentation/programming-guide/) and I can't find how PubsubIO
> implements this windowing strategy as well. :(
>
> Could someone kindly provide some pointers? Thanks!
> --
> Derek Hao Hu
>
> Software Engineer | Snapchat
> Snap Inc.
>


KafkaIO and Avro

2017-10-18 Thread Andrew Jones
Hi,

I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
it should be as simple as:

p.apply(KafkaIO.*read*()
  .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
  AvroCoder.of(Envelope.class))

Where Envelope is the name of the Avro class. However, that does not
compile and I get the following error:

incompatible types:
java.lang.Class
cannot be converted to java.lang.Class>

I've tried a number of variations on this theme but haven't yet worked
it out and am starting to run out of ideas...

Has anyone successfully read Avro data from Kafka?

The code I'm using can be found at
https://github.com/andrewrjones/debezium-kafka-beam-example and a full
environment can be created with Docker.

Thanks,
Andrew


Re: [VOTE] [DISCUSSION] Remove support for Java 7

2017-10-18 Thread Srinivas Reddy
+1

-
Srinivas

- Typed on tiny keys. pls ignore typos.{mobile app}

On 17-Oct-2017 9:47 PM, "Ismaël Mejía"  wrote:

> We have discussed recently in the developer mailing list about the
> idea of removing support for Java 7 on Beam. There are multiple
> reasons for this:
>
> - Java 7 has not received public updates for almost two years and most
> companies are moving / have already moved to Java 8.
> - A good amount of the systems Beam users rely on have decided to drop
> Java 7 support, e.g. Spark, Flink, Elasticsearch, even Hadoop plans to
> do it on version 3.
> - Most Big data distributions and Cloud managed Spark/Hadoop services
> have already moved to Java 8.
> - Recent versions of core libraries Beam uses are moving to be Java 8
> only (or mostly), e.g. Guava, Google Auto, etc.
> - Java 8 has some nice features that can make Beam code nicer e.g.
> lambdas, streams.
>
> Considering that Beam is a ‘recent’ project we expect users to be
> already using Java 8. However we wanted first to ask the opinion of
> the Beam users on this subject. It could be the case that some of the
> users are still dealing with some old cluster running on Java 7 or
> have another argument to keep the Java 7 compatibility.
>
> So, please vote:
> +1 Yes, go ahead and move Beam support to Java 8.
>  0 Do whatever you want. I don’t have a preference.
> -1 Please keep Java 7 compatibility (if possible add your argument to
> keep supporting for Java 7).
>


Re: [VOTE] [DISCUSSION] Remove support for Java 7

2017-10-18 Thread Jean-Baptiste Onofré
What happens for the users using spark 1.5 that run with Java 7 only ?

On Oct 18, 2017, 12:06, at 12:06, "Ismaël Mejía"  wrote:
>+1
>
>I forgot to vote yesterday, I don't really think this is a change
>worth requiring a major version of Beam. Just clear information in the
>site/release notes should make it. Also I am afraid that if we wait
>until we have enough changes to switch Beam to a new major version the
>switch to Java 8 will happen too late, probably after Java 8's end of
>life. And I am not exaggerating, Java 8 is planned to EOL next march
>2018! (of course Oracle usually changes this), in any case go go Java
>8 ASAP !
>
>
>On Wed, Oct 18, 2017 at 8:08 AM, Prabeesh K. 
>wrote:
>> +1
>>
>> On 18 October 2017 at 05:16, Griselda Cuevas  wrote:
>>>
>>> +1
>>>
>>> On 17 October 2017 at 16:36, Robert Bradshaw 
>wrote:

 +1 to removing Java 7 support, pending no major user outcry to the
 contrary.

 In terms of versioning, I fall into the camp that this isn't
 sufficiently incompatible to warrant a major version increase.
 Semantic versioning is all about messaging, and upgrading the major
 version so soon after GA for such a minor change would IMHO cause
>more
 confusion that clarity. Hitting 3.0 should signal a major
>improvement
 to Beam itself.

 On Tue, Oct 17, 2017 at 3:52 PM, Eugene Kirpichov
>
 wrote:
 > +1 to removing Java 7 support.
 >
 > In terms of release 3.0, we can handle this two ways:
 > - Wait until enough other potentially incompatible changes
>accumulate,
 > do
 > all of them, and call it a "3.0" release, so that 3.0 will truly
>differ
 > in a
 > lot of incompatible and hopefully nice ways from 2.x. This might
>well
 > take a
 > year or so.
 > - Make a release in which Java 7 support is removed, and call it
>a
 > "3.0"
 > release just to signal the incompatibility, and other potentially
 > incompatible changes will wait until "4.0" etc.
 >
 > I suppose the decision depends on whether we have a lot of other
 > incompatible changes we would like to do, and whether we have any
>other
 > truly great features enabled by those changes, or at least truly
>great
 > features justifying increasing the major version number. If we go
>with
 > #1,
 > I'd say, among the current work happening in Beam, portability
>comes to
 > mind
 > as a sufficiently huge milestone, so maybe drop Java 7 in the
>same
 > release
 > that offers a sufficient chunk of the portability work?
 >
 > (There's also a third path: declare that dropping Java7 support
>is not
 > sufficiently "incompatible" to warrant a major version increase,
 > because
 > people don't have to rewrite their code but only switch their
>compiler
 > version, and people who already use a Java8 compiler won't even
>notice.
 > This
 > path could perhaps be considered if we had evidence that
>switching to a
 > Beam
 > release without Java7 support would require 0 work for an
>overwhelming
 > majority of users)
 >
 >
 >
 > On Tue, Oct 17, 2017 at 3:34 PM Randal Moore
>
 > wrote:
 >>
 >> +1
 >>
 >> On Tue, Oct 17, 2017 at 5:21 PM Raghu Angadi
>
 >> wrote:
 >>>
 >>> +1.
 >>>
 >>> On Tue, Oct 17, 2017 at 2:11 PM, David McNeill
>
 >>> wrote:
 
  The final version of Beam that supports Java 7 should be
>clearly
  stated
  in the docs, so those stuck on old production infrastructure
>for
  other java
  app dependencies know where to stop upgrading.
 
  David McNeill
  021 721 015
 
 
 
  On 18 October 2017 at 05:16, Ismaël Mejía 
>wrote:
 >
 > We have discussed recently in the developer mailing list
>about the
 > idea of removing support for Java 7 on Beam. There are
>multiple
 > reasons for this:
 >
 > - Java 7 has not received public updates for almost two years
>and
 > most
 > companies are moving / have already moved to Java 8.
 > - A good amount of the systems Beam users rely on have
>decided to
 > drop
 > Java 7 support, e.g. Spark, Flink, Elasticsearch, even Hadoop
>plans
 > to
 > do it on version 3.
 > - Most Big data distributions and Cloud managed Spark/Hadoop
 > services
 > have already moved to Java 8.
 > - Recent versions of core libraries Beam uses are moving to
>be Java
 > 8
 > only (or mostly), e.g. Guava, Google Auto, etc.
 > - Java 8 has some nice features that can make Beam code nicer
>e.g.
 > lambdas, streams.
 >
 > 

Re: [VOTE] [DISCUSSION] Remove support for Java 7

2017-10-18 Thread Ismaël Mejía
+1

I forgot to vote yesterday, I don't really think this is a change
worth requiring a major version of Beam. Just clear information in the
site/release notes should make it. Also I am afraid that if we wait
until we have enough changes to switch Beam to a new major version the
switch to Java 8 will happen too late, probably after Java 8's end of
life. And I am not exaggerating, Java 8 is planned to EOL next march
2018! (of course Oracle usually changes this), in any case go go Java
8 ASAP !


On Wed, Oct 18, 2017 at 8:08 AM, Prabeesh K.  wrote:
> +1
>
> On 18 October 2017 at 05:16, Griselda Cuevas  wrote:
>>
>> +1
>>
>> On 17 October 2017 at 16:36, Robert Bradshaw  wrote:
>>>
>>> +1 to removing Java 7 support, pending no major user outcry to the
>>> contrary.
>>>
>>> In terms of versioning, I fall into the camp that this isn't
>>> sufficiently incompatible to warrant a major version increase.
>>> Semantic versioning is all about messaging, and upgrading the major
>>> version so soon after GA for such a minor change would IMHO cause more
>>> confusion that clarity. Hitting 3.0 should signal a major improvement
>>> to Beam itself.
>>>
>>> On Tue, Oct 17, 2017 at 3:52 PM, Eugene Kirpichov 
>>> wrote:
>>> > +1 to removing Java 7 support.
>>> >
>>> > In terms of release 3.0, we can handle this two ways:
>>> > - Wait until enough other potentially incompatible changes accumulate,
>>> > do
>>> > all of them, and call it a "3.0" release, so that 3.0 will truly differ
>>> > in a
>>> > lot of incompatible and hopefully nice ways from 2.x. This might well
>>> > take a
>>> > year or so.
>>> > - Make a release in which Java 7 support is removed, and call it a
>>> > "3.0"
>>> > release just to signal the incompatibility, and other potentially
>>> > incompatible changes will wait until "4.0" etc.
>>> >
>>> > I suppose the decision depends on whether we have a lot of other
>>> > incompatible changes we would like to do, and whether we have any other
>>> > truly great features enabled by those changes, or at least truly great
>>> > features justifying increasing the major version number. If we go with
>>> > #1,
>>> > I'd say, among the current work happening in Beam, portability comes to
>>> > mind
>>> > as a sufficiently huge milestone, so maybe drop Java 7 in the same
>>> > release
>>> > that offers a sufficient chunk of the portability work?
>>> >
>>> > (There's also a third path: declare that dropping Java7 support is not
>>> > sufficiently "incompatible" to warrant a major version increase,
>>> > because
>>> > people don't have to rewrite their code but only switch their compiler
>>> > version, and people who already use a Java8 compiler won't even notice.
>>> > This
>>> > path could perhaps be considered if we had evidence that switching to a
>>> > Beam
>>> > release without Java7 support would require 0 work for an overwhelming
>>> > majority of users)
>>> >
>>> >
>>> >
>>> > On Tue, Oct 17, 2017 at 3:34 PM Randal Moore 
>>> > wrote:
>>> >>
>>> >> +1
>>> >>
>>> >> On Tue, Oct 17, 2017 at 5:21 PM Raghu Angadi 
>>> >> wrote:
>>> >>>
>>> >>> +1.
>>> >>>
>>> >>> On Tue, Oct 17, 2017 at 2:11 PM, David McNeill 
>>> >>> wrote:
>>> 
>>>  The final version of Beam that supports Java 7 should be clearly
>>>  stated
>>>  in the docs, so those stuck on old production infrastructure for
>>>  other java
>>>  app dependencies know where to stop upgrading.
>>> 
>>>  David McNeill
>>>  021 721 015
>>> 
>>> 
>>> 
>>>  On 18 October 2017 at 05:16, Ismaël Mejía  wrote:
>>> >
>>> > We have discussed recently in the developer mailing list about the
>>> > idea of removing support for Java 7 on Beam. There are multiple
>>> > reasons for this:
>>> >
>>> > - Java 7 has not received public updates for almost two years and
>>> > most
>>> > companies are moving / have already moved to Java 8.
>>> > - A good amount of the systems Beam users rely on have decided to
>>> > drop
>>> > Java 7 support, e.g. Spark, Flink, Elasticsearch, even Hadoop plans
>>> > to
>>> > do it on version 3.
>>> > - Most Big data distributions and Cloud managed Spark/Hadoop
>>> > services
>>> > have already moved to Java 8.
>>> > - Recent versions of core libraries Beam uses are moving to be Java
>>> > 8
>>> > only (or mostly), e.g. Guava, Google Auto, etc.
>>> > - Java 8 has some nice features that can make Beam code nicer e.g.
>>> > lambdas, streams.
>>> >
>>> > Considering that Beam is a ‘recent’ project we expect users to be
>>> > already using Java 8. However we wanted first to ask the opinion of
>>> > the Beam users on this subject. It could be the case that some of
>>> > the
>>> > users are still dealing with some old cluster running on Java 7 

Re: Limit the number of DoFn instances per worker?

2017-10-18 Thread Raghu Angadi
One way you can limit DoFn parallelism is to reshuffle input into fixed
number of shards. If you want to limit it to 32 across 8 workers, you can
reshuffle into 32 shards. In Dataflow, this roughly evenly distribute among
the workers. You can take a look at the this stackoverflow question

where
the user wanted to increase parallelism.
In Dataflow DoFn parallelism depends not just on cores on the workers.. it
can have much higher parallism if it is after a GroupByKey (depending on
key cardinality)

Regd Autoscaling: if you have small number of records with a lot processing
(order of minutes), it might be hard to trigger upscaling if this skew is
very high. All the pending records might fit in internal queues and the
runner might see zero backlog. Limiting parallelism with reshuffle should
help with this.


On Tue, Oct 17, 2017 at 5:33 PM, Derek Hao Hu 
wrote:

> Thanks Rafal and Lukasz! These are great suggestions! One quick question
> about using semaphore though, would it be possible for multiple elements to
> pile up in a particular worker instance, waiting to acquire the semaphore
> but can't? I'll definitely test it though.
>
> Lukasz, let me try to explain why I feel this autoscaling might not be the
> ideal solution first. I'll definitely contact dataflow-feedb...@google.com
> as well but I'll try to give some of my [probably incorrect] thoughts.
>
> So basically based on my understanding if Beam tries to allocate multiple
> elements to a single machine, let's assume an ideal computational model
> where each single core takes T time to finish processing an element but if
> all 32 cores can be used to process this element then it takes T/32 time.
>
> Therefore, if we have 32 incoming elements, if Beam allocates 32 threads
> on a worker instance for this DoFn, each element using a single core will
> be finished in T time and therefore there would be no back log during this
> time since all the elements are being processed. But if we can tune the
> parameter to say Beam should allocate fewer elements per worker instance,
> then this creates a backlog and autoscaling might trigger earlier, so
> technically the overall system lag might actually be better?
>
> I haven't tested this hypothesis yet but basically the above is my
> reasoning.
>
> Thanks,
>
> Derek
>
> On Tue, Oct 17, 2017 at 8:49 AM, Lukasz Cwik  wrote:
>
>> The `numberOfWorkerHarnessThreads` is worker wide and not per DoFn.
>> Setting this value to constrain how many threads are executing will impact
>> all parts of your pipeline. One idea is to use a Semaphore as a static
>> object within your DoFn with a fixed number of allowed actors that can
>> enter and execute your Tensorflow.
>>
>> class TfDoFn {
>>   private static final int MAX_TF_ACTORS = 4;
>>   private static final Semaphore semaphore = new Semaphore(MAX_TF_ACTORS,
>> true);
>>
>>   @ProcessElement
>>   public void processElement(X x) {
>> try {
>>   semaphore.acquire();
>>   // Do TF work
>> } finally {
>>   semaphore.release();
>> }
>>   }
>> }
>>
>> This will ensure that your processing each TF item in a more timely
>> manner but it will still mean that there could be many other TF items which
>> are still sitting around waiting for the semaphore to be acquired.
>>
>> As an alternative, I would recommend contacting
>> dataflow-feedb...@google.com specifically referencing how you believe
>> autoscaling is not working well for your usecase/pipeline. Also provide a
>> description of your pipeline and some job ids (if possible).
>>
>>
>> On Mon, Oct 16, 2017 at 6:26 PM, Rafal Wojdyla  wrote:
>>
>>> Hi.
>>> To answer your question: if we limit ourselves to DataflowRunner, you
>>> could use `numberOfWorkerHarnessThreads`. See more here
>>> .
>>> That said, I'm not gonna comment whether that is a good remedy for your
>>> actual problem.
>>> - rav
>>>
>>> On Mon, Oct 16, 2017 at 8:48 PM, Derek Hao Hu 
>>> wrote:
>>>
 Hi,

 ​Is there an easy way to limit the number of DoFn instances per worker?

 The use case is like this: we are calling TensorFlow in our DoFn and
 each TensorFlow call would automatically try to allocate the available CPU
 resources. So in a streaming pipeline, what I'm seeing is the inference
 time will become longer over time if autoscaling didn't catch up. My
 hypothesis is that Beam is trying to allocate a specific number of elements
 (maybe the number of cores?) on each worker for a particular DoFn and then
 these TensorFlow threads contend for CPU cycles. Therefore, I would like to
 know whether it's possible to limit the number of threads a pipeline runner
 can allocate for a DoFn per worker. 

Re: [VOTE] [DISCUSSION] Remove support for Java 7

2017-10-18 Thread Prabeesh K.
+1

On 18 October 2017 at 05:16, Griselda Cuevas  wrote:

> +1
>
> On 17 October 2017 at 16:36, Robert Bradshaw  wrote:
>
>> +1 to removing Java 7 support, pending no major user outcry to the
>> contrary.
>>
>> In terms of versioning, I fall into the camp that this isn't
>> sufficiently incompatible to warrant a major version increase.
>> Semantic versioning is all about messaging, and upgrading the major
>> version so soon after GA for such a minor change would IMHO cause more
>> confusion that clarity. Hitting 3.0 should signal a major improvement
>> to Beam itself.
>>
>> On Tue, Oct 17, 2017 at 3:52 PM, Eugene Kirpichov 
>> wrote:
>> > +1 to removing Java 7 support.
>> >
>> > In terms of release 3.0, we can handle this two ways:
>> > - Wait until enough other potentially incompatible changes accumulate,
>> do
>> > all of them, and call it a "3.0" release, so that 3.0 will truly differ
>> in a
>> > lot of incompatible and hopefully nice ways from 2.x. This might well
>> take a
>> > year or so.
>> > - Make a release in which Java 7 support is removed, and call it a "3.0"
>> > release just to signal the incompatibility, and other potentially
>> > incompatible changes will wait until "4.0" etc.
>> >
>> > I suppose the decision depends on whether we have a lot of other
>> > incompatible changes we would like to do, and whether we have any other
>> > truly great features enabled by those changes, or at least truly great
>> > features justifying increasing the major version number. If we go with
>> #1,
>> > I'd say, among the current work happening in Beam, portability comes to
>> mind
>> > as a sufficiently huge milestone, so maybe drop Java 7 in the same
>> release
>> > that offers a sufficient chunk of the portability work?
>> >
>> > (There's also a third path: declare that dropping Java7 support is not
>> > sufficiently "incompatible" to warrant a major version increase, because
>> > people don't have to rewrite their code but only switch their compiler
>> > version, and people who already use a Java8 compiler won't even notice.
>> This
>> > path could perhaps be considered if we had evidence that switching to a
>> Beam
>> > release without Java7 support would require 0 work for an overwhelming
>> > majority of users)
>> >
>> >
>> >
>> > On Tue, Oct 17, 2017 at 3:34 PM Randal Moore 
>> wrote:
>> >>
>> >> +1
>> >>
>> >> On Tue, Oct 17, 2017 at 5:21 PM Raghu Angadi 
>> wrote:
>> >>>
>> >>> +1.
>> >>>
>> >>> On Tue, Oct 17, 2017 at 2:11 PM, David McNeill 
>> >>> wrote:
>> 
>>  The final version of Beam that supports Java 7 should be clearly
>> stated
>>  in the docs, so those stuck on old production infrastructure for
>> other java
>>  app dependencies know where to stop upgrading.
>> 
>>  David McNeill
>>  021 721 015
>> 
>> 
>> 
>>  On 18 October 2017 at 05:16, Ismaël Mejía  wrote:
>> >
>> > We have discussed recently in the developer mailing list about the
>> > idea of removing support for Java 7 on Beam. There are multiple
>> > reasons for this:
>> >
>> > - Java 7 has not received public updates for almost two years and
>> most
>> > companies are moving / have already moved to Java 8.
>> > - A good amount of the systems Beam users rely on have decided to
>> drop
>> > Java 7 support, e.g. Spark, Flink, Elasticsearch, even Hadoop plans
>> to
>> > do it on version 3.
>> > - Most Big data distributions and Cloud managed Spark/Hadoop
>> services
>> > have already moved to Java 8.
>> > - Recent versions of core libraries Beam uses are moving to be Java
>> 8
>> > only (or mostly), e.g. Guava, Google Auto, etc.
>> > - Java 8 has some nice features that can make Beam code nicer e.g.
>> > lambdas, streams.
>> >
>> > Considering that Beam is a ‘recent’ project we expect users to be
>> > already using Java 8. However we wanted first to ask the opinion of
>> > the Beam users on this subject. It could be the case that some of
>> the
>> > users are still dealing with some old cluster running on Java 7 or
>> > have another argument to keep the Java 7 compatibility.
>> >
>> > So, please vote:
>> > +1 Yes, go ahead and move Beam support to Java 8.
>> >  0 Do whatever you want. I don’t have a preference.
>> > -1 Please keep Java 7 compatibility (if possible add your argument
>> to
>> > keep supporting for Java 7).
>> 
>> 
>> >>>
>> >
>>
>
>