Flink Jdbc streaming source support in 1.7.1 or in future?

2019-01-21 Thread Manjusha Vuyyuru
Hello,

Do flink 1.7.1 supports connection to relational database(mysql)?
I want to use mysql as my streaming source to read some configuration..

Thanks,
Manju


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-21 Thread Jeff Zhang
Thanks Chesnay for raising this discussion thread.  I think there are 3
major use scenarios for flink binary distribution.

1. Use it to set up standalone cluster
2. Use it to experience features of flink, such as via scala-shell,
sql-client
3. Downstream project use it to integrate with their system

I did a size estimation of flink dist folder, lib folder take around 100M
and opt folder take around 200M. Overall I agree to make a thin flink dist.
So the next problem is which components to drop. I check the opt folder,
and I think the filesystem components and metrics components could be moved
out. Because they are pluggable components and is only used in scenario 1 I
think (setting up standalone cluster). Other components like flink-table,
flink-ml, flnk-gellay, we should still keep them IMHO, because new user may
still use it to try the features of flink. For me, scala-shell is the first
option to try new features of flink.



Fabian Hueske  于2019年1月18日周五 下午7:34写道:

> Hi Chesnay,
>
> Thank you for the proposal.
> I think this is a good idea.
> We follow a similar approach already for Hadoop dependencies and
> connectors (although in application space).
>
> +1
>
> Fabian
>
> Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
> ches...@apache.org>:
>
>> Hello,
>>
>> the binary distribution that we release by now contains quite a lot of
>> optional components, including various filesystems, metric reporters and
>> libraries. Most users will only use a fraction of these, and as such
>> pretty much only increase the size of flink-dist.
>>
>> With Flink growing more and more in scope I don't believe it to be
>> feasible to ship everything we have with every distribution, and instead
>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
>> lean and additional components are downloaded separately and added by
>> the user.
>>
>> This would primarily affect the /opt directory, but could also be
>> extended to cover flink-dist. For example, the yarn and mesos code could
>> be spliced out into separate jars that could be added to lib manually.
>>
>> Let me know what you think.
>>
>> Regards,
>>
>> Chesnay
>>
>>

-- 
Best Regards

Jeff Zhang


Re: Query on retract stream

2019-01-21 Thread Hequn Cheng
Hi Gagan,

> But I also have a requirement for event time based sliding window
aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently,
sliding windows don't support early fire, i.e., only output results when
event time reaches the end of the window. Once window fires, the window
state will be cleared and late data belonging to this window will be
ignored. In order to wait for the late event, you can extract
watermark with an offset from the timestamp. For example, make watermark =
timestamp - 5min.

If event time and early fire is a strong requirement in your scenarios, you
can probably use an over window[1] to solve your problem, say an over
window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a
DataStream job in which define your own window logic[2].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#over-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html



On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal 
wrote:

