[jira] [Assigned] (BEAM-1273) Error with FlinkPipelineOptions serialization after setStateBackend

2017-01-16 Thread Alexey Diomin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Diomin reassigned BEAM-1273:
---

Assignee: Alexey Diomin  (was: Aljoscha Krettek)

> Error with FlinkPipelineOptions serialization after setStateBackend
> ---
>
> Key: BEAM-1273
> URL: https://issues.apache.org/jira/browse/BEAM-1273
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Alexey Diomin
>Assignee: Alexey Diomin
>
> Trying setup FlinkPipelineOptions.setStateBackend cause error:
> {code}
> Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not 
> construct instance of org.apache.flink.runtime.state.AbstractStateBackend: 
> abstract types either need to be mapped to concrete types, have custom 
> deserializer, or contain additional type information.
> {code}
> Exception was thrown in SerializedPipelineOptions.
> Main problem then AbstractStateBackend and their implementation can't be 
> mapped in JSON schema for serialization.
> Error starting after:
> [BEAM-617][flink] introduce option to set state backend



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1273) Error with FlinkPipelineOptions serialization after setStateBackend

2017-01-15 Thread Alexey Diomin (JIRA)
Alexey Diomin created BEAM-1273:
---

 Summary: Error with FlinkPipelineOptions serialization after 
setStateBackend
 Key: BEAM-1273
 URL: https://issues.apache.org/jira/browse/BEAM-1273
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Reporter: Alexey Diomin
Assignee: Aljoscha Krettek


Trying setup FlinkPipelineOptions.setStateBackend cause error:

{code}
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not 
construct instance of org.apache.flink.runtime.state.AbstractStateBackend: 
abstract types either need to be mapped to concrete types, have custom 
deserializer, or contain additional type information.
{code}

Exception was thrown in SerializedPipelineOptions.

Main problem then AbstractStateBackend and their implementation can't be mapped 
in JSON schema for serialization.

Error starting after:
[BEAM-617][flink] introduce option to set state backend



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource

2017-01-11 Thread Alexey Diomin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Diomin reassigned BEAM-1255:
---

Assignee: Alexey Diomin  (was: Maximilian Michels)

> java.io.NotSerializableException in flink on UnboundedSource
> 
>
> Key: BEAM-1255
> URL: https://issues.apache.org/jira/browse/BEAM-1255
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.5.0
>Reporter: Alexey Diomin
>Assignee: Alexey Diomin
>
> After introduce new Coders with TypeDescriptor on flink runner we have issue:
> {code}
> Caused by: java.io.NotSerializableException: 
> sun.reflect.generics.reflectiveObjects.TypeVariableImpl
>   - element of array (index: 0)
>   - array (class "[Ljava.lang.Object;", size: 2)
>   - field (class 
> "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", 
> type: "class [Ljava.lang.Object;")
>   - object (class 
> "com.google.common.collect.ImmutableList$SerializedForm", 
> com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
>   - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> name: "argumentsList", type: "class com.google.common.collect.ImmutableList")
>   - object (class 
> "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "com.google.common.reflect.TypeToken", name: 
> "runtimeType", type: "interface java.lang.reflect.Type")
>   - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: 
> "token", type: "class com.google.common.reflect.TypeToken")
>   - object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1",
>  org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: 
> "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor")
>   - object (class "org.apache.beam.sdk.coders.SerializableCoder", 
> SerializableCoder)
>   - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", 
> type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.KvCoder", 
> KvCoder(SerializableCoder,AvroCoder))
>   - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: 
> "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.ListCoder", 
> ListCoder(KvCoder(SerializableCoder,AvroCoder)))
>   - field (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
>   - root object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
> {code}
> bug introduced after commit:
> 7b98fa08d14e8121e8885f00a9a9a878b73f81a6
> pull request:
> https://github.com/apache/beam/pull/1537
> Code for reproduce error
> {code}
> import com.google.common.collect.ImmutableList;
> import org.apache.beam.runners.flink.FlinkPipelineOptions;
> import org.apache.beam.runners.flink.FlinkRunner;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.kafka.KafkaIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class FlinkSerialisationError {
> public static void main(String[] args) {
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.as(FlinkPipelineOptions.class);
> options.setRunner(FlinkRunner.class);
> options.setStreaming(true);
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(
> KafkaIO.read()
> .withBootstrapServers("localhost:9092")
> .withTopics(ImmutableList.of("test"))
> // set ConsumerGroup
> .withoutMetadata());
> pipeline.run();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource

2017-01-11 Thread Alexey Diomin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15818082#comment-15818082
 ] 

Alexey Diomin edited comment on BEAM-1255 at 1/11/17 12:20 PM:
---

This bug relate for serialization of UnboundedSourceWrapper

{code}
@Test
  public void testSerialization() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();

TestCountingSource source = new TestCountingSource(1);
UnboundedSourceWrapper, 
TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>(options, source, 1);

InstantiationUtil.serializeObject(flinkWrapper);
  }
{code}

UnboundedSourceWrapper.java:147
{code}
Coder> sourceCoder =
  SerializableCoder.of(new TypeDescriptor>() {
  });
{code}

new TypeDescriptor>() {};
produce not serializable object


was (Author: humanoid):
This bug relate for serialization of UnboundedSourceWrapper

{code}
@Test
  public void testSerialization() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();

TestCountingSource source = new TestCountingSource(1);
UnboundedSourceWrapper, 
TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>(options, source, 1);

InstantiationUtil.serializeObject(flinkWrapper);
  }
{code}

