Re: Multi-stream SQL-like processing

2020-11-05 Thread Krzysztof Zarzycki
Yes,
kafka connect supports topics.regex option for Sink connectors. The
connector automatically discovers new topics which fit the regex pattern.
It's similar with source connectors, which discover tables in a SQL
database and save them to Kafka topics.


czw., 5 lis 2020 o 04:16 Jark Wu  napisał(a):

> Yes. The dynamism might be a problem.
> Does Kafka Connect support discovering new tables and synchronizing them
> dynamically?
>
> Best,
> Jark
>
> On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki 
> wrote:
>
>> Hi Jark, thanks for joining the discussion!
>> I understand your point of view that SQL environment is probably not the
>> best for what I was looking to achieve.
>> The idea of a configuration tool sounds almost perfect :) Almost ,
>> because:
>> Without the "StatementSet" that you mentioned at the end I would be
>> worried about resource consumption (job & task manager objects, buffers,
>> connections) of having one topology per table. That would be a significant
>> loss against architecture of Kafka Connect kind.
>> With StatementSet I understand this is not a case, but there is another
>> issue: We lose the dynamism. That is, the job won't be able to discover new
>> tables. We would need to always restart the whole (reconfigured)
>> StatementSet job. (Anyway, this approach sounds good enough to try it out
>> in my current assignment.)
>> The other issue I see is that I still need to define the DSL for the
>> configuration(sth like config of KConnect). SQL will not be it, it will
>> probably be barely a way to implement the tool.
>>
>> I would appreciate your comments, Jark.
>> Also if anyone would like to add other ideas, feel welcome!
>>
>> Best,
>> Krzysztof
>>
>> śr., 4 lis 2020 o 09:37 Jark Wu  napisał(a):
>>
>>> Hi Krzysztof,
>>>
>>> This is a very interesting idea.
>>>
>>> I think SQL is not a suitable tool for this use case, because SQL is a
>>> structured query language
>>>  where the table schema is fixed and never changes during job running.
>>>
>>> However, I think it can be a configuration tool project on top of Flink
>>> SQL.
>>> The configuration tool can dynamically generate all the queries
>>> according to the config
>>>  and submit them in one job.
>>>
>>> For example, if the configuration says "synchronize from mysql address
>>> '' to kafka broker ''",
>>> then the generated Flink SQL would like:
>>>
>>> CREATE TABLE db (
>>>   `database_name` STRING,
>>>   `table_name` STRING,
>>>   `data` BYTES  // encodes all the columns value, can be a better
>>> structure for performance
>>> ) WITH (
>>>   connector = ...   // a new connector scan all tables from the mysql
>>> address
>>>   url = 'jdbc:mysql://localhost:3306/flink-test'
>>> );
>>>
>>> // the configuration tool will generate multiple INSERT INTO according
>>> to how many tables in the DB
>>> INSERT INTO kafka_table1
>>> SELECT parse_data(table_name, data)   // the parse_data UDF will infer
>>> schema from database
>>> FROM db WHERE table = 'table1'// or schema registry and
>>> deserialize the data into columns with different types.
>>>
>>> INSERT INTO kafka_table2
>>> SELECT parse_data(table_name, data)
>>> FROM db WHERE table = 'table2'
>>>
>>> ...
>>>
>>> The configuration tool can use `StatementSet` to package all the INSERT
>>> INTO queries together and submit them in one job.
>>> With the `StatementSet`, the job will share the common source task, so
>>> the tables in MySQL are only read once.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki 
>>> wrote:
>>>
>>>> Hi community, I would like to confront one idea with you.
>>>>
>>>> I was thinking that Flink SQL could be a Flink's answer for Kafka
>>>> Connect (more powerful, with advantages like being decoupled from Kafka).
>>>> Flink SQL would be the configuration language for Flink "connectors",
>>>> sounds great!.
>>>> But one thing does not allow me to implement this idea: There is no
>>>> possibility to run SQL-based processing over multiple similar inputs and
>>>> produce multiple similar outp

Multi-stream SQL-like processing

2020-11-02 Thread Krzysztof Zarzycki
Hi community, I would like to confront one idea with you.

I was thinking that Flink SQL could be a Flink's answer for Kafka Connect
(more powerful, with advantages like being decoupled from Kafka). Flink SQL
would be the configuration language for Flink "connectors", sounds great!.
But one thing does not allow me to implement this idea: There is no
possibility to run SQL-based processing over multiple similar inputs and
produce multiple similar outputs (counted in tens or hundreds).
As a problem example that I need to solve, consider that I have a hundred
of Kafka topics, with similar data in each. And I would like to sink them
to a SQL database. With Kafka connect, I can use a single connector with
JDBC sink, that properly configured will dump each topic to a separate
table properly keeping the schema (based on what is in the schema
registry).
With Flink SQL I would need to run a query per topic/table, I believe.
Similarly with sourcing data. There is this cool project
flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on
SQL database, but when used with SQL, it can only pull in one table per
query.
These cases can be solved using the datastream API. With it I can code
pulling in/pushing out multiple table streams. But then "the configuration"
is a much bigger effort, because it requires using java code. And that is a
few hours vs few days case, an enormous difference.