> Thank you guys. It's great to hear multiple solutions to achieve this. I
> understand that records once emitted to Kafka can not be deleted and that's
> acceptable for our use case as last updated value should always be correct.
> However as I understand most of these solutions will work for global
> aggregation which was asked in original question. But I also have
> requirement for event time based sliding window aggregation where same
> order count needs to be maintained for past x hours window (sliding at say
> every 5 minutes). Is it possible to achieve with Table Api / SQL at the
> moment or will require some custom implementation?
>
> For window based upsert stream, there can be few scenarios.
>
> 1. An update to record key comes in same window. E.g Pending (t1) ->
> Success (t2) happens in same window w1. In this case once window
> aggregation is triggered/emitted, such records will be counted as 0
> 2. An update to record key belongs to same window but arrives late. In
> this case old(and already emitted)  window (w1) needs to be re-emitted with
> decreased value.
> 3. An update to record key comes in different window. E.g Pending (t1) in
> window w1 and Success (t2) in w2. I think in this case it may not require
> to re-emit old window w1 as it represents pending count till that window
> time (w1) which is still valid as record moved to Success in next window w2
> (based on event time).
>
> Gagan
>
>
> On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski 
> wrote:
>
>> @Jeff: It depends if user can define a time window for his condition.
>> As Gagan described his problem it was about “global” threshold of pending
>> orders.
>>
>>
>>
>> I have just thought about another solution that should work without any
>> custom code. Converting “status” field to status_value int:
>> - "+1” for pending
>> - “-1” for success/failure
>> - “0” otherwise
>>
>> Then running:
>>
>> SELECT uid, SUM(status_value) FROM … GROUP BY uid;
>>
>> Query on top of such stream. Conversion to integers could be made by
>> using `CASE` expression.
>>
>> One thing to note here is that probably all of the proposed solutions
>> would work based on the order of the records, not based on the event_time.
>>
>> Piotrek
>>
>> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
>>
>> I am thinking of another approach instead of retract stream. Is it
>> possible to define a custom window to do this ? This window is defined for
>> each order. And then you just need to analyze the events in this window.
>>
>> Piotr Nowojski  于2019年1月21日周一 下午8:44写道:
>>
>>> Hi,
>>>
>>> There is a missing feature in Flink Table API/SQL of supporting
>>> retraction streams as the input (or conversions from append stream to
>>> retraction stream) at the moment. With that your problem would simplify to
>>> one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an
>>> ongoing work with related work [1], so this might be supported in the next
>>> couple of months.
>>>
>>> There might a workaround at the moment that could work. I think you
>>> would need to write your own custom `LAST_ROW(x)` aggregation function,
>>> which would just return the value of the most recent aggregated row. With
>>> that you could write a query like this:
>>>
>>> SELECT
>>> uid, count(*)
>>> FROM (
>>> SELECT
>>> *
>>> FROM (
>>> SELECT
>>> uid, LAST_ROW(status)
>>> FROM
>>> changelog
>>> GROUP BY
>>> uid, oid)
>>> WHERE status = `pending`)
>>> GROUP BY
>>> uid
>>>
>>> Where `changelog` is an append only stream with the following content:
>>>
>>> *user, order, status, event_time*
>>> u1, o1, pending, t1
>>> u2, o2, failed, t2
>>> *u1, o3, pending, t3*
>>> *u1, o3, success, t4*
>>> u2, o4, pending, t5
>>> u2, o4, pending, t6
>>>
>>>
>>>
>>> Besides that, you could also write your own a relatively simple Data
>>> Stream application to do the same thing.
>>>
>>> 

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-21 Thread Ufuk Celebi
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Xmx1024m
You are looking for this line > 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -


Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin  wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is 
> canceled or fails and then is restarted (with or without 
> savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` 
> seem to start having impact again and our job will run without failure. More 
> below.
>
> We use consume Snappy-compressed sequence files in our flink job. This 
> requires access to the hadoop native libraries. In our `flink-conf.yaml` for 
> both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates 
> fine. If at some point I cancel the job or if the job restarts for some other 
> reason, the job will begin to crashloop because it tries to open a 
> Snappy-compressed file but doesn't have access to the codec from the native 
> hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the 
> task manager while the job is crashlooping, the job is start running without 
> any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail 
> is if the `env.java.opts` were not being passed through to the job on restart 
> for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? 
> I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded 
> jars to access our files in S3. We do not use the `bundled-with-hadoop` 
> distribution of Flink.
>
> Best,
>
> Aaron Levin


Re: Query on retract stream

2019-01-21 Thread Gagan Agrawal
Thank you guys. It's great to hear multiple solutions to achieve this. I
understand that records once emitted to Kafka can not be deleted and that's
acceptable for our use case as last updated value should always be correct.
However as I understand most of these solutions will work for global
aggregation which was asked in original question. But I also have
requirement for event time based sliding window aggregation where same
order count needs to be maintained for past x hours window (sliding at say
every 5 minutes). Is it possible to achieve with Table Api / SQL at the
moment or will require some custom implementation?

For window based upsert stream, there can be few scenarios.

1. An update to record key comes in same window. E.g Pending (t1) ->
Success (t2) happens in same window w1. In this case once window
aggregation is triggered/emitted, such records will be counted as 0
2. An update to record key belongs to same window but arrives late. In this
case old(and already emitted)  window (w1) needs to be re-emitted with
decreased value.
3. An update to record key comes in different window. E.g Pending (t1) in
window w1 and Success (t2) in w2. I think in this case it may not require
to re-emit old window w1 as it represents pending count till that window
time (w1) which is still valid as record moved to Success in next window w2
(based on event time).

Gagan


On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski 
wrote:

> @Jeff: It depends if user can define a time window for his condition.
> As Gagan described his problem it was about “global” threshold of pending
> orders.
>
>
>
> I have just thought about another solution that should work without any
> custom code. Converting “status” field to status_value int:
> - "+1” for pending
> - “-1” for success/failure
> - “0” otherwise
>
> Then running:
>
> SELECT uid, SUM(status_value) FROM … GROUP BY uid;
>
> Query on top of such stream. Conversion to integers could be made by using
> `CASE` expression.
>
> One thing to note here is that probably all of the proposed solutions
> would work based on the order of the records, not based on the event_time.
>
> Piotrek
>
> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
>
> I am thinking of another approach instead of retract stream. Is it
> possible to define a custom window to do this ? This window is defined for
> each order. And then you just need to analyze the events in this window.
>
> Piotr Nowojski  于2019年1月21日周一 下午8:44写道:
>
>> Hi,
>>
>> There is a missing feature in Flink Table API/SQL of supporting
>> retraction streams as the input (or conversions from append stream to
>> retraction stream) at the moment. With that your problem would simplify to
>> one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an
>> ongoing work with related work [1], so this might be supported in the next
>> couple of months.
>>
>> There might a workaround at the moment that could work. I think you would
>> need to write your own custom `LAST_ROW(x)` aggregation function, which
>> would just return the value of the most recent aggregated row. With that
>> you could write a query like this:
>>
>> SELECT
>> uid, count(*)
>> FROM (
>> SELECT
>> *
>> FROM (
>> SELECT
>> uid, LAST_ROW(status)
>> FROM
>> changelog
>> GROUP BY
>> uid, oid)
>> WHERE status = `pending`)
>> GROUP BY
>> uid
>>
>> Where `changelog` is an append only stream with the following content:
>>
>> *user, order, status, event_time*
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> *u1, o3, pending, t3*
>> *u1, o3, success, t4*
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>>
>>
>>
>> Besides that, you could also write your own a relatively simple Data
>> Stream application to do the same thing.
>>
>> I’m CC’ing Timo, maybe he will have another better idea.
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8577
>>
>> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
>>
>> Hi,
>> I have a requirement and need to understand if same can be achieved with
>> Flink retract stream. Let's say we have stream with 4 attributes userId,
>> orderId, status, event_time where orderId is unique and hence any change in
>> same orderId updates previous value as below
>>
>> *Changelog* *Event Stream*
>>
>> *user, order, status, event_time*
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> *u1, o3, pending, t3*
>> *u1, o3, success, t4*
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>>
>> *Snapshot view at time t6 (as viewed in mysql)*
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, success, t4
>> u4, o4, pending, t6
>> (Here rows at time t3 and t5 are deleted as they have been updated for
>> respective order ids)
>>
>> What I need is to maintain count of "Pending" orders against a user and
>> if they go beyond configured threshold, then push that user and pending
>> count to Kafka. Here there can be multiple updates to order status e.g
>> Pending -> Success or Pending -> Failed. Also in some cases there may not
>> be any change in status but we may 

Re: Kafka stream fed in batches throughout the day

2019-01-21 Thread miki haiat
In flink you cant read data from kafka in Dataset API (Batch)
And you dont want to mess with start and stop your job every few hours.
Can you elaborate more on your use case ,
Are you going to use KeyBy , is thire any way to use trigger ... ?



On Mon, Jan 21, 2019 at 4:43 PM Jonny Graham 
wrote:

> We have a Kafka stream of events that we want to process with a Flink
> datastream process. However, the stream is populated by an upstream batch
> process that only executes every few hours. So the stream has very 'bursty'
> behaviour. We need a window based on event time to await the next events
> for the same key. Due to this batch population of the stream, these windows
> can remain open (with no event activity on the stream) for many hours. From
> what I understand we could indeed leave the Flink datastream process up and
> running all this time and the window would remain open. We could even use a
> savepoint and then stop the process and restart it (with the window state
> being restored) when we get the next batch and the events start appearing
> in the stream again.
>
>
>
> One rationale for this mode of operation is that we have a future usecase
> where this stream will be populated in real-time and would behave like a
> normal stream.
>
>
>
> Is that a best-practice approach for this scenario? Or should we be
> treating these batches as individual batches (Flink job that ends with the
> end of the batch) and manually handle the windowing that needs to cross
> multiple batches.
>
>
>
> Thanks,
>
> Jonny
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Query on retract stream

2019-01-21 Thread Piotr Nowojski
@Jeff: It depends if user can define a time window for his condition. As Gagan 
described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom 
code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using 
`CASE` expression. 

One thing to note here is that probably all of the proposed solutions would 
work based on the order of the records, not based on the event_time.

Piotrek

> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
> 
> I am thinking of another approach instead of retract stream. Is it possible 
> to define a custom window to do this ? This window is defined for each order. 
> And then you just need to analyze the events in this window.
> 
> Piotr Nowojski mailto:pi...@da-platform.com>> 
> 于2019年1月21日周一 下午8:44写道:
> Hi,
> 
> There is a missing feature in Flink Table API/SQL of supporting retraction 
> streams as the input (or conversions from append stream to retraction stream) 
> at the moment. With that your problem would simplify to one simple `SELECT 
> uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with 
> related work [1], so this might be supported in the next couple of months.
> 
> There might a workaround at the moment that could work. I think you would 
> need to write your own custom `LAST_ROW(x)` aggregation function, which would 
> just return the value of the most recent aggregated row. With that you could 
> write a query like this:
> 
> SELECT 
>   uid, count(*) 
> FROM (
>   SELECT 
>   * 
>   FROM (
>   SELECT 
>   uid, LAST_ROW(status)
>   FROM
>   changelog
>   GROUP BY
>   uid, oid)
>   WHERE status = `pending`)
> GROUP BY
>   uid
> 
> Where `changelog` is an append only stream with the following content:
> 
>> user, order, status, event_time
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, pending, t3
>> u1, o3, success, t4
>> u2, o4, pending, t5
>> u2, o4, pending, t6
> 
> 
> Besides that, you could also write your own a relatively simple Data Stream 
> application to do the same thing.
> 
> I’m CC’ing Timo, maybe he will have another better idea.
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-8577 
> 
> 
>> On 18 Jan 2019, at 18:30, Gagan Agrawal > > wrote:
>> 
>> Hi,
>> I have a requirement and need to understand if same can be achieved with 
>> Flink retract stream. Let's say we have stream with 4 attributes userId, 
>> orderId, status, event_time where orderId is unique and hence any change in 
>> same orderId updates previous value as below
>> 
>> Changelog Event Stream
>> 
>> user, order, status, event_time
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, pending, t3
>> u1, o3, success, t4
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>> 
>> Snapshot view at time t6 (as viewed in mysql)
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, success, t4
>> u4, o4, pending, t6
>> (Here rows at time t3 and t5 are deleted as they have been updated for 
>> respective order ids)
>> 
>> What I need is to maintain count of "Pending" orders against a user and if 
>> they go beyond configured threshold, then push that user and pending count 
>> to Kafka. Here there can be multiple updates to order status e.g Pending -> 
>> Success or Pending -> Failed. Also in some cases there may not be any change 
>> in status but we may still get a row (may be due to some other attribute 
>> update which we are not concerned about). So is it possible to have running 
>> count in flink as below at respective event times. Here Pending count is 
>> decreased from 2 to 1 for user u1 at t4 since one of it's order status was 
>> changed from Pending to Success. Similarly for user u2, at time t6, there 
>> was no change in running count as there was no change in status for order o4
>> 
>> t1 -> u1 : 1, u2 : 0
>> t2 -> u1 : 1, u2 : 0
>> t3 -> u1 : 2, u2 : 0
>> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is 
>> decreased for u1)
>> t5 -> u1 : 1, u2 : 1
>> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)
>> 
>> As I understand may be retract stream can achieve this. However I am not 
>> sure how. Any samples around this would be of great help.
>> 
>> Gagan
>> 
> 
> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang



Temporal tables not behaving as expected

2019-01-21 Thread Chris Miller

Hi all,

I'm new to Flink so am probably missing something simple. I'm using 
Flink 1.7.1 and am trying to use temporal table functions but aren't 
getting the results I expect. With the example code below, I would 
expect 4 records to be output (one for each order), but instead I'm only 
seeing a (random) subset of these records (it varies on each run). To 
compound my confusion further, the CSV output often shows a different 
subset of results than those written to the console. I assume there's a 
race condition of some sort but I can't figure out where it is. Any 
ideas what I'm doing wrong?



import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;

public class Test {
  public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);


List> rateData = Arrays.asList(
new Tuple2<>("GBP", 1.29),
new Tuple2<>("EUR", 1.14),
new Tuple2<>("EUR", 1.15),
new Tuple2<>("GBP", 1.30));
DataStreamSource> rateStream = 
env.addSource(new DelayedSource<>(rateData, 1L));

rateStream.returns(new TypeHint>() {});

Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, 
Rate, FxRates_ProcTime.proctime");
TemporalTableFunction rates = 
rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency");

tableEnv.registerFunction("FxRates", rates);

List> orderData = Arrays.asList(
new Tuple3<>(1, "GBP", 4.51),
new Tuple3<>(2, "GBP", 23.68),
new Tuple3<>(3, "EUR", 2.99),
new Tuple3<>(4, "EUR", 14.76));

