Re: Timers and state

2018-03-05 Thread Xingcan Cui
Hi Alberto,

an ultimate solution for your problem would be a map state with ordered keys 
(like a TreeMap), but unfortunately, this is still a WIP feature. 

For now, maybe you could "eagerly remove” the outdated value (with 
`iterator.remove()`) when iterating the map state in the process function or 
split the key space for your map state into static bins, thus you could 
calculate a set of outdated keys before removing them.

Hope that helps.

Best,
Xingcan

> On 5 Mar 2018, at 4:19 PM, Alberto Mancini  wrote:
> 
>  



Timers and state

2018-03-05 Thread Alberto Mancini
Hello,
in a Flink application we have a keyed operator that keeps a map state
(MapState). Some of the elements in the state need a timeout so we
use a Timer.
When the timer is called the state is scoped to the key as expected but we
would like to 'pass' (or have available elsewhere) to onTimer the map-key
that is expiring.
The actual approach is to visit the whole map and select the correct key
keeping in the map-values the expected timestamp of the timer but this
seems overkill.
Is there consolidated a better approach we do not see?

Thanks,
   Alberto.


Using time window with SQL nested query

2018-03-05 Thread 杨力
I tried to use write a nested query with HOP window in a streaming
environment.

Table `source` consists of 3 column, a, b, and timestamp.

SELECT a FROM (SELECT a, COUNT(*) FROM source GROUP BY HOP(timestamp, ...,
...), a, b) GROUP BY HOP(timestamp, ..., ...), a HAVING ...

And flink throws an exception of "Column 'timestamp' not found in any
table".

And I tried to "SELECT HOP_END(timestamp, ..., ...) AS timestamp, a,
COUNT(*)" in the inner query, getting an exception of "Window can only be
defined over a time attribute column."

Can I make the rowtime attribute propagating to the outer query, just like
chaining windows in DataStream API?

Regrads,
Bill


cep code

2018-03-05 Thread aitozi
Hi,

i am reading flink-cep source code based on release-1.3.2 . I cant
understand here , can anyone help me on this in NFACompiler?

private List, String>>
getCurrentNotCondition() {
List, String>> 
notConditions = new
ArrayList<>();

Pattern previousPattern = 
currentPattern;
while (previousPattern.getPrevious() != null && (

previousPattern.getPrevious().getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)
||

previousPattern.getPrevious().getQuantifier().getConsumingStrategy() ==
Quantifier.ConsumingStrategy.NOT_FOLLOW)) {

previousPattern = previousPattern.getPrevious();

if 
(previousPattern.getQuantifier().getConsumingStrategy() ==
Quantifier.ConsumingStrategy.NOT_FOLLOW) {
final IterativeCondition 
notCondition = (IterativeCondition)
previousPattern.getCondition();

notConditions.add(Tuple2.of(notCondition, previousPattern.getName()));
}
}
return notConditions;
}

it choose the pattern "hasProperty(Quantifier.QuantifierProperty.OPTIONAL)"
or "Quantifier.ConsumingStrategy.NOT_FOLLOW" but it just add it to
notConditions when it is "Quantifier.ConsumingStrategy.NOT_FOLLOW" is there
something wrong 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: cep code

2018-03-05 Thread Dawid Wysakowicz
Hi,

It is a very low level detail of the CEP library, that should be transparent 
for the end-user.

However, just to clarify it a bit, it is expected. The reason behind this 
function is to create an optional path that bypasses all optional states. 
NOT_FOLLOW is treated as part of optional path, but in contrast to OPTIONAL its 
condition should be taken into account into the combined condition. Hope it 
helps.

Regards,
Dawid

> On 5 Mar 2018, at 10:42, aitozi  wrote:
> 
> Hi,
> 
> i am reading flink-cep source code based on release-1.3.2 . I cant
> understand here , can anyone help me on this in NFACompiler?
> 
> private List, String>>
> getCurrentNotCondition() {
>   List, String>> 
> notConditions = new
> ArrayList<>();
> 
>   Pattern previousPattern = 
> currentPattern;
>   while (previousPattern.getPrevious() != null && (
> 
> previousPattern.getPrevious().getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)
> ||
>   
> previousPattern.getPrevious().getQuantifier().getConsumingStrategy() ==
> Quantifier.ConsumingStrategy.NOT_FOLLOW)) {
> 
>   previousPattern = previousPattern.getPrevious();
> 
>   if 
> (previousPattern.getQuantifier().getConsumingStrategy() ==
> Quantifier.ConsumingStrategy.NOT_FOLLOW) {
>   final IterativeCondition 
> notCondition = (IterativeCondition)
> previousPattern.getCondition();
>   
> notConditions.add(Tuple2.of(notCondition, previousPattern.getName()));
>   }
>   }
>   return notConditions;
>   }
> 
> it choose the pattern "hasProperty(Quantifier.QuantifierProperty.OPTIONAL)"
> or "Quantifier.ConsumingStrategy.NOT_FOLLOW" but it just add it to
> notConditions when it is "Quantifier.ConsumingStrategy.NOT_FOLLOW" is there
> something wrong
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: Message signed with OpenPGP


Simple CEP examples with full Maven project

2018-03-05 Thread Esa Heikkinen
Hi

I have tried to learn CEP, but from some reasons it seems to be little 
difficult. It looks very complex.

Are there exist some simple (Scala) examples about CEP with full Maven projects 
? I have only found TaxiRide of Dataartisan example [1].