So in the end some questions:
* Do you know how SQL could be extended to support handling such cases
elegantly, with a single job in the end?
* Or do you believe SQL should not be used for that case and we should come
up with a different tool and configuration language? I.e. sth like Kafka
Connect
* Do you know of any other project that implements this idea?

I definitely believe that this is a great use case for Flink to be an
easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore
there is a need for a solution for my case.

Thanks for answering!
Krzysztof

[1] https://github.com/ververica/flink-cdc-connectors


Re: Dynamic Flink SQL

2020-04-07 Thread Krzysztof Zarzycki
Hi Maciej, thanks for joining. I answer your comments below.

>
> the idea is quite interesting - although maintaining some coordination to
> be able to handle checkpoints would probably pretty tricky. Did you figure
> out how to handle proper distribution of tasks between TMs? As far as I
> understand you have to guarantee that all sources reading from cache are on
> the same TM as sinks writing data from Kafka? Or you think about some
> distributed caches?
>
No, we haven't yet figured that out. Yes, I've heard that it is indeed a
problem to force Flink to distribute the tasks as one wants it. I only
hoped that we will be able to solve it when we get there :-)
One of the ideas was to actually use in-memory grid co-located with Flink
(like based on Apache Ignite), but then the problem of network shuffle just
moved from Kafka to that grid. Which might be smaller problem, but still.

> As for your original question - we are also looking for solutions/ideas
> for this problem in Nussknacker. We have similar problem, however we had
> different constraints (on premise, not have to care too much about
> bandwidth) and we went with "one job per scenario". It works ok, but the
> biggest problem for me is that it does not scale with the number of jobs:
> Flink job is quite heavy entity - all the threads, classloaders etc. Having
> more than a few dozens of jobs is also not so easy to handle on JobManager
> part - especially when it's restarted etc. I guess your idea would also
> suffer from this problem?
>
Unfortunately yes, good point. Maybe it can be mitigated if I had the jobs
distributed among several Flink clusters. Solution globally heavier, but
lighter on each cluster. Then the data must go to the distributed in-memory
grid with hopefully local reads only.

I see a lot of difficulties in the discussed approach. But my willingness
to use SQL/Table API and CEP is so strong, I want to do the PoC regardless.
I hope we will be able to provide benchmarks which prove that the
performance of such approach is significantly better justifying the work by
us, maybe also the community, on overcoming these difficulties.

>
> thanks,
>
> maciek
>
>
>
> On 27/03/2020 10:18, Krzysztof Zarzycki wrote:
>
> I want to do a bit different hacky PoC:
> * I will write a sink, that caches the results in "JVM global" memory.
> Then I will write a source, that reads this cache.
> * I will launch one job, that reads from Kafka source, shuffles the data
> to the desired partitioning and then sinks to that cache.
> * Then I will lunch multiple jobs (Datastream based or Flink SQL based) ,
> that uses the source from cache to read the data out and then reinterprets
> it as keyed stream [1].
> * Using JVM global memory is necessary, because AFAIK the jobs use
> different classloaders. The class of cached object also needs to be
> available in the parent classloader i.e. in the cluster's classpath.
> This is just to prove the idea, the performance and usefulness of it. All
> the problems of checkpointing this data I will leave for later.
>
> I'm very very interested in your, community, comments about this idea and
> later productization of it.
> Thanks!
>
> Answering your comments:
>
>> Unless you need reprocessing for newly added rules, I'd probably just
>> cancel with savepoint and restart the application with the new rules. Of
>> course, it depends on the rules themselves and how much state they require
>> if a restart is viable. That's up to a POC.
>>
> No, I don't need reprocessing (yet). The rule starts processing the data
> from the moment it is defined.
> The cancellation with savepoint was considered, but because the number of
> new rules defined/changed daily might be large enough, that will generate
> too much of downtime. There is a lot of state kept in those rules making
> the restart heavy. What's worse, that would be cross-tenant downtime,
> unless the job was somehow per team/tenant. Therefore we reject this option.
> BTW, the current design of our system is similar to the one from the blog
> series by Alexander Fedulov about dynamic rules pattern [2] he's just
> publishing.
>
>
>> They will consume the same high intensive source(s) therefore I want to
>>> optimize for that by consuming the messages in Flink only once.
>>>
>> That's why I proposed to run one big query instead of 500 small ones.
>> Have a POC where you add two of your rules manually to a Table and see how
>> the optimized logical plan looks like. I'd bet that the source is only
>> tapped once.
>>
>
> I can do that PoC, no problem. But AFAIK it will only work with the
> "restart with savepoint" pattern discussed above.
>
>
> [1]
> https://ci.apache.org

Complex graph-based sessionization (potential use for stateful functions)