DataStreamSource> orderStream = 
env.addSource(new DelayedSource<>(orderData, 100L));
orderStream.returns(new TypeHint>() 
{});


Table orders = tableEnv.fromDataStream(orderStream, "OrderId, 
o_Currency, Amount, Order_ProcTime.proctime");
Table usdOrders = orders.join(new Table(tableEnv, 
"FxRates(Order_ProcTime)"), "o_Currency = Currency")
.select("OrderId, Amount, Currency, Rate, 
(Amount * Rate) as UsdAmount");


String[] fields = usdOrders.getSchema().getFieldNames();
TypeInformation[] types = usdOrders.getSchema().getFieldTypes();
DataStream usdStream = tableEnv.toAppendStream(usdOrders, 
usdOrders.getSchema().toRowType());
CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", 
",", 1, FileSystem.WriteMode.OVERWRITE);

tableEnv.registerTableSink("csvSink", fields, types, csvTableSink);
usdOrders.insertInto("csvSink");
usdStream.addSink(new PrintSink());
env.execute();
System.out.println("Test completed at " + time());
  }

  public static String time() {
return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
  }

  private static class DelayedSource extends RichSourceFunction {
private final List data;
private final long initialDelay;
private volatile boolean shutdown;

private DelayedSource(List data, long initialDelay) {
  this.data = data;
  this.initialDelay = initialDelay;
}

@Override
public void run(SourceContext ctx) throws Exception {
  Iterator iterator = data.iterator();
  Thread.sleep(initialDelay);
  while (!shutdown && iterator.hasNext()) {
T next = iterator.next();
System.out.println(time() + " - producing " + next);
ctx.collect(next);
  }
}

@Override
public void cancel() {
  shutdown = true;
}
  }

  private static class PrintSink extends RichSinkFunction {
@Override
public void invoke(Row value, Context context) {
  Integer orderId = (Integer) value.getField(0);
  Double amount = (Double) value.getField(1);
  String currency = (String) value.getField(2);
  Double rate = (Double) value.getField(3);
  Double usdAmount = (Double) value.getField(4);
  System.out.println(time() + " - order " + orderId + " was 

[Flink 1.6] How to get current total number of processed events

2019-01-21 Thread Thanh-Nhan Vo
Hello all,

I have a question, please !
I'm using Flink 1.6 to process our data in streaming mode.
I wonder if at a given event, there is a way to get the current total number of 
processed events (before this event).
If possible, I want to get this total number of processed events as a value 
state in Keystream.
It means that for a given key in KeyStream, I want to retrieve not only the 
total number of processed events for this key but also the total number of 
processed events for all keys.

There is a way to do this in Flink 1.6, please!
Best regard,
Nhan



Kafka stream fed in batches throughout the day

2019-01-21 Thread Jonny Graham
We have a Kafka stream of events that we want to process with a Flink 
datastream process. However, the stream is populated by an upstream batch 
process that only executes every few hours. So the stream has very 'bursty' 
behaviour. We need a window based on event time to await the next events for 
the same key. Due to this batch population of the stream, these windows can 
remain open (with no event activity on the stream) for many hours. From what I 
understand we could indeed leave the Flink datastream process up and running 
all this time and the window would remain open. We could even use a savepoint 
and then stop the process and restart it (with the window state being restored) 
when we get the next batch and the events start appearing in the stream again.

One rationale for this mode of operation is that we have a future usecase where 
this stream will be populated in real-time and would behave like a normal 
stream.

Is that a best-practice approach for this scenario? Or should we be treating 
these batches as individual batches (Flink job that ends with the end of the 
batch) and manually handle the windowing that needs to cross multiple batches.

Thanks,
Jonny

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: Query on retract stream

2019-01-21 Thread Jeff Zhang
I am thinking of another approach instead of retract stream. Is it possible
to define a custom window to do this ? This window is defined for each
order. And then you just need to analyze the events in this window.

Piotr Nowojski  于2019年1月21日周一 下午8:44写道:

> Hi,
>
> There is a missing feature in Flink Table API/SQL of supporting retraction
> streams as the input (or conversions from append stream to retraction
> stream) at the moment. With that your problem would simplify to one simple
> `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing
> work with related work [1], so this might be supported in the next couple
> of months.
>
> There might a workaround at the moment that could work. I think you would
> need to write your own custom `LAST_ROW(x)` aggregation function, which
> would just return the value of the most recent aggregated row. With that
> you could write a query like this:
>
> SELECT
> uid, count(*)
> FROM (
> SELECT
> *
> FROM (
> SELECT
> uid, LAST_ROW(status)
> FROM
> changelog
> GROUP BY
> uid, oid)
> WHERE status = `pending`)
> GROUP BY
> uid
>
> Where `changelog` is an append only stream with the following content:
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
>
>
> Besides that, you could also write your own a relatively simple Data
> Stream application to do the same thing.
>
> I’m CC’ing Timo, maybe he will have another better idea.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-8577
>
> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
>
> Hi,
> I have a requirement and need to understand if same can be achieved with
> Flink retract stream. Let's say we have stream with 4 attributes userId,
> orderId, status, event_time where orderId is unique and hence any change in
> same orderId updates previous value as below
>
> *Changelog* *Event Stream*
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
> *Snapshot view at time t6 (as viewed in mysql)*
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for
> respective order ids)
>
> What I need is to maintain count of "Pending" orders against a user and if
> they go beyond configured threshold, then push that user and pending count
> to Kafka. Here there can be multiple updates to order status e.g Pending ->
> Success or Pending -> Failed. Also in some cases there may not be any
> change in status but we may still get a row (may be due to some other
> attribute update which we are not concerned about). So is it possible to
> have running count in flink as below at respective event times. Here
> Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
> order status was changed from Pending to Success. Similarly for user u2, at
> time t6, there was no change in running count as there was no change in
> status for order o4
>
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
> decreased for u1)*
> t5 -> u1 : 1, u2 : 1
> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
> change)*
>
> As I understand may be retract stream can achieve this. However I am not
> sure how. Any samples around this would be of great help.
>
> Gagan
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Re: Is there a way to find the age of an element in a Global window?

2019-01-21 Thread Manjusha Vuyyuru
Hi Kostas,

I have a similar scenario where i have to clear window elements upon
reaching some count or clear the elements if they are older than one hour.
I'm using the below approach, just wanted to know if its the right way :

 DataStream> out = mappedFields
.map(new CustomMapFunction())
.keyBy(0,1)
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(new CustomCountTrigger()))
.evictor(TimeEvictor.of(Time.seconds(3600), true))
.apply(new CustomWindowFunction());

*TimeEvictor.of(Time.seconds(3600), true) - *evicting after window function
is evaluated.

Please help.

Thanks,
Manju


On Fri, Jan 18, 2019 at 8:28 PM Kumar Bolar, Harshith 
wrote:

> Thanks. That makes sense :)
>
>
>
> *From: *Kostas Kloudas 
> *Date: *Friday, 18 January 2019 at 8:25 PM
> *To: *Harshith Kumar Bolar 
> *Cc: *"user@flink.apache.org" 
> *Subject: *[External] Re: Is there a way to find the age of an element in
> a Global window?
>
>
>
> Hi Harshith,
>
>
>
> The evictor has 2 methods:
>
> void evictBefore(Iterable> elements, int size, W
> window, EvictorContext evictorContext);
>
> void evictAfter(Iterable> elements, int size, W
> window, EvictorContext evictorContext);
>
>
>
> In the iterables, you have access to the elements and their timestamps,
> and the evictor context gives you access to the current watermark
>
> and current processing time.
>
>
>
> Based on this information, you can call remove on the iterator created by
> the iterable and clean up the elements that you want to remove.
>
> If you operate on event time, and you want to clean up base on processing
> time, then you can put a processFunction or a map before
>
> you window operator, put the System.currentTimeMillis in the record
> itself, and the use the evictor and the currentProcessing time to clean up.
>
>
>
> I hope this helps,
>
> Kostas
>
>
>
>
>
> On Fri, Jan 18, 2019 at 9:25 AM Kumar Bolar, Harshith 
> wrote:
>
> Hi all,
>
>
>
> I'm using Global Windows for my application with a custom trigger and
> custom evictor based on some conditions. Now, I also want to evict those
> elements from the window that have stayed there for too long, let's say 30
> mins. How would I go about doing this? Is there a utility that Flink
> provides that lets me know what the age of an element in a window is?
>
>
>
> Thanks,
>
> Harshith
>
>
>
>


Re: Query on retract stream

2019-01-21 Thread Piotr Nowojski
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction 
streams as the input (or conversions from append stream to retraction stream) 
at the moment. With that your problem would simplify to one simple `SELECT uid, 
count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related 
work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need 
to write your own custom `LAST_ROW(x)` aggregation function, which would just 
return the value of the most recent aggregated row. With that you could write a 
query like this:

SELECT 
uid, count(*) 
FROM (
SELECT 
* 
FROM (
SELECT 
uid, LAST_ROW(status)
FROM
changelog
GROUP BY
uid, oid)
WHERE status = `pending`)
GROUP BY
uid

Where `changelog` is an append only stream with the following content:

> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream 
application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
> 
> Hi,
> I have a requirement and need to understand if same can be achieved with 
> Flink retract stream. Let's say we have stream with 4 attributes userId, 
> orderId, status, event_time where orderId is unique and hence any change in 
> same orderId updates previous value as below
> 
> Changelog Event Stream
> 
> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6
> 
> Snapshot view at time t6 (as viewed in mysql)
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for 
> respective order ids)
> 
> What I need is to maintain count of "Pending" orders against a user and if 
> they go beyond configured threshold, then push that user and pending count to 
> Kafka. Here there can be multiple updates to order status e.g Pending -> 
> Success or Pending -> Failed. Also in some cases there may not be any change 
> in status but we may still get a row (may be due to some other attribute 
> update which we are not concerned about). So is it possible to have running 
> count in flink as below at respective event times. Here Pending count is 
> decreased from 2 to 1 for user u1 at t4 since one of it's order status was 
> changed from Pending to Success. Similarly for user u2, at time t6, there was 
> no change in running count as there was no change in status for order o4
> 
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is 
> decreased for u1)
> t5 -> u1 : 1, u2 : 1
> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)
> 
> As I understand may be retract stream can achieve this. However I am not sure 
> how. Any samples around this would be of great help.
> 
> Gagan
> 



