Re: termination of stream#iterate on finite streams

2017-09-03 Thread Peter Ertl
Hi Xingcan!

if a _finite_ stream would, at the end, emit a special, trailing "End-Of-Stream 
Message" that floats downward the operator stream, wouldn't this enable us to 
deterministically end the iteration without needing a timeout?

Having an arbitrary timeout that must be longer than any iteration step takes 
seems really awkward.

What you think?

Best regards
Peter


> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>>:
> 
> Hi Peter,
> 
> I just omitted the filter part. Sorry for that.
> 
> Actually, as the javadoc explained, by default a DataStream with iteration 
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether the 
> feedback stream has reached its end
> (though the data source is terminated, there may be unknowable subsequent 
> data) and that's why it needs a
> timeout value to make the judgement, just like many other function calls in 
> network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator 
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
> 
> Internally, this mechanism is implemented via a blocking queue (the related 
> code can be found here 
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>).
> 
> Hope everything is considered this time : )
> 
> Best,
> Xingcan
> 
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net 
> <mailto:peter.e...@gmx.net>> wrote:
> 
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com 
>> <mailto:xingc...@gmail.com>>:
>> 
>> In your codes, all the the long values will subtract 1 and be sent back to 
>> the iterate operator, endlessly.
> 
> 
> Is this true? shouldn't
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
> 
> produce the following _feedback_ streams?
> 
> initial input to #iterate(): [1 2 3 4]
> 
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which 
> actually only happens when setting a timeout value)
> 
> Best regards
> Peter



> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com>:
> 
> Hi Peter,
> 
> I just omitted the filter part. Sorry for that.
> 
> Actually, as the javadoc explained, by default a DataStream with iteration 
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether the 
> feedback stream has reached its end
> (though the data source is terminated, there may be unknowable subsequent 
> data) and that's why it needs a
> timeout value to make the judgement, just like many other function calls in 
> network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator 
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
> 
> Internally, this mechanism is implemented via a blocking queue (the related 
> code can be found here 
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>).
> 
> Hope everything is considered this time : )
> 
> Best,
> Xingcan
> 
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net 
> <mailto:peter.e...@gmx.net>> wrote:
> 
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com 
>> <mailto:xingc...@gmail.com>>:
>> 
>> In your codes, all the the long values will subtract 1 and be sent back to 
>> the iterate operator, endlessly.
> 
> 
> Is this true? shouldn't
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
> 
> produce the following _feedback_ streams?
> 
> initial input to #iterate(): [1 2 3 4]
> 
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which 
> actually only happens when setting a timeout value)
> 
> Best regards
> Peter
> 
> 
> 



Re: termination of stream#iterate on finite streams

2017-09-02 Thread Peter Ertl

> Am 02.09.2017 um 04:45 schrieb Xingcan Cui :
> 
> In your codes, all the the long values will subtract 1 and be sent back to 
> the iterate operator, endlessly.


Is this true? shouldn't
  val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
(it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
meaningless 'y' chars just to do anything
  })
  iterationResult2.print()

produce the following _feedback_ streams?

initial input to #iterate(): [1 2 3 4]

iteration #1 : [1 2 3]
iteration #2 : [1 2]
iteration #3 : [1]
iteration #4 : []  => empty feedback stream => cause termination? (which 
actually only happens when setting a timeout value)

Best regards
Peter




termination of stream#iterate on finite streams

2017-09-01 Thread Peter Ertl
Hi folks,

I was doing some experiments with DataStream#iterate and what felt strange to 
me is the fact that #iterate() does not terminate on it's own when consuming a 
_finite_ stream.

I think this is awkward und unexpected. Only thing that "helped" was setting an 
arbitrary and meaningless timeout on iterate.

Imho this should not be necessary (maybe sent an internal "poison message" 
downward the iteration stream to signal shutdown of the streaming task?)

example:

// ---
// does terminate by introducing a meaningless timeout
// ---
val iterationResult1 = env.generateSequence(1, 4).iterate(it => {
  (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump 
meaningless 'x' chars just to do anything
}, 1000, keepPartitioning = false)

iterationResult1.print()

// ---
// does NEVER terminate
// ---
val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
  (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
meaningless 'y' chars just to do anything
})
iterationResult2.print()

