Re: Best practices to maintain reference data for Flink Jobs

2017-05-19 Thread Sand Stone
Also, took a quick read on side input. it's unclear to me how side
input could solve this issue better.

At a high level, this is what I have in mind:
flatmap(byte[] value, Collector<> output) {
   var iter = someStoreStateObject.seek(akeyprefix);
//or seek(akeyprefix, akeysuffix);
for(byte[] key : iter) {}
}

Thanks for your time!


On Fri, May 19, 2017 at 10:03 AM, Sand Stone <sand.m.st...@gmail.com> wrote:
> Thanks Gordon and Fabian.
>
> The enriching data is really reference data, e.g. the reverseIP
> database. It's hard to be keyed as the main data stream as the "ip
> address" in the event is not a primary key in the main data stream.
>
> QueryableState is close, but it does not support range scan as far as
> I could tell. The remote datastore has a clean semantics: a logical
> single copy plus supports range scan, but the RPC to another cluster
> is not optimal.
>
> I assume this is a quite common streaming processing pattern for Flink
> based services.
>
>
> On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>> +1 to what Gordon said.
>>
>> Queryable state is rather meant as an external interface to streaming jobs
>> than for lookups within jobs.
>> Accessing co-located state should give you better performance and is
>> probably easier to implement and maintain.
>>
>> Cheers,
>> Fabian
>>
>> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:
>>>
>>> Hi,
>>>
>>> Can the enriching data be keyed? Or is it something that has to be
>>> broadcasted to each operator?
>>> Either way, I think Side Inputs (an upcoming feature in the future) is the
>>> best fit for this. You can take a look at
>>> https://issues.apache.org/jira/browse/FLINK-6131.
>>>
>>> Regarding the 3 options you listed:
>>>
>>> By using QueryableState in option B, what you mean is that you want to
>>> feed the enriching data stream to a separate job, let that job allow
>>> queryable state, and query that state from the actual application job
>>> operators, correct? If so, I think options A and B would mean the same
>>> thing; i.e., they require accessing data external to the job.
>>>
>>> If the enriching data can somehow be keyed with the stream that requires
>>> it, I would go for option C using connected streams, with the enriching data
>>> as one input and the actual data as the other. Instead of just “caching the
>>> enriching data in memory”, you should register it as a managed Link state
>>> for the CoMapFunction / CoFlatMapFunction. The actual input stream records
>>> can just access that registered state locally.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.st...@gmail.com) wrote:
>>>
>>> Hi. Say I have a few reference data sets need to be used for a
>>> streaming job. The sizes range between 10M-10GB. The data is not
>>> static, will be refreshed at minutes and/or day intervals.
>>>
>>> With the new advancements in Flink, it seems there are quite a few
>>> options.
>>> A. Store all the data in an external (kv) database cluster. And use
>>> async io calls
>>> * data refresh can be done in a few different ways
>>> B. Use the new Querytable State feature
>>> * it seems there is no "easy" API to discover the
>>> queryable state at the moment. Need to use the restful API to figure
>>> out the job id.
>>> C. Ingest the reference data into the job and cache them in memory
>>> Any other option?
>>>
>>> On paper, it seems option B with the Queryable State is the cleanest
>>> solution.
>>>
>>> Any comment/suggestion is greatly appreciated in particular in terms
>>> of robustness and consistent recovery.
>>>
>>> Thanks much!
>>
>>


Re: Best practices to maintain reference data for Flink Jobs

2017-05-19 Thread Sand Stone
Thanks Gordon and Fabian.

The enriching data is really reference data, e.g. the reverseIP
database. It's hard to be keyed as the main data stream as the "ip
address" in the event is not a primary key in the main data stream.

QueryableState is close, but it does not support range scan as far as
I could tell. The remote datastore has a clean semantics: a logical
single copy plus supports range scan, but the RPC to another cluster
is not optimal.

I assume this is a quite common streaming processing pattern for Flink
based services.