For example what variables, classes and functions are mandatories in body of 
correct CEP application and which order ?

Should select()-function be always at the end of the CEP application ?

Are there differences between types of input streams for CEP or not for CEP?

I found "simple CEP" https://github.com/haoch/flink-siddhi Is it recommend to 
use ?

[1] 
https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/scala/com/dataartisans/flinktraining/exercises/datastream_scala/cep/LongRides.scala

Best, Esa


Re: CsvTableSource Types.TIMESTAMP

2018-03-05 Thread Timo Walther

Hi,

SQL_TIMESTAMP is the same. A couple of months ago it was decided to 
rename this property such that it can be used for timestamps with 
timezone support in the future.


Regards,
Tiom


Am 3/5/18 um 2:10 PM schrieb Esa Heikkinen:


I have tried to following example to work, but no succeed yet.

https://flink.apache.org/news/2017/03/29/table-sql-api-update.html

Error .. value TIMESTAMP is not a member of object 
org.apache.glink.table.api.Types


What would be the problem ?

What the imports should I use ?

Or should I use SQL_TIMESTAMP instead of it ? is it same ?

Best, Esa





Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Stefan Richter
Hi,

quick question: what is your exact checkpointing configuration? In particular, 
what is your value for the maximum parallel checkpoints and the minimum time 
interval to wait between two checkpoints?

Best,
Stefan

> Am 05.03.2018 um 06:34 schrieb Tony Wei :
> 
> Hi all,
> 
> Last weekend, my flink job's checkpoint start failing because of timeout. I 
> have no idea what happened, but I collect some informations about my cluster 
> and job. Hope someone can give me advices or hints about the problem that I 
> encountered.
> 
> My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has 4 
> cores. These machines are ec2 spot instances. The job's parallelism is set as 
> 32, using rocksdb as state backend and s3 presto as checkpoint file system.
> The state's size is nearly 15gb and still grows day-by-day. Normally, It 
> takes 1.5 mins to finish the whole checkpoint process. The timeout 
> configuration is set as 10 mins.
> 
> 
> 
> As the picture shows, not each subtask of checkpoint broke caused by timeout, 
> but each machine has ever broken for all its subtasks during last weekend. 
> Some machines recovered by themselves and some machines recovered after I 
> restarted them.
> 
> I record logs, stack trace and snapshot for machine's status (CPU, IO, 
> Network, etc.) for both good and bad machine. If there is a need for more 
> informations, please let me know. Thanks in advance.
> 
> Best Regards,
> Tony Wei.
> 



CsvTableSource Types.TIMESTAMP

2018-03-05 Thread Esa Heikkinen
I have tried to following example to work, but no succeed yet.

https://flink.apache.org/news/2017/03/29/table-sql-api-update.html

Error .. value TIMESTAMP is not a member of object 
org.apache.glink.table.api.Types

What would be the problem ?

What the imports should I use ?

Or should I use SQL_TIMESTAMP instead of it ? is it same ?

Best, Esa


Re: Using time window with SQL nested query

2018-03-05 Thread Timo Walther

Hi Bill,

you can use HOP_ROWTIME()/HOP_PROCTIME() to propagate the time attribute 
to the outer query. See also [1] for an example.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#selecting-group-window-start-and-end-timestamps




Am 3/5/18 um 11:26 AM schrieb 杨力:
I tried to use write a nested query with HOP window in a streaming 
environment.


Table `source` consists of 3 column, a, b, and timestamp.

SELECT a FROM (SELECT a, COUNT(*) FROM source GROUP BY HOP(timestamp, 
..., ...), a, b) GROUP BY HOP(timestamp, ..., ...), a HAVING ...


And flink throws an exception of "Column 'timestamp' not found in any 
table".


And I tried to "SELECT HOP_END(timestamp, ..., ...) AS timestamp, a, 
COUNT(*)" in the inner query, getting an exception of "Window can only 
be defined over a time attribute column."


Can I make the rowtime attribute propagating to the outer query, just 
like chaining windows in DataStream API?


Regrads,
Bill





Re: Table API Compilation Error in Flink

2018-03-05 Thread Timo Walther

Hi Nagananda,

could you show us your entire pom.xml?

From what I see it seems that you are using the wrong 
StreamTableEnvironment. First you need to decide if you want to program 
in Scala or Java. Depending on that you can add the dependencies as 
descriped in [1].


There are two environments specialized for Java and Scala development 
(org.apache.flink.table.api.scala.StreamTableEnvironment OR 
org.apache.flink.table.api.java.StreamTableEnvironment).


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/#setup


Am 3/5/18 um 8:39 AM schrieb Nagananda M:


Hi Xingcan,

I have included the below portion in my pom.xml. Still the same error 
is coming.






org.apache.flink

flink-streaming-scala_2.11

1.4.0



Regards,

Nagananda M

*From:*Xingcan Cui [mailto:xingc...@gmail.com]
*Sent:* Monday, March 05, 2018 1:01 PM
*To:* Nagananda M
*Cc:* user@flink.apache.org
*Subject:* Re: Table API Compilation Error in Flink

Hi Nagananda,

adding `flink-streaming-scala_${scala version}` to your maven 
dependency would solve this.


Best,

Xingcan



On 5 Mar 2018, at 2:21 PM, Nagananda M > wrote:


Hi All,

Am trying to compile a sample program in apache flink using 
TableEnvironment and facing some issue in compilation for the addition 
of the below line.