2020-03-30 Thread Krzysztof Zarzycki
Hi!  Interesting problem to solve ahead :)
I need to implement a streaming sessionization algorithm (split stream of
events into groups of correlated events). It's pretty non-standard as we
DON'T have a key like user id which separates the stream into substreams
which we just need to chunk based on time.
Instead and simplifying a lot, our events bear tuples, that I compare to
graph edges, e.g.:
event 1: A -> B
event 2: B -> C
event 3: D -> E
event 4: D -> F
event 5: G -> F
I need to group them into subgroups reachable by following these edges from
some specific nodes. E.g. here:
{ A->B, B->C}
{ D->E, D->F}
{ G->F }
(note: I need to group the events, which are represented by edges here, not
the nodes).
As far as I understand, to solve this problem I need to leverage feedback
loops/iterations feature in Flink (Generally I believe I need to apply a
Bulk Synchronous Processing approach).

Does anyone have seen this kind of sessionization implemented in the wild?
Would you suggest implementing such an algorithm using *stateful functions*?
(AFAIK, they use feedback loops underneath). Can you suggest how would
these be used here?
I know there are some problems with checkpointing when using iterations,
does it mean the implementation may experience data loss on stops?

Side comment: I'm not sure which graph algorithm derivative needs to be
applied here, but the candidate is transitive closure.

Thanks for joining the discussion!
Krzysztof


Re: Dynamic Flink SQL

2020-03-27 Thread Krzysztof Zarzycki
I want to do a bit different hacky PoC:
* I will write a sink, that caches the results in "JVM global" memory. Then
I will write a source, that reads this cache.
* I will launch one job, that reads from Kafka source, shuffles the data to
the desired partitioning and then sinks to that cache.
* Then I will lunch multiple jobs (Datastream based or Flink SQL based) ,
that uses the source from cache to read the data out and then reinterprets
it as keyed stream [1].
* Using JVM global memory is necessary, because AFAIK the jobs use
different classloaders. The class of cached object also needs to be
available in the parent classloader i.e. in the cluster's classpath.
This is just to prove the idea, the performance and usefulness of it. All
the problems of checkpointing this data I will leave for later.

I'm very very interested in your, community, comments about this idea and
later productization of it.
Thanks!

Answering your comments:

> Unless you need reprocessing for newly added rules, I'd probably just
> cancel with savepoint and restart the application with the new rules. Of
> course, it depends on the rules themselves and how much state they require
> if a restart is viable. That's up to a POC.
>
No, I don't need reprocessing (yet). The rule starts processing the data
from the moment it is defined.
The cancellation with savepoint was considered, but because the number of
new rules defined/changed daily might be large enough, that will generate
too much of downtime. There is a lot of state kept in those rules making
the restart heavy. What's worse, that would be cross-tenant downtime,
unless the job was somehow per team/tenant. Therefore we reject this option.
BTW, the current design of our system is similar to the one from the blog
series by Alexander Fedulov about dynamic rules pattern [2] he's just
publishing.


> They will consume the same high intensive source(s) therefore I want to
>> optimize for that by consuming the messages in Flink only once.
>>
> That's why I proposed to run one big query instead of 500 small ones. Have
> a POC where you add two of your rules manually to a Table and see how the
> optimized logical plan looks like. I'd bet that the source is only tapped
> once.
>

I can do that PoC, no problem. But AFAIK it will only work with the
"restart with savepoint" pattern discussed above.


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html



> On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki 
> wrote:
>
>> Hello Arvid,
>> Thanks for joining to the thread!
>> First, did you take into consideration that I would like to dynamically
>> add queries on the same source? That means first define one query, later
>> the day add another one , then another one, and so on. A Week later kill
>> one of those, start yet another one, etc... There will be hundreds of these
>> queries running at once, but the set of queries change several times a day.
>> They will consume the same high intensive source(s) therefore I want to
>> optimize for that by consuming the messages in Flink only once.
>>
>> Regarding the temporary tables AFAIK they are only the metadata (let's
>> say Kafka topic detals) and store it in the scope of a SQL session.
>> Therefore multiple queries against that temp table will behave the same way
>> as querying normal table, that is will read the datasource multiple times.
>>
>> It looks like the feature I want or could use is defined by the way of
>> FLIP-36 about Interactive Programming, more precisely caching the stream
>> table [1].
>> While I wouldn't like to limit the discussion to that non-existing yet
>> feature. Maybe there are other ways of achieving this danymic querying
>> capability.
>>
>> Kind Regards,
>> Krzysztof
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable
>>
>>
>>
>> * You want to use primary Table API as that allows you to
>>> programmatically introduce structural variance (changing rules).
>>>
>> * You start by registering the source as temporary table.
>>>
>> * Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
>>> * Lastly you unionAll the results.
>>>
>>> Then I'd perform some experiment if indeed the optimizer figured out
>>> that it needs to only read the source once. The resulting code would be
>>> minimal and easy to maintain. If the performance is not satisfying, you can
>>> always make it

Re: Dynamic Flink SQL