On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> +1 to what Gordon said.
>
> Queryable state is rather meant as an external interface to streaming jobs
> than for lookups within jobs.
> Accessing co-located state should give you better performance and is
> probably easier to implement and maintain.
>
> Cheers,
> Fabian
>
> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:
>>
>> Hi,
>>
>> Can the enriching data be keyed? Or is it something that has to be
>> broadcasted to each operator?
>> Either way, I think Side Inputs (an upcoming feature in the future) is the
>> best fit for this. You can take a look at
>> https://issues.apache.org/jira/browse/FLINK-6131.
>>
>> Regarding the 3 options you listed:
>>
>> By using QueryableState in option B, what you mean is that you want to
>> feed the enriching data stream to a separate job, let that job allow
>> queryable state, and query that state from the actual application job
>> operators, correct? If so, I think options A and B would mean the same
>> thing; i.e., they require accessing data external to the job.
>>
>> If the enriching data can somehow be keyed with the stream that requires
>> it, I would go for option C using connected streams, with the enriching data
>> as one input and the actual data as the other. Instead of just “caching the
>> enriching data in memory”, you should register it as a managed Link state
>> for the CoMapFunction / CoFlatMapFunction. The actual input stream records
>> can just access that registered state locally.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.st...@gmail.com) wrote:
>>
>> Hi. Say I have a few reference data sets need to be used for a
>> streaming job. The sizes range between 10M-10GB. The data is not
>> static, will be refreshed at minutes and/or day intervals.
>>
>> With the new advancements in Flink, it seems there are quite a few
>> options.
>> A. Store all the data in an external (kv) database cluster. And use
>> async io calls
>> * data refresh can be done in a few different ways
>> B. Use the new Querytable State feature
>> * it seems there is no "easy" API to discover the
>> queryable state at the moment. Need to use the restful API to figure
>> out the job id.
>> C. Ingest the reference data into the job and cache them in memory
>> Any other option?
>>
>> On paper, it seems option B with the Queryable State is the cleanest
>> solution.
>>
>> Any comment/suggestion is greatly appreciated in particular in terms
>> of robustness and consistent recovery.
>>
>> Thanks much!
>
>


Best practices to maintain reference data for Flink Jobs

2017-05-18 Thread Sand Stone
Hi. Say I have a few reference data sets need to be used for a
streaming job. The sizes range between 10M-10GB. The data is not
static, will be refreshed at minutes and/or day intervals.

With the new advancements in Flink, it seems there are quite a few options.
   A. Store all the data in an external (kv) database cluster. And use
async io calls
  * data refresh can be done in a few different ways
   B. Use the new Querytable State feature
* it seems there is no "easy" API to discover the
queryable state at the moment. Need to use the restful API to figure
out the job id.
   C. Ingest the reference data into the job and cache them in memory
Any other option?

On paper, it seems option B with the Queryable State is the cleanest solution.

Any comment/suggestion is greatly appreciated in particular in terms
of robustness and consistent recovery.

Thanks much!


Re: Flink docs in regards to State

2017-04-27 Thread Sand Stone
Thanks Stefan. The logical data model of Map<EventKey, Map<UserKey,
Value>> makes total sense.  A related question, the MapState supports
iterate. What's the encoding format at the RocksDB layer? Or rather
how a user could control the user key encoding?

I assume the implementation uses a compound key format:
EventKeyUserKey.  Let's assume UserKey is an int or long. If using big
endian, the iterate will return UserKey in order as stored in the
RocksDB.

Thanks!


On Thu, Apr 27, 2017 at 6:34 AM, Stefan Richter
<s.rich...@data-artisans.com> wrote:
> Hi,
>
> you can imagine the internals of keyed map state working like a Map<EventKey, 
> Map<UserKey, Value>>, but you only deal with the Map<UserKey, Value> part in 
> your user code. Under the hood, Flink will always present you the map that 
> corresponds to the currently processed even’s key. So for each element, it 
> will always swap in the inner map, basically doing lookup to the outer map by 
> event key for you.
>
> For operator state (which is state that is not by key) there are currently no 
> map states and also no implementation for RocksDB. We might introduce this in 
> the future but until now, it was never really required because large state is 
> typically by key. So what you can do is just maintaining e.g.  a 
> java.util.Map<UserKey, Value> yourself and write it to a ListState at 
> checkpointing time. The list aspect in operator state is different from the 
> keyed ListState: list elements build the atoms of state re-distribution 
> (think you scaling in or out). So you could store your complete map as one 
> list element, or each entry as one list element, or anything in between - 
> depending on if and how your operator state can be re-sharded. You could take 
> a look at FlinkKafkaConsumerBase::initializeState and 
> FlinkKafkaConsumerBase::snapshotState as an example, where Kafka partition 
> offsets are the operator state and individual offsets become list elements so 
> that they can be individually redistributed.
>
> Best,
> Stefan
>
>
>> Am 26.04.2017 um 17:24 schrieb Sand Stone <sand.m.st...@gmail.com>:
>>
>> To be clear, I like the direction of Flink is going with State:
>> Querytable State, MapState etc. MapState in particular is a great
>> feature and I am trying to find more documentation and/or usage
>> patterns with it before I dive into the deep end of the code. As far
>> as I can tell, the key in MapState does not have to be associated with
>> the key in keyed stream. So in theory, I should be able to use
>> MapState almost anywhere that accepts "RichXXX" functions.
>>
>> Also, I wonder if it makes sense to have "global state" (stored in a
>> rocksdb backend) to be instantiated by Env and maintained by
>> JobManager. Sure the state access is RPC but the database lifetime is
>> maintained by the Flink cluster. Right now I think I could use a "long
>> running" job to expose a Queryable State to emulate this.
>>
>> Thanks!
>>
>>
>>
>> On Wed, Apr 26, 2017 at 8:01 AM, Timo Walther <twal...@apache.org> wrote:
>>> Hi,
>>>
>>> you are right. There are some limitation about RichReduceFunctions on
>>> windows. Maybe the new AggregateFunction `window.aggregate()` could solve
>>> your problem, you can provide an accumulator which is your custom state that
>>> you can update for each record. I couldn't find a documentation page, it
>>> might be created in next weeks after the feature freeze.
>>>
>>> Regarding the MapState I loop in Stefan, maybe he can give you some advice
>>> here.
>>>
>>> Timo
>>>
>>>
>>>
>>>
>>> Am 26/04/17 um 04:25 schrieb Sand Stone:
>>>
>>>> Hi, Flink newbie here.
>>>>
>>>> I played with the API (built from GitHub master), I encountered some
>>>> issues but I am not sure if they are limitations or actually by
>>>> design:
>>>> 1. the data stream reduce method does not take a
>>>> RichReduceFunction. The code compiles but throws runtime exception
>>>> when submitted. [My intent is to maintain a MapState, more below]
>>>>
>>>>  2. Flink seems to be picky on where the MapState is used at
>>>> runtime. MapState is restricted to keyed stream, and cannot be used
>>>> with certain operators. However I might need to maintain a MapState
>>>> for certain (persistent) keyed state for processing contexts. [I could
>>>> use an external kv store via async io API, but I am hoping Flink could
>>>> help to maintain the (rocksdb) db instances so I could avoid another
>>>> layer of external store].
>>>>
>>>> Any pointer to blog/doc/video is greatly appreciated.
>>>>
>>>> Thanks!
>>>
>>>
>>>
>