Can someone elaborate on this - should I file a ticket?

Regards
Peter

load + update global state

2017-08-07 Thread Peter Ertl
Hi folks,

I am coding a streaming task that processes http requests from our web site and 
enriches these with additional information.

It contains session ids from historic requests and the related emails that were 
used within these session in the past.


lookup - hashtable: session_id: String => emails: Set[String]


During processing of these NEW http request

- the lookup table should be used to get previous emails and enrich the current 
stream item
- new candidates for the lookup table will be discovered during processing of 
these items and should be added to the lookup table (also these changes should 
be visible through the cluster)

I see at least the following issues:

(1) load the state as a whole from the data store into memory is a huge burn of 
memory (also making changes cluster-wide visible is an issue)

(2) not loading into memory but using something like cassandra / redis as a 
lookup store would certainly work but introduces a lot of network requests 
(possible ideas: use a distributed cache? broadcast updates in flink cluster?)

(3) how should I integrate the changes to the table with flink's checkpointing?

I really don't get how to solve this best and my current solution is far from 
elegant 

So is there any best practice for supporting "large lookup tables that change 
during stream processing" ?

Cheers
Peter






json mapper

2017-08-03 Thread Peter Ertl
Hi flink users,

I just wanted to ask if this kind of scala map function is correct?

object JsonMapper {
  private val mapper: ObjectMapper = new ObjectMapper()
}

class JsonMapper extends MapFunction[String, ObjectNode] {
  override def map(value: String): ObjectNode = 
JsonMapper.mapper.readValue(value, classOf[ObjectNode])
}

Is using a static reference to ObjectMapper fine or will this cause issues on a 
distributed cluster / with checkpoint / serializing state / whatever ?

Or should I instead use a non-transient property initialized in ctor 
(ObjectMapper is java.io.Serializable) ?

Or should I initialize it with RichMapFunction.open into a transient property?

Also I am wondering if replacing 'class' with 'object' (=> singleton)

object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }

is ok (actually the mapper is stateless so no obvious need to re-instantiate it 
again and again ? )

Thanks and best regards
Peter

state inside functions

2017-08-03 Thread Peter Ertl
Hi,

can someone elaborate on when I should set properties transient / non-transient 
within operators (e.g. map / flatMap / reduce) ?

I see these two possibilies:

(1) initialize a non-transient property from the constructor
(2) initialize a transient property inside a Rich???Function when 
open(ConfigurationParameters) is invoked

on what criteria should I choose (1) or (2) ?

how is this related to checkpointing / rebalancing?

Thanks in advance
Peter

replacement for KeyedStream.fold(..) ?

2017-08-02 Thread Peter Ertl
Hi folks,

since KeyedStream.fold(..) is marked as @deprecated what is the proper 
replacement for that kind of functionality?

Is mapWithState() and flatMapWithState() a *full* replacement?

Cheers
Peter

multiple streams with multiple actions - proper way?

2017-07-29 Thread Peter Ertl
Hello Flink People :-)


I am trying to get my head around flink - is it a supported use case to 
register multiple streams with possibly more than one transformation / action 
per stream?


def main(args: Array[String]): Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val prop = new Properties()
  prop.setProperty("bootstrap.servers", "vmi:9092")
  
  // first stream
  val ins = env.addSource(new FlinkKafkaConsumer010("foo", new 
SimpleStringSchema(), prop))
.map(s => "transformation-1: " + s)

  ins.map(s => "transformation-2:" + s).print() // one action
  ins.map(s => "transformation-3:" + s).print() // one more action
  ins.map(s => "transformation-4:" + s).print() // another action on the same 
stream

  // second, different stream
  val ins2 = env.addSource(new FlinkKafkaConsumer010("bar", new 
SimpleStringSchema(), prop))
.map(s => "transformation-5: " + s)

  ins2.map(s => "transformation-7:" + s).print() // action
  ins2.map(s => "transformation-6:" + s).print() // different action
  
  env.execute("run all streams with multiple actions attached")
}


Is this program abusing flnk or is this just how you are supposed to do things?

also, how many threads will this programm consume when running with parallelism 
= 4 ?


Best regards
Peter