2020-03-25 Thread Krzysztof Zarzycki
Hello Arvid,
Thanks for joining to the thread!
First, did you take into consideration that I would like to dynamically add
queries on the same source? That means first define one query, later the
day add another one , then another one, and so on. A Week later kill one of
those, start yet another one, etc... There will be hundreds of these
queries running at once, but the set of queries change several times a day.
They will consume the same high intensive source(s) therefore I want to
optimize for that by consuming the messages in Flink only once.

Regarding the temporary tables AFAIK they are only the metadata (let's say
Kafka topic detals) and store it in the scope of a SQL session. Therefore
multiple queries against that temp table will behave the same way as
querying normal table, that is will read the datasource multiple times.

It looks like the feature I want or could use is defined by the way of
FLIP-36 about Interactive Programming, more precisely caching the stream
table [1].
While I wouldn't like to limit the discussion to that non-existing yet
feature. Maybe there are other ways of achieving this danymic querying
capability.

Kind Regards,
Krzysztof

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable



* You want to use primary Table API as that allows you to programmatically
> introduce structural variance (changing rules).
>
* You start by registering the source as temporary table.
>
* Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
> * Lastly you unionAll the results.
>
> Then I'd perform some experiment if indeed the optimizer figured out that
> it needs to only read the source once. The resulting code would be minimal
> and easy to maintain. If the performance is not satisfying, you can always
> make it more complicated.
>
> Best,
>
> Arvid
>
>
> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki 
> wrote:
>
>> Dear Flink community!
>>
>> In our company we have implemented a system that realize the dynamic
>> business rules pattern. We spoke about it during Flink Forward 2019
>> https://www.youtube.com/watch?v=CyrQ5B0exqU.
>> The system is a great success and we would like to improve it. Let me
>> shortly mention what the system does:
>> * We have a Flink job with the engine that applies business rules on
>> multiple data streams. These rules find patterns in data, produce complex
>> events on these patterns.
>> * The engine is built on top of CoProcessFunction, the rules are
>> preimplemented using state and timers.
>> * The engine accepts control messages, that deliver configuration of the
>> rules, and start the instances of the rules. There might be many rule
>> instances with different configurations running in parallel.
>> * Data streams are routed to those rules, to all instances.
>>
>> The *advantages* of this design are:
>>   * *The performance is superb. *The key to it is that we read data from
>> the Kafka topic once, deserialize once, shuffle it once (thankfully we have
>> one partitioning key) and then apply over 100 rule instances needing the
>> same data.
>> * We are able to deploy multiple rule instances dynamically without
>> starting/stopping the job.
>>
>> Especially the performance is crucial, we have up to 500K events/s
>> processed by 100 of rules on less than 100 of cores. I can't imagine having
>> 100 of Flink SQL queries each consuming these streams from Kafka on such a
>> cluster.
>>
>> The main *painpoints *of the design is:
>> * to deploy new business rule kind, we need to predevelop the rule
>> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
>> SQL libraries.* Which are getting stronger every day. Flink SQL with
>> MATCH_RECOGNIZE would fit perfectly for our cases.
>> * The isolation of the rules is weak. There are many rules running per
>> job. One fails, the whole job fails.
>> * There is one set of Kafka offsets, one watermark, one checkpoint for
>> all the rules.
>> * We have one just distribution key. Although that can be overcome.
>>
>> I would like to focus on solving the *first point*. We can live with the
>> rest.
>>
>> *Question to the community*: Do you have ideas how to make it possible
>> to develop with use of Flink SQL with MATCH_RECOGNIZE?
>>
>> My current ideas are:
>> 1. *A possibility to dynamically modify the job topology. *
>> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
>> sources.
>>
>> 2. *A possibility to save data streams internally to Flink,
>> predistributed*. Then Flink SQL quer

Dynamic Flink SQL

2020-03-23 Thread Krzysztof Zarzycki
Dear Flink community!

In our company we have implemented a system that realize the dynamic
business rules pattern. We spoke about it during Flink Forward 2019
https://www.youtube.com/watch?v=CyrQ5B0exqU.
The system is a great success and we would like to improve it. Let me
shortly mention what the system does:
* We have a Flink job with the engine that applies business rules on
multiple data streams. These rules find patterns in data, produce complex
events on these patterns.
* The engine is built on top of CoProcessFunction, the rules are
preimplemented using state and timers.
* The engine accepts control messages, that deliver configuration of the
rules, and start the instances of the rules. There might be many rule
instances with different configurations running in parallel.
* Data streams are routed to those rules, to all instances.

The *advantages* of this design are:
  * *The performance is superb. *The key to it is that we read data from
the Kafka topic once, deserialize once, shuffle it once (thankfully we have
one partitioning key) and then apply over 100 rule instances needing the
same data.
* We are able to deploy multiple rule instances dynamically without
starting/stopping the job.

Especially the performance is crucial, we have up to 500K events/s
processed by 100 of rules on less than 100 of cores. I can't imagine having
100 of Flink SQL queries each consuming these streams from Kafka on such a
cluster.