Re: Query on retract stream

2019-01-21 Thread Hequn Cheng
Hi Gagan,

Yes, you can achieve this with Flink TableAPI/SQL. However, you have to pay
attention to the following things:
1) Currently, Flink only ingests append streams. In order to ingest upsert
streams(steam with keys), you can use groupBy with a user-defined
LAST_VALUE aggregate function. For implementation, you can refer to the MAX
AggregateFunction(MAX always return the max value while LAST_VALUE always
return the latest value). The SQL may look like:

SELECT user, COUNT(*)
> FROM (
> SELECT order, LAST_VALUE(user), LAST_VALUE(status), LAST_VALUE(event_time)
> FROM SourceTable
> GROUP BY order
> )
> WHERE status = 'pending'
> GROUP BY user

You have to note that the query will be processed under processing time
instead of event time. But I think it would be fine for you, as the final
result will be right.

As for the upsert source, there is already a pr[1] on it, and it is under
review now.

2) You have to note that once you output results to Kafka according to a
configured threshold. The output record cannot be deleted anymore even the
count value decreased. Because Kafka doesn't support delete messages. Also,
this issue[2] make things worse. You can take a detailed look if you
interested in it.

Best, Hequn

[1] https://github.com/apache/flink/pull/6787
[2] https://issues.apache.org/jira/browse/FLINK-9528


