Re: Reconstruct object through partial select query

2019-05-10 Thread Shahar Cizer Kobrinsky
Hi Fabian,

I have a trouble implementing the type for this operation, i wonder how i
can do that.
So given generic type T i want to create a TypeInformation for:
class TaggedEvent {
   String[] tags
   T originalEvent
}

Was trying a few different things but not sure how to do it.
Doesn't seem like i can use TypeHint as i need to know the actual generics
class for it, right?
Do i need a TaggedEventTypeFactory? If so, how do i create the
TaggedEventTypeInfo for it?  do you have an example for it? was trying to
follow this[1] but doesn't seem to really work. I'm getting null as my
genericParameter for some reason. Also, how would you create the serializer
for the type info? can i reuse some builtin Kryo functionality?

Thanks


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer





On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Thanks Fabian,
>
> I'm looking into a way to enrich it without having to know the internal
> fields of the original event type.
> Right now what I managed to do is to map Car into a TaggedEvent prior
> to the SQL query, tags being empty, then run the SQL query selecting *origin,
> enrich(.. ) as tags*
> Not sure there's a better way but i guess that works
>
>
>
> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> you can use the value construction function ROW to create a nested row
>> (or object).
>> However, you have to explicitly reference all attributes that you will
>> add.
>>
>> If you have a table Cars with (year, modelName) a query could look like
>> this:
>>
>> SELECT
>>   ROW(year, modelName) AS car,
>>   enrich(year, modelName) AS tags
>> FROM Cars;
>>
>> Handling many attributes is always a bit painful in SQL.
>> There is an effort to make the Table API easier to use for these use
>> cases (for example Column Operations [1]).
>>
>> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11967
>>
>>
>>
>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
>> shahar.kobrin...@gmail.com>:
>>
>>> Just to be more clear on my goal -
>>> Im trying to enrich the incoming stream with some meaningful tags based
>>> on
>>> conditions from the event itself.
>>> So the input stream could be an event looks like:
>>> Class Car {
>>>   int year;
>>>   String modelName;
>>> }
>>>
>>> i will have a config that are defining tags as:
>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>>
>>> So ideally my output will be in the structure of
>>>
>>> Class TaggedEvent {
>>>Car origin;
>>>String[] tags;
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Use case for StreamingFileSink: Different parquet writers within the Sink

2019-05-10 Thread Kailash Dayanand
Hello,

I was able to solve this based by creating a data model where all the
incoming events are added into a message envelope and writing a Sink for a
dataStream containing these message envelopes. Also I ended up creating
parquet writers not when constructing the parquetWriter but instead inside
the addElement method of the BulkWriter function. Adding this information
for posterior in case someone needs to handle seem use case.

Thanks
Kailash

On Sun, Apr 28, 2019 at 3:57 PM Kailash Dayanand  wrote:

> We have the following use case: We are reading a stream of events which we
> want to write to different parquet files based on data within the element
> . The end goal is to register these parquet files in hive to query. I
> was exploring the option of using StreamingFileSink for this use case but
> found a new things which I could not customize.
>
> It looks like StreamingFileSink takes a single schema / provides a
> ParquetWriter for a specific schema. Since the elements needs to have
> different Avro schema based on the data in the elements I could not use the
> sink as-is (AvroParquetWriters, needs to specify the same Schema for the
> parquetBuilder). So looking a bit deeper I found that there is a
> WriterFactory here: https://tinyurl.com/y68drj35 . This can be extended
> to create a BulkPartWriter based on BucketID. Something like this:
> BulkWriter.Factory writerFactory. In this way you can
> create a unique ParquetWriter for each bucket.  Is there any other option
> to do this?
>
> I considered another option of possibly using a common schema for all the
> different schemas but I have not fully explored that option.
>
> Thanks
> Kailash
>


[no subject]

2019-05-10 Thread an0
> Q2: after a, map(A), and map(B) would work fine. Assign watermarks
> immediatedly after a keyBy() is not a good idea, because 1) the records are
> shuffled and it's hard to reasoning about ordering, and 2) you lose the
> KeyedStream property and would have to keyBy() again (unless you use
> interpreteAsKeyedStream).

I know it is better to assignTimestampsAndWatermarks as early as possible. I 
intentionally put it after keyBy to check my understanding of this specific 
situation because I may have to use assignTimestampsAndWatermarks after keyBy 
in my application. I didn't make my question's intention clear though.

I'm glad to learn another trick from you: reinterpretAsKeyedStream :). Let's 
assume it is keyBy. 
assignTimestampsAndWatermarks.reinterpretAsKeyedStream.timeWindow(C).

I wanted to ask:
Because it is using keyed windows, every key's window is fired independently, 
even if I assignTimestampsAndWatermarks after keyBy and C.2 doesn't have any 
data so generates no watermarks, it won't affect the firing of C.1's windows. 
Is this understand correct?

> Although C.2 does not receive data, it receives watermarks because WMs are
> broadcasted. They flow to any task that is reachable by any event. The case
> that all keys fall into C.1 is not important because a record for C.2 might
> arrive at any point in time. Also Flink does does not give any guarantees
> about how keys (or rather key groups) are assigned to tasks. If you rescale
> the application to a parallelism of 3, the active key group might be
> scheduled to C.2 or C.3.
> 
> Long story short, D makes progress in event time because watermarks are
> broadcasted.

