Re: Move files read by flink

2018-03-08 Thread Jörn Franke
Why don’t you let your flink job move them once it’s done?

> On 9. Mar 2018, at 03:12, flinkuser101  wrote:
> 
> I am reading files from a folder suppose 
> 
> /files/*
> 
> Files are pushed into that folder. 
> 
> /files/file1_2018_03_09.csv
> /files/file2_2018_03_09.csv
> 
> Flink is reading files from the folder fine but as the no of files grows how
> do I move the files into another folder? Currently I am using cronjob to
> move files every 10 minutes but I get error in flink that file is not found.
> Is there anyway to move files not being read by flink?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dynamic CEP https://issues.apache.org/jira/browse/FLINK-7129?subTaskView=all

2018-03-08 Thread Dawid Wysakowicz
Hi,

Kostas is right, unfortunately I had to stop the work, cause we were missing 
BroadcastState. I hope I will get back to this feature soon and finish it for 
1.6.

> On 8 Mar 2018, at 17:28, Vishal Santoshi  wrote:
> 
> Perfect.  Thanks.
> 
> On Thu, Mar 8, 2018 at 10:41 AM, Kostas Kloudas  
> wrote:
> Hi Vishal,
> 
> Dawid (cc’ed) who was working on that stopped because in the past Flink
> did not support broadcast state.
> 
> This is now added (in the master) and the implementation of FLINK-7129
> will continue hopefully soon.
> 
> Cheers,
> Kostas
> 
> > On Mar 8, 2018, at 4:09 PM, Vishal Santoshi  
> > wrote:
> >
> > Hello Fabian,
> >
> > What about https://issues.apache.org/jira/browse/FLINK-7129 ? Do 
> > you folks intend to conclude this ticket too ?
> >
> > On Thu, Mar 8, 2018 at 1:08 AM, Fabian Hueske  wrote:
> > We hope to pick up FLIP-20 after Flink 1.5.0 has been released.
> >
> > 2018-03-07 22:05 GMT-08:00 Shailesh Jain :
> > In addition to making the addition of patterns dynamic, any updates on FLIP 
> > 20 ?
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP
> >
> > On Thu, Mar 8, 2018 at 12:23 AM, Vishal Santoshi 
> >  wrote:
> > I see https://github.com/dawidwys/flink/tree/cep-dynamic-nfa is almost 
> > there.
> >
> > On Wed, Mar 7, 2018 at 1:34 PM, Vishal Santoshi  
> > wrote:
> > What is the state of this ticket ? Is CEP invested in supporting dynamic 
> > patterns that could potentially be where patterns can be added/disabled 
> > through a control stream ?
> >
> >
> >
> >
> 
> 



signature.asc
Description: Message signed with OpenPGP


Re: Emulate Tumbling window in Event Time Space

2018-03-08 Thread Xingcan Cui
Hi Dhruv,

there’s no need to implement the window logic with the low-level 
`ProcessFunction` yourself. Flink has provided built-in window operators and 
you just need to implement the `WindowFunction` for that [1].

Best,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#window-functions
 


> On 9 Mar 2018, at 1:51 PM, Dhruv Kumar  wrote:
> 
> Hi
> 
> I was trying to emulate tumbling window in event time space. Here 
> 
>  is the link to my code.
> I am using the process function to do the custom processing which I want to 
> do within every window. I am having an issue of how to emit results at the 
> end of every window since my watermark only gets emitted at every incoming 
> event (incoming event will mostly not intersect with the end time of any 
> window). Seems like I need to add a trigger somewhere which fires at the end 
> of every window. Could any one here help me? Sorry, if I am not clear in 
> anything. I am quite new to Flink. 
> 
> Thanks
> Dhruv



Emulate Tumbling window in Event Time Space

2018-03-08 Thread Dhruv Kumar
Hi

I was trying to emulate tumbling window in event time space. Here is the link 
to my code.
I am using the process function to do the custom processing which I want to do 
within every window. I am having an issue of how to emit results at the end of 
every window since my watermark only gets emitted at every incoming event 
(incoming event will mostly not intersect with the end time of any window). 
Seems like I need to add a trigger somewhere which fires at the end of every 
window. Could any one here help me? Sorry, if I am not clear in anything. I am 
quite new to Flink. 

Thanks
Dhruv

Move files read by flink

2018-03-08 Thread flinkuser101
I am reading files from a folder suppose 

/files/*

Files are pushed into that folder. 

/files/file1_2018_03_09.csv
/files/file2_2018_03_09.csv

Flink is reading files from the folder fine but as the no of files grows how
do I move the files into another folder? Currently I am using cronjob to
move files every 10 minutes but I get error in flink that file is not found.
Is there anyway to move files not being read by flink?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


UUIDs generated by Flink SQL

2018-03-08 Thread Gregory Fee
Hello, from what I understand in the documentation it appears there is no
way to assign UUIDs to operators added to the DAG by Flink SQL. Is my
understanding correct?

I'd very much like to be able to assign UUIDs to those operators. I want to
run a program using some Flink SQL, create a save point, and then run
another program with slightly different structure that picks up from that
save point. The suggested way of making something like that work in the
document is to assign UUIDs but that doesn't seem possible if I'm using
Flink SQL. Any advice?

On a related note, I'm wondering what happens if I have a stateful program
using Flink SQL and I want to update my Flink binaries. If the query plan
ends up changing based on that upgrade does it mean that the load of the
save point is going to fail?

Thanks!

-- 
*Gregory Fee*
Engineer
425.830.4734 <+14258304734>
[image: Lyft] 


Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

2018-03-08 Thread Yan Zhou [FDS Science]
Hi Xingcan, Timo,

Thanks for the information.
I am going to convert the result table to DataStream and follow the logic of 
TimeBoundedStreamInnerJoin to do the timed-window join. Should I do this? Is 
there any concern from performance or stability perspective?

Best
Yan



From: Xingcan Cui 
Sent: Thursday, March 8, 2018 8:21:42 AM
To: Timo Walther
Cc: user; Yan Zhou [FDS Science]
Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError 
on rowtime column

Hi Yan & Timo,

this is confirmed to be a bug and I’ve created an issue [1] for it.

I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT 
keyword will be implemented with an aggregation, which outputs a retract stream 
[2]. In that situation, all the time-related fields will be materialized as if 
they were common fields (with the timestamp type). Currently, due to the 
semantics problem, the time-windowed join cannot be performed on retract 
streams. But you could try non-windowed join [3] after we fix this.

Best,
Xingcan

[1] https://issues.apache.org/jira/browse/FLINK-8897
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins

On 8 Mar 2018, at 8:59 PM, Timo Walther 
mailto:twal...@apache.org>> wrote:

Hi Xingcan,

thanks for looking into this. This definitely seems to be a bug. Maybe in the 
org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we should 
create an issue for it.

Regards,
Timo


Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
Hi Xingcan,

Thanks for your help. Attached is a sample code that can reproduce the problem.
When I was writing the sample code, if I remove the `distinct` keyword in 
select clause, the AssertionError doesn't occur.

String sql1 = "select distinct id, eventTs, count(*) over (partition by id 
order by eventTs rows between 100 preceding and current row) as cnt1 from 
myTable";

Best
Yan

From: xccui-foxmail 
Sent: Wednesday, March 7, 2018 8:10 PM
To: Yan Zhou [FDS Science]
Cc: user@flink.apache.org
Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError 
on rowtime column

Hi Yan,

I’d like to look into this. Can you share more about your queries and the full 
stack trace?

Thank,
Xingcan

On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] 
mailto:yz...@coupang.com>> wrote:

Hi experts,
I am using flink table api to join two tables, which are datastream underneath. 
However, I got an assertion error of "java.lang.AssertionError: mismatched type 
$1 TIMESTAMP(3)" on rowtime column. Below is more details:

There in only one kafka data source, which is then converted to Table and 
registered. One existed column is set as rowtime(event time) attribute. Two 
over-window aggregation queries are run against the table and two tables are 
created as results. Everything works great so far.
However when timed-window joining two result tables with inherented rowtime, 
calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" 
AssertionError. Can someone let me know what is the possible cause? F.Y.I., I 
rename the rowtime column for one of the result table.


DataStream dataStream = env.addSource(kafkaConsumer);

Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);

tableEnv.registerTable(tableName, table);

Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  from 
...");

Table right = tableEnv.sqlQuery("select id as r_id, eventTime as r_event_time, 
count (*) over ...  from ...");

left.join(right).where("id = r_id && eventTime === r_event_time)

.addSink(...); // here calcite throw exception: java.lang.AssertionError: 
mismatched type $1 TIMESTAMP(3)

source table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_1 table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_2 table
 |-- rid: Long
 |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
 |-- ...


Best
Yan






Re: Event time join

2018-03-08 Thread Vishal Santoshi
Yep.  I think this leads to this general question and may be not pertinent
to https://github.com/apache/flink/pull/5342.  How do we throttle a source
if the held back data gets unreasonably large ? I know that that is in
itself a broader question but delayed watermarks of slow stream accentuates
the issue . I am curious to know how credit based back pressure handling
plays or is that outside the realm of this discussion ? And is credit
based back
pressure handling in 1.5 release.

On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske  wrote:

> The join would not cause backpressure but rather put all events that
> cannot be processed yet into state to process them later.
> So this works well if the data that is provided by the streams is roughly
> aligned by event time.
>
> 2018-03-08 9:04 GMT-08:00 Vishal Santoshi :
>
>> Aah we have it here https://docs.google.com/d
>> ocument/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#
>> heading=h.bgl260hr56g6
>>
>> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This is very interesting.  I would imagine that there will be high back
>>> pressure on the LEFT source effectively throttling it but as is the current
>>> state that is likely effect other pipelines as the free o/p buffer on the
>>> source side and and i/p buffers on the consumer side start blocking and get
>>> exhausted for all other pipes. I am very interested in how holding back the
>>> busy source does not create a pathological  issue where that source is
>>> forever held back. Is there a FLIP for it ?
>>>
>>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Gytis,

 Flink does currently not support holding back individual streams, for
 example it is not possible to align streams on (offset) event-time.

 However, the Flink community is working on a windowed join for the
 DataStream API, that only holds the relevant tail of the stream as state.
 If your join condition is +/- 5 minutes then, the join would store he
 last five minutes of both streams as state. Here's an implementation of the
 operator [1] that is close to be merged and will be available in Flink
 1.6.0.
 Flink's SQL support (and Table API) support this join type since
 version 1.4.0 [2].

 Best, Fabian

 [1] https://github.com/apache/flink/pull/5342
 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
 dev/table/sql.html#joins

 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas :

> Hi,
>
> we're considering flink for a couple of our projects. I'm doing a
> trial implementation for one of them. So far, I like a lot of things,
> however there are a couple of issues that I can't figure out how to
> resolve. Not sure if it's me misunderstanding the tool, or flink just
> doesn't have a capability to do it.
>
> We want to do an event time join on two big kafka streams. Both of
> them might experience some issues on the other end and be delayed.
> Additionally, while both are big, one (let's call it stream A) is
> significantly larger than stream B.
>
> We also know, that the join window is around 5min. That is, given some
> key K in stream B, if there is a counterpart in stream A, it's going
> to be +/5 5min in event time.
>
> Since stream A is especially heavy and it's unfeasable to keep hours
> of it in memory, I would imagine an ideal solution where we read both
> streams from Kafka. We always make sure that stream B is ahead by
> 10min, that is, if stream A is currently ahead in watermarks, we stall
> it and consume stream B until it catches up. Once the stream are
> alligned in event time (with the 10min delay window) we run them both
> through join.
>
> The problem is, that I find a mechanism to implement that in flink. If
> I try to do a CoProcessFunction then it just consumes both streams at
> the same time, ingests a lot of messages from stream A, runs out of
> memory and dies.
>
> Any ideas on how this could be solved?
>
> (here's a thread with a very similar problem from some time ago
> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
> ble.com/synchronizing-two-streams-td6830.html)
>
> Regards,
> Gytis
>


>>>
>>
>


Re: Event time join

2018-03-08 Thread Fabian Hueske
The join would not cause backpressure but rather put all events that cannot
be processed yet into state to process them later.
So this works well if the data that is provided by the streams is roughly
aligned by event time.

2018-03-08 9:04 GMT-08:00 Vishal Santoshi :

> Aah we have it here https://docs.google.com/document/d/16GMH5VM6JJiWj_
> N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
>
> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> This is very interesting.  I would imagine that there will be high back
>> pressure on the LEFT source effectively throttling it but as is the current
>> state that is likely effect other pipelines as the free o/p buffer on the
>> source side and and i/p buffers on the consumer side start blocking and get
>> exhausted for all other pipes. I am very interested in how holding back the
>> busy source does not create a pathological  issue where that source is
>> forever held back. Is there a FLIP for it ?
>>
>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske  wrote:
>>
>>> Hi Gytis,
>>>
>>> Flink does currently not support holding back individual streams, for
>>> example it is not possible to align streams on (offset) event-time.
>>>
>>> However, the Flink community is working on a windowed join for the
>>> DataStream API, that only holds the relevant tail of the stream as state.
>>> If your join condition is +/- 5 minutes then, the join would store he
>>> last five minutes of both streams as state. Here's an implementation of the
>>> operator [1] that is close to be merged and will be available in Flink
>>> 1.6.0.
>>> Flink's SQL support (and Table API) support this join type since version
>>> 1.4.0 [2].
>>>
>>> Best, Fabian
>>>
>>> [1] https://github.com/apache/flink/pull/5342
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/table/sql.html#joins
>>>
>>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas :
>>>
 Hi,

 we're considering flink for a couple of our projects. I'm doing a
 trial implementation for one of them. So far, I like a lot of things,
 however there are a couple of issues that I can't figure out how to
 resolve. Not sure if it's me misunderstanding the tool, or flink just
 doesn't have a capability to do it.

 We want to do an event time join on two big kafka streams. Both of
 them might experience some issues on the other end and be delayed.
 Additionally, while both are big, one (let's call it stream A) is
 significantly larger than stream B.

 We also know, that the join window is around 5min. That is, given some
 key K in stream B, if there is a counterpart in stream A, it's going
 to be +/5 5min in event time.

 Since stream A is especially heavy and it's unfeasable to keep hours
 of it in memory, I would imagine an ideal solution where we read both
 streams from Kafka. We always make sure that stream B is ahead by
 10min, that is, if stream A is currently ahead in watermarks, we stall
 it and consume stream B until it catches up. Once the stream are
 alligned in event time (with the 10min delay window) we run them both
 through join.

 The problem is, that I find a mechanism to implement that in flink. If
 I try to do a CoProcessFunction then it just consumes both streams at
 the same time, ingests a lot of messages from stream A, runs out of
 memory and dies.

 Any ideas on how this could be solved?

 (here's a thread with a very similar problem from some time ago
 http://apache-flink-user-mailing-list-archive.2336050.n4.nab
 ble.com/synchronizing-two-streams-td6830.html)

 Regards,
 Gytis

>>>
>>>
>>
>


Re: Event time join

2018-03-08 Thread Vishal Santoshi
Aah we have it here
https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6

On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi 
wrote:

> This is very interesting.  I would imagine that there will be high back
> pressure on the LEFT source effectively throttling it but as is the current
> state that is likely effect other pipelines as the free o/p buffer on the
> source side and and i/p buffers on the consumer side start blocking and get
> exhausted for all other pipes. I am very interested in how holding back the
> busy source does not create a pathological  issue where that source is
> forever held back. Is there a FLIP for it ?
>
> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske  wrote:
>
>> Hi Gytis,
>>
>> Flink does currently not support holding back individual streams, for
>> example it is not possible to align streams on (offset) event-time.
>>
>> However, the Flink community is working on a windowed join for the
>> DataStream API, that only holds the relevant tail of the stream as state.
>> If your join condition is +/- 5 minutes then, the join would store he
>> last five minutes of both streams as state. Here's an implementation of the
>> operator [1] that is close to be merged and will be available in Flink
>> 1.6.0.
>> Flink's SQL support (and Table API) support this join type since version
>> 1.4.0 [2].
>>
>> Best, Fabian
>>
>> [1] https://github.com/apache/flink/pull/5342
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/sql.html#joins
>>
>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas :
>>
>>> Hi,
>>>
>>> we're considering flink for a couple of our projects. I'm doing a
>>> trial implementation for one of them. So far, I like a lot of things,
>>> however there are a couple of issues that I can't figure out how to
>>> resolve. Not sure if it's me misunderstanding the tool, or flink just
>>> doesn't have a capability to do it.
>>>
>>> We want to do an event time join on two big kafka streams. Both of
>>> them might experience some issues on the other end and be delayed.
>>> Additionally, while both are big, one (let's call it stream A) is
>>> significantly larger than stream B.
>>>
>>> We also know, that the join window is around 5min. That is, given some
>>> key K in stream B, if there is a counterpart in stream A, it's going
>>> to be +/5 5min in event time.
>>>
>>> Since stream A is especially heavy and it's unfeasable to keep hours
>>> of it in memory, I would imagine an ideal solution where we read both
>>> streams from Kafka. We always make sure that stream B is ahead by
>>> 10min, that is, if stream A is currently ahead in watermarks, we stall
>>> it and consume stream B until it catches up. Once the stream are
>>> alligned in event time (with the 10min delay window) we run them both
>>> through join.
>>>
>>> The problem is, that I find a mechanism to implement that in flink. If
>>> I try to do a CoProcessFunction then it just consumes both streams at
>>> the same time, ingests a lot of messages from stream A, runs out of
>>> memory and dies.
>>>
>>> Any ideas on how this could be solved?
>>>
>>> (here's a thread with a very similar problem from some time ago
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/synchronizing-two-streams-td6830.html)
>>>
>>> Regards,
>>> Gytis
>>>
>>
>>
>


Re: Event time join

2018-03-08 Thread Vishal Santoshi
This is very interesting.  I would imagine that there will be high back
pressure on the LEFT source effectively throttling it but as is the current
state that is likely effect other pipelines as the free o/p buffer on the
source side and and i/p buffers on the consumer side start blocking and get
exhausted for all other pipes. I am very interested in how holding back the
busy source does not create a pathological  issue where that source is
forever held back. Is there a FLIP for it ?

On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske  wrote:

> Hi Gytis,
>
> Flink does currently not support holding back individual streams, for
> example it is not possible to align streams on (offset) event-time.
>
> However, the Flink community is working on a windowed join for the
> DataStream API, that only holds the relevant tail of the stream as state.
> If your join condition is +/- 5 minutes then, the join would store he last
> five minutes of both streams as state. Here's an implementation of the
> operator [1] that is close to be merged and will be available in Flink
> 1.6.0.
> Flink's SQL support (and Table API) support this join type since version
> 1.4.0 [2].
>
> Best, Fabian
>
> [1] https://github.com/apache/flink/pull/5342
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/table/sql.html#joins
>
> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas :
>
>> Hi,
>>
>> we're considering flink for a couple of our projects. I'm doing a
>> trial implementation for one of them. So far, I like a lot of things,
>> however there are a couple of issues that I can't figure out how to
>> resolve. Not sure if it's me misunderstanding the tool, or flink just
>> doesn't have a capability to do it.
>>
>> We want to do an event time join on two big kafka streams. Both of
>> them might experience some issues on the other end and be delayed.
>> Additionally, while both are big, one (let's call it stream A) is
>> significantly larger than stream B.
>>
>> We also know, that the join window is around 5min. That is, given some
>> key K in stream B, if there is a counterpart in stream A, it's going
>> to be +/5 5min in event time.
>>
>> Since stream A is especially heavy and it's unfeasable to keep hours
>> of it in memory, I would imagine an ideal solution where we read both
>> streams from Kafka. We always make sure that stream B is ahead by
>> 10min, that is, if stream A is currently ahead in watermarks, we stall
>> it and consume stream B until it catches up. Once the stream are
>> alligned in event time (with the 10min delay window) we run them both
>> through join.
>>
>> The problem is, that I find a mechanism to implement that in flink. If
>> I try to do a CoProcessFunction then it just consumes both streams at
>> the same time, ingests a lot of messages from stream A, runs out of
>> memory and dies.
>>
>> Any ideas on how this could be solved?
>>
>> (here's a thread with a very similar problem from some time ago
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/synchronizing-two-streams-td6830.html)
>>
>> Regards,
>> Gytis
>>
>
>


Re: Event time join

2018-03-08 Thread Fabian Hueske
Hi Gytis,

Flink does currently not support holding back individual streams, for
example it is not possible to align streams on (offset) event-time.

However, the Flink community is working on a windowed join for the
DataStream API, that only holds the relevant tail of the stream as state.
If your join condition is +/- 5 minutes then, the join would store he last
five minutes of both streams as state. Here's an implementation of the
operator [1] that is close to be merged and will be available in Flink
1.6.0.
Flink's SQL support (and Table API) support this join type since version
1.4.0 [2].

Best, Fabian

[1] https://github.com/apache/flink/pull/5342
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins

2018-03-08 1:02 GMT-08:00 Gytis Žilinskas :

> Hi,
>
> we're considering flink for a couple of our projects. I'm doing a
> trial implementation for one of them. So far, I like a lot of things,
> however there are a couple of issues that I can't figure out how to
> resolve. Not sure if it's me misunderstanding the tool, or flink just
> doesn't have a capability to do it.
>
> We want to do an event time join on two big kafka streams. Both of
> them might experience some issues on the other end and be delayed.
> Additionally, while both are big, one (let's call it stream A) is
> significantly larger than stream B.
>
> We also know, that the join window is around 5min. That is, given some
> key K in stream B, if there is a counterpart in stream A, it's going
> to be +/5 5min in event time.
>
> Since stream A is especially heavy and it's unfeasable to keep hours
> of it in memory, I would imagine an ideal solution where we read both
> streams from Kafka. We always make sure that stream B is ahead by
> 10min, that is, if stream A is currently ahead in watermarks, we stall
> it and consume stream B until it catches up. Once the stream are
> alligned in event time (with the 10min delay window) we run them both
> through join.
>
> The problem is, that I find a mechanism to implement that in flink. If
> I try to do a CoProcessFunction then it just consumes both streams at
> the same time, ingests a lot of messages from stream A, runs out of
> memory and dies.
>
> Any ideas on how this could be solved?
>
> (here's a thread with a very similar problem from some time ago
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/synchronizing-two-streams-td6830.html)
>
> Regards,
> Gytis
>


Re: Dynamic CEP https://issues.apache.org/jira/browse/FLINK-7129?subTaskView=all

2018-03-08 Thread Vishal Santoshi
Perfect.  Thanks.

On Thu, Mar 8, 2018 at 10:41 AM, Kostas Kloudas  wrote:

> Hi Vishal,
>
> Dawid (cc’ed) who was working on that stopped because in the past Flink
> did not support broadcast state.
>
> This is now added (in the master) and the implementation of FLINK-7129
> will continue hopefully soon.
>
> Cheers,
> Kostas
>
> > On Mar 8, 2018, at 4:09 PM, Vishal Santoshi 
> wrote:
> >
> > Hello Fabian,
> >
> > What about https://issues.apache.org/jira/browse/FLINK-7129 ?
> Do you folks intend to conclude this ticket too ?
> >
> > On Thu, Mar 8, 2018 at 1:08 AM, Fabian Hueske  wrote:
> > We hope to pick up FLIP-20 after Flink 1.5.0 has been released.
> >
> > 2018-03-07 22:05 GMT-08:00 Shailesh Jain :
> > In addition to making the addition of patterns dynamic, any updates on
> FLIP 20 ?
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 20%3A+Integration+of+SQL+and+CEP
> >
> > On Thu, Mar 8, 2018 at 12:23 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> > I see https://github.com/dawidwys/flink/tree/cep-dynamic-nfa is almost
> there.
> >
> > On Wed, Mar 7, 2018 at 1:34 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
> > What is the state of this ticket ? Is CEP invested in supporting dynamic
> patterns that could potentially be where patterns can be added/disabled
> through a control stream ?
> >
> >
> >
> >
>
>


Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

2018-03-08 Thread Xingcan Cui
Hi Yan & Timo,

this is confirmed to be a bug and I’ve created an issue [1] for it.

I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT 
keyword will be implemented with an aggregation, which outputs a retract stream 
[2]. In that situation, all the time-related fields will be materialized as if 
they were common fields (with the timestamp type). Currently, due to the 
semantics problem, the time-windowed join cannot be performed on retract 
streams. But you could try non-windowed join [3] after we fix this.

Best,
Xingcan

[1] https://issues.apache.org/jira/browse/FLINK-8897 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
 

[3] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins 


> On 8 Mar 2018, at 8:59 PM, Timo Walther  wrote:
> 
> Hi Xingcan,
> 
> thanks for looking into this. This definitely seems to be a bug. Maybe in the 
> org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we 
> should create an issue for it.
> 
> Regards,
> Timo
> 
> 
> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>> Hi Xingcan,
>> 
>> Thanks for your help. Attached is a sample code that can reproduce the 
>> problem.
>> When I was writing the sample code, if I remove the `distinct` keyword in 
>> select clause, the AssertionError doesn't occur.
>> 
>> String sql1 = "select distinct id, eventTs, count(*) over (partition by id 
>> order by eventTs rows between 100 preceding and current row) as cnt1 from 
>> myTable";
>> 
>> Best
>> Yan
>> From: xccui-foxmail  
>> Sent: Wednesday, March 7, 2018 8:10 PM
>> To: Yan Zhou [FDS Science]
>> Cc: user@flink.apache.org 
>> Subject: Re: flink sql timed-window join throw "mismatched type" 
>> AssertionError on rowtime column
>>  
>> Hi Yan,
>> 
>> I’d like to look into this. Can you share more about your queries and the 
>> full stack trace?
>> 
>> Thank,
>> Xingcan
>> 
>>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] >> > wrote:
>>> 
>>> Hi experts, 
>>> I am using flink table api to join two tables, which are datastream 
>>> underneath. However, I got an assertion error of "java.lang.AssertionError: 
>>> mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
>>> 
>>> There in only one kafka data source, which is then converted to Table and 
>>> registered. One existed column is set as rowtime(event time) attribute. Two 
>>> over-window aggregation queries are run against the table and two tables 
>>> are created as results. Everything works great so far.
>>> However when timed-window joining two result tables with inherented 
>>> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 
>>> TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible 
>>> cause? F.Y.I., I rename the rowtime column for one of the result table.  
>>> 
>>> DataStream dataStream = env.addSource(kafkaConsumer);
>>> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>>> tableEnv.registerTable(tableName, table);
>>> Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  
>>> from ...");
>>> Table right = tableEnv.sqlQuery("select id as r_id, eventTime as 
>>> r_event_time, count (*) over ...  from ...");
>>> left.join(right).where("id = r_id && eventTime === r_event_time)
>>> .addSink(...); // here calcite throw exception: java.lang.AssertionError: 
>>> mismatched type $1 TIMESTAMP(3) 
>>> 
>>> source table
>>>  |-- id: Long
>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>>  |-- ...
>>>  
>>> result_1 table
>>>  |-- id: Long
>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>>  |-- ...
>>>  
>>> result_2 table
>>>  |-- rid: Long
>>>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>> 
>>> 
>>> Best
>>> Yan
>> 
> 
> 



Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

2018-03-08 Thread Hequn Cheng
Hi Yan,

This is a bug in flink. As a workaround, you can cast eventTime to other
basic sql types(for example, cast eventTime as varchar).

@Timo and @Xingcan, I think we have to materialize time indicators in
conditions of LogicalFilter. I created an issue and we can have
more discussions there[1].

[1] https://issues.apache.org/jira/browse/FLINK-8898

Best, Hequn

On Thu, Mar 8, 2018 at 8:59 PM, Timo Walther  wrote:

> Hi Xingcan,
>
> thanks for looking into this. This definitely seems to be a bug. Maybe in
> the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case
> we should create an issue for it.
>
> Regards,
> Timo
>
>
> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>
> Hi Xingcan,
>
>
> Thanks for your help. Attached is a sample code that can reproduce the
> problem.
>
> When I was writing the sample code, if I remove the `distinct` keyword in
> select clause, the AssertionError doesn't occur.
>
>
> *String sql1 = "select distinct id, eventTs, count(*) over (partition by
> id order by eventTs rows between 100 preceding and current row) as cnt1
> from myTable";*
>
>
> Best
> Yan
> --
> *From:* xccui-foxmail  
> *Sent:* Wednesday, March 7, 2018 8:10 PM
> *To:* Yan Zhou [FDS Science]
> *Cc:* user@flink.apache.org
> *Subject:* Re: flink sql timed-window join throw "mismatched type"
> AssertionError on rowtime column
>
> Hi Yan,
>
> I’d like to look into this. Can you share more about your queries and the
> full stack trace?
>
> Thank,
> Xingcan
>
> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] 
> wrote:
>
> Hi experts,
> I am using flink table api to join two tables, which are datastream
> underneath. However, I got an assertion error of "java.lang.AssertionError:
> mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
>
> There in only one kafka data source, which is then converted to Table and
> registered. One existed column is set as rowtime(event time) attribute. Two
> over-window aggregation queries are run against the table and two tables
> are created as results. Everything works great so far.
> However when timed-window joining two result tables with inherented
> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1
> TIMESTAMP(3)" AssertionError. Can someone let me know what is the
> possible cause? F.Y.I., I rename the rowtime column for one of the result
> table.
>
> DataStream dataStream = env.addSource(kafkaConsumer);
>
> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>
> tableEnv.registerTable(tableName, table);
>
> Table left = tableEnv.sqlQuery("select id, *eventTime*,count (*) over
> ...  from ...");
>
> Table right = tableEnv.sqlQuery("select id as r_id, *eventTime as
> r_event_time*, count (*) over ...  from ...");
>
> left.join(right).where("id = r_id && eventTime === r_event_time)
>
> .addSink(...); // here calcite throw exception: java.lang.AssertionError:
> mismatched type $1 TIMESTAMP(3)
>
> source table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>
> result_1 table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>
> result_2 table
>  |-- rid: Long
>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>
>
> Best
> Yan
>
>
>
>


Re: Dynamic CEP https://issues.apache.org/jira/browse/FLINK-7129?subTaskView=all

2018-03-08 Thread Kostas Kloudas
Hi Vishal,

Dawid (cc’ed) who was working on that stopped because in the past Flink 
did not support broadcast state.

This is now added (in the master) and the implementation of FLINK-7129 
will continue hopefully soon.

Cheers,
Kostas

> On Mar 8, 2018, at 4:09 PM, Vishal Santoshi  wrote:
> 
> Hello Fabian, 
> 
> What about https://issues.apache.org/jira/browse/FLINK-7129 ? Do you 
> folks intend to conclude this ticket too ? 
> 
> On Thu, Mar 8, 2018 at 1:08 AM, Fabian Hueske  wrote:
> We hope to pick up FLIP-20 after Flink 1.5.0 has been released.
> 
> 2018-03-07 22:05 GMT-08:00 Shailesh Jain :
> In addition to making the addition of patterns dynamic, any updates on FLIP 
> 20 ? 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP
> 
> On Thu, Mar 8, 2018 at 12:23 AM, Vishal Santoshi  
> wrote:
> I see https://github.com/dawidwys/flink/tree/cep-dynamic-nfa is almost there. 
> 
> On Wed, Mar 7, 2018 at 1:34 PM, Vishal Santoshi  
> wrote:
> What is the state of this ticket ? Is CEP invested in supporting dynamic 
> patterns that could potentially be where patterns can be added/disabled 
> through a control stream ? 
> 
> 
> 
> 



Re: Dynamic CEP https://issues.apache.org/jira/browse/FLINK-7129?subTaskView=all

2018-03-08 Thread Vishal Santoshi
Hello Fabian,

What about https://issues.apache.org/jira/browse/FLINK-7129 ? Do
you folks intend to conclude this ticket too ?

On Thu, Mar 8, 2018 at 1:08 AM, Fabian Hueske  wrote:

> We hope to pick up FLIP-20 after Flink 1.5.0 has been released.
>
> 2018-03-07 22:05 GMT-08:00 Shailesh Jain :
>
>> In addition to making the addition of patterns dynamic, any updates on
>> FLIP 20 ?
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%
>> 3A+Integration+of+SQL+and+CEP
>>
>> On Thu, Mar 8, 2018 at 12:23 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I see https://github.com/dawidwys/flink/tree/cep-dynamic-nfa is almost
>>> there.
>>>
>>> On Wed, Mar 7, 2018 at 1:34 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 What is the state of this ticket ? Is CEP invested in supporting
 dynamic patterns that could potentially be where patterns can be
 added/disabled through a control stream ?

>>>
>>>
>>
>


Re: Job is be cancelled, but the stdout log still prints

2018-03-08 Thread sundy
I got it. That’s really a big problem.

Thank you very much

> On 8 Mar 2018, at 21:03, kedar mhaswade  wrote:
> 
> Also, in addition to what Gary said, if you take Flink completely out of 
> picture and wrote a simple Java class with a main method and the static block 
> (!) which does some long running task like getLiveInfo(), then chances are 
> that your class will make the JVM hang!
> 
> Basically what you are doing is start a bunch of threads (which are perhaps 
> non-daemon by default) and leave them running. Since there is at least one 
> non-daemon thread that is running, the JVM is not allowed to shut down, 
> causing the hang.
> 
> Regards,
> Kedar



Re: Job is be cancelled, but the stdout log still prints

2018-03-08 Thread kedar mhaswade
Also, in addition to what Gary said, if you take Flink completely out of
picture and wrote a simple Java class with a main method and the static
block (!) which does some long running task like getLiveInfo(), then
chances are that your class will make the JVM hang!

Basically what you are doing is start a bunch of threads (which are perhaps
non-daemon by default) and leave them running. Since there is at least one
non-daemon thread that is running, the JVM is not allowed to shut down,
causing the hang.

Regards,
Kedar


On Thu, Mar 8, 2018 at 3:15 AM, Gary Yao  wrote:

> Hi,
>
> You are not shutting down the ScheduledExecutorService [1], which means
> that
> after job cancelation the thread will continue running getLiveInfo(). The
> user
> code class loader, and your classes won't be garbage collected. You should
> use
> the RichFunction#close callback to shutdown your thread pool [2].
>
> Best,
> Gary
>
> [1] https://stackoverflow.com/questions/10504172/how-to-
> shutdown-an-executorservice
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/api_concepts.html#rich-functions
>
>
> On Thu, Mar 8, 2018 at 3:11 AM, sundy <543950...@qq.com> wrote:
>
>>
>> Hi:
>>
>> I faced a problem, the taskmanagers in 3 nodes are still running, I make
>> sure that all job are cancelled,  but I could see that stdout logs are
>> still printing all the way. The job's parallelism is 6.
>>
>> I wrote a scheduled pool like this
>>
>> static {
>>   Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
>> @Override
>> public void run() {
>>   try {
>> getLiveInfo();
>>   } catch (Exception e) {
>> e.printStackTrace();
>>   }
>> }
>>   }, 0, 60, TimeUnit.SECONDS);
>> }
>>
>> Is that the static methods will still be running in the taskmanagers even
>> if the job is cancelled? That’s weird.
>>
>
>


Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

2018-03-08 Thread Timo Walther

Hi Xingcan,

thanks for looking into this. This definitely seems to be a bug. Maybe 
in the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any 
case we should create an issue for it.


Regards,
Timo


Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:


Hi Xingcan,


Thanks for your help. Attached is a sample code that can reproduce the 
problem.


When I was writing the sample code, if I remove the `distinct` keyword 
in select clause, the AssertionError doesn't occur.



/String sql1 = "select *distinct* id, eventTs, count(*) over
(partition by id order by eventTs rows between 100 preceding and
current row) as cnt1 from myTable";/


Best
Yan

*From:* xccui-foxmail 
*Sent:* Wednesday, March 7, 2018 8:10 PM
*To:* Yan Zhou [FDS Science]
*Cc:* user@flink.apache.org
*Subject:* Re: flink sql timed-window join throw "mismatched type" 
AssertionError on rowtime column

Hi Yan,

I’d like to look into this. Can you share more about your queries and 
the full stack trace?


Thank,
Xingcan

On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] > wrote:


Hi experts,
I am using flink table api to join two tables, which are datastream 
underneath. However, I got an assertion error 
of"java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on 
rowtime column. Below is more details:


There in only one kafka data source, which is then converted to Table 
and registered. One existed column is set as rowtime(event time) 
attribute. Two over-window aggregation queries are run against the 
table and two tables are created as results. Everything works great 
so far.
However when timed-window joining two result tables with inherented 
rowtime, calcite throw the "java.lang.AssertionError: mismatched type 
$1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the 
possible cause? F.Y.I., I rename the rowtime column for one of the 
result table.


DataStream dataStream = env.addSource(kafkaConsumer);

Table table = tableEnv.fromDataStream(dataStream, "col1", "col2",
...);

tableEnv.registerTable(tableName, table);

Table left = tableEnv.sqlQuery("select id,*eventTime*,count (*)
over ...  from ...");

Table right = tableEnv.sqlQuery("select id as r_id,*eventTime as
r_event_time*, count (*) over ...  from ...");

left.join(right).where("id = r_id && eventTime === r_event_time)

.addSink(...); // here calcite throw exception:
java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)

source table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...
result_1 table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...
result_2 table
 |-- rid: Long
 |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
 |-- ...


Best
Yan






Re: Table Api and CSV builder

2018-03-08 Thread Timo Walther

Hi Karim,

the CsvTableSource and its builder are currently not able to specify 
event-time or processing-time. I'm sure this will change in the near 
future. Until then I would recommend to either extend it yourself or use 
the DataStream API first to do the parsing and watermarking and then 
convert it into a table.


This issue covers your case: 
https://issues.apache.org/jira/browse/FLINK-8536


Regards,
Timo



Am 3/7/18 um 4:23 PM schrieb Stefan Richter:

Hi,

I think you just need to specify a custom watermark extractor that 
constructs the watermark from the 3 fields, as described here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamp_extractors.html.


Best,
Stefan

Am 07.03.2018 um 00:52 schrieb Karim Amer >:


Hi there

I Have a CSV file with the timestamp deconstructed into 3 fields and 
I was wondering what is the best way to  specify the those 3 fields 
are the event  time ? Should I make extend  CsvTableSource and do the 
preprocessing or can CsvTableSource.builder() handle it. Or is there 
a better way in general to tackle this obstacle.


Thanks







Re: Flink UI not responding on Yarn + Flink

2018-03-08 Thread Gary Yao
Hi Samar,

Can you share the JobManager and TaskManager logs returned by:

  yarn logs -applicationId ?

Is your browser rendering a blank page, or does the HTTP request not finish?
Can you show the output of one of the following commands:

  curl -v http://host:port
  curl -v http://host:port/jobs

Best,
Gary

On Wed, Mar 7, 2018 at 5:51 PM, samar kumar 
wrote:

> Hi Users,
>  I am currently trying to run flink with yarn. I have written a program
> which will launch flink jobs on yarn . It works perfectly fine with local
> yarn cluster . When I run it on a remote yarn cluster it does not work as
> expected. As I am reading and writing data from kafka , I do not see any
> data on the destination kafka topic. The yarn log goes print the
> transformation I want to apply. I am not able to debug the issue because
> there is no error in the yarn logs. Also the flink application UI seem to
> be blank , and does not give the usual flink job list. Please help me debug
> and fix the issue.
>
> Running Flink 1.3.2 with yarn 2.6
> Thanks,
> Samar
>


Re: Flink Kafka reads too many bytes .... Very rarely

2018-03-08 Thread Stephan Ewen
Double checking: The "deserialize(byte[] message)" already receives an
additional byte[] with too many bytes?

I wonder if this might be an issue in Kafka then, or in the specific way
Kafka is configured.

On Wed, Mar 7, 2018 at 5:40 PM, Philip Doctor 
wrote:

> Hi Stephan,
>
> Sorry for the slow response.
>
>
>
> I added some logging inside of my DeserializationSchema’s
> `deserialize(byte[] message)` method.
>
>
>
> I see the extra bytes appearing in that method.
>
>
>
> If there’s another place I should add logging, please let me know and I’m
> happy to do so.
>
>
>
> Additionally (and this is weird), I write all my messages to the DB, so I
> was looking for what messages didn’t make it (i.e. input message 1->10,000
> which of those isn’t in the DB).  Turns out all 10k are in the DB.  I’m not
> sure if that indicates this message is read and then retried, or what.  I
> would have guessed that somehow extra data got written to my topic, but
> kafka tool tell me otherwise.  So from my application’s perspective it just
> looks like I get extra garbage data every now and then.
>
>
>
> This is actually a big relief, I toss out the garbage data and keep
> rolling.
>
>
>
> I hope this helps, thank you.
>
>
>
>
>
>
>
> *From: *Stephan Ewen 
> *Date: *Thursday, March 1, 2018 at 9:26 AM
> *To: *"user@flink.apache.org" 
> *Cc: *Philip Doctor 
> *Subject: *Re: Flink Kafka reads too many bytes  Very rarely
>
>
>
> Can you specify exactly where you have that excess of data?
>
>
>
> Flink uses basically Kafka's standard consumer and passes byte[]
> unmodified to the DeserializationSchema. Can you help us check whether the 
> "too
> many bytes" happens already before or after the DeserializationSchema?
>
>
>
>   - If the "too many bytes" already arrive at the DeserializationSchema,
> then we should dig into the way that Kafka's consumer is configured
>
>
>
>   - If the "too many bytes" appears after the  DeserializationSchema, then
> we should look into the DeserializationSchema, for example whether it is
> stateful, accidentally shared across threads, etc.
>
>
>
>
>
>
>
> On Thu, Mar 1, 2018 at 11:08 AM, Fabian Hueske  wrote:
>
> Hi Phil,
>
> I've created a JIRA ticket for the problem that you described and linked
> it to this thread: FLINK-8820.
>
> Thank you, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8820
>
>
>
> 2018-02-28 5:13 GMT+01:00 Philip Doctor :
>
>
>- The fact that I seem to get all of my data is currently leading me
>to discard and ignore this error
>
>
>
> Please ignore this statement, I typed this email as I was testing a
> theory, I meant to delete this line.  This is still a very real issue for
> me.  I was looking to try a work around tomorrow, I saw that the Kafka 11
> consumer supported transactions for exactly once processing, I was going to
> read about that and see if I could somehow fail a read that I couldn’t
> deserialize and try again, and if that might make a difference (can I just
> retry this ?).  I’m not sure how that’ll go.  If you’ve got an idea for a
> work around, I’d be all ears too.
>
>
>
>
>
> *From: *Philip Doctor 
> *Date: *Tuesday, February 27, 2018 at 10:02 PM
> *To: *"Tzu-Li (Gordon) Tai" , Fabian Hueske <
> fhue...@gmail.com>
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Flink Kafka reads too many bytes  Very rarely
>
>
>
> Honestly this has been a very frustrating issue to dig in to.  The fact
> that I seem to get all of my data is currently leading me to discard and
> ignore this error, it’s rare, flink still seems to work, but something is
> very hard to debug here and despite some confusing observations, most of my
> evidence suggests that this originates in the flink kafka consumer.
>
>
>
>
>


Re: Flink is looking for Kafka topic "n/a"

2018-03-08 Thread Nico Kruber
I think, I found a code path (race between threads) that may lead to two
markers being in the list.

I created https://issues.apache.org/jira/browse/FLINK-8896 to track this
and will have a pull request ready (probably) today.


Nico

On 07/03/18 10:09, Mu Kong wrote:
> Hi Gordon,
> 
> Thanks for your response.
> I think I've misspoken about the failure after "n/a" exception.
> The behavior after this exception would be:
> 
> switched from RUNNING to CANCELING
> switched from CANCELING to CANCELED
> Try to restart or fail the job "X" () if no
> longer possible.
> switched from state FAILING to RESTARTING
> Restarting the job "X" ()
> Recovering checkpoints from ZooKeeper
> Found 1 checkpoints in ZooKeeper
> Trying to retrieve checkpoint 1091
> Restoring from latest valid checkpoint: Checkpoint 1091 @
>  for 
> switched from CREATED to SCHEDULED
> switched from SCHEDULED to DEPLOYING
> switched from DEPLOYING to RUNNING
> (several check pointings)
> switched from RUNNING to FAILED
> TimerException{java.io.EOFException:Premature EOF: no length prefix
> available}
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException: Premature EOF: no length prefix available
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
> 
> Since there several successful check points after the restart, I think
> the later failure might be something else.
> Also, could you please share more information about the MARKER in the
> code? Like which piece of code should I look for.
> 
> And thanks for the suggestion to let me upgrade the flink to 1.3.2
> 
> Best regards,
> Mu
> 
> 
> On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai  > wrote:
> 
> Hi Mu,
> 
> You mentioned that the job stopped after the "n/a" topic error, but
> the job
> failed to recover.
> What exception did you encounter in the restart executions? Was it
> the same
> error?
> This would verify if we actually should be removing more than one of
> these
> special MARKER partition states.
> 
> On the other hand, if I recall correctly, the Kafka consumer had a
> severe
> bug in 1.3.0 which could lead to potential duplicate data, which was
> fixed
> in 1.3.2. Though I don't think it is related to the error you
> encountered, I
> strongly recommend that you use 1.3.2 instead.
> 
> Cheers,
> Gordon
> 
> 
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Job is be cancelled, but the stdout log still prints

2018-03-08 Thread Gary Yao
Hi,

You are not shutting down the ScheduledExecutorService [1], which means that
after job cancelation the thread will continue running getLiveInfo(). The
user
code class loader, and your classes won't be garbage collected. You should
use
the RichFunction#close callback to shutdown your thread pool [2].

Best,
Gary

[1]
https://stackoverflow.com/questions/10504172/how-to-shutdown-an-executorservice
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#rich-functions


On Thu, Mar 8, 2018 at 3:11 AM, sundy <543950...@qq.com> wrote:

>
> Hi:
>
> I faced a problem, the taskmanagers in 3 nodes are still running, I make
> sure that all job are cancelled,  but I could see that stdout logs are
> still printing all the way. The job's parallelism is 6.
>
> I wrote a scheduled pool like this
>
> static {
>   Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
>   try {
> getLiveInfo();
>   } catch (Exception e) {
> e.printStackTrace();
>   }
> }
>   }, 0, 60, TimeUnit.SECONDS);
> }
>
> Is that the static methods will still be running in the taskmanagers even
> if the job is cancelled? That’s weird.
>


Event time join

2018-03-08 Thread Gytis Žilinskas
Hi,

we're considering flink for a couple of our projects. I'm doing a
trial implementation for one of them. So far, I like a lot of things,
however there are a couple of issues that I can't figure out how to
resolve. Not sure if it's me misunderstanding the tool, or flink just
doesn't have a capability to do it.

We want to do an event time join on two big kafka streams. Both of
them might experience some issues on the other end and be delayed.
Additionally, while both are big, one (let's call it stream A) is
significantly larger than stream B.

We also know, that the join window is around 5min. That is, given some
key K in stream B, if there is a counterpart in stream A, it's going
to be +/5 5min in event time.

Since stream A is especially heavy and it's unfeasable to keep hours
of it in memory, I would imagine an ideal solution where we read both
streams from Kafka. We always make sure that stream B is ahead by
10min, that is, if stream A is currently ahead in watermarks, we stall
it and consume stream B until it catches up. Once the stream are
alligned in event time (with the 10min delay window) we run them both
through join.

The problem is, that I find a mechanism to implement that in flink. If
I try to do a CoProcessFunction then it just consumes both streams at
the same time, ingests a lot of messages from stream A, runs out of
memory and dies.

Any ideas on how this could be solved?

(here's a thread with a very similar problem from some time ago
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)

Regards,
Gytis