The main *painpoints *of the design is:
* to deploy new business rule kind, we need to predevelop the rule template
with use of our SDK. *We can't use* *great Flink CEP*, *Flink SQL
libraries.* Which are getting stronger every day. Flink SQL with
MATCH_RECOGNIZE would fit perfectly for our cases.
* The isolation of the rules is weak. There are many rules running per job.
One fails, the whole job fails.
* There is one set of Kafka offsets, one watermark, one checkpoint for all
the rules.
* We have one just distribution key. Although that can be overcome.

I would like to focus on solving the *first point*. We can live with the
rest.

*Question to the community*: Do you have ideas how to make it possible to
develop with use of Flink SQL with MATCH_RECOGNIZE?

My current ideas are:
1. *A possibility to dynamically modify the job topology. *
Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
sources.

2. *A possibility to save data streams internally to Flink, predistributed*.
Then Flink SQL queries should be able to read these streams.

The ideal imaginary solution would look that simple in use:
CREATE TABLE my_stream(...) with (,
cached = 'true')
PARTITIONED BY my_partition_key

(the cached table can also be a result of CREATE TABLE and INSERT INTO
my_stream_cached SELECT ... FROM my_stream).

then I can run multiple parallel Flink SQL queries reading from that cached
table in Flink.
These

Technical implementation: Ideally, I imagine saving events in Flink state
before they are consumed. Then implement a Flink source, that can read the
Flink state of the state-filling job. It's a different job, I know! Of
course it needs to run on the same Flink cluster.
A lot of options are possible: building on top of Flink, modifying Flink
(even keeping own fork for the time being), using an external component.

In my opinion the key to the maximized performance are:
* avoid pulling data through network from Kafka
* avoid deserialization of messages for each of queries/ processors.

Comments, ideas - Any feedback is welcome!
Thank you!
Krzysztof

P.S.   I'm writing to both dev and users groups because I suspect I would
need to modify Flink to achieve what I wrote above.


Re: Join a datastream with tables stored in Hive

2019-12-16 Thread Krzysztof Zarzycki
Thanks Kurt for your answers.

Summing up, I feel like the option 1 (i.e. join with temporal table
function) requires some coding around a source, that needs to pull data
once a day. But otherwise, bring the following benefits:
* I don't have to put dicts in another store like Hbase. All stays in
Hive + Flink.
* I'll be able to make a true temporal join - event-time based.
* I believe I will be able to build a history reprocessing program based on
the same logic (i.e. same SQL). At least for a particular day - processing
multiple days would be tricky, because I will need to pull multiple
versions of the dictionary.
Plus, looking up dict values will be much faster and resource optimal when
dict is stored in a state instead of uncached Hbase. It's especially
important in a case when we want to reprocess historical, archived stream
with a speed of millions of events/sec.

I understand that option 2 is easier to implement. I may do a PoC of it as
well.
OK, I believe I know enough to get my hands dirty with the code. I can
share later on what I was able to accomplish. And probably more questions
will show up when I finally start the implementation.

Thanks
Krzysztof

pon., 16 gru 2019 o 03:14 Kurt Young  napisał(a):

> Hi Krzysztof, thanks for the discussion, you raised lots of good
> questions, I will try to reply them
> one by one.
>
> Re option 1:
>
> > Question 1: do I need to write that Hive source or can I use something
> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
> class?
>
> I'm not sure if you can reuse the logic of `HiveTableSource`. Currently
> `HiveTableSource` works
> as batch mode, it will read all data at once and stop. But what you need
> is wait until next day after
> finish. What you can try is reuse the logic of `HiveTableInputFormat`, and
> wrap the "monitoring"
> logic outside.
>
> > Question/worry 2:  the state would grow inifinitely if I had infinite
> number of keys, but not only infinite number of versions of all keys.
>
> The temporal table function doesn't support watermark based state clean up
> yet, but what you can
> try is idle state retention [1]. So even if you have infinite number of
> keys, for example say you have
> different join keys every day, the old keys will not be touched in next
> days and become idle and will
> be deleted by framework.
>
> > Question 3: Do you imagine that I could use the same logic for both
> stream processing and reprocessing just by replacing sources and sinks?
>
> Generally speaking, yes I think so. With event time based join, we should
> be able to reuse the logic
> of normal stream processing and reprocessing historical data. Although
> there will definitely exists some
> details should be addressed, like event time and watermarks.
>
> Re option 2:
>
> > maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
> whole dictionary to memory
>
> You can do this manually but I would recommend you go with the first
> choice which loads hive table
> to HBase periodically. It's much more easier and efficient. And this
> approach you mentioned also
> seems a little bit duplicate with the temporal table function solution.
>
> > this option is available only with Blink engine and also only with use
> of Flink SQL, no Table API?
>
> I'm afraid yes, you can only use it with SQL for now.
>
> > do you think it would be possible to use the same logic / SQL for
> reprocessing?
>
> Given the fact this solution is based on processing time, I don't think it
> can cover the use case of
> reprocessing, except if you can accept always joining with latest day even
> during backfilling. But we
> are also aiming to resolve this shortcoming maybe in 1 or 2 releases.
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
>
> On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki 
> wrote:
>
>> Very interesting, Kurt! Yes, I also imagined it's rather a very common
>> case. In my company we currently have 3 clients wanting this functionality.
>> I also just realized this slight difference between Temporal Join and
>> Temporal Table Function Join, that there are actually two methods:)
>>
>> Regarding option 1:
>> So I would need to:
>> * write a Datastream API source, that pulls Hive dictionary table every
>> let's say day, assigns event time column to rows and creates a stream of
>> it. It does that and only that.
>> * create a table (from Table API) out of it, assigning one of the columns
>> as an event time column.
>> * then use table.createTemporalTableFunction(> column>)
>> * finally join my main 