"StreamTableEnvironment tableEnv = 
StreamTableEnvironment.getTableEnvironment(env);"


The error is as below

"cannot access org.apache.flink.api.scala.ExecutionEnvironment"

Am compiling the code using maven. Can anyone help on this.

Cheers,

Nagananda M





Virus-free.www.avast.com 







Re: Rocksdb in production

2018-03-05 Thread Jayant Ameta
Oh! Somehow I missed while reading the documentation that RocksDB is
embedded in Flink.

Also, irrespective of state backend being filesystem or rocksdb, I'll have
to setup a shared filesystem (HDFS, S3, etc). Is my understanding correct?


Jayant Ameta

On Mon, Mar 5, 2018 at 9:51 PM, Fabian Hueske  wrote:

> Hi,
> RockDB is an embedded key-value storage that is internally used by Flink.
> There is no need to setup a RocksDB database or service yourself. All of
> that is done by Flink.
> As a Flink user that uses the RockDB state backend, you won't get in touch
> with RocksDB itself.
>
> Besides that, RocksDB is developed by Facebook [1] and is a fairly active
> project.
>
> Best, Fabian
>
> [1] https://github.com/facebook/rocksdb
>
> 2018-03-05 3:57 GMT-08:00 Jayant Ameta :
>
>> Hi,
>> I wanted to know how's the online support and resources for RocksDB? I
>> want to use RocksDB as the state backend, but I'm not sure how active the
>> community is. Can anyone vouch for it?
>>
>
>


Re: Timers and state

2018-03-05 Thread Fabian Hueske
Hi Alberto,

You can also add another MapState. The key is a timestamps and
the value is the key that you want to discard.
When onTimer() is called, you look up the key in the MapState
and and remove it from the original MapState.

Best, Fabian

2018-03-05 0:48 GMT-08:00 Xingcan Cui :

> Hi Alberto,
>
> an ultimate solution for your problem would be a map state with ordered
> keys (like a TreeMap), but unfortunately, this is still a WIP feature.
>
> For now, maybe you could "eagerly remove” the outdated value (with
> `iterator.remove()`) when iterating the map state in the process function
> or split the key space for your map state into static bins, thus you could
> calculate a set of outdated keys before removing them.
>
> Hope that helps.
>
> Best,
> Xingcan
>
> On 5 Mar 2018, at 4:19 PM, Alberto Mancini  wrote:
>
>
>
>
>


Re: Rocksdb in production

2018-03-05 Thread Fabian Hueske
Hi,
RockDB is an embedded key-value storage that is internally used by Flink.
There is no need to setup a RocksDB database or service yourself. All of
that is done by Flink.
As a Flink user that uses the RockDB state backend, you won't get in touch
with RocksDB itself.

Besides that, RocksDB is developed by Facebook [1] and is a fairly active
project.

Best, Fabian

[1] https://github.com/facebook/rocksdb

2018-03-05 3:57 GMT-08:00 Jayant Ameta :

> Hi,
> I wanted to know how's the online support and resources for RocksDB? I
> want to use RocksDB as the state backend, but I'm not sure how active the
> community is. Can anyone vouch for it?
>


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Stefan Richter
Hi,

thanks for all the info. I had a look into the problem and opened 
https://issues.apache.org/jira/browse/FLINK-8871 
 to fix this. From your stack 
trace, you can see many checkpointing threads are running on your TM for 
checkpoints that have already timed out, and I think this cascades and slows 
down everything. Seems like the implementation of some features like checkpoint 
timeouts and not failing tasks from checkpointing problems overlooked that we 
also require to properly communicate that checkpoint cancellation to all task, 
which was not needed before.

Best,
Stefan

> Am 05.03.2018 um 14:42 schrieb Tony Wei :
> 
> Hi Stefan,
> 
> Here is my checkpointing configuration.
> 
> Checkpointing ModeExactly Once
> Interval  20m 0s
> Timeout   10m 0s
> Minimum Pause Between Checkpoints 0ms
> Maximum Concurrent Checkpoints1
> Persist Checkpoints ExternallyEnabled (delete on cancellation)
> Best Regards,
> Tony Wei
> 
> 2018-03-05 21:30 GMT+08:00 Stefan Richter  >:
> Hi,
> 
> quick question: what is your exact checkpointing configuration? In 
> particular, what is your value for the maximum parallel checkpoints and the 
> minimum time interval to wait between two checkpoints?
> 
> Best,
> Stefan
> 
> > Am 05.03.2018 um 06:34 schrieb Tony Wei  > >:
> >
> > Hi all,
> >
> > Last weekend, my flink job's checkpoint start failing because of timeout. I 
> > have no idea what happened, but I collect some informations about my 
> > cluster and job. Hope someone can give me advices or hints about the 
> > problem that I encountered.
> >
> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has 4 
> > cores. These machines are ec2 spot instances. The job's parallelism is set 
> > as 32, using rocksdb as state backend and s3 presto as checkpoint file 
> > system.
> > The state's size is nearly 15gb and still grows day-by-day. Normally, It 
> > takes 1.5 mins to finish the whole checkpoint process. The timeout 
> > configuration is set as 10 mins.
> >
> > 
> >
> > As the picture shows, not each subtask of checkpoint broke caused by 
> > timeout, but each machine has ever broken for all its subtasks during last 
> > weekend. Some machines recovered by themselves and some machines recovered 
> > after I restarted them.
> >
> > I record logs, stack trace and snapshot for machine's status (CPU, IO, 
> > Network, etc.) for both good and bad machine. If there is a need for more 
> > informations, please let me know. Thanks in advance.
> >
> > Best Regards,
> > Tony Wei.
> > 
> 
> 