> java.io.NotSerializableException in flink on UnboundedSource
> 
>
> Key: BEAM-1255
> URL: https://issues.apache.org/jira/browse/BEAM-1255
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.5.0
>Reporter: Alexey Diomin
>Assignee: Maximilian Michels
>
> After introduce new Coders with TypeDescriptor on flink runner we have issue:
> {code}
> Caused by: java.io.NotSerializableException: 
> sun.reflect.generics.reflectiveObjects.TypeVariableImpl
>   - element of array (index: 0)
>   - array (class "[Ljava.lang.Object;", size: 2)
>   - field (class 
> "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", 
> type: "class [Ljava.lang.Object;")
>   - object (class 
> "com.google.common.collect.ImmutableList$SerializedForm", 
> com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
>   - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> name: "argumentsList", type: "class com.google.common.collect.ImmutableList")
>   - object (class 
> "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "com.google.common.reflect.TypeToken", name: 
> "runtimeType", type: "interface java.lang.reflect.Type")
>   - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: 
> "token", type: "class com.google.common.reflect.TypeToken")
>   - object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1",
>  org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: 
> "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor")
>   - object (class "org.apache.beam.sdk.coders.SerializableCoder", 
> SerializableCoder)
>   - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", 
> type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.KvCoder", 
> KvCoder(SerializableCoder,AvroCoder))
>   - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: 
> "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.ListCoder", 
> ListCoder(KvCoder(SerializableCoder,AvroCoder)))
>   - field (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
>   - root object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
> {code}
> bug introduced after commit:
> 7b98fa08d14e8121e8885f00a9a9a878b73f81a6
> pull request:
> https://github.com/apache/beam/pull/1537
> Code for reproduce error
> {code}
> import com.google.common.collect.ImmutableList;
> import 

[jira] [Commented] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource

2017-01-11 Thread Alexey Diomin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15818082#comment-15818082
 ] 

Alexey Diomin commented on BEAM-1255:
-

This bug relate for serialization of UnboundedSourceWrapper

{code}
@Test
  public void testSerialization() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();

TestCountingSource source = new TestCountingSource(1);
UnboundedSourceWrapper, 
TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>(options, source, 1);

InstantiationUtil.serializeObject(flinkWrapper);
  }
{code}