On Sat, Jan 19, 2019 at 1:31 AM Gagan Agrawal 
wrote:

> Hi,
> I have a requirement and need to understand if same can be achieved with
> Flink retract stream. Let's say we have stream with 4 attributes userId,
> orderId, status, event_time where orderId is unique and hence any change in
> same orderId updates previous value as below
>
> *Changelog* *Event Stream*
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
> *Snapshot view at time t6 (as viewed in mysql)*
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for
> respective order ids)
>
> What I need is to maintain count of "Pending" orders against a user and if
> they go beyond configured threshold, then push that user and pending count
> to Kafka. Here there can be multiple updates to order status e.g Pending ->
> Success or Pending -> Failed. Also in some cases there may not be any
> change in status but we may still get a row (may be due to some other
> attribute update which we are not concerned about). So is it possible to
> have running count in flink as below at respective event times. Here
> Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
> order status was changed from Pending to Success. Similarly for user u2, at
> time t6, there was no change in running count as there was no change in
> status for order o4
>
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
> decreased for u1)*
> t5 -> u1 : 1, u2 : 1
> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
> change)*
>
> As I understand may be retract stream can achieve this. However I am not
> sure how. Any samples around this would be of great help.
>
> Gagan
>
>


Re: NPE when using spring bean in custom input format