Kafka offset auto-commit stops after timeout

2018-03-05 Thread Edward
We have noticed that the Kafka offset auto-commit functionality seems to stop
working after it encounters a timeout. It appears in the logs like this:

2018-03-04 07:02:54,779 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator kafka06:9092 (id: 2147483641 rack: null) dead for group
consumergroup01
2018-03-04 07:02:54,780 WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Auto-commit of offsets {topic01-24=OffsetAndMetadata{offset=153237895,
metadata=''}} failed for group consumergroup01: Offset commit failed with a
retriable exception. You should retry committing offsets. The underlying
error was: The request timed out.

After this message is logged, no more offsets are committed by the job until
it is restarted (and if the flink process ends abnormally, the offsets never
get committed).

This is using Flink 1.4.0 which uses kafka-clients 0.11.0.2. We are using
the default kafka client settings for enable.auto.commit (true) and
auto.commit.interval.ms (5000). We are not using Flink checkpointing, so the
kafka client offset commit mode is OffsetCommitMode.KAFKA_PERIODIC (not
OffsetCommitMode.ON_CHECKPOINTS).

I'm wondering if others have encountered this?

And if so, does enabling checkpointing resolve the issue, because
Kafka09Fetcher.doCommitInternalOffsetsToKafka is called from the Flink code?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Rocksdb in production

2018-03-05 Thread Fabian Hueske
Yes, that is correct.

2018-03-05 8:57 GMT-08:00 Jayant Ameta :

> Oh! Somehow I missed while reading the documentation that RocksDB is
> embedded in Flink.
>
> Also, irrespective of state backend being filesystem or rocksdb, I'll have
> to setup a shared filesystem (HDFS, S3, etc). Is my understanding correct?
>
>
> Jayant Ameta
>
> On Mon, Mar 5, 2018 at 9:51 PM, Fabian Hueske  wrote:
>
>> Hi,
>> RockDB is an embedded key-value storage that is internally used by Flink.
>> There is no need to setup a RocksDB database or service yourself. All of
>> that is done by Flink.
>> As a Flink user that uses the RockDB state backend, you won't get in
>> touch with RocksDB itself.
>>
>> Besides that, RocksDB is developed by Facebook [1] and is a fairly active
>> project.
>>
>> Best, Fabian
>>
>> [1] https://github.com/facebook/rocksdb
>>
>> 2018-03-05 3:57 GMT-08:00 Jayant Ameta :
>>
>>> Hi,
>>> I wanted to know how's the online support and resources for RocksDB? I
>>> want to use RocksDB as the state backend, but I'm not sure how active the
>>> community is. Can anyone vouch for it?
>>>
>>
>>
>


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Stefan,

Here is my checkpointing configuration.

Checkpointing Mode Exactly Once
Interval 20m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 0ms
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (delete on cancellation)
Best Regards,
Tony Wei

2018-03-05 21:30 GMT+08:00 Stefan Richter :

> Hi,
>
> quick question: what is your exact checkpointing configuration? In
> particular, what is your value for the maximum parallel checkpoints and the
> minimum time interval to wait between two checkpoints?
>
> Best,
> Stefan
>
> > Am 05.03.2018 um 06:34 schrieb Tony Wei :
> >
> > Hi all,
> >
> > Last weekend, my flink job's checkpoint start failing because of
> timeout. I have no idea what happened, but I collect some informations
> about my cluster and job. Hope someone can give me advices or hints about
> the problem that I encountered.
> >
> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has
> 4 cores. These machines are ec2 spot instances. The job's parallelism is
> set as 32, using rocksdb as state backend and s3 presto as checkpoint file
> system.
> > The state's size is nearly 15gb and still grows day-by-day. Normally, It
> takes 1.5 mins to finish the whole checkpoint process. The timeout
> configuration is set as 10 mins.
> >
> > 
> >
> > As the picture shows, not each subtask of checkpoint broke caused by
> timeout, but each machine has ever broken for all its subtasks during last
> weekend. Some machines recovered by themselves and some machines recovered
> after I restarted them.
> >
> > I record logs, stack trace and snapshot for machine's status (CPU, IO,
> Network, etc.) for both good and bad machine. If there is a need for more
> informations, please let me know. Thanks in advance.
> >
> > Best Regards,
> > Tony Wei.
> >  tm_log.log>
>
>


Re: Delta iteration not spilling to disk

2018-03-05 Thread santoshg
Hi Joshua,

I am running into a similar problem. Can you explain your solution a bit
more ? A code snippet will help.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: cep code

2018-03-05 Thread aitozi
Then what is STOP state in NFA, i haven't seen this state in event pattern
match paper ? Does each Not pattern will be transformed to stop state?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Stefan,

I see. That explains why the loading of machines grew up. However, I think
it is not the root cause that led to these consecutive checkpoint timeout.
As I said in my first mail, the checkpointing progress usually took 1.5
mins to upload states, and this operator and kafka consumer are only two
operators that have states in my pipeline. In the best case, I should never
encounter the timeout problem that only caused by lots of pending
checkpointing threads that have already timed out. Am I right?