> java.io.NotSerializableException in flink on UnboundedSource
> 
>
> Key: BEAM-1255
> URL: https://issues.apache.org/jira/browse/BEAM-1255
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 0.5.0
>Reporter: Alexey Diomin
>Assignee: Maximilian Michels
>
> After introduce new Coders with TypeDescriptor on flink runner we have issue:
> {code}
> Caused by: java.io.NotSerializableException: 
> sun.reflect.generics.reflectiveObjects.TypeVariableImpl
>   - element of array (index: 0)
>   - array (class "[Ljava.lang.Object;", size: 2)
>   - field (class 
> "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", 
> type: "class [Ljava.lang.Object;")
>   - object (class 
> "com.google.common.collect.ImmutableList$SerializedForm", 
> com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
>   - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> name: "argumentsList", type: "class com.google.common.collect.ImmutableList")
>   - object (class 
> "com.google.common.reflect.Types$ParameterizedTypeImpl", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "com.google.common.reflect.TypeToken", name: 
> "runtimeType", type: "interface java.lang.reflect.Type")
>   - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", 
> org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: 
> "token", type: "class com.google.common.reflect.TypeToken")
>   - object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1",
>  org.apache.beam.sdk.io.UnboundedSource)
>   - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: 
> "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor")
>   - object (class "org.apache.beam.sdk.coders.SerializableCoder", 
> SerializableCoder)
>   - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", 
> type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.KvCoder", 
> KvCoder(SerializableCoder,AvroCoder))
>   - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: 
> "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder")
>   - object (class "org.apache.beam.sdk.coders.ListCoder", 
> ListCoder(KvCoder(SerializableCoder,AvroCoder)))
>   - field (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
>   - root object (class 
> "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
>  
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
> {code}
> bug introduced after commit:
> 7b98fa08d14e8121e8885f00a9a9a878b73f81a6
> pull request:
> https://github.com/apache/beam/pull/1537
> Code for reproduce error
> {code}
> import com.google.common.collect.ImmutableList;
> import org.apache.beam.runners.flink.FlinkPipelineOptions;
> import org.apache.beam.runners.flink.FlinkRunner;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.kafka.KafkaIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class FlinkSerialisationError {
> public static void main(String[] args) {
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.as(FlinkPipelineOptions.class);
> options.setRunner(FlinkRunner.class);
> options.setStreaming(true);
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(
> KafkaIO.read()
> .withBootstrapServers("localhost:9092")
> .withTopics(ImmutableList.of("test"))
> // set ConsumerGroup
> .withoutMetadata());
> pipeline.run();

[jira] [Commented] (BEAM-1227) Release 0.4.0

2017-01-10 Thread Alexey Diomin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15815919#comment-15815919
 ] 

Alexey Diomin commented on BEAM-1227:
-

Maybe we can close this task ;)

> Release 0.4.0
> -
>
> Key: BEAM-1227
> URL: https://issues.apache.org/jira/browse/BEAM-1227
> Project: Beam
>  Issue Type: Task
>  Components: project-management
>Affects Versions: 0.4.0
>Reporter: Daniel Halperin
>Assignee: Jean-Baptiste Onofré
>
> Umbrella bug for the 0.4.0 (incubating or not) release.
> JB is the release manager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource

2017-01-10 Thread Alexey Diomin (JIRA)
Alexey Diomin created BEAM-1255:
---

 Summary: java.io.NotSerializableException in flink on 
UnboundedSource
 Key: BEAM-1255
 URL: https://issues.apache.org/jira/browse/BEAM-1255
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Affects Versions: 0.5.0
Reporter: Alexey Diomin
Assignee: Maximilian Michels


After introduce new Coders with TypeDescriptor on flink runner we have issue:

{code}
Caused by: java.io.NotSerializableException: 
sun.reflect.generics.reflectiveObjects.TypeVariableImpl
- element of array (index: 0)
- array (class "[Ljava.lang.Object;", size: 2)
- field (class 
"com.google.common.collect.ImmutableList$SerializedForm", name: "elements", 
type: "class [Ljava.lang.Object;")
- object (class 
"com.google.common.collect.ImmutableList$SerializedForm", 
com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
- field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", 
name: "argumentsList", type: "class com.google.common.collect.ImmutableList")
- object (class 
"com.google.common.reflect.Types$ParameterizedTypeImpl", 
org.apache.beam.sdk.io.UnboundedSource)
- field (class "com.google.common.reflect.TypeToken", name: 
"runtimeType", type: "interface java.lang.reflect.Type")
- object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", 
org.apache.beam.sdk.io.UnboundedSource)
- field (class "org.apache.beam.sdk.values.TypeDescriptor", name: 
"token", type: "class com.google.common.reflect.TypeToken")
- object (class 
"org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1",
 org.apache.beam.sdk.io.UnboundedSource)
- field (class "org.apache.beam.sdk.coders.SerializableCoder", name: 
"typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor")
- object (class "org.apache.beam.sdk.coders.SerializableCoder", 
SerializableCoder)
- field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", 
type: "interface org.apache.beam.sdk.coders.Coder")
- object (class "org.apache.beam.sdk.coders.KvCoder", 
KvCoder(SerializableCoder,AvroCoder))
- field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: 
"elementCoder", type: "interface org.apache.beam.sdk.coders.Coder")
- object (class "org.apache.beam.sdk.coders.ListCoder", 
ListCoder(KvCoder(SerializableCoder,AvroCoder)))
- field (class 
"org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
 name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
- root object (class 
"org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
{code}

bug introduced after commit:
7b98fa08d14e8121e8885f00a9a9a878b73f81a6

pull request:
https://github.com/apache/beam/pull/1537

Code for reproduce error
{code}
import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class FlinkSerialisationError {

public static void main(String[] args) {
FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setStreaming(true);


Pipeline pipeline = Pipeline.create(options);

pipeline.apply(
KafkaIO.read()
.withBootstrapServers("localhost:9092")
.withTopics(ImmutableList.of("test"))
// set ConsumerGroup
.withoutMetadata());

pipeline.run();
}
}
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-106) Native support for conditional iteration

2017-01-09 Thread Alexey Diomin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Diomin updated BEAM-106:
---
Description: 
Ported from: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/50

There are a variety of use cases which would benefit from native support for 
conditional iteration.

For instance, 
http://stackoverflow.com/questions/31654421/conditional-iterations-in-google-cloud-dataflow/31659923?noredirect=1#comment51264604_31659923
 asks about being able to write a loop like the following:

{code}
PCollection data  = ...
while(needsMoreWork(data)) {
  data = doAStep(data)
}
{code}
If there are specific use cases please let us know the details. In the future 
we will use this issue to post progress updates.

  was:
Ported from: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/50

There are a variety of use cases which would benefit from native support for 
conditional iteration.

For instance, 
http://stackoverflow.com/questions/31654421/conditional-iterations-in-google-cloud-dataflow/31659923?noredirect=1#comment51264604_31659923
 asks about being able to write a loop like the following:

PCollection data  = ...
while(needsMoreWork(data)) {
  data = doAStep(data)
}
If there are specific use cases please let us know the details. In the future 
we will use this issue to post progress updates.


> Native support for conditional iteration
> 
>
> Key: BEAM-106
> URL: https://issues.apache.org/jira/browse/BEAM-106
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-ideas
>Reporter: Luke Cwik
>Assignee: James Malone
>
> Ported from: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/50
> There are a variety of use cases which would benefit from native support for 
> conditional iteration.
> For instance, 
> http://stackoverflow.com/questions/31654421/conditional-iterations-in-google-cloud-dataflow/31659923?noredirect=1#comment51264604_31659923
>  asks about being able to write a loop like the following:
> {code}
> PCollection data  = ...
> while(needsMoreWork(data)) {
>   data = doAStep(data)
> }
> {code}
> If there are specific use cases please let us know the details. In the future 
> we will use this issue to post progress updates.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)