Re: Join a datastream with tables stored in Hive

2019-12-13 Thread Krzysztof Zarzycki
Very interesting, Kurt! Yes, I also imagined it's rather a very common
case. In my company we currently have 3 clients wanting this functionality.
I also just realized this slight difference between Temporal Join and
Temporal Table Function Join, that there are actually two methods:)

Regarding option 1:
So I would need to:
* write a Datastream API source, that pulls Hive dictionary table every
let's say day, assigns event time column to rows and creates a stream of
it. It does that and only that.
* create a table (from Table API) out of it, assigning one of the columns
as an event time column.
* then use table.createTemporalTableFunction()
* finally join my main data stream with the temporal table function (let me
use short name TTF from now) from my dictionary, using Flink SQL and LATERAL
TABLE (Rates(o.rowtime)) AS r construct.
And so I should achieve my temporal event-time based join with versioned
dictionaries!
Question 1: do I need to write that Hive source or can I use something
ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
class?

Question/worry 2: One thing that worried me is this comment in the docs:

*Note: State retention defined in a query configuration
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html>
is
not yet implemented for temporal joins. This means that the required state
to compute the query result might grow infinitely depending on the number
of distinct primary keys for the history table.  *

On the other side, I find this comment: *By definition of event
time, watermarks
<https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html>
allow
the join operation to move forward in time and discard versions of the
build table that are no longer necessary because no incoming row with lower
or equal timestamp is expected.*
So I believe that the state would grow inifinitely if I had infinite number
of keys, but not only infinite number of versions of all keys. Which is
fine. Do you confirm?

Question 3: I need to be able to cover also reprocessing or backfilling of
historical data. Let's say I would need to join data stream and
(versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I
could use the same logic for both stream processing and reprocessing just
by replacing sources and sinks? Maybe after some slight modifications?


Regarding option 2:
Here I understand the current limitation (which will stay for some time )
is that the join can happen only on processing time, which means join only
with the latest version of dictionaries.
Accepting that, I understand I would need to do:
a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
whole dictionary to memory (or even to Flink state, if it is possible to
use it from TableFunction).
Then use this table and my Kafka stream table in temporal join expressed
with Flink SQL.
What do you think, is that feasible?
Do I understand correctly, that this option is available only with Blink
engine and also only with use of Flink SQL, no Table API?

Same question comes up regarding reprocessing: do you think it would be
possible to use the same logic / SQL for reprocessing?

Thank you for continuing discussion with me. I believe we're here on a
subject of a really important design for the community.
Krzysztof

pt., 13 gru 2019 o 09:39 Kurt Young  napisał(a):

> Sorry I forgot to paste the reference url.
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> On Fri, Dec 13, 2019 at 4:37 PM Kurt Young  wrote:
>
>> Hi Krzysztof,
>>
>> What you raised also interested us a lot to achieve in Flink.
>> Unfortunately, there
>> is no in place solution in Table/SQL API yet, but you have 2 options
>> which are both
>> close to this thus need some modifications.
>>
>> 1. The first one is use temporal table function [1]. It needs you to
>> write the logic of
>> reading hive tables and do the daily update inside the table function.
>> 2. The second choice is to use temporal table join [2], which only works
>> with processing
>> time now (just like the simple solution you mentioned), and need the
>> table source has
>> look up capability (like hbase). Currently, hive connector doesn't
>> support look up, so to
>> make this work, you need to sync the content to other storages which
>> support look up,
>> like HBase.
>>
>> Both solutions are not ideal now, and we also aims to improve this maybe
>> in the following
>> release.
>>
>> Best,
>> Kurt

Join a datastream with tables stored in Hive

2019-12-12 Thread Krzysztof Zarzycki
Hello dear Flinkers,
If this kind of question was asked on the groups, I'm sorry for a
duplicate. Feel free to just point me to the thread.
I have to solve a probably pretty common case of joining a datastream to a
dataset.
Let's say I have the following setup:
* I have a high pace stream of events coming in Kafka.
* I have some dimension tables stored in Hive. These tables are changed
daily. I can keep a snapshot for each day.

Now conceptually, I would like to join the stream of incoming events to the
dimension tables (simple hash join). we can consider two cases:
1) simpler, where I join the stream with the most recent version of the
dictionaries. (So the result is accepted to be nondeterministic if the job
is retried).
2) more advanced, where I would like to do temporal join of the stream with
dictionaries snapshots that were valid at the time of the event. (This
result should be deterministic).