Since these logging and stack trace was taken after nearly 3 hours from the
first checkpoint timeout, I'm afraid that we couldn't actually find out the
root cause for the first checkpoint timeout. Because we are preparing to
make this pipeline go on production, I was wondering if you could help me
find out where the root cause happened: bad machines or s3 or
flink-s3-presto packages or flink checkpointing thread. It will be great if
we can find it out from those informations the I provided, or a
hypothesis based on your experience is welcome as well. The most important
thing is that I have to decide whether I need to change my persistence
filesystem or use another s3 filesystem package, because it is the last
thing I want to see that the checkpoint timeout happened very often.

Thank you very much for all your advices.

Best Regards,
Tony Wei

2018-03-06 1:07 GMT+08:00 Stefan Richter :

> Hi,
>
> thanks for all the info. I had a look into the problem and opened
> https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your
> stack trace, you can see many checkpointing threads are running on your TM
> for checkpoints that have already timed out, and I think this cascades and
> slows down everything. Seems like the implementation of some features like
> checkpoint timeouts and not failing tasks from checkpointing problems
> overlooked that we also require to properly communicate that checkpoint
> cancellation to all task, which was not needed before.
>
> Best,
> Stefan
>
>
> Am 05.03.2018 um 14:42 schrieb Tony Wei :
>
> Hi Stefan,
>
> Here is my checkpointing configuration.
>
> Checkpointing Mode Exactly Once
> Interval 20m 0s
> Timeout 10m 0s
> Minimum Pause Between Checkpoints 0ms
> Maximum Concurrent Checkpoints 1
> Persist Checkpoints Externally Enabled (delete on cancellation)
> Best Regards,
> Tony Wei
>
> 2018-03-05 21:30 GMT+08:00 Stefan Richter :
>
>> Hi,
>>
>> quick question: what is your exact checkpointing configuration? In
>> particular, what is your value for the maximum parallel checkpoints and the
>> minimum time interval to wait between two checkpoints?
>>
>> Best,
>> Stefan
>>
>> > Am 05.03.2018 um 06:34 schrieb Tony Wei :
>> >
>> > Hi all,
>> >
>> > Last weekend, my flink job's checkpoint start failing because of
>> timeout. I have no idea what happened, but I collect some informations
>> about my cluster and job. Hope someone can give me advices or hints about
>> the problem that I encountered.
>> >
>> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has
>> 4 cores. These machines are ec2 spot instances. The job's parallelism is
>> set as 32, using rocksdb as state backend and s3 presto as checkpoint file
>> system.
>> > The state's size is nearly 15gb and still grows day-by-day. Normally,
>> It takes 1.5 mins to finish the whole checkpoint process. The timeout
>> configuration is set as 10 mins.
>> >
>> > 
>> >
>> > As the picture shows, not each subtask of checkpoint broke caused by
>> timeout, but each machine has ever broken for all its subtasks during last
>> weekend. Some machines recovered by themselves and some machines recovered
>> after I restarted them.
>> >
>> > I record logs, stack trace and snapshot for machine's status (CPU, IO,
>> Network, etc.) for both good and bad machine. If there is a need for more
>> informations, please let me know. Thanks in advance.
>> >
>> > Best Regards,
>> > Tony Wei.
>> > > log.log>
>>
>>
>
>


Re: Using time window with SQL nested query

2018-03-05 Thread 杨力
Thanks. It works. I missed it while reading the document.

Timo Walther  于 2018年3月5日周一 下午9:20写道:

> Hi Bill,
>
> you can use HOP_ROWTIME()/HOP_PROCTIME() to propagate the time attribute
> to the outer query. See also [1] for an example.
>
> Regards,
> Timo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#selecting-group-window-start-and-end-timestamps
>
>
>
> Am 3/5/18 um 11:26 AM schrieb 杨力:
>
>
>


Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Sihua,

Thanks for your suggestion. "incremental checkpoint" is what I will try out
next and I know it will give a better performance. However, it might not
solve this issue completely, because as I said, the average end to end
latency of checkpointing is less than 1.5 mins currently, and it is far
from my timeout configuration. I believe "incremental checkpoint" will
reduce the latency and make this issue might occur seldom, but I can't
promise it won't happen again if I have bigger states growth in the future.
Am I right?

Best Regards,
Tony Wei

2018-03-06 10:55 GMT+08:00 周思华 :