I know watermarks are broadcasted. But I'm using assignTimestampsAndWatermarks 
after keyBy, so C.2 doesn't receive watermarks, it creates watermarks from it's 
received data. Since it doesn't receive any data, it doesn't create any 
watermarks. D couldn't make progress because one of its inputs, C2, doesn't 
make progress. Is this understand correct?

On 2019/05/10 10:38:35, Fabian Hueske  wrote: 
> Hi,
> 
> Again answers below ;-)
> 
> Am Do., 9. Mai 2019 um 17:12 Uhr schrieb an0 :
> 
> > You are right, thanks. But something is still not totally clear to me.
> > I'll reuse your diagram with a little modification:
> >
> > DataStream a = ...
> > a.map(A).map(B).keyBy().timeWindow(C)
> >
> > and execute this with parallelism 2. However, keyBy only generates one
> > single key value, and assume they all go to C.1. Does the data flow look
> > like this?
> >
> > A.1 -- B.1 -/-- C.1
> > /
> > A.2 -- B.2 --/   C.2
> >
> > Will the lack of data into C.2 prevent C.1's windows from firing? Will the
> > location of assignTimestampsAndWatermarks(after a, after map(A), after
> > map(B), after keyBy) matter for the firing of C.1's windows
> 
> By my understanding, the answers are "no" and "no". Correct?
> >
> > Q1: no. Watermarks are propagated even in case of skewed key distribution.
> C.2 will also advance it's event-time clock (based on the WMs) and forward
> new WMs.
> Q2: after a, map(A), and map(B) would work fine. Assign watermarks
> immediatedly after a keyBy() is not a good idea, because 1) the records are
> shuffled and it's hard to reasoning about ordering, and 2) you lose the
> KeyedStream property and would have to keyBy() again (unless you use
> interpreteAsKeyedStream).
> 
> 
> > Now comes the *silly* question: does C.2 exist at all? Since there is only
> > one key value, only one C instance is needed. I could see that C.2 as a
> > physical instance may exist, but as a logical instance it shouldn't exist
> > in the diagram because it is unused. I feel the answer to this silly
> > question may be the most important in understanding my and(perhaps many
> > others') misunderstanding of situations like this.
> >
> > If C.2 exists just because parallelism is set to 2, even though it is not
> > logically needed, and it also serves as an input to the next operator if
> > there is one, then the mystery is completely solved for me.
> >
> > C.2 exists. Flink does not create a flow topology based on data values. As
> soon as there is a record with a key that would need to go to C.2, we would
> need it.
> 
> 
> > Use a concrete example:
> >
> > DataStream a = ...
> >
> > a.map(A).map(B).keyBy().assignTimestampsAndWatermarks(C).timeWindowAll(D)
> >
> > A.1 -- B.1 -/-- C.1 --\
> > / D
> > A.2 -- B.2 --/   C.2 --/
> >
> > D's watermark can not progress because C.2's watermark can not progress,
> > because C.2 doesn't have any input data, even though C.2 is not logically
> > needed but it does exist and it ruins everything :p. Is this understanding
> > correct?
> >
> 
> Although C.2 does not receive data, it receives watermarks because WMs are
> broadcasted. They flow to any task that is reachable by any event. The case
> that all keys fall into C.1 is not important because a record for C.2 might
> arrive at any poin

Re: I want to use MapState on an unkeyed stream

2019-05-10 Thread an0
Got it, thanks.

On 2019/05/10 10:20:40, Fabian Hueske  wrote: 
> Hi,
> 
> RocksDB is only used as local state store. Operator state is not stored in
> RocksDB but only on the TM JVM heap.
> When a checkpoint is taken, the keyed state from RocksDB and the operator
> state from the heap are both copied to a persistent data store (HDFS, S3,
> ...).
> 
> I was trying to find the documentation that explains how operator state is
> managed, but couldn't find it.
> I'll create a Jira to fix that.
> 
> Best, Fabian
> 
> Am Do., 9. Mai 2019 um 16:10 Uhr schrieb an0 :
> 
> > Thanks, I didn't know that. But it is checkpoints to RocksDB, isn't it?
> > BTW, is this special treatment of operator state documented anywhere?
> >
> > On 2019/05/09 07:39:34, Fabian Hueske  wrote:
> > > Hi,
> > >
> > > Yes, IMO it is more clear.
> > > However, you should be aware that operator state is maintained on heap
> > only
> > > (not in RocksDB).
> > >
> > > Best, Fabian
> > >
> > >
> > > Am Mi., 8. Mai 2019 um 20:44 Uhr schrieb an0 :
> > >
> > > > I switched to using operator list state. It is more clear. It is also
> > > > supported by RocksDBKeyedStateBackend, isn't it?
> > > >
> > > > On 2019/05/08 14:42:36, Till Rohrmann  wrote:
> > > > > Hi,
> > > > >
> > > > > if you want to increase the parallelism you could also pick a key
> > > > randomly
> > > > > from a set of keys. The price you would pay is a shuffle operation
> > > > (network
> > > > > I/O) which would not be needed if you were using the unkeyed stream
> > and
> > > > > used the operator list state.
> > > > >
> > > > > However, with keyed state you could also use Flink's
> > > > > RocksDBKeyedStateBackend which allows to go out of core if your state
> > > > size
> > > > > should grow very large.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Tue, May 7, 2019 at 5:57 PM an0  wrote:
> > > > >
> > > > > > But I only have one stream, nothing to connect it to.
> > > > > >
> > > > > > On 2019/05/07 00:15:59, Averell  wrote:
> > > > > > > From my understanding, having a fake keyBy (stream.keyBy(r =>
> > > > > > "dummyString"))
> > > > > > > means there would be only one slot handling the data.
> > > > > > > Would a broadcast function [1] work for your case?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Averell
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Sent from:
> > > > > >
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 


Re: Rich and incrementally aggregating window functions

2019-05-10 Thread an0
Thanks. I know reimplementing windowing myself will work but that's a very bad 
last resort.

I believe it is a very reasonable request. But since someone else has already 
filed a Jira and it was closed as Won't Fix[1], I won't bother refiling it 
again. I'll try something else first. 

[1]
https://issues.apache.org/jira/browse/FLINK-10250

On 2019/05/09 02:07:00, Hequn Cheng  wrote: 
> Hi,
> 
> There is a discussion about this before, you can take a look at it[1].
> Best, Hequn
> 
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-implementation-of-aggregate-function-using-a-ProcessFunction-td23473.html#a23531
> 
> On Thu, May 9, 2019 at 5:14 AM an0  wrote:
> 
> > I want to use ProcessWindowFunction.Context#globalState in my window
> > function. But I don't want to apply ProcessWindowFunction directly to my
> > WindowedStream because I don't want to buffer all the elements of each
> > window. Currently I'm using WindowedStream#aggregate(AggregateFunction,
> > ProcessWindowFunction).
> >
> > My understanding is that RichFunction.runtimeContext also give access to
> > those global states. That thought naturally pointed me to
> > RichAggregateFunction, RichReduceFunction and RichFoldFunction. However,
> > they all cause runtime error like this:
> > "AggregateFunction can not be a RichFunction. Please use
> > fold(AggregateFunction, WindowFunction) instead."
> >
> > So how can I use an incrementally aggregating window function and have
> > access to global states at the same time?
> >
> 


