Re: Cyclic ConnectedStream

2017-02-05 Thread Matt
I really don't know what you mean, I've been reading the documentation and
examples showing iterations. but it just won't work for me I believe. Maybe
you can write a quick example? It doesn't matter the details, only the
topology.

If anyone else has an idea it's very welcome!

Matt

On Tue, Jan 31, 2017 at 3:07 PM, Gábor Gévay  wrote:

> I somehow still suspect that iterations might work for your use case.
> Note, that in the streaming API, iterations are currently nothing more than
> a back-edge in the topology, i.e. a low-level tool to create a cyclic
> topology, like as you say with your hypothetical setter syntax. (It's quite
> different from the iterations of the batch API.)
>
> The tricky part for your use-case is that you would want a ConnectedStream
> as your iteration head, which should get the elements from the back-edge in
> a separated way from the normal input. You could simulate this by using not
> ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input
> element type is an Either type, whose two components would be the normal
> input and the back-edge input. (And you add maps before the closeWith and
> to your input1, which would appropriately wrap into the two alternatives of
> the Either type.)
>
> Best,
> Gábor
>
>
>
> 2017-01-29 15:39 GMT+01:00 Matt :
>
>> Check this image for clarification, this is what I'm trying to do:
>> http://i.imgur.com/iZxPv04.png
>>
>> [image: Inline image 1]
>>
>> The rectangles are the two CoFlatMapFunction, sharing a state between
>> process and update (map1 and map2). It's clear from the image that I need
>> input1 and the green box to create the blue box, and input2 and the blue
>> box to create the green one.
>>
>> ---
>> *blue*  = *input1*.connect(*green*).keyBy(...).flatMap(...);
>> *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...);
>> ---
>>
>> As you can see there's no cycle in the flow of data so I guess this
>> topology is valid. The problem is not having a way to define such flow.
>>
>> For instance, with the appropriate setters we would be able to do this:
>>
>> ---
>> *blue*  = *input1*.connect();
>> *green* = *input2*.connect();
>>
>> *blue.*setConnection(*green*);
>> *green*.setConnection(*blue*);
>>
>> *blue*.keyBy(...).flatMap(...);
>> *green*.keyBy(...).flatMap(...);
>> ---
>>
>> Any idea is welcome.
>>
>> Matt
>>
>> On Sat, Jan 28, 2017 at 5:31 PM, Matt  wrote:
>>
>>> I'm aware of IterativeStream but I don't think it's useful in this case.
>>>
>>> As shown in the example above, my use case is "cyclic" in that the same
>>> object goes from *Input* to *predictionStream* (flatMap1), then to
>>> *statsStream* (flatMap2, where it's updated with an object from *Input2*)
>>> and finally to *predictionStream* (flatMap2).
>>>
>>> The same operator is never applied twice to the object, thus I would say
>>> this dataflow is cyclic only in the dependencies of the stream
>>> (predictionStream depends on statsStream, but it depends on
>>> predictionStream in the first place).
>>>
>>> I hope it is clear now.
>>>
>>> Matt
>>>
>>> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay  wrote:
>>>
 Hello,

 Cyclic dataflows can be built using iterations:
 https://ci.apache.org/projects/flink/flink-docs-release-1.2/
 dev/datastream_api.html#iterations

 Best,
 Gábor




 2017-01-28 18:39 GMT+01:00 Matt :
 > I have a ConnectedStream (A) that depends on another ConnectedStream
 (B),
 > which depends on the first one (A).
 >
 > Simplified code:
 >
 > predictionStream = input
 >   .connect(statsStream)
 >   .keyBy(...)
 >   .flatMap(CoFlatMapFunction {
 >  flatMap1(obj, output) {
 >  p = prediction(obj)
 >  output.collect(p)
 >  }
 >  flatMap2(stat, output) {
 >  updateModel(stat)
 >  }
 >   })
 >
 > statsStream = input2
 >   .connect(predictionStream)
 >   .keyBy(...)
 >   .flatMap(CoFlatMapFunction {
 >  flatMap1(obj2, output) {
 > s = getStats(obj2, p)
 > output.collect(s)
 >  }
 >  flatMap2(prediction, output) {
 > p = prediction
 >  }
 >   })
 >
 > I'm guessing this should be possible to achieve, one way would be to
 add a
 > sink on statsStream to save the elements into Kafka and read from
 that topic
 > on predictionStream instead of initializing it with a reference of
 > statsStream. But I would rather avoid writing unnecessarily into
 kafka.
 >
 > Is there any other way to achieve this?
 >
 > Thanks,
 > Matt

>>>
>>>
>>
>


Re: Cyclic ConnectedStream

2017-01-31 Thread Gábor Gévay
I somehow still suspect that iterations might work for your use case. Note,
that in the streaming API, iterations are currently nothing more than a
back-edge in the topology, i.e. a low-level tool to create a cyclic
topology, like as you say with your hypothetical setter syntax. (It's quite
different from the iterations of the batch API.)

The tricky part for your use-case is that you would want a ConnectedStream
as your iteration head, which should get the elements from the back-edge in
a separated way from the normal input. You could simulate this by using not
ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input
element type is an Either type, whose two components would be the normal
input and the back-edge input. (And you add maps before the closeWith and
to your input1, which would appropriately wrap into the two alternatives of
the Either type.)

Best,
Gábor



2017-01-29 15:39 GMT+01:00 Matt :

> Check this image for clarification, this is what I'm trying to do:
> http://i.imgur.com/iZxPv04.png
>
> [image: Inline image 1]
>
> The rectangles are the two CoFlatMapFunction, sharing a state between
> process and update (map1 and map2). It's clear from the image that I need
> input1 and the green box to create the blue box, and input2 and the blue
> box to create the green one.
>
> ---
> *blue*  = *input1*.connect(*green*).keyBy(...).flatMap(...);
> *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...);
> ---
>
> As you can see there's no cycle in the flow of data so I guess this
> topology is valid. The problem is not having a way to define such flow.
>
> For instance, with the appropriate setters we would be able to do this:
>
> ---
> *blue*  = *input1*.connect();
> *green* = *input2*.connect();
>
> *blue.*setConnection(*green*);
> *green*.setConnection(*blue*);
>
> *blue*.keyBy(...).flatMap(...);
> *green*.keyBy(...).flatMap(...);
> ---
>
> Any idea is welcome.
>
> Matt
>
> On Sat, Jan 28, 2017 at 5:31 PM, Matt  wrote:
>
>> I'm aware of IterativeStream but I don't think it's useful in this case.
>>
>> As shown in the example above, my use case is "cyclic" in that the same
>> object goes from *Input* to *predictionStream* (flatMap1), then to
>> *statsStream* (flatMap2, where it's updated with an object from *Input2*)
>> and finally to *predictionStream* (flatMap2).
>>
>> The same operator is never applied twice to the object, thus I would say
>> this dataflow is cyclic only in the dependencies of the stream
>> (predictionStream depends on statsStream, but it depends on
>> predictionStream in the first place).
>>
>> I hope it is clear now.
>>
>> Matt
>>
>> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay  wrote:
>>
>>> Hello,
>>>
>>> Cyclic dataflows can be built using iterations:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>> dev/datastream_api.html#iterations
>>>
>>> Best,
>>> Gábor
>>>
>>>
>>>
>>>
>>> 2017-01-28 18:39 GMT+01:00 Matt :
>>> > I have a ConnectedStream (A) that depends on another ConnectedStream
>>> (B),
>>> > which depends on the first one (A).
>>> >
>>> > Simplified code:
>>> >
>>> > predictionStream = input
>>> >   .connect(statsStream)
>>> >   .keyBy(...)
>>> >   .flatMap(CoFlatMapFunction {
>>> >  flatMap1(obj, output) {
>>> >  p = prediction(obj)
>>> >  output.collect(p)
>>> >  }
>>> >  flatMap2(stat, output) {
>>> >  updateModel(stat)
>>> >  }
>>> >   })
>>> >
>>> > statsStream = input2
>>> >   .connect(predictionStream)
>>> >   .keyBy(...)
>>> >   .flatMap(CoFlatMapFunction {
>>> >  flatMap1(obj2, output) {
>>> > s = getStats(obj2, p)
>>> > output.collect(s)
>>> >  }
>>> >  flatMap2(prediction, output) {
>>> > p = prediction
>>> >  }
>>> >   })
>>> >
>>> > I'm guessing this should be possible to achieve, one way would be to
>>> add a
>>> > sink on statsStream to save the elements into Kafka and read from that
>>> topic
>>> > on predictionStream instead of initializing it with a reference of
>>> > statsStream. But I would rather avoid writing unnecessarily into kafka.
>>> >
>>> > Is there any other way to achieve this?
>>> >
>>> > Thanks,
>>> > Matt
>>>
>>
>>
>


Re: Cyclic ConnectedStream

2017-01-29 Thread Matt
Check this image for clarification, this is what I'm trying to do:
http://i.imgur.com/iZxPv04.png

[image: Inline image 1]

The rectangles are the two CoFlatMapFunction, sharing a state between
process and update (map1 and map2). It's clear from the image that I need
input1 and the green box to create the blue box, and input2 and the blue
box to create the green one.

---
*blue*  = *input1*.connect(*green*).keyBy(...).flatMap(...);
*green* = *input2*.connect(*blue*).keyBy(...).flatMap(...);
---

As you can see there's no cycle in the flow of data so I guess this
topology is valid. The problem is not having a way to define such flow.

For instance, with the appropriate setters we would be able to do this:

---
*blue*  = *input1*.connect();
*green* = *input2*.connect();

*blue.*setConnection(*green*);
*green*.setConnection(*blue*);

*blue*.keyBy(...).flatMap(...);
*green*.keyBy(...).flatMap(...);
---

Any idea is welcome.

Matt

On Sat, Jan 28, 2017 at 5:31 PM, Matt  wrote:

> I'm aware of IterativeStream but I don't think it's useful in this case.
>
> As shown in the example above, my use case is "cyclic" in that the same
> object goes from *Input* to *predictionStream* (flatMap1), then to
> *statsStream* (flatMap2, where it's updated with an object from *Input2*)
> and finally to *predictionStream* (flatMap2).
>
> The same operator is never applied twice to the object, thus I would say
> this dataflow is cyclic only in the dependencies of the stream
> (predictionStream depends on statsStream, but it depends on
> predictionStream in the first place).
>
> I hope it is clear now.
>
> Matt
>
> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay  wrote:
>
>> Hello,
>>
>> Cyclic dataflows can be built using iterations:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/datastream_api.html#iterations
>>
>> Best,
>> Gábor
>>
>>
>>
>>
>> 2017-01-28 18:39 GMT+01:00 Matt :
>> > I have a ConnectedStream (A) that depends on another ConnectedStream
>> (B),
>> > which depends on the first one (A).
>> >
>> > Simplified code:
>> >
>> > predictionStream = input
>> >   .connect(statsStream)
>> >   .keyBy(...)
>> >   .flatMap(CoFlatMapFunction {
>> >  flatMap1(obj, output) {
>> >  p = prediction(obj)
>> >  output.collect(p)
>> >  }
>> >  flatMap2(stat, output) {
>> >  updateModel(stat)
>> >  }
>> >   })
>> >
>> > statsStream = input2
>> >   .connect(predictionStream)
>> >   .keyBy(...)
>> >   .flatMap(CoFlatMapFunction {
>> >  flatMap1(obj2, output) {
>> > s = getStats(obj2, p)
>> > output.collect(s)
>> >  }
>> >  flatMap2(prediction, output) {
>> > p = prediction
>> >  }
>> >   })
>> >
>> > I'm guessing this should be possible to achieve, one way would be to
>> add a
>> > sink on statsStream to save the elements into Kafka and read from that
>> topic
>> > on predictionStream instead of initializing it with a reference of
>> > statsStream. But I would rather avoid writing unnecessarily into kafka.
>> >
>> > Is there any other way to achieve this?
>> >
>> > Thanks,
>> > Matt
>>
>
>


Re: Cyclic ConnectedStream

2017-01-28 Thread Matt
I'm aware of IterativeStream but I don't think it's useful in this case.

As shown in the example above, my use case is "cyclic" in that the same
object goes from *Input* to *predictionStream* (flatMap1), then to
*statsStream* (flatMap2, where it's updated with an object from *Input2*)
and finally to *predictionStream* (flatMap2).

The same operator is never applied twice to the object, thus I would say
this dataflow is cyclic only in the dependencies of the stream
(predictionStream depends on statsStream, but it depends on
predictionStream in the first place).

I hope it is clear now.

Matt

On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay  wrote:

> Hello,
>
> Cyclic dataflows can be built using iterations:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/datastream_api.html#iterations
>
> Best,
> Gábor
>
>
>
>
> 2017-01-28 18:39 GMT+01:00 Matt :
> > I have a ConnectedStream (A) that depends on another ConnectedStream (B),
> > which depends on the first one (A).
> >
> > Simplified code:
> >
> > predictionStream = input
> >   .connect(statsStream)
> >   .keyBy(...)
> >   .flatMap(CoFlatMapFunction {
> >  flatMap1(obj, output) {
> >  p = prediction(obj)
> >  output.collect(p)
> >  }
> >  flatMap2(stat, output) {
> >  updateModel(stat)
> >  }
> >   })
> >
> > statsStream = input2
> >   .connect(predictionStream)
> >   .keyBy(...)
> >   .flatMap(CoFlatMapFunction {
> >  flatMap1(obj2, output) {
> > s = getStats(obj2, p)
> > output.collect(s)
> >  }
> >  flatMap2(prediction, output) {
> > p = prediction
> >  }
> >   })
> >
> > I'm guessing this should be possible to achieve, one way would be to add
> a
> > sink on statsStream to save the elements into Kafka and read from that
> topic
> > on predictionStream instead of initializing it with a reference of
> > statsStream. But I would rather avoid writing unnecessarily into kafka.
> >
> > Is there any other way to achieve this?
> >
> > Thanks,
> > Matt
>


Cyclic ConnectedStream

2017-01-28 Thread Matt
I have a ConnectedStream (A) that depends on another ConnectedStream (B),
which depends on the first one (A).

Simplified code:

*predictionStream = **input*
  .connect(*statsStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
 flatMap1(obj, output) {
 p = prediction(obj)
* output.collect(p)*
 }
 flatMap2(stat, output) {
 updateModel(stat)
 }
  })

*statsStream = input2*
  .connect(*predictionStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
 flatMap1(obj2, output) {
s = getStats(obj2, p)
*output.collect(s)*
 }
 flatMap2(prediction, output) {
p = prediction
 }
  })

I'm guessing this should be possible to achieve, one way would be to add a
sink on statsStream to save the elements into Kafka and read from that
topic on predictionStream instead of initializing it with a reference of
statsStream. But I would rather avoid writing unnecessarily into kafka.

Is there any other way to achieve this?

Thanks,
Matt