> Hi Tony,
>
> Sorry for jump into, one thing I want to remind is that from the log you
> provided it looks like you are using "full checkpoint", this means that the
> state data that need to be checkpointed and transvered to s3 will grow over
> time, and even for the first checkpoint it performance is slower that
> incremental checkpoint (because it need to iterate all the record from the
> rocksdb using the RocksDBMergeIterator). Maybe you can try out "incremental
> checkpoint", it could help you got a better performance.
>
> Best Regards,
> Sihua Zhou
>
> 发自网易邮箱大师
>
> On 03/6/2018 10:34,Tony Wei
>  wrote:
>
> Hi Stefan,
>
> I see. That explains why the loading of machines grew up. However, I think
> it is not the root cause that led to these consecutive checkpoint timeout.
> As I said in my first mail, the checkpointing progress usually took 1.5
> mins to upload states, and this operator and kafka consumer are only two
> operators that have states in my pipeline. In the best case, I should never
> encounter the timeout problem that only caused by lots of pending
> checkpointing threads that have already timed out. Am I right?
>
> Since these logging and stack trace was taken after nearly 3 hours from
> the first checkpoint timeout, I'm afraid that we couldn't actually find out
> the root cause for the first checkpoint timeout. Because we are preparing
> to make this pipeline go on production, I was wondering if you could help
> me find out where the root cause happened: bad machines or s3 or
> flink-s3-presto packages or flink checkpointing thread. It will be great if
> we can find it out from those informations the I provided, or a
> hypothesis based on your experience is welcome as well. The most important
> thing is that I have to decide whether I need to change my persistence
> filesystem or use another s3 filesystem package, because it is the last
> thing I want to see that the checkpoint timeout happened very often.
>
> Thank you very much for all your advices.
>
> Best Regards,
> Tony Wei
>
> 2018-03-06 1:07 GMT+08:00 Stefan Richter :
>
>> Hi,
>>
>> thanks for all the info. I had a look into the problem and opened
>> https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your
>> stack trace, you can see many checkpointing threads are running on your TM
>> for checkpoints that have already timed out, and I think this cascades and
>> slows down everything. Seems like the implementation of some features like
>> checkpoint timeouts and not failing tasks from checkpointing problems
>> overlooked that we also require to properly communicate that checkpoint
>> cancellation to all task, which was not needed before.
>>
>> Best,
>> Stefan
>>
>>
>> Am 05.03.2018 um 14:42 schrieb Tony Wei :
>>
>> Hi Stefan,
>>
>> Here is my checkpointing configuration.
>>
>> Checkpointing Mode Exactly Once
>> Interval 20m 0s
>> Timeout 10m 0s
>> Minimum Pause Between Checkpoints 0ms
>> Maximum Concurrent Checkpoints 1
>> Persist Checkpoints Externally Enabled (delete on cancellation)
>> Best Regards,
>> Tony Wei
>>
>> 2018-03-05 21:30 GMT+08:00 Stefan Richter :
>>
>>> Hi,
>>>
>>> quick question: what is your exact checkpointing configuration? In
>>> particular, what is your value for the maximum parallel checkpoints and the
>>> minimum time interval to wait between two checkpoints?
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 05.03.2018 um 06:34 schrieb Tony Wei :
>>> >
>>> > Hi all,
>>> >
>>> > Last weekend, my flink job's checkpoint start failing because of
>>> timeout. I have no idea what happened, but I collect some informations
>>> about my cluster and job. Hope someone can give me advices or hints about
>>> the problem that I encountered.
>>> >
>>> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each
>>> has 4 cores. These machines are ec2 spot instances. The job's parallelism
>>> is set as 32, using rocksdb as state backend and s3 presto as checkpoint
>>> file system.
>>> > The state's size is nearly 15gb and still grows day-by-day. Normally,
>>> It takes 1.5 mins to finish the whole checkpoint process. The timeout
>>> configuration is set as 10 mins.
>>> >
>>> > 
>>> >
>>> > As the picture shows, not each subtask of checkpoint broke caused by

Re: Fwd: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Hi Sihua,

You are right. The incremental checkpoint might release machine from high
cpu loading and make the bad machines recover quickly, but I was wondering
why the first checkpoint failed by timeout. You can see when the bad
machine recovered, the cpu loading for each checkpoint is not so high,
although there were still peeks in each checkpoint happened. I think the
high cpu loading that might be caused by those timeout checkpointing
threads is not the root cause. I will use the incremental checkpoint
eventually but I will decide if change my persistence filesystem after we
find out the root cause or stop the investigation and make the
conclusion in this mailing thread. What do you think?

Best Regards,
Tony Wei

2018-03-06 15:13 GMT+08:00 周思华 :

> Hi Tony,
>
> Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu load
> is so much higher that the 'good tm', so I think maybe it also a reason
> that could lead to timeout. Since you were using "full checkpoint", it need
> to iterate all the records in the RocksDB with some `if` check, when the
> state is huge this is cpu costly. Let me try to explain the full checkpoint
> a bit more, it contains two parts.
>
> Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint
> Duration (sync) " on the checkpoint detail page)
>
> Part2. Loop the records of the snapshot, along with some `if` check to ensure
> that the data is sent to s3 in the order of the key group. (This can map to
> the "Checkpoint Duration(Async)").
>
> So part2 could be cpu costly and network costly, if the CPU load is too
> high, then sending data will slow down, because there are in a single loop.
> If cpu is the reason, this phenomenon will disappear if you use increment
> checkpoint, because it almost only send data to s3. In the all, for now
> trying out the incremental checkpoint is the best thing to do I think.
>
> Best Regards,
> Sihua Zhou
>
>
> 发自网易邮箱大师
>
> On 03/6/2018 14:45,Tony Wei
>  wrote:
>
> Sent to the wrong mailing list. Forward it to the correct one.
>
> -- Forwarded message --
> From: Tony Wei 
> Date: 2018-03-06 14:43 GMT+08:00
> Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
> To: 周思华 , Stefan Richter  >
> Cc: "user-subscr...@flink.apache.org" 
>
>
> Hi Sihua,
>
> Thanks a lot. I will try to find out the problem from machines'
> environment. If you and Stefan have any new suggestions or thoughts, please
> advise me. Thank you !
>
> Best Regards,
> Tony Wei
>
> 2018-03-06 14:34 GMT+08:00 周思华 :
>
>> Hi Tony,
>>
>> I think the two things you mentioned can both lead to a bad network. But
>> from my side, I think it more likely that it is the unstable network env
>> that cause the poor network performance itself, because as far as I know
>> I can't found out the reason that the flink would slow down the network so
>> much (even It does, the effect should not be that so much).
>>
>> Maybe stefan could tell more about that. ;)
>>
>> Best Regards,
>> Sihua Zhou
>>
>> 发自网易邮箱大师
>>
>> On 03/6/2018 14:04,Tony Wei
>>  wrote:
>>
>> Hi Sihua,
>>
>>
>>> Hi Tony,
>>>
>>> About to your question: average end to end latency of checkpoint is less
>>> than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it
>>> determined byt the max end to end latency (the slowest one), a checkpoint
>>> truly completed only after all task's checkpoint have completed.
>>>
>>
>> Sorry for my poor expression. What I mean is the average duration of
>> "completed" checkpoints, so I guess there are some problems that make some
>> subtasks of checkpoint take so long, even more than 10 mins.
>>
>>
>>>
>>> About to the problem: after a second look at the info you privode, we
>>> can found from the checkpoint detail picture that there is one task which
>>> cost 4m20s to transfer it snapshot (about 482M) to s3 and there are 4
>>> others tasks didn't complete the checkpoint yet. And from the
>>> bad_tm_pic.png vs good_tm_pic.png, we can found that on "bad tm" the
>>> network performance is far less than the "good tm" (-15 MB vs -50MB). So I
>>> guss the network is a problem, sometimes it failed to send 500M data to s3
>>> in 10 minutes. (maybe you need to check whether the network env is stable)
>>>
>>
>> That is what I concerned. Because I can't determine if checkpoint is
>> stuck makes network performance worse or network performance got worse
>> makes checkpoint stuck.
>> Although I provided one "bad machine" and one "good machine", that
>> doesn't mean bad machine is always bad and good machine is always good. See
>> the attachments.
>> All of my TMs met this problem at least once from last weekend until now.
>> Some machines recovered by themselves and some recovered after I restarted
>> 

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread 周思华