The end goal is to do aggregation of that joined stream, store results in
Hive or more real-time analytical store (Druid).

Now, could you please help me understand is any of these cases
implementable with declarative Table/SQL API? With use of temporal joins,
catalogs, Hive integration, JDBC connectors, or whatever beta features
there are now. (I've read quite a lot of Flink docs about each of those,
but I have a problem to compile this information in the final design.)
Could you please help me understand how these components should cooperate?
If that is impossible with Table API, can we come up with the easiest
implementation using Datastream API ?

Thanks a lot for any help!
Krzysztof


Re: Does Kafka connector leverage Kafka message keys?

2016-05-12 Thread Krzysztof Zarzycki
If I can throw in my 2 cents, I agree with what Elias says. Without that
feature (not partitioning already partitioned Kafka data), Flink is in bad
position for common simpler processing, that don't involve shuffling at
all, for example simple readKafka-enrich-writeKafka . The systems like the
new Kafka Streams processing system, that leverage Kafka partitioning, will
probably win with Flink in performance (of course, it's just an intuition).

Are you planning to provide such feature? Is it simple to do with Flink
current engine and API?




czw., 14.04.2016 o 03:11 użytkownik Elias Levy 
napisał:

> On Wed, Apr 13, 2016 at 2:10 AM, Stephan Ewen  wrote:
>
>> If you want to use Flink's internal key/value state, however, you need to
>> let Flink re-partition the data by using "keyBy()". That is because Flink's
>> internal sharding of state (including the re-sharding to adjust parallelism
>> we are currently working on) follows a dedicated hashing scheme which is
>> with all likelihood different from the partition function that writes the
>> key/value pairs to the Kafka Topics.
>>
>
> That is interesting, if somewhat disappointing.  I was hoping that
> performing a keyBy from a Kafka source would perform no reshuffling if you
> used the same value as you used for the Kafka message key.  But it makes
> sense if you are using different hash functions.
>
> It may be useful to have a variant of keyBy() that converts the stream to
> a KeyedStream but performs no shuffling if the caller is certain that the
> DataStream is already partitioned by the given key.
>
>
>


multi-application correlated savepoints

2016-05-10 Thread Krzysztof Zarzycki
Hi!
I'm thinking about using a great Flink functionality - savepoints . I would
like to be able to stop my streaming application, rollback the state of it
and restart it (for example to update code, to fix a bug). Let's say I
would like travel back in time and reprocess some data.
But what if I had many streaming applications running, that's states are
correlated, like in microservice architecture? I would like to travel back
in time in all of my services to a common point in time.  Is there a
possibility to somehow manage correlated savepoints? Of course I can
trigger savepointing at approximately same time, but it's just an
approximation, right?
Is there something in Flink that could support this advanced use case?
Maybe someone else hit this issue already and thought about the solution?
I'll be grateful for any comments,
Cheers,
Krzysztof


Re: RocksDB state checkpointing is expensive?

2016-04-07 Thread Krzysztof Zarzycki
OK, Thanks Aljoscha for the info!
Guys, great work on Flink, I really love it :)

Cheers,
Krzysztof

czw., 7.04.2016 o 10:48 użytkownik Aljoscha Krettek <aljos...@apache.org>
napisał:

> Hi,
> you are right. Currently there is no incremental checkpointing and
> therefore, at each checkpoint, we essentially copy the whole RocksDB
> database to HDFS (or whatever filesystem you chose as a backup location).
> As far as I know, Stephan will start working on adding support for
> incremental snapshots this week or next week.
>
> Cheers,
> Aljoscha
>
> On Thu, 7 Apr 2016 at 09:55 Krzysztof Zarzycki <k.zarzy...@gmail.com>
> wrote:
>
>> Hi,
>> I saw the documentation and source code of the state management with
>> RocksDB and before I use it, I'm concerned of one thing: Am I right that
>> currently when state is being checkpointed, the whole RocksDB state is
>> snapshotted? There is no incremental, diff snapshotting, is it? If so, this
>> seems to be unfeasible for keeping state counted in tens or hundreds of GBs
>> (and you reach that size of a state, when you want to keep an embedded
>> state of the streaming application instead of going out to Cassandra/Hbase
>> or other DB). It will just cost too much to do snapshots of such large
>> state.
>>
>> Samza as a good example to compare, writes every state change to Kafka
>> topic, considering it a snapshot in the shape of changelog. Of course in
>> the moment of app restart, recovering the state from the changelog would be
>> too costly, that is why the changelog topic is compacted. Plus, I think
>> Samza does a state snapshot from time to time anyway (but I'm not sure of
>> that).
>>
>> Thanks for answering my doubts,
>> Krzysztof
>>
>>