Re: Checkpointing and save pointing

2019-05-10 Thread Fabian Hueske
For checkpoints and savepoints, the JM and all TMs need access to the same
storage system.
This can be shared NFS that is mounted on each machine.

Best, Fabian

Am Fr., 10. Mai 2019 um 15:15 Uhr schrieb Boris Lublinsky <
boris.lublin...@lightbend.com>:

> For now is a regular Link cluster,
> But even there I want to use both check and save pointing.
> We do not want to use Hadoop, but rather shared fs - NFS/Gluster.
> I was trying to see whether volumes need to be mounted only for Job
> manager or both.
> HA is the next step. Trying to find the code saving checkpointing location.
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> On May 10, 2019, at 2:47 AM, Fabian Hueske  wrote:
>
> Hi Boris,
>
> Is your question is in the context of replacing Zookeeper by a different
> service for highly-available setups or are you setting up a regular Flink
> cluster?
>
> Best, Fabian
>
>
>
> Am Mi., 8. Mai 2019 um 06:20 Uhr schrieb Congxian Qiu <
> qcx978132...@gmail.com>:
>
>> Hi, Boris
>>
>> TM will also need to write to the external volume.
>>
>> Best, Congxian
>> On May 8, 2019, 03:56 +0800, Boris Lublinsky <
>> boris.lublin...@lightbend.com>, wrote:
>>
>> I am planning to use external volume for this. My understanding is that
>> it needs to be mounted only to the job manager, not the task managers. Is
>> this correct, or it needs to be mounted to both?
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>>
>


Re: Checkpointing and save pointing

2019-05-10 Thread Boris Lublinsky
For now is a regular Link cluster,
But even there I want to use both check and save pointing.
We do not want to use Hadoop, but rather shared fs - NFS/Gluster.
I was trying to see whether volumes need to be mounted only for Job manager or 
both.
HA is the next step. Trying to find the code saving checkpointing location.


Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On May 10, 2019, at 2:47 AM, Fabian Hueske  wrote:
> 
> Hi Boris,
> 
> Is your question is in the context of replacing Zookeeper by a different 
> service for highly-available setups or are you setting up a regular Flink 
> cluster?
> 
> Best, Fabian
> 
> 
> 
> Am Mi., 8. Mai 2019 um 06:20 Uhr schrieb Congxian Qiu  >:
> Hi, Boris
> 
> TM will also need to write to the external volume.
> 
> Best, Congxian
> On May 8, 2019, 03:56 +0800, Boris Lublinsky  >, wrote:
>> I am planning to use external volume for this. My understanding is that it 
>> needs to be mounted only to the job manager, not the task managers. Is this 
>> correct, or it needs to be mounted to both?
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com 
>> https://www.lightbend.com/ 



Re: How to export all not-null keyed ValueState