Hi Tony,


About to your question: average end to end latency of checkpoint is less than 
1.5 mins, doesn't means that checkpoint won't timeout. indeed, it determined 
byt the max end to end latency (the slowest one), a checkpoint truly completed 
only after all task's checkpoint have completed.


About to the problem: after a second look at the info you privode, we can found 
from the checkpoint detail picture that there is one task which cost 4m20s to 
transfer it snapshot (about 482M) to s3 and there are 4 others tasks didn't 
complete the checkpoint yet. And from the bad_tm_pic.png vs good_tm_pic.png, we 
can found that on "bad tm" the network performance is far less than the "good 
tm" (-15 MB vs -50MB). So I guss the network is a problem, sometimes it failed 
to send 500M data to s3 in 10 minutes. (maybe you need to check whether the 
network env is stable)


About the solution: I think incremental checkpoint can help you a lot, it will 
only send the new data each checkpoint, but you are right if the increment 
state size is huger than 500M, it might cause the timeout problem again 
(because of the bad network performance).


Best Regards,
Sihua Zhou


发自网易邮箱大师


On 03/6/2018 13:02,Tony Wei wrote:
Hi Sihua,


Thanks for your suggestion. "incremental checkpoint" is what I will try out 
next and I know it will give a better performance. However, it might not solve 
this issue completely, because as I said, the average end to end latency of 
checkpointing is less than 1.5 mins currently, and it is far from my timeout 
configuration. I believe "incremental checkpoint" will reduce the latency and 
make this issue might occur seldom, but I can't promise it won't happen again 
if I have bigger states growth in the future. Am I right?


Best Regards,
Tony Wei 


2018-03-06 10:55 GMT+08:00 周思华 :

Hi Tony,


Sorry for jump into, one thing I want to remind is that from the log you 
provided it looks like you are using "full checkpoint", this means that the 
state data that need to be checkpointed and transvered to s3 will grow over 
time, and even for the first checkpoint it performance is slower that 
incremental checkpoint (because it need to iterate all the record from the 
rocksdb using the RocksDBMergeIterator). Maybe you can try out "incremental 
checkpoint", it could help you got a better performance.


Best Regards,
Sihua Zhou


发自网易邮箱大师


On 03/6/2018 10:34,Tony Wei wrote:
Hi Stefan,


I see. That explains why the loading of machines grew up. However, I think it 
is not the root cause that led to these consecutive checkpoint timeout. As I 
said in my first mail, the checkpointing progress usually took 1.5 mins to 
upload states, and this operator and kafka consumer are only two operators that 
have states in my pipeline. In the best case, I should never encounter the 
timeout problem that only caused by lots of pending checkpointing threads that 
have already timed out. Am I right?


Since these logging and stack trace was taken after nearly 3 hours from the 
first checkpoint timeout, I'm afraid that we couldn't actually find out the 
root cause for the first checkpoint timeout. Because we are preparing to make 
this pipeline go on production, I was wondering if you could help me find out 
where the root cause happened: bad machines or s3 or flink-s3-presto packages 
or flink checkpointing thread. It will be great if we can find it out from 
those informations the I provided, or a hypothesis based on your experience is 
welcome as well. The most important thing is that I have to decide whether I 
need to change my persistence filesystem or use another s3 filesystem package, 
because it is the last thing I want to see that the checkpoint timeout happened 
very often.


Thank you very much for all your advices.


Best Regards,
Tony Wei


2018-03-06 1:07 GMT+08:00 Stefan Richter :

Hi,


thanks for all the info. I had a look into the problem and opened 
https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From your stack 
trace, you can see many checkpointing threads are running on your TM for 
checkpoints that have already timed out, and I think this cascades and slows 
down everything. Seems like the implementation of some features like checkpoint 
timeouts and not failing tasks from checkpointing problems overlooked that we 
also require to properly communicate that checkpoint cancellation to all task, 
which was not needed before.