2019-01-21 Thread Piotr Nowojski
Hi,

You have to use `open()` method to handle initialisation of the things required 
by your code/operators. By the nature of the LocalEnvironment, the life cycle 
of the operators is different there compared to what happens when submitting a 
job to the real cluster. With remote environments your classes will be 
serialised, sent over the network and then deserialised and then `open()` 
methods will be called. In such setup, if you need to initialise some shared 
static resource, you also have to keep in mind that depending on the 
parallelism, number of the tasks & number of task managers, you will have to 
make sure that your static resource is initialised only once. You also should 
take care about de-initialisation of this resource & take into account what 
will happen if your code will crash with an exception. In that case your job 
might be resubmitted with out restarting the TaskManagers.

Piotrek

> On 18 Jan 2019, at 11:48, madan  wrote:
> 
> Suggestions please.
> 
> Thinking of options
> 1. Initilizing spring application context in the 'open' method. Instead of 
> loading entire context, move service related beans to one/multiple packages  
> and scan only those packages. Requires code refactoring.
> 2. Direct database query - direct query cannot be used since business logic 
> is around while fetching records
> 3. Write initially to csv and do transformation on csv. Last possible option.
> 
> Please share your thoughts.
> 
> Thank you.
> 
> On Wed, Jan 16, 2019 at 2:50 PM madan  > wrote:
> Hi,
> 
> Need help in the below scenario,
> 
> I have CustomInputFormat which loads the records using a bean,
> 
> public class CustomInputFormat extends GenericInputFormat {
>   private Iterator> recordsIterator;
>   @Override
> public void open(GenericInputSplit split) throws IOException {
>ServiceX serviceX = SpringBeanFinder.getBean(ServiceX.class);
> recordsIterator = serviceX.getRecords(..); 
>  }
> }
> The above input format works fine when using Flink LocalEnvironment in spring 
> application. Problem is when running flink in a cluster mode and trying to 
> connect to it using RemoveEnvironment. Since Spring applicaiton context will 
> not be initialized, NPE is thrown. Please suggest what could be the solution 
> in this scenario.
> 
> 
> 
> -- 
> Thank you,
> Madan.
> 
> 
> -- 
> Thank you,
> Madan.