Re: Flink docs in regards to State

2017-04-26 Thread Sand Stone
To be clear, I like the direction of Flink is going with State:
Querytable State, MapState etc. MapState in particular is a great
feature and I am trying to find more documentation and/or usage
patterns with it before I dive into the deep end of the code. As far
as I can tell, the key in MapState does not have to be associated with
the key in keyed stream. So in theory, I should be able to use
MapState almost anywhere that accepts "RichXXX" functions.

Also, I wonder if it makes sense to have "global state" (stored in a
rocksdb backend) to be instantiated by Env and maintained by
JobManager. Sure the state access is RPC but the database lifetime is
maintained by the Flink cluster. Right now I think I could use a "long
running" job to expose a Queryable State to emulate this.

Thanks!



On Wed, Apr 26, 2017 at 8:01 AM, Timo Walther <twal...@apache.org> wrote:
> Hi,
>
> you are right. There are some limitation about RichReduceFunctions on
> windows. Maybe the new AggregateFunction `window.aggregate()` could solve
> your problem, you can provide an accumulator which is your custom state that
> you can update for each record. I couldn't find a documentation page, it
> might be created in next weeks after the feature freeze.
>
> Regarding the MapState I loop in Stefan, maybe he can give you some advice
> here.
>
> Timo
>
>
>
>
> Am 26/04/17 um 04:25 schrieb Sand Stone:
>
>> Hi, Flink newbie here.
>>
>> I played with the API (built from GitHub master), I encountered some
>> issues but I am not sure if they are limitations or actually by
>> design:
>>  1. the data stream reduce method does not take a
>> RichReduceFunction. The code compiles but throws runtime exception
>> when submitted. [My intent is to maintain a MapState, more below]
>>
>>   2. Flink seems to be picky on where the MapState is used at
>> runtime. MapState is restricted to keyed stream, and cannot be used
>> with certain operators. However I might need to maintain a MapState
>> for certain (persistent) keyed state for processing contexts. [I could
>> use an external kv store via async io API, but I am hoping Flink could
>> help to maintain the (rocksdb) db instances so I could avoid another
>> layer of external store].
>>
>> Any pointer to blog/doc/video is greatly appreciated.
>>
>> Thanks!
>
>
>


Flink docs in regards to State

2017-04-25 Thread Sand Stone
Hi, Flink newbie here.

I played with the API (built from GitHub master), I encountered some
issues but I am not sure if they are limitations or actually by
design:
1. the data stream reduce method does not take a
RichReduceFunction. The code compiles but throws runtime exception
when submitted. [My intent is to maintain a MapState, more below]

 2. Flink seems to be picky on where the MapState is used at
runtime. MapState is restricted to keyed stream, and cannot be used
with certain operators. However I might need to maintain a MapState
for certain (persistent) keyed state for processing contexts. [I could
use an external kv store via async io API, but I am hoping Flink could
help to maintain the (rocksdb) db instances so I could avoid another
layer of external store].

Any pointer to blog/doc/video is greatly appreciated.

Thanks!