Best,
Stefan




Am 05.03.2018 um 14:42 schrieb Tony Wei :


Hi Stefan,


Here is my checkpointing configuration.



| Checkpointing Mode | Exactly Once |
| Interval | 20m 0s |
| Timeout | 10m 0s |
| Minimum Pause Between Checkpoints | 0ms |
| Maximum Concurrent Checkpoints | 1 |
| Persist Checkpoints Externally | Enabled (delete on cancellation) |
Best Regards,

Tony Wei


2018-03-05 21:30 GMT+08:00 Stefan Richter 

Visual CEP pattern & State transformation

2018-03-05 Thread aitozi
Hi, 

When i read about the CEP code, i am feeling that it is a bit difficult to
understand the whole transformation,  I hava a idea that we can have a draw
of the pattern and State transformation like the DAG plan picture in web UI.
May be it can be the "ignore, take, processd" like this 

 
described in "Efficient Pattern Matching over Event Streams" .

Whats your idea ? @kl0u @dianfu @dawidwys



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Fwd: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-03-05 Thread Tony Wei
Sent to the wrong mailing list. Forward it to the correct one.

-- Forwarded message --
From: Tony Wei 
Date: 2018-03-06 14:43 GMT+08:00
Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem
To: 周思华 , Stefan Richter 
Cc: "user-subscr...@flink.apache.org" 


Hi Sihua,

Thanks a lot. I will try to find out the problem from machines'
environment. If you and Stefan have any new suggestions or thoughts, please
advise me. Thank you !

Best Regards,
Tony Wei

2018-03-06 14:34 GMT+08:00 周思华 :

> Hi Tony,
>
> I think the two things you mentioned can both lead to a bad network. But
> from my side, I think it more likely that it is the unstable network env
> that cause the poor network performance itself, because as far as I know
> I can't found out the reason that the flink would slow down the network so
> much (even It does, the effect should not be that so much).
>
> Maybe stefan could tell more about that. ;)
>
> Best Regards,
> Sihua Zhou
>
> 发自网易邮箱大师
>
> On 03/6/2018 14:04,Tony Wei
>  wrote:
>
> Hi Sihua,
>
>
>> Hi Tony,
>>
>> About to your question: average end to end latency of checkpoint is less
>> than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it
>> determined byt the max end to end latency (the slowest one), a checkpoint
>> truly completed only after all task's checkpoint have completed.
>>
>
> Sorry for my poor expression. What I mean is the average duration of
> "completed" checkpoints, so I guess there are some problems that make some
> subtasks of checkpoint take so long, even more than 10 mins.
>
>
>>
>> About to the problem: after a second look at the info you privode, we can
>> found from the checkpoint detail picture that there is one task which cost
>> 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others
>> tasks didn't complete the checkpoint yet. And from the bad_tm_pic.png vs
>> good_tm_pic.png, we can found that on "bad tm" the network performance is
>> far less than the "good tm" (-15 MB vs -50MB). So I guss the network is a
>> problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe
>> you need to check whether the network env is stable)
>>
>
> That is what I concerned. Because I can't determine if checkpoint is stuck
> makes network performance worse or network performance got worse makes
> checkpoint stuck.
> Although I provided one "bad machine" and one "good machine", that doesn't
> mean bad machine is always bad and good machine is always good. See the
> attachments.
> All of my TMs met this problem at least once from last weekend until now.
> Some machines recovered by themselves and some recovered after I restarted
> them.
>
> Best Regards,
> Tony Wei
>
> 2018-03-06 13:41 GMT+08:00 周思华 :
>
>>
>> Hi Tony,
>>
>> About to your question: average end to end latency of checkpoint is less
>> than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it
>> determined byt the max end to end latency (the slowest one), a checkpoint
>> truly completed only after all task's checkpoint have completed.
>>
>> About to the problem: after a second look at the info you privode, we can
>> found from the checkpoint detail picture that there is one task which cost
>> 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others
>> tasks didn't complete the checkpoint yet. And from the bad_tm_pic.png vs
>> good_tm_pic.png, we can found that on "bad tm" the network performance is
>> far less than the "good tm" (-15 MB vs -50MB). So I guss the network is a
>> problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe
>> you need to check whether the network env is stable)
>>
>> About the solution: I think incremental checkpoint can help you a lot, it
>> will only send the new data each checkpoint, but you are right if the
>> increment state size is huger than 500M, it might cause the timeout problem
>> again (because of the bad network performance).
>>
>> Best Regards,
>> Sihua Zhou
>>
>> 发自网易邮箱大师
>>
>> On 03/6/2018 13:02,Tony Wei
>>  wrote:
>>
>> Hi Sihua,
>>
>> Thanks for your suggestion. "incremental checkpoint" is what I will try
>> out next and I know it will give a better performance. However, it might
>> not solve this issue completely, because as I said, the average end to end
>> latency of checkpointing is less than 1.5 mins currently, and it is far
>> from my timeout configuration. I believe "incremental checkpoint" will
>> reduce the latency and make this issue might occur seldom, but I can't
>> promise it won't happen again if I have bigger states growth in the future.
>> Am I right?
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-03-06 10:55 GMT+08:00 周思华 :
>>
>>> Hi Tony,
>>>
>>> Sorry for jump into, one thing I