2019-05-10 Thread Averell Huyen Levan
Thank you very much, Fabian.

Regards,
Averell

On Fri, May 10, 2019 at 9:46 PM Fabian Hueske  wrote:

> Hi Averell,
>
> I'd go with your approach any state access (given that you use RocksDB
> keyed state) or deduplication of messages is going to be more expensive
> than a simple cast.
>
> Best, Fabian
>
> Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <
> lvhu...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for that. However, as I mentioned in my previous email, that
>> implementation requires a lot of typecasting/object wrapping.
>> I tried to broadcast that Toggle stream - the toggles will be saved as a
>> MapState, and whenever an export trigger record arrived, I send out that
>> MapState. There's no use of applyToKeyedState in this implementation.
>> And the problem I got is I received duplicated output (one from each
>> parallelism-instance).
>>
>> Is there any option to modify the keyed state from within the
>> processBroadcastElement() method?
>>
>> Thanks a lot for your help.
>>
>> Regards,
>> Averell
>>
>>
>> On Fri, May 10, 2019 at 8:52 PM Fabian Hueske  wrote:
>>
>>> Hi Averell,
>>>
>>> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
>>> Since you had both streams keyed, your current solution looks fine to me.
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
>>> lvhu...@gmail.com>:
>>>
 Hi Fabian,

 Sorry, but I am still confused about your guide. If I union the Toggle
 stream with the StateReportTrigger stream, would that means I need to make
 my Toggles broadcasted states? Or there's some way to modify the keyed
 states from within the processBroadcastElement() method?

 I tried to implement the other direction (which I briefed in my
 previous email). It seems working, but I am not confident in that, not sure
 whether it has any flaws. Could you please give your comment?
 In my view, this implementation causes a lot of type-casting for my
 main data stream, which might cause a high impact on performance (my toggle
 is on in only about 1% of the keys, and the rate of input1.left is less
 than a millionth comparing to the rate of input1.right)

 /**
   * This KeyedBroadcastProcessFunction has:
   *input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
   *   input1.left: Toggles in the form of a tuple (Key, Boolean).
   *  When Toggle._2 == true, records from input1.right for the 
 same key will be forwarded to the main output.
   *  If it is false, records from input1.right for that same key 
 will be dropped
   *   input1.right: the main data stream
   *
   *input2: a broadcasted stream of StateReport triggers. When a record 
 arrived on this stream,
   *   the current value of Toggles will be sent out via the outputTag
   */
 class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
   extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
 Any, MyEvent] {

val toggleStateDescriptor = new 
 ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])

override def processElement(in1: Either[Toggle, MyEvent],
 readOnlyContext: 
 KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], Any, 
 MyEvent]#ReadOnlyContext,
 collector: Collector[MyEvent]): Unit = {
   in1 match {
  case Left(toggle) =>
 
 getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
  case Right(event) =>
 if (getRuntimeContext.getState(toggleStateDescriptor).value())
collector.collect(event)
   }
}

override def processBroadcastElement(in2: Any,
context: KeyedBroadcastProcessFunction[Key, 
 Either[Toggle, MyEvent], Any, MyEvent]#Context,
collector: Collector[MyEvent]): Unit = {
   context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: 
 ValueState[Boolean]) =>
  if (s != null) context.output(outputTag, (k, s.value(
}
 }

 Thanks for your help.
 Regards,
 Averell

 On Thu, May 9, 2019 at 7:31 PM Fabian Hueske  wrote:

> Hi,
>
> Passing a Context through a DataStream definitely does not work.
> You'd need to have the keyed state that you want to scan over in the
> KeyedBroadcastProcessFunction.
>
> For the toggle filter use case, you would need to have a unioned
> stream with Toggle and StateReport events.
> For the output, you can use side outputs to route the different
> outputs to different streams.
>
> Best, Fabian
>
> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell :
>
>> Thank you Congxian and Fabian.
>>
>> @Fab

Re: How to export all not-null keyed ValueState

2019-05-10 Thread Fabian Hueske
Hi Averell,

I'd go with your approach any state access (given that you use RocksDB
keyed state) or deduplication of messages is going to be more expensive
than a simple cast.

Best, Fabian

Am Fr., 10. Mai 2019 um 13:08 Uhr schrieb Averell Huyen Levan <
lvhu...@gmail.com>:

> Hi Fabian,
>
> Thanks for that. However, as I mentioned in my previous email, that
> implementation requires a lot of typecasting/object wrapping.
> I tried to broadcast that Toggle stream - the toggles will be saved as a
> MapState, and whenever an export trigger record arrived, I send out that
> MapState. There's no use of applyToKeyedState in this implementation. And
> the problem I got is I received duplicated output (one from each
> parallelism-instance).
>
> Is there any option to modify the keyed state from within the
> processBroadcastElement() method?
>
> Thanks a lot for your help.
>
> Regards,
> Averell
>
>
> On Fri, May 10, 2019 at 8:52 PM Fabian Hueske  wrote:
>
>> Hi Averell,
>>
>> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
>> Since you had both streams keyed, your current solution looks fine to me.
>>
>> Best,
>> Fabian
>>
>> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
>> lvhu...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Sorry, but I am still confused about your guide. If I union the Toggle
>>> stream with the StateReportTrigger stream, would that means I need to make
>>> my Toggles broadcasted states? Or there's some way to modify the keyed
>>> states from within the processBroadcastElement() method?
>>>
>>> I tried to implement the other direction (which I briefed in my previous
>>> email). It seems working, but I am not confident in that, not sure whether
>>> it has any flaws. Could you please give your comment?
>>> In my view, this implementation causes a lot of type-casting for my main
>>> data stream, which might cause a high impact on performance (my toggle is
>>> on in only about 1% of the keys, and the rate of input1.left is less than a
>>> millionth comparing to the rate of input1.right)
>>>
>>> /**
>>>   * This KeyedBroadcastProcessFunction has:
>>>   *input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>>   *   input1.left: Toggles in the form of a tuple (Key, Boolean).
>>>   *  When Toggle._2 == true, records from input1.right for the same 
>>> key will be forwarded to the main output.
>>>   *  If it is false, records from input1.right for that same key 
>>> will be dropped
>>>   *   input1.right: the main data stream
>>>   *
>>>   *input2: a broadcasted stream of StateReport triggers. When a record 
>>> arrived on this stream,
>>>   *   the current value of Toggles will be sent out via the outputTag
>>>   */
>>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>>   extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
>>> Any, MyEvent] {
>>>
>>>val toggleStateDescriptor = new 
>>> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>>
>>>override def processElement(in1: Either[Toggle, MyEvent],
>>> readOnlyContext: KeyedBroadcastProcessFunction[Key, 
>>> Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>>> collector: Collector[MyEvent]): Unit = {
>>>   in1 match {
>>>  case Left(toggle) =>
>>> 
>>> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>>  case Right(event) =>
>>> if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>>collector.collect(event)
>>>   }
>>>}
>>>
>>>override def processBroadcastElement(in2: Any,
>>>context: KeyedBroadcastProcessFunction[Key, 
>>> Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>>collector: Collector[MyEvent]): Unit = {
>>>   context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: 
>>> ValueState[Boolean]) =>
>>>  if (s != null) context.output(outputTag, (k, s.value(
>>>}
>>> }
>>>
>>> Thanks for your help.
>>> Regards,
>>> Averell
>>>
>>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske  wrote:
>>>
 Hi,

 Passing a Context through a DataStream definitely does not work.
 You'd need to have the keyed state that you want to scan over in the
 KeyedBroadcastProcessFunction.

 For the toggle filter use case, you would need to have a unioned stream
 with Toggle and StateReport events.
 For the output, you can use side outputs to route the different outputs
 to different streams.

 Best, Fabian

 Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell :

> Thank you Congxian and Fabian.
>
> @Fabian: could you please give a bit more details? My understanding
> is: to
> pass the context itself and an OutputTag to the KeyedStateFunction
> parameter
> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from

Re: How to export all not-null keyed ValueState

2019-05-10 Thread Averell Huyen Levan
Hi Fabian,

Thanks for that. However, as I mentioned in my previous email, that
implementation requires a lot of typecasting/object wrapping.
I tried to broadcast that Toggle stream - the toggles will be saved as a
MapState, and whenever an export trigger record arrived, I send out that
MapState. There's no use of applyToKeyedState in this implementation. And
the problem I got is I received duplicated output (one from each
parallelism-instance).

Is there any option to modify the keyed state from within the
processBroadcastElement() method?

Thanks a lot for your help.

Regards,
Averell


On Fri, May 10, 2019 at 8:52 PM Fabian Hueske  wrote:

> Hi Averell,
>
> Ah, sorry. I had assumed the toggle events where broadcasted anyway.
> Since you had both streams keyed, your current solution looks fine to me.
>
> Best,
> Fabian
>
> Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
> lvhu...@gmail.com>:
>
>> Hi Fabian,
>>
>> Sorry, but I am still confused about your guide. If I union the Toggle
>> stream with the StateReportTrigger stream, would that means I need to make
>> my Toggles broadcasted states? Or there's some way to modify the keyed
>> states from within the processBroadcastElement() method?
>>
>> I tried to implement the other direction (which I briefed in my previous
>> email). It seems working, but I am not confident in that, not sure whether
>> it has any flaws. Could you please give your comment?
>> In my view, this implementation causes a lot of type-casting for my main
>> data stream, which might cause a high impact on performance (my toggle is
>> on in only about 1% of the keys, and the rate of input1.left is less than a
>> millionth comparing to the rate of input1.right)
>>
>> /**
>>   * This KeyedBroadcastProcessFunction has:
>>   *input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>>   *   input1.left: Toggles in the form of a tuple (Key, Boolean).
>>   *  When Toggle._2 == true, records from input1.right for the same 
>> key will be forwarded to the main output.
>>   *  If it is false, records from input1.right for that same key 
>> will be dropped
>>   *   input1.right: the main data stream
>>   *
>>   *input2: a broadcasted stream of StateReport triggers. When a record 
>> arrived on this stream,
>>   *   the current value of Toggles will be sent out via the outputTag
>>   */
>> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>>   extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
>> Any, MyEvent] {
>>
>>val toggleStateDescriptor = new 
>> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>>
>>override def processElement(in1: Either[Toggle, MyEvent],
>> readOnlyContext: KeyedBroadcastProcessFunction[Key, 
>> Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
>> collector: Collector[MyEvent]): Unit = {
>>   in1 match {
>>  case Left(toggle) =>
>> 
>> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>>  case Right(event) =>
>> if (getRuntimeContext.getState(toggleStateDescriptor).value())
>>collector.collect(event)
>>   }
>>}
>>
>>override def processBroadcastElement(in2: Any,
>>context: KeyedBroadcastProcessFunction[Key, 
>> Either[Toggle, MyEvent], Any, MyEvent]#Context,
>>collector: Collector[MyEvent]): Unit = {
>>   context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: 
>> ValueState[Boolean]) =>
>>  if (s != null) context.output(outputTag, (k, s.value(
>>}
>> }
>>
>> Thanks for your help.
>> Regards,
>> Averell
>>
>> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Passing a Context through a DataStream definitely does not work.
>>> You'd need to have the keyed state that you want to scan over in the
>>> KeyedBroadcastProcessFunction.
>>>
>>> For the toggle filter use case, you would need to have a unioned stream
>>> with Toggle and StateReport events.
>>> For the output, you can use side outputs to route the different outputs
>>> to different streams.
>>>
>>> Best, Fabian
>>>
>>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell :
>>>
 Thank you Congxian and Fabian.

 @Fabian: could you please give a bit more details? My understanding is:
 to
 pass the context itself and an OutputTag to the KeyedStateFunction
 parameter
 of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
 within that KeyedStateFunction.process() send out the side output. Am I
 understand your idea correctly?

 BTW, I have another question regarding KeyedBroadcastProcessFunction
 best
 practice: I am having two streams: Data and Toggle. The stream Toggle is
 just a keyed boolean stream, being used to filter data from the stream
 Data.
 And I am implementing that filter us

Re: How to export all not-null keyed ValueState

2019-05-10 Thread Fabian Hueske
Hi Averell,

Ah, sorry. I had assumed the toggle events where broadcasted anyway.
Since you had both streams keyed, your current solution looks fine to me.

Best,
Fabian

Am Fr., 10. Mai 2019 um 03:13 Uhr schrieb Averell Huyen Levan <
lvhu...@gmail.com>:

> Hi Fabian,
>
> Sorry, but I am still confused about your guide. If I union the Toggle
> stream with the StateReportTrigger stream, would that means I need to make
> my Toggles broadcasted states? Or there's some way to modify the keyed
> states from within the processBroadcastElement() method?
>
> I tried to implement the other direction (which I briefed in my previous
> email). It seems working, but I am not confident in that, not sure whether
> it has any flaws. Could you please give your comment?
> In my view, this implementation causes a lot of type-casting for my main
> data stream, which might cause a high impact on performance (my toggle is
> on in only about 1% of the keys, and the rate of input1.left is less than a
> millionth comparing to the rate of input1.right)
>
> /**
>   * This KeyedBroadcastProcessFunction has:
>   *input1: a keyed `DataStream[Either[Toggle, MyEvent]]`:
>   *   input1.left: Toggles in the form of a tuple (Key, Boolean).
>   *  When Toggle._2 == true, records from input1.right for the same 
> key will be forwarded to the main output.
>   *  If it is false, records from input1.right for that same key will 
> be dropped
>   *   input1.right: the main data stream
>   *
>   *input2: a broadcasted stream of StateReport triggers. When a record 
> arrived on this stream,
>   *   the current value of Toggles will be sent out via the outputTag
>   */
> class EventFilterAndExportToggles(outputTag: OutputTag[Toggle])
>   extends KeyedBroadcastProcessFunction[Key, Either[Toggle, MyEvent], 
> Any, MyEvent] {
>
>val toggleStateDescriptor = new 
> ValueStateDescriptor[Boolean]("MyEventToggle", classOf[Boolean])
>
>override def processElement(in1: Either[Toggle, MyEvent],
> readOnlyContext: KeyedBroadcastProcessFunction[Key, 
> Either[Toggle, MyEvent], Any, MyEvent]#ReadOnlyContext,
> collector: Collector[MyEvent]): Unit = {
>   in1 match {
>  case Left(toggle) =>
> 
> getRuntimeContext.getState(toggleStateDescriptor).update(toggle._2)
>  case Right(event) =>
> if (getRuntimeContext.getState(toggleStateDescriptor).value())
>collector.collect(event)
>   }
>}
>
>override def processBroadcastElement(in2: Any,
>context: KeyedBroadcastProcessFunction[Key, 
> Either[Toggle, MyEvent], Any, MyEvent]#Context,
>collector: Collector[MyEvent]): Unit = {
>   context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: 
> ValueState[Boolean]) =>
>  if (s != null) context.output(outputTag, (k, s.value(
>}
> }
>
> Thanks for your help.
> Regards,
> Averell
>
> On Thu, May 9, 2019 at 7:31 PM Fabian Hueske  wrote:
>
>> Hi,
>>
>> Passing a Context through a DataStream definitely does not work.
>> You'd need to have the keyed state that you want to scan over in the
>> KeyedBroadcastProcessFunction.
>>
>> For the toggle filter use case, you would need to have a unioned stream
>> with Toggle and StateReport events.
>> For the output, you can use side outputs to route the different outputs
>> to different streams.
>>
>> Best, Fabian
>>
>> Am Do., 9. Mai 2019 um 10:34 Uhr schrieb Averell :
>>
>>> Thank you Congxian and Fabian.
>>>
>>> @Fabian: could you please give a bit more details? My understanding is:
>>> to
>>> pass the context itself and an OutputTag to the KeyedStateFunction
>>> parameter
>>> of  KeyedBroadcastProcessFunction#Context.applyToKeyedState(), and from
>>> within that KeyedStateFunction.process() send out the side output. Am I
>>> understand your idea correctly?
>>>
>>> BTW, I have another question regarding KeyedBroadcastProcessFunction best
>>> practice: I am having two streams: Data and Toggle. The stream Toggle is
>>> just a keyed boolean stream, being used to filter data from the stream
>>> Data.
>>> And I am implementing that filter using a simple RichCoFlatMapFunction.
>>>
>>> Now that I want to export the list of keys which are currently toggled
>>> on.
>>> Should I
>>> (1) have one additional KeyedBroadcastProcessFunction operator (which has
>>> Toggle and BroadCast as the input streams), or
>>> (2) replace that RichCoFlatMapFunction with a new
>>> KeyedBroadcastProcessFunction, which has both functionalities: filter and
>>> export? Doing this would require unioning Toggle and Data into one single
>>> keyed stream.
>>>
>>> Thanks and best regards,
>>> Averell
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re:

2019-05-10 Thread Fabian Hueske
Hi,

Again answers below ;-)

Am Do., 9. Mai 2019 um 17:12 Uhr schrieb an0 :

> You are right, thanks. But something is still not totally clear to me.
> I'll reuse your diagram with a little modification:
>
> DataStream a = ...
> a.map(A).map(B).keyBy().timeWindow(C)
>
> and execute this with parallelism 2. However, keyBy only generates one
> single key value, and assume they all go to C.1. Does the data flow look
> like this?
>
> A.1 -- B.1 -/-- C.1
> /
> A.2 -- B.2 --/   C.2
>
> Will the lack of data into C.2 prevent C.1's windows from firing? Will the
> location of assignTimestampsAndWatermarks(after a, after map(A), after
> map(B), after keyBy) matter for the firing of C.1's windows

By my understanding, the answers are "no" and "no". Correct?
>
> Q1: no. Watermarks are propagated even in case of skewed key distribution.
C.2 will also advance it's event-time clock (based on the WMs) and forward
new WMs.
Q2: after a, map(A), and map(B) would work fine. Assign watermarks
immediatedly after a keyBy() is not a good idea, because 1) the records are
shuffled and it's hard to reasoning about ordering, and 2) you lose the
KeyedStream property and would have to keyBy() again (unless you use
interpreteAsKeyedStream).


> Now comes the *silly* question: does C.2 exist at all? Since there is only
> one key value, only one C instance is needed. I could see that C.2 as a
> physical instance may exist, but as a logical instance it shouldn't exist
> in the diagram because it is unused. I feel the answer to this silly
> question may be the most important in understanding my and(perhaps many
> others') misunderstanding of situations like this.
>
> If C.2 exists just because parallelism is set to 2, even though it is not
> logically needed, and it also serves as an input to the next operator if
> there is one, then the mystery is completely solved for me.
>
> C.2 exists. Flink does not create a flow topology based on data values. As
soon as there is a record with a key that would need to go to C.2, we would
need it.


> Use a concrete example:
>
> DataStream a = ...
>
> a.map(A).map(B).keyBy().assignTimestampsAndWatermarks(C).timeWindowAll(D)
>
> A.1 -- B.1 -/-- C.1 --\
> / D
> A.2 -- B.2 --/   C.2 --/
>
> D's watermark can not progress because C.2's watermark can not progress,
> because C.2 doesn't have any input data, even though C.2 is not logically
> needed but it does exist and it ruins everything :p. Is this understanding
> correct?
>

Although C.2 does not receive data, it receives watermarks because WMs are
broadcasted. They flow to any task that is reachable by any event. The case
that all keys fall into C.1 is not important because a record for C.2 might
arrive at any point in time. Also Flink does does not give any guarantees
about how keys (or rather key groups) are assigned to tasks. If you rescale
the application to a parallelism of 3, the active key group might be
scheduled to C.2 or C.3.

Long story short, D makes progress in event time because watermarks are
broadcasted.


>
> On 2019/05/09 10:01:44, Fabian Hueske  wrote:
> > Hi,
> >
> > Please find my response below.
> >
> > Am Fr., 3. Mai 2019 um 16:16 Uhr schrieb an0 :
> >
> > > Thanks, but it does't seem covering this rule:
> > > --- Quote
> > > Watermarks are generated at, or directly after, source functions. Each
> > > parallel subtask of a source function usually generates its watermarks
> > > independently. These watermarks define the event time at that
> particular
> > > parallel source.
> > >
> > > As the watermarks flow through the streaming program, they advance the
> > > event time at the operators where they arrive. Whenever an operator
> > > advances its event time, it generates a new watermark downstream for
> its
> > > successor operators.
> > >
> > > Some operators consume multiple input streams; a union, for example, or
> > > operators following a keyBy(…) or partition(…) function. Such an
> operator’s
> > > current event time is the minimum of its input streams’ event times.
> As its
> > > input streams update their event times, so does the operator.
> > > --- End Quote
> > >
> > > The most relevant part, I believe, is this:
> > > "Some operators consume multiple input streams…operators following a
> > > keyBy(…) function. Such an operator’s current event time is the
> minimum of
> > > its input streams’ event times."
> > >
> > > But the wording of "current event time is the minimum of its input
> > > streams’ event times" actually implies that the input streams(produced
> by
> > > keyBy) have different watermarks, the exactly opposite of what you just
> > > explained.
> > >
> > >
> > IMO, the description in the documentation is correct, but looks at the
> > issue from a different angle.
> > An operator task typically has many input from which it receives records.
> > Depending on the number of input operators (one ore more) and the
> > connection between 

Re: Reduce key state

2019-05-10 Thread Fabian Hueske
Hi Frank,

By default, Flink does not remove any state. It is the responsibility of
the developer to ensure that an application does not leak state.
Typically, you would use timers [1] to discard state that expired and is
not useful anymore.

In the last release 1.8, we added lazy cleanup strategies for State TTL
(time-to-live) [2] [3].
You configure a time-to-live when you define state and state is removed if
it wasn't touched for the configured interval of time. There are a bunch of
config options (is read and/or write considered as touch, strict read
protection, etc.). Right now, only processing time is supported.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#timers
[2]
https://flink.apache.org/news/2019/04/09/release-1.8.0.html#new-features-and-improvements
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl

Am Do., 9. Mai 2019 um 15:01 Uhr schrieb Frank Wilson :

> Hi,
>
> In an unwindowed key stream while using event time semantics is state
> stored indefinitely or does it get expired eventually (was wondering if the
> state inherits the event time of the element that updated, and if it
> expires when the watermark goes past it).
>
> Thanks,
>
> Frank
>


Re: I want to use MapState on an unkeyed stream

2019-05-10 Thread Fabian Hueske
Hi,

RocksDB is only used as local state store. Operator state is not stored in
RocksDB but only on the TM JVM heap.
When a checkpoint is taken, the keyed state from RocksDB and the operator
state from the heap are both copied to a persistent data store (HDFS, S3,
...).

I was trying to find the documentation that explains how operator state is
managed, but couldn't find it.
I'll create a Jira to fix that.

Best, Fabian

Am Do., 9. Mai 2019 um 16:10 Uhr schrieb an0 :

> Thanks, I didn't know that. But it is checkpoints to RocksDB, isn't it?
> BTW, is this special treatment of operator state documented anywhere?
>
> On 2019/05/09 07:39:34, Fabian Hueske  wrote:
> > Hi,
> >
> > Yes, IMO it is more clear.
> > However, you should be aware that operator state is maintained on heap
> only
> > (not in RocksDB).
> >
> > Best, Fabian
> >
> >
> > Am Mi., 8. Mai 2019 um 20:44 Uhr schrieb an0 :
> >
> > > I switched to using operator list state. It is more clear. It is also
> > > supported by RocksDBKeyedStateBackend, isn't it?
> > >
> > > On 2019/05/08 14:42:36, Till Rohrmann  wrote:
> > > > Hi,
> > > >
> > > > if you want to increase the parallelism you could also pick a key
> > > randomly
> > > > from a set of keys. The price you would pay is a shuffle operation
> > > (network
> > > > I/O) which would not be needed if you were using the unkeyed stream
> and
> > > > used the operator list state.
> > > >
> > > > However, with keyed state you could also use Flink's
> > > > RocksDBKeyedStateBackend which allows to go out of core if your state
> > > size
> > > > should grow very large.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Tue, May 7, 2019 at 5:57 PM an0  wrote:
> > > >
> > > > > But I only have one stream, nothing to connect it to.
> > > > >
> > > > > On 2019/05/07 00:15:59, Averell  wrote:
> > > > > > From my understanding, having a fake keyBy (stream.keyBy(r =>
> > > > > "dummyString"))
> > > > > > means there would be only one slot handling the data.
> > > > > > Would a broadcast function [1] work for your case?
> > > > > >
> > > > > > Regards,
> > > > > > Averell
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Sent from:
> > > > >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Checkpointing and save pointing

2019-05-10 Thread Fabian Hueske
Hi Boris,

Is your question is in the context of replacing Zookeeper by a different
service for highly-available setups or are you setting up a regular Flink
cluster?

Best, Fabian



Am Mi., 8. Mai 2019 um 06:20 Uhr schrieb Congxian Qiu <
qcx978132...@gmail.com>:

> Hi, Boris
>
> TM will also need to write to the external volume.
>
> Best, Congxian
> On May 8, 2019, 03:56 +0800, Boris Lublinsky <
> boris.lublin...@lightbend.com>, wrote:
>
> I am planning to use external volume for this. My understanding is that it
> needs to be mounted only to the job manager, not the task managers. Is this
> correct, or it needs to be mounted to both?
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
>