RocksDB state checkpointing is expensive?

2016-04-07 Thread Krzysztof Zarzycki
Hi,
I saw the documentation and source code of the state management with
RocksDB and before I use it, I'm concerned of one thing: Am I right that
currently when state is being checkpointed, the whole RocksDB state is
snapshotted? There is no incremental, diff snapshotting, is it? If so, this
seems to be unfeasible for keeping state counted in tens or hundreds of GBs
(and you reach that size of a state, when you want to keep an embedded
state of the streaming application instead of going out to Cassandra/Hbase
or other DB). It will just cost too much to do snapshots of such large
state.

Samza as a good example to compare, writes every state change to Kafka
topic, considering it a snapshot in the shape of changelog. Of course in
the moment of app restart, recovering the state from the changelog would be
too costly, that is why the changelog topic is compacted. Plus, I think
Samza does a state snapshot from time to time anyway (but I'm not sure of
that).

Thanks for answering my doubts,
Krzysztof


Re: Working with protobuf wrappers

2015-12-02 Thread Krzysztof Zarzycki
Thanks guys for your answers, that is exactly information I was looking
for.

Krzysztof

2015-12-01 19:22 GMT+01:00 Robert Metzger <rmetz...@apache.org>:

> Hi Flavio,
>
> 1. you don't have to register serializers if its working for you. I would
> add a custom serializer if its not working or if the performance is poor.
> 2. I don't think that there is such a performance comparison. Tuples are a
> little faster than POJOs, other types (serialized with Kryo's standard
> serializer) are usually slower.
> 3. There are some plans for the table api to do various optimizations
> (projection/filter push down), which also have some assumptions about the
> serializers. So yes, this might change for the table api.
>
>
>
> On Tue, Dec 1, 2015 at 11:26 AM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Sorry for the long question but I take advantage of this discussion to
>> ask for something I've never fully understood.. Let's say I have for
>> example a thrift/protobuf/avro object Person.
>>
>>1. Do I have really to register a custom serializer? In my code I
>>create a dataset from parquet-thrift but I never had to register
>>anything...Does this change something if I
>>call registerTypeWithKryoSerializer?
>>2. How are performance of Flink affected by using one serialization
>>wrt another? For example, is there a simple snippet of a Flink program 
>> that
>>show when it's better to the original Person, its POJO version or it's
>>Tuple version (assuming that is a flat object)?
>>3. Does this further change when I use Table APIs?
>>
>>
>> Best,
>> Flavio
>>
>> On Tue, Dec 1, 2015 at 10:25 AM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Also, we don't add serializers automatically for DataStream programs.
>>> I've opened a JIRA for this a while ago.
>>>
>>> On Tue, Dec 1, 2015 at 10:20 AM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Kryzsztof,
>>>>
>>>> it's true that we once added the Protobuf serializer automatically.
>>>> However, due to versioning conflicts (see
>>>> https://issues.apache.org/jira/browse/FLINK-1635), we removed it
>>>> again. Now you have to register the ProtobufSerializer manually:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program
>>>> .
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Mon, Nov 30, 2015 at 8:48 PM, Krzysztof Zarzycki <
>>>> k.zarzy...@gmail.com> wrote:
>>>>
>>>>> Hi!
>>>>> I'm trying to use generated Protobuf wrappers compiled with protoc and
>>>>> pass them as objects between functions of Flink. I'm using Flink 0.10.0.
>>>>> Unfortunately, I get an exception on runtime:
>>>>>
>>>>> [...]
>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>> java.lang.UnsupportedOperationException
>>>>> Serialization trace:
>>>>> enrichments_ (com.company$MyObject)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
>>>>> ... 11 more
>>>>> Caused by: java.lang.UnsupportedOperationException
>>>>> at
>>>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>> ... 15 more
>>>>>
>>>>>
>>>>> I believed that protobuf are now serializable on default Flink
>>>>> configuration after fixing this issue in 0.9/0.8.1:
>>>>> https://issues.apache.org/jira/browse/FLINK-1392
>>>>>
>>>>> Maybe it really is, but Flink just requires some configuration?
>>>>> I'll be grateful for your help with this issue.
>>>>> Cheers,
>>>>> Krzysztof
>>>>>
>>>>>
>>>>
>>>
>>
>


Working with protobuf wrappers

2015-11-30 Thread Krzysztof Zarzycki
Hi!
I'm trying to use generated Protobuf wrappers compiled with protoc and pass
them as objects between functions of Flink. I'm using Flink 0.10.0.
Unfortunately, I get an exception on runtime:

[...]
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
enrichments_ (com.company$MyObject)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
... 11 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 15 more


I believed that protobuf are now serializable on default Flink
configuration after fixing this issue in 0.9/0.8.1:
https://issues.apache.org/jira/browse/FLINK-1392

Maybe it really is, but Flink just requires some configuration?
I'll be grateful for your help with this issue.
Cheers,
Krzysztof