[jira] [Comment Edited] (FLINK-7782) Flink CEP not recognizing pattern

2017-10-09 Thread Ajay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198175#comment-16198175
 ] 

Ajay edited comment on FLINK-7782 at 10/10/17 5:18 AM:
---

Hi Kostas,

My stream is pretty much unbounded in an endless loop and the pattern repeats, 
so there is a continuous stream of data. I have implemented an ascending 
timestamp extractor. However, I do see some issues with state. Here is a quick 
snapshot of the size of state from the Flink job.

End to End Duration State Size  Buffered During Alignment
Minimum 9ms 260 KB  0 B
Average 57ms988 KB  2.26 KB
Maximum 3s  15.3 MB 271 KB


ID  Status  AcknowledgedTrigger TimeLatest Acknowledgement  Failure 
TimeEnd to End Duration State Size  Buffered During Alignment   
Failure Message
514 Failed  11/12 (92%) 22:15:1222:15:1322:15:15
2s  4.11 MB 0 B Checkpoint failed: Checkpoint Coordinator is suspending.

On my console where my Flink job was running I saw the following message
Size of the state is larger than the maximum permitted memory-backed state. 
Size=5621890 , maxSize=5242880 . Consider using a different state backend, like 
the File System State backend.



was (Author: ajkrishna):
Hi Kostas,

My stream is pretty much unbounded in an endless loop and the pattern repeats, 
so there is a continuous stream of data. I have implemented an ascending 
timestamp extractor. However, I do see some issues with state. Here is a quick 
snapshot of the size of state from the Flink job.

End to End Duration State Size  Buffered During Alignment
Minimum 9ms 260 KB  0 B
Average 57ms988 KB  2.26 KB
Maximum 3s  15.3 MB 271 KB


ID  Status  AcknowledgedTrigger TimeLatest Acknowledgement  Failure 
TimeEnd to End Duration State Size  Buffered During Alignment   
Failure Message
514 Failed  11/12 (92%) 22:15:1222:15:1322:15:15
2s  4.11 MB 0 B Checkpoint failed: Checkpoint Coordinator is suspending.



I have tried the same job with a parallelism of 1 and while there is 
significant lag in Kafka, the behavior from the CEP library is exactly the same.


> Flink CEP not recognizing pattern
> -
>
> Key: FLINK-7782
> URL: https://issues.apache.org/jira/browse/FLINK-7782
> Project: Flink
>  Issue Type: Bug
>Reporter: Ajay
>
> I am using flink version 1.3.2. Flink has a kafka source. I am using 
> KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM 
> running Ubuntu 16.04. From the flink dashboard, I see that I have 2 
> Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> {code:java}
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> {code}
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> {code:java}
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> //cepMapByHomeId.print();
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> {code}
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> {code:java}
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> {code}
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented 

[jira] [Comment Edited] (FLINK-7782) Flink CEP not recognizing pattern

2017-10-09 Thread Ajay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198175#comment-16198175
 ] 

Ajay edited comment on FLINK-7782 at 10/10/17 5:15 AM:
---

Hi Kostas,

My stream is pretty much unbounded in an endless loop and the pattern repeats, 
so there is a continuous stream of data. I have implemented an ascending 
timestamp extractor. However, I do see some issues with state. Here is a quick 
snapshot of the size of state from the Flink job.

End to End Duration State Size  Buffered During Alignment
Minimum 9ms 260 KB  0 B
Average 57ms988 KB  2.26 KB
Maximum 3s  15.3 MB 271 KB


ID  Status  AcknowledgedTrigger TimeLatest Acknowledgement  Failure 
TimeEnd to End Duration State Size  Buffered During Alignment   
Failure Message
514 Failed  11/12 (92%) 22:15:1222:15:1322:15:15
2s  4.11 MB 0 B Checkpoint failed: Checkpoint Coordinator is suspending.



I have tried the same job with a parallelism of 1 and while there is 
significant lag in Kafka, the behavior from the CEP library is exactly the same.



was (Author: ajkrishna):
Hi Kostas,

My stream is pretty much unbounded in an endless loop and the pattern repeats, 
so there is a continuous stream of data. Also, I was monitoring my Kafka 
manager and the Flink streaming dashboard. Kafka shows almost no lag and I see 
watermarks advancing in Flink. I have implemented an ascending timestamp 
extractor. Here is quick snapshot of the size of state from the Flink job.

End to End Duration State Size  Buffered During Alignment
Minimum 9ms 260 KB  0 B
Average 18ms481 KB  1 B
Maximum 1s  709 KB  437 B

I have tried the same job with a parallelism of 1 and while there is 
significant lag in Kafka, the behavior from the CEP library is exactly the same.


> Flink CEP not recognizing pattern
> -
>
> Key: FLINK-7782
> URL: https://issues.apache.org/jira/browse/FLINK-7782
> Project: Flink
>  Issue Type: Bug
>Reporter: Ajay
>
> I am using flink version 1.3.2. Flink has a kafka source. I am using 
> KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM 
> running Ubuntu 16.04. From the flink dashboard, I see that I have 2 
> Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> {code:java}
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> {code}
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> {code:java}
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> //cepMapByHomeId.print();
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> {code}
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> {code:java}
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> {code}
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented by a step of 1 sec. 
> I start multiple processes to simulate data form different homes.
> THE PROBLEM:
> Flink completely misses capturing events for a large subset of the input 
> data. I barely see the events for about 4-5 of the home_id values. I do a 
> print before applying the pattern and after and I see all home_ids before and 
> only a tiny subset 

[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern

2017-10-09 Thread Ajay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198175#comment-16198175
 ] 

Ajay commented on FLINK-7782:
-

Hi Kostas,

My stream is pretty much unbounded in an endless loop and the pattern repeats, 
so there is a continuous stream of data. Also, I was monitoring my Kafka 
manager and the Flink streaming dashboard. Kafka shows almost no lag and I see 
watermarks advancing in Flink. I have implemented an ascending timestamp 
extractor. Here is quick snapshot of the size of state from the Flink job.

End to End Duration State Size  Buffered During Alignment
Minimum 9ms 260 KB  0 B
Average 18ms481 KB  1 B
Maximum 1s  709 KB  437 B

I have tried the same job with a parallelism of 1 and while there is 
significant lag in Kafka, the behavior from the CEP library is exactly the same.


> Flink CEP not recognizing pattern
> -
>
> Key: FLINK-7782
> URL: https://issues.apache.org/jira/browse/FLINK-7782
> Project: Flink
>  Issue Type: Bug
>Reporter: Ajay
>
> I am using flink version 1.3.2. Flink has a kafka source. I am using 
> KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM 
> running Ubuntu 16.04. From the flink dashboard, I see that I have 2 
> Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> {code:java}
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> {code}
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> {code:java}
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> //cepMapByHomeId.print();
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> {code}
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> {code:java}
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> {code}
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented by a step of 1 sec. 
> I start multiple processes to simulate data form different homes.
> THE PROBLEM:
> Flink completely misses capturing events for a large subset of the input 
> data. I barely see the events for about 4-5 of the home_id values. I do a 
> print before applying the pattern and after and I see all home_ids before and 
> only a tiny subset after. Since the data is exactly the same, I expect all 
> homeid to be captured and written to my sink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7764) FlinkKafkaProducer010 does not accept name, uid, or parallelism

2017-10-09 Thread Xingcan Cui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui reassigned FLINK-7764:
--

Assignee: Xingcan Cui

> FlinkKafkaProducer010 does not accept name, uid, or parallelism
> ---
>
> Key: FLINK-7764
> URL: https://issues.apache.org/jira/browse/FLINK-7764
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> As [reported on the user 
> list|https://lists.apache.org/thread.html/1e97b79cb5611a942beb609a61b5fba6d62a49c9c86336718a9a0004@%3Cuser.flink.apache.org%3E]:
> When I try to use KafkaProducer with timestamps it fails to set name, uid or 
> parallelism. It uses default values.
> {code}
> FlinkKafkaProducer010.FlinkKafkaProducer010Configuration producer = 
> FlinkKafkaProducer010
> .writeToKafkaWithTimestamps(stream, topicName, schema, props, 
> partitioner);
> producer.setFlushOnCheckpoint(flushOnCheckpoint);
> producer.name("foo")
> .uid("bar")
> .setParallelism(5);
> return producer;
> {code}
> As operator name it shows "FlinKafkaProducer 0.10.x” with the typo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197978#comment-16197978
 ] 

ASF GitHub Bot commented on FLINK-6105:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4316
  
I think InterruptedException should be handled uniformly in 
HadoopInputFormatBase.java


> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4316: [FLINK-6105] Use InterruptedIOException instead of IOExce...

2017-10-09 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4316
  
I think InterruptedException should be handled uniformly in 
HadoopInputFormatBase.java


---


[jira] [Updated] (FLINK-7049) TestingApplicationMaster keeps running after integration tests finish

2017-10-09 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7049:
--
Description: 
After integration tests finish, TestingApplicationMaster is still running.

Toward the end of 
flink-yarn-tests/target/flink-yarn-tests-ha/flink-yarn-tests-ha-logDir-nm-1_0/application_1498768839874_0001/container_1498768839874_0001_03_01/jobmanager.log
 :
{code}
2017-06-29 22:09:49,681 INFO  org.apache.zookeeper.ClientCnxn   
- Opening socket connection to server 127.0.0.1/127.0.0.1:46165
2017-06-29 22:09:49,681 ERROR 
org.apache.flink.shaded.org.apache.curator.ConnectionState- Authentication 
failed
2017-06-29 22:09:49,682 WARN  org.apache.zookeeper.ClientCnxn   
- Session 0x0 for server null, unexpected error, closing socket 
connection and attempting reconnect
java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
  at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
  at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
2017-06-29 22:09:50,782 WARN  org.apache.zookeeper.ClientCnxn   
- SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/jaas-3597644653611245612.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it.
2017-06-29 22:09:50,782 INFO  org.apache.zookeeper.ClientCnxn   
- Opening socket connection to server 127.0.0.1/127.0.0.1:46165
2017-06-29 22:09:50,782 ERROR 
org.apache.flink.shaded.org.apache.curator.ConnectionState- Authentication 
failed
2017-06-29 22:09:50,783 WARN  org.apache.zookeeper.ClientCnxn   
- Session 0x0 for server null, unexpected error, closing socket 
connection and attempting reconnect
java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
  at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
  at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
{code}

  was:
After integration tests finish, TestingApplicationMaster is still running.

Toward the end of 
flink-yarn-tests/target/flink-yarn-tests-ha/flink-yarn-tests-ha-logDir-nm-1_0/application_1498768839874_0001/container_1498768839874_0001_03_01/jobmanager.log
 :

{code}
2017-06-29 22:09:49,681 INFO  org.apache.zookeeper.ClientCnxn   
- Opening socket connection to server 127.0.0.1/127.0.0.1:46165
2017-06-29 22:09:49,681 ERROR 
org.apache.flink.shaded.org.apache.curator.ConnectionState- Authentication 
failed
2017-06-29 22:09:49,682 WARN  org.apache.zookeeper.ClientCnxn   
- Session 0x0 for server null, unexpected error, closing socket 
connection and attempting reconnect
java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
  at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
  at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
2017-06-29 22:09:50,782 WARN  org.apache.zookeeper.ClientCnxn   
- SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/jaas-3597644653611245612.conf'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it.
2017-06-29 22:09:50,782 INFO  org.apache.zookeeper.ClientCnxn   
- Opening socket connection to server 127.0.0.1/127.0.0.1:46165
2017-06-29 22:09:50,782 ERROR 
org.apache.flink.shaded.org.apache.curator.ConnectionState- Authentication 
failed
2017-06-29 22:09:50,783 WARN  org.apache.zookeeper.ClientCnxn   
- Session 0x0 for server null, unexpected error, closing socket 
connection and attempting reconnect
java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
  at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
  at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
{code}


> TestingApplicationMaster keeps running after integration tests finish
> -
>
> 

[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197673#comment-16197673
 ] 

Fabian Hueske commented on FLINK-7548:
--

I've created a proposal for a reworked TableSource API that solves several 
issues. I've pushed a WIP branch to my repository: 
https://github.com/fhueske/flink/tree/tableWatermarks-extended

The main changes are as follows:
- The {{TableSource}} interface gets a new method {{getTableSchema(): 
TableSchema}} which returns the schema with all fields of the resulting table. 
For streaming tables, this includes also rowtime indicators. These fields have 
the corresponding {{TimeIndicatorTypeInformation}} types. This means, that the 
{{TypeInformation}} returned by {{getResultType()}} does no longer determine 
the schema and fields of the table. Hence, we need to perform a mapping from 
the physical input type ({{getResultType()}} and the logical table schema 
{{getTableSchema()}}. By default, this mapping is done based on field names, 
i.e., a table schema field is mapped to the field of the same name in the 
physical input type. If we cannot find a physical field for a logical field or 
if the types do not match, the {{TableSource}} is rejected. This default 
behavior resembles the current behavior. If fields should not be automatically 
mapped by name, we will allow to specify a manual index based mapping (similar 
to the previous {{DefinedFieldNames}} interface (note: this is not yet included 
in the WIP branch).
- Processing time fields (i.e., fields in the table schema with type 
{{TimeIndicatorTypeInformation(false)}}) are automatically inserted into the 
schema during the initial conversion.
- A {{StreamTableSource}} with rowtime fields (i.e., fields in the table schema 
with type {{TimeIndicatorTypeInformation(true)}}) requires an additional 
interface {{DefinedRowtimeAttributes}}. The interface provides a 
{{RowtimeAttributeDescriptor}} for each rowtime field (right now only a single 
rowtime field is supported but we are prepared for more). The 
{{RowtimeAttributeDescriptor}} provides a {{TimestampExtractor}} which gives a 
{{RexNode}} expression to extract the timestamp field from the input type. A 
corresponding expression is code-gen'd into the initial conversion and executed 
by the table scan operator. Moreover, {{RowtimeAttributeDescriptor}} gives a 
{{WatermarkStrategy}} which is used to generate watermarks for a rowtime 
attribute. Watermarks are also generated by the table scan operator.
- We can provided built-in implementations for {{TimestampExtractor}} (right 
now only {{ExistingField}} to convert a {{Long}} or {{Timestamp}} attributes 
into a rowtime attribute) and {{WatermarkStrategy}} (right now 
{{AscendingWatermarks}} and {{BoundedOutOfOrderWatermarks}}). Additional 
implementations can be added in later PRs.

I think the proposal is a good solution because:
- the table schema is explicitly defined by the {{TableSource}} and not by the 
API internals as before (handling of time attributes is currently hidden). This 
will make {{CREATE TABLE}} DDL statements easier.
- timestamp extraction and watermark assignment can be configured by extensible 
interfaces without changing the API internals. The actual timestamp extraction 
and watermark generation happen in the table scan operator, so the 
{{TableSource}} does not have to deal with it.
- Timestamp extraction happens with via {{RexNode}} expressions. So this is 
very versatile and it should be possible to call UDFs if these had been 
registered before (need to check this though).
- We can provide built-in implementations for the most common strategies. I 
think all requirements that we had listed in this issue before, should be 
solvable with this design.
- Projection push-down can be enabled for table sources with timestamp 
attributes which doesn't work right now (not part of the WIP branch)
- I could remove a few weird artifacts / workarounds of the current design that 
changes the tables schema internally.

Of course, there are also a downside:
- We are touching the {{TableSource}} interface and related interfaces. This is 
not a big deal for Flink's built-in sources (because there are not so many), 
but might be for users that have more {{TableSource}}s implemented.

Please let me know what you think [~jark], [~wheat9], [~twalthr], [~xccui], 
[~ykt836], and everybody else.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> 

[jira] [Commented] (FLINK-7787) Remove guava dependency in the cassandra connector

2017-10-09 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197582#comment-16197582
 ] 

Chesnay Schepler commented on FLINK-7787:
-

we can't get rid of the cassandra guava dependency since it is hard-coded into 
the datastax driver API.

> Remove guava dependency in the cassandra connector
> --
>
> Key: FLINK-7787
> URL: https://issues.apache.org/jira/browse/FLINK-7787
> Project: Flink
>  Issue Type: Bug
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> As discovered in FLINK-6225, the cassandra connector uses the future classes 
> in the guava library. We can get rid of the dependency by using the 
> equivalent classes provided by Java 8.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7788) Allow port range for queryable state client proxy.

2017-10-09 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-7788:
-

 Summary: Allow port range for queryable state client proxy.
 Key: FLINK-7788
 URL: https://issues.apache.org/jira/browse/FLINK-7788
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Affects Versions: 1.4.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.4.0


Currently the newly introduced queryable state client proxy can only take one 
port as a parameter to bind to. In case of multiple proxies running on one 
machine, this can result in port clashes and inability to start the 
corresponding proxies. 

This issue proposes to allow the specification of a port range, so that if some 
ports in the range are occupied, the proxy can still pick from the remaining 
free ones.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7787) Remove guava dependency in the cassandra connector

2017-10-09 Thread Haohui Mai (JIRA)
Haohui Mai created FLINK-7787:
-

 Summary: Remove guava dependency in the cassandra connector
 Key: FLINK-7787
 URL: https://issues.apache.org/jira/browse/FLINK-7787
 Project: Flink
  Issue Type: Bug
Reporter: Haohui Mai
Assignee: Haohui Mai


As discovered in FLINK-6225, the cassandra connector uses the future classes in 
the guava library. We can get rid of the dependency by using the equivalent 
classes provided by Java 8.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197500#comment-16197500
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3748
  
Given we have dropped the support for Java 7, it might make sense to just 
rewrite the class to use the completablefuture in java 8.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink

2017-10-09 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3748
  
Given we have dropped the support for Java 7, it might make sense to just 
rewrite the class to use the completablefuture in java 8.


---


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197497#comment-16197497
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3748
  
When importing guava you have to explicitly add an exclusion to 
https://github.com/apache/flink/blob/master/tools/maven/suppressions.xml.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink

2017-10-09 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3748
  
When importing guava you have to explicitly add an exclusion to 
https://github.com/apache/flink/blob/master/tools/maven/suppressions.xml.


---


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197419#comment-16197419
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4239


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197420#comment-16197420
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Thanks :)


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

2017-10-09 Thread pnowojski
Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4239


---


[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...

2017-10-09 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Thanks :)


---


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197379#comment-16197379
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user PangZhi commented on the issue:

https://github.com/apache/flink/pull/3748
  
@fhueske Hi zentol hasn't replied can you help to take a look. thx



> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink

2017-10-09 Thread PangZhi
Github user PangZhi commented on the issue:

https://github.com/apache/flink/pull/3748
  
@fhueske Hi zentol hasn't replied can you help to take a look. thx



---


[jira] [Commented] (FLINK-7731) Trigger on GlobalWindow does not clean state completely

2017-10-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197372#comment-16197372
 ] 

Aljoscha Krettek commented on FLINK-7731:
-

No, FLINK-7700 is not related because we're not dealing with merging windows 
here. I looked at the code again and it seems that {{clear()}} is currently 
only called when the window is being cleaned up because of window expiration.

[~gerardg] Can you try manually calling clear in places where you return 
{{FIRE_AND_PURGE}}? This should solve the problem of growing state. 

> Trigger on GlobalWindow does not clean state completely
> ---
>
> Key: FLINK-7731
> URL: https://issues.apache.org/jira/browse/FLINK-7731
> Project: Flink
>  Issue Type: Bug
>  Components: Core, DataStream API
>Affects Versions: 1.3.2
>Reporter: Gerard Garcia
>Priority: Minor
>
> I have an operator that consists of:
> CoGroup Datastream -> GlobalWindow -> CustomTrigger -> Apply function
> The custom trigger fires and purges the elements after it has received the 
> expected number of elements (or when a timeout fires) from one of the streams 
> and the apply function merges the elements with the ones received from the 
> other stream. It appears that the state of the operator grows continuously so 
> it seems it never gets completely cleaned.
> There is a discussion in 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clean-GlobalWidnow-state-td15613.html
>  that suggests that it may be a bug.
> This job reproduces the issue: 
> https://github.com/GerardGarcia/flink-global-window-growing-state



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197323#comment-16197323
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4742


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4742


---


[jira] [Closed] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-6988.
---
Resolution: Fixed

Implemented in 2f651e9a69a9929ef154e7bf6fcba624b0e8b9a1 and surrounding commits.

> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197306#comment-16197306
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4239
  
Merged!  

Could you please close this PR?


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...

2017-10-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4239
  
Merged! 😃 

Could you please close this PR?


---


[jira] [Commented] (FLINK-6219) Add a sorted state primitive

2017-10-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197278#comment-16197278
 ] 

Fabian Hueske commented on FLINK-6219:
--

Thanks [~phoenixjiangnan], I updated the linked issue. 
FLINK-6204 was closed in favor of the newer issue FLINK-7193.

> Add a sorted state primitive
> 
>
> Key: FLINK-6219
> URL: https://issues.apache.org/jira/browse/FLINK-6219
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Table API & SQL
>Reporter: sunjincheng
>
> When we implement the OVER window of 
> [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]
> We notice that we need a state primitive which supports sorting, allows for 
> efficient insertion, traversal in order, and removal from the head. 
> For example: In event-time OVER window, we need to sort by time,If the datas 
> as follow:
> {code}
> (1L, 1, Hello)
> (2L, 2, Hello)
> (5L, 5, Hello)
> (4L, 4, Hello)
> {code}
> We randomly insert the datas, just like:
> {code}
> put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, 
> Hello)),
> {code}
> We deal with elements in time order:
> {code}
> process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, 
> Hello)),process((5L, 5, Hello))
> {code}
> Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] 
> [~aljoscha] [~fhueske] 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7388) ProcessFunction.onTimer() sets processing time as timestamp

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197245#comment-16197245
 ] 

ASF GitHub Bot commented on FLINK-7388:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4786#discussion_r143517965
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
 ---
@@ -79,7 +79,6 @@ public void onEventTime(InternalTimer 
timer) throws Exception
 
@Override
public void onProcessingTime(InternalTimer timer) 
throws Exception {
-   collector.setAbsoluteTimestamp(timer.getTimestamp());
--- End diff --

@aljoscha Thanks for pointing this out. Fixed!


> ProcessFunction.onTimer() sets processing time as timestamp
> ---
>
> Key: FLINK-7388
> URL: https://issues.apache.org/jira/browse/FLINK-7388
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> The {{ProcessFunction.onTimer()}} method sets the current processing time as 
> event-time timestamp when it is called from a processing time timer.
> I don't think this behavior is useful. Processing time timestamps won't be 
> aligned with watermarks and are not deterministic. The only reason would be 
> to have _some_ value in the timestamp field. However, the behavior is very 
> subtle and might not be noticed by users.
> IMO, it would be better to erase the timestamp. This will cause downstream 
> operator that rely on timestamps to fail and notify the users that the logic 
> they implemented was probably not what they intended to do.
> What do you think [~aljoscha]?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4786: [FLINK-7388][DataStream API] ProcessFunction.onTim...

2017-10-09 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4786#discussion_r143517965
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
 ---
@@ -79,7 +79,6 @@ public void onEventTime(InternalTimer 
timer) throws Exception
 
@Override
public void onProcessingTime(InternalTimer timer) 
throws Exception {
-   collector.setAbsoluteTimestamp(timer.getTimestamp());
--- End diff --

@aljoscha Thanks for pointing this out. Fixed!


---


[jira] [Commented] (FLINK-6615) tmp directory not cleaned up on shutdown

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197240#comment-16197240
 ] 

ASF GitHub Bot commented on FLINK-6615:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4787
  
I was looking into 
[FLINK-6615](https://issues.apache.org/jira/browse/FLINK-6615), and I think 
6615 is not a problem due to files may be created concurrently. But I found the 
current impl of `deleteDirectory()` to be cumbersome.

The new code also handles concurrent removals well, and I can add back 
`cleanDirectory()` though it's not used anywhere yet. I'm happy to keep the 
original impl if you prefer it. Which version do you like better?


> tmp directory not cleaned up on shutdown
> 
>
> Key: FLINK-6615
> URL: https://issues.apache.org/jira/browse/FLINK-6615
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Andrey
>Assignee: Bowen Li
>
> Steps to reproduce:
> 1) Stop task manager gracefully (kill -6 )
> 2) In the logs:
> {code}
> 2017-05-17 13:35:50,147 INFO  org.apache.zookeeper.ClientCnxn 
>   - EventThread shut down [main-EventThread]
> 2017-05-17 13:35:50,200 ERROR 
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - IOManager 
> failed to properly clean up temp file directory: 
> /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47 
> [flink-akka.actor.default-dispatcher-2]
> java.nio.file.DirectoryNotEmptyException: 
> /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47
>   at 
> sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.flink.util.FileUtils.deleteDirectory(FileUtils.java:154)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager.shutdown(IOManager.java:109)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:185)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:241)
>   at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> {code}
> Expected:
> * on shutdown delete non-empty directory anyway. 
> Notes:
> * after process terminated, I've checked 
> "/mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47" directory and 
> didn't find anything there. So it looks like timing issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4787: [FLINK-6615][core] simplify FileUtils

2017-10-09 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4787
  
I was looking into 
[FLINK-6615](https://issues.apache.org/jira/browse/FLINK-6615), and I think 
6615 is not a problem due to files may be created concurrently. But I found the 
current impl of `deleteDirectory()` to be cumbersome.

The new code also handles concurrent removals well, and I can add back 
`cleanDirectory()` though it's not used anywhere yet. I'm happy to keep the 
original impl if you prefer it. Which version do you like better?


---


[jira] [Comment Edited] (FLINK-6219) Add a sorted state primitive

2017-10-09 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197188#comment-16197188
 ] 

Bowen Li edited comment on FLINK-6219 at 10/9/17 4:18 PM:
--

[~fhueske] The new title is much better since the discussion above said we 
don't need a dedicated sort backend. Besides, I think the dependency in 'Issues 
Link' is set wrong. This ticket shows to be a dependency of FLINK-6204 but 6204 
is already finished. I reopened this ticket. Maybe you can correct the 
dependencies of issues, or add some explanation here?


was (Author: phoenixjiangnan):
[~fhueske] I think the dependency in 'Issues Link' is set wrong. This ticket 
shows to be a dependency of FLINK-6204 but 6204 is already finished. I reopened 
this ticket. Maybe you can correct the dependencies of issues, or add some 
explanation here?

> Add a sorted state primitive
> 
>
> Key: FLINK-6219
> URL: https://issues.apache.org/jira/browse/FLINK-6219
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Table API & SQL
>Reporter: sunjincheng
>
> When we implement the OVER window of 
> [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]
> We notice that we need a state primitive which supports sorting, allows for 
> efficient insertion, traversal in order, and removal from the head. 
> For example: In event-time OVER window, we need to sort by time,If the datas 
> as follow:
> {code}
> (1L, 1, Hello)
> (2L, 2, Hello)
> (5L, 5, Hello)
> (4L, 4, Hello)
> {code}
> We randomly insert the datas, just like:
> {code}
> put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, 
> Hello)),
> {code}
> We deal with elements in time order:
> {code}
> process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, 
> Hello)),process((5L, 5, Hello))
> {code}
> Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] 
> [~aljoscha] [~fhueske] 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7786) JarActionHandler.tokenizeArguments removes all quotes from program arguments

2017-10-09 Thread Brice Bingman (JIRA)
Brice Bingman created FLINK-7786:


 Summary: JarActionHandler.tokenizeArguments removes all quotes 
from program arguments
 Key: FLINK-7786
 URL: https://issues.apache.org/jira/browse/FLINK-7786
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.3.0
Reporter: Brice Bingman
Priority: Minor


I would like to send json as an argument to my program, but when submitting it 
via the REST API, all the quotes in the json are gone, resulting in invalid 
json.

The JarActionHandler.tokenizeArguments should only remove the leading and 
trailing quotes of an argument, not all of the quotes.

Current workaround:  Replace the quotes in json with an escape character and 
replace them back in the run method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4773: [FLINK-7761] [examples] Include shaded guava depen...

2017-10-09 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4773


---


[jira] [Closed] (FLINK-7761) Twitter example is not self-contained

2017-10-09 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7761.
---
Resolution: Fixed

1.4: 08bfdae684548c2e9edf189f29d892335fd27dd3

> Twitter example is not self-contained
> -
>
> Key: FLINK-7761
> URL: https://issues.apache.org/jira/browse/FLINK-7761
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> The Twitter example jar is not self-contained as it excludes the shaded guava 
> dependency from the twitter connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7761) Twitter example is not self-contained

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197215#comment-16197215
 ] 

ASF GitHub Bot commented on FLINK-7761:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4773


> Twitter example is not self-contained
> -
>
> Key: FLINK-7761
> URL: https://issues.apache.org/jira/browse/FLINK-7761
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> The Twitter example jar is not self-contained as it excludes the shaded guava 
> dependency from the twitter connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7785) DispatcherTest failure

2017-10-09 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-7785:
---

 Summary: DispatcherTest failure
 Key: FLINK-7785
 URL: https://issues.apache.org/jira/browse/FLINK-7785
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination, Tests
Affects Versions: 1.4.0
Reporter: Chesnay Schepler
 Fix For: 1.4.0


https://s3.amazonaws.com/archive.travis-ci.org/jobs/285598045
{code}
Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.539 sec <<< 
FAILURE! - in org.apache.flink.runtime.dispatcher.DispatcherTest
testLeaderElection(org.apache.flink.runtime.dispatcher.DispatcherTest)  Time 
elapsed: 0.016 sec  <<< ERROR!
akka.actor.InvalidActorNameException: actor name [dispatcher] is not unique!
at 
akka.actor.dungeon.ChildrenContainer$TerminatingChildrenContainer.reserve(ChildrenContainer.scala:192)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:201)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:41)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:208)
at org.apache.flink.runtime.rpc.RpcEndpoint.(RpcEndpoint.java:89)
at 
org.apache.flink.runtime.rpc.FencedRpcEndpoint.(FencedRpcEndpoint.java:44)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.(Dispatcher.java:102)
at 
org.apache.flink.runtime.dispatcher.DispatcherTest$TestingDispatcher.(DispatcherTest.java:207)
at 
org.apache.flink.runtime.dispatcher.DispatcherTest.testLeaderElection(DispatcherTest.java:171)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6219) Add a sorted state primitive

2017-10-09 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197188#comment-16197188
 ] 

Bowen Li commented on FLINK-6219:
-

[~fhueske] I think the dependency in 'Issues Link' is set wrong. This ticket 
shows to be a dependency of FLINK-6204 but 6204 is already finished. I reopened 
this ticket. Maybe you can correct the dependencies of issues, or add some 
explanation here?

> Add a sorted state primitive
> 
>
> Key: FLINK-6219
> URL: https://issues.apache.org/jira/browse/FLINK-6219
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Table API & SQL
>Reporter: sunjincheng
>
> When we implement the OVER window of 
> [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]
> We notice that we need a state primitive which supports sorting, allows for 
> efficient insertion, traversal in order, and removal from the head. 
> For example: In event-time OVER window, we need to sort by time,If the datas 
> as follow:
> {code}
> (1L, 1, Hello)
> (2L, 2, Hello)
> (5L, 5, Hello)
> (4L, 4, Hello)
> {code}
> We randomly insert the datas, just like:
> {code}
> put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, 
> Hello)),
> {code}
> We deal with elements in time order:
> {code}
> process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, 
> Hello)),process((5L, 5, Hello))
> {code}
> Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] 
> [~aljoscha] [~fhueske] 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-6219) Add a sorted state primitive

2017-10-09 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reopened FLINK-6219:
-

> Add a sorted state primitive
> 
>
> Key: FLINK-6219
> URL: https://issues.apache.org/jira/browse/FLINK-6219
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Table API & SQL
>Reporter: sunjincheng
>
> When we implement the OVER window of 
> [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]
> We notice that we need a state primitive which supports sorting, allows for 
> efficient insertion, traversal in order, and removal from the head. 
> For example: In event-time OVER window, we need to sort by time,If the datas 
> as follow:
> {code}
> (1L, 1, Hello)
> (2L, 2, Hello)
> (5L, 5, Hello)
> (4L, 4, Hello)
> {code}
> We randomly insert the datas, just like:
> {code}
> put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, 
> Hello)),
> {code}
> We deal with elements in time order:
> {code}
> process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, 
> Hello)),process((5L, 5, Hello))
> {code}
> Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] 
> [~aljoscha] [~fhueske] 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197180#comment-16197180
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143508371
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,8 +33,7 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
--- End diff --

This is a valid objection which I share. I'll remove this change and set 
the map of `TaskCheckpointStatistics` to an empty map in case that we want to 
leave the details out.


> Port CheckpointStatsDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7709
> URL: https://issues.apache.org/jira/browse/FLINK-7709
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197181#comment-16197181
 ] 

ASF GitHub Bot commented on FLINK-7739:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4777#discussion_r143480344
  
--- Diff: flink-core/pom.xml ---
@@ -77,6 +88,12 @@ under the License.

org.apache.commons
commons-compress
+   
+   
--- End diff --

`avro` is pulling `xz` as well

```
[INFO] +- org.apache.avro:avro:jar:1.8.2:compile
[INFO] |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
[INFO] |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
[INFO] |  +- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
[INFO] |  \- org.tukaani:xz:jar:1.5:compile
```


> Improve Kafka*ITCase tests stability
> 
>
> Key: FLINK-7739
> URL: https://issues.apache.org/jira/browse/FLINK-7739
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4777: [FLINK-7739] Enable dependency convergence

2017-10-09 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4777#discussion_r143480344
  
--- Diff: flink-core/pom.xml ---
@@ -77,6 +88,12 @@ under the License.

org.apache.commons
commons-compress
+   
+   
--- End diff --

`avro` is pulling `xz` as well

```
[INFO] +- org.apache.avro:avro:jar:1.8.2:compile
[INFO] |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
[INFO] |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
[INFO] |  +- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
[INFO] |  \- org.tukaani:xz:jar:1.5:compile
```


---


[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

2017-10-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143508371
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,8 +33,7 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
--- End diff --

This is a valid objection which I share. I'll remove this change and set 
the map of `TaskCheckpointStatistics` to an empty map in case that we want to 
leave the details out.


---


[jira] [Closed] (FLINK-7750) Strange behaviour in savepoints

2017-10-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7750.
---
Resolution: Not A Problem

> Strange behaviour in savepoints
> ---
>
> Key: FLINK-7750
> URL: https://issues.apache.org/jira/browse/FLINK-7750
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.2
>Reporter: Razvan
>Priority: Blocker
>
> I recently upgraded from 1.2.0 and savepoint creration behaves strange. 
> Whenever I try to create a savepoint with specified directory Apache Flink 
> creates a folder on the active JobManager (even if I trigger savepoint 
> creation from a different JobManager) which contains only _metadata. And 
> another folder on the TaskManager where the job is running which contains the 
> actual savepoint.
> Obviously if I try to restore it says it can't find the savepoint.
> This worked in 1.2.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7750) Strange behaviour in savepoints

2017-10-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197175#comment-16197175
 ] 

Aljoscha Krettek commented on FLINK-7750:
-

If the data was not split when using Flink 1.2.x this was only a side-effect of 
an optimisation in the state backend that would allow it to not store state in 
a file system and instead send it to the job manager.

I'm afraid what you are observing is the expected behaviour.

> Strange behaviour in savepoints
> ---
>
> Key: FLINK-7750
> URL: https://issues.apache.org/jira/browse/FLINK-7750
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.2
>Reporter: Razvan
>Priority: Blocker
>
> I recently upgraded from 1.2.0 and savepoint creration behaves strange. 
> Whenever I try to create a savepoint with specified directory Apache Flink 
> creates a folder on the active JobManager (even if I trigger savepoint 
> creation from a different JobManager) which contains only _metadata. And 
> another folder on the TaskManager where the job is running which contains the 
> actual savepoint.
> Obviously if I try to restore it says it can't find the savepoint.
> This worked in 1.2.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197170#comment-16197170
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143505801
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

2017-10-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143505801
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;
+
+   @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+   private final int numSubtasks;
+
+   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+   private final int numAckSubtasks;
+
+   @JsonProperty(FIELD_NAME_TASKS)
  

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

2017-10-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143504804
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;
+
+   @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+   private final int numSubtasks;
+
+   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+   private final int numAckSubtasks;
+
+   @JsonProperty(FIELD_NAME_TASKS)
  

[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197163#comment-16197163
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143504804
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;

[jira] [Updated] (FLINK-7495) AbstractUdfStreamOperator#initializeState() should be called in AsyncWaitOperator#initializeState()

2017-10-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7495:

Component/s: (was: State Backends, Checkpointing)
 DataStream API

> AbstractUdfStreamOperator#initializeState() should be called in 
> AsyncWaitOperator#initializeState()
> ---
>
> Key: FLINK-7495
> URL: https://issues.apache.org/jira/browse/FLINK-7495
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: Fang Yong
>Priority: Minor
>
> {code}
> recoveredStreamElements = context
>   .getOperatorStateStore()
>   .getListState(new ListStateDescriptor<>(STATE_NAME, 
> inStreamElementSerializer));
> {code}
> Call to AbstractUdfStreamOperator#initializeState() should be added in the 
> beginning



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7388) ProcessFunction.onTimer() sets processing time as timestamp

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197158#comment-16197158
 ] 

ASF GitHub Bot commented on FLINK-7388:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4786#discussion_r143502884
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
 ---
@@ -79,7 +79,6 @@ public void onEventTime(InternalTimer 
timer) throws Exception
 
@Override
public void onProcessingTime(InternalTimer timer) 
throws Exception {
-   collector.setAbsoluteTimestamp(timer.getTimestamp());
--- End diff --

This should call `eraseTimestamp()` because we might still have a timestamp 
set from processing some previous elements. Same for the other occurrences in 
the code.


> ProcessFunction.onTimer() sets processing time as timestamp
> ---
>
> Key: FLINK-7388
> URL: https://issues.apache.org/jira/browse/FLINK-7388
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> The {{ProcessFunction.onTimer()}} method sets the current processing time as 
> event-time timestamp when it is called from a processing time timer.
> I don't think this behavior is useful. Processing time timestamps won't be 
> aligned with watermarks and are not deterministic. The only reason would be 
> to have _some_ value in the timestamp field. However, the behavior is very 
> subtle and might not be noticed by users.
> IMO, it would be better to erase the timestamp. This will cause downstream 
> operator that rely on timestamps to fail and notify the users that the logic 
> they implemented was probably not what they intended to do.
> What do you think [~aljoscha]?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

2017-10-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143502870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;
+
+   @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+   private final int numSubtasks;
+
+   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+   private final int numAckSubtasks;
+
+   @JsonProperty(FIELD_NAME_TASKS)
+   

[GitHub] flink pull request #4786: [FLINK-7388][DataStream API] ProcessFunction.onTim...

2017-10-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4786#discussion_r143502884
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
 ---
@@ -79,7 +79,6 @@ public void onEventTime(InternalTimer 
timer) throws Exception
 
@Override
public void onProcessingTime(InternalTimer timer) 
throws Exception {
-   collector.setAbsoluteTimestamp(timer.getTimestamp());
--- End diff --

This should call `eraseTimestamp()` because we might still have a timestamp 
set from processing some previous elements. Same for the other occurrences in 
the code.


---


[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197156#comment-16197156
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143502870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;
+

[jira] [Commented] (FLINK-7167) Job can't set checkpoint meta directory to cover the system setting state.checkpoints.dir

2017-10-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197144#comment-16197144
 ] 

Aljoscha Krettek commented on FLINK-7167:
-

Thanks for pointing this out, [~phoenixjiangnan]. There is actually a (very 
outdated) PR that also tries to tackle this issue: 
https://github.com/apache/flink/pull/3522. I think the solution we will go for 
in the end is to get rid of this configuration parameter, this should make 
things easier.

> Job can't set checkpoint meta directory to cover the system setting 
> state.checkpoints.dir 
> --
>
> Key: FLINK-7167
> URL: https://issues.apache.org/jira/browse/FLINK-7167
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.1
>Reporter: yuqi
>Assignee: Bowen Li
>
> If we want to recover a failed job use checkpoint, till now, as all job 
> checkpoint meta data are in the same directory and do not have specific 
> identification, we have to traverse all file in the directory to find the 
> data of this job.  this is rather troublesome.  so seting this configuration 
> in the job level is preferable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197137#comment-16197137
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143500886
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

2017-10-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143500886
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;
+
+   @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+   private final int numSubtasks;
+
+   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+   private final int numAckSubtasks;
+
+   @JsonProperty(FIELD_NAME_TASKS)
  

[jira] [Closed] (FLINK-5510) Replace Scala Future with FlinkFuture in QueryableStateClient

2017-10-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-5510.
---
Resolution: Duplicate

With the QS client reworking of FLINK-7770 this issue becomes obsolete since 
FLINK-7770 also removes the Scala Futures in favour of Java Futures.

> Replace Scala Future with FlinkFuture in QueryableStateClient
> -
>
> Key: FLINK-5510
> URL: https://issues.apache.org/jira/browse/FLINK-5510
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Priority: Minor
>
> The entry point for queryable state users is the {{QueryableStateClient}} 
> which returns query results via Scala Futures. Since merging the initial 
> version of QueryableState we have introduced the FlinkFuture wrapper type in 
> order to not expose our Scala dependency via the API.
> Since APIs tend to stick around longer than expected, it might be worthwhile 
> to port the exposed QueryableStateClient interface to use the FlinkFuture. 
> Early users can still get the Scala Future via FlinkFuture#getScalaFuture().



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7736) Fix some of the alerts raised by lgtm.com

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197132#comment-16197132
 ] 

ASF GitHub Bot commented on FLINK-7736:
---

Github user 1m2c3t4 commented on the issue:

https://github.com/apache/flink/pull/4784
  
All review comments addressed


> Fix some of the alerts raised by lgtm.com
> -
>
> Key: FLINK-7736
> URL: https://issues.apache.org/jira/browse/FLINK-7736
> Project: Flink
>  Issue Type: Improvement
>Reporter: Malcolm Taylor
>Assignee: Malcolm Taylor
>
> lgtm.com has identified a number of issues giving scope for improvement in 
> the code: [https://lgtm.com/projects/g/apache/flink/alerts/?mode=list]
> This issue is to address some of the simpler ones. Some of these are quite 
> clear bugs such as off-by-one errors. Others are areas where the code might 
> be made clearer, such as use of a variable name which shadows another 
> variable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4784: FLINK-7736: fix some lgtm.com alerts

2017-10-09 Thread 1m2c3t4
Github user 1m2c3t4 commented on the issue:

https://github.com/apache/flink/pull/4784
  
All review comments addressed


---


[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197131#comment-16197131
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143499239
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

2017-10-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143499239
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+   public static final String FIELD_NAME_ID = "id";
+
+   public static final String FIELD_NAME_STATUS = "status";
+
+   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+   public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+   public static final String FIELD_NAME_TASKS = "tasks";
+
+   @JsonProperty(FIELD_NAME_ID)
+   private final long id;
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final CheckpointStatsStatus status;
+
+   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+   private final boolean savepoint;
+
+   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+   private final long triggerTimestamp;
+
+   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+   private final long latestAckTimestamp;
+
+   @JsonProperty(FIELD_NAME_STATE_SIZE)
+   private final long stateSize;
+
+   @JsonProperty(FIELD_NAME_DURATION)
+   private final long duration;
+
+   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+   private final long alignmentBuffered;
+
+   @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+   private final int numSubtasks;
+
+   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+   private final int numAckSubtasks;
+
+   @JsonProperty(FIELD_NAME_TASKS)
  

[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern

2017-10-09 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197108#comment-16197108
 ] 

Kostas Kloudas commented on FLINK-7782:
---

Hi [~ajkrishna], as seen in the JIRA that you posted 
(https://issues.apache.org/jira/browse/FLINK-7606) the problem there was that 
there are no more elements in the stream, so the watermark was not advancing. 
This lead to the elements being buffered inside Flink, and waiting for the 
watermark to advace, so that their "window" expires.

Can this be the case also for you? You can monitor the size of you state, or 
you can write you own timestamp extractor and print every watermark you send. 
With parallelism of 1 you can see if the watermark is greater than that of your 
last element, or not (which would mean that your data is buffered).

> Flink CEP not recognizing pattern
> -
>
> Key: FLINK-7782
> URL: https://issues.apache.org/jira/browse/FLINK-7782
> Project: Flink
>  Issue Type: Bug
>Reporter: Ajay
>
> I am using flink version 1.3.2. Flink has a kafka source. I am using 
> KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM 
> running Ubuntu 16.04. From the flink dashboard, I see that I have 2 
> Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> {code:java}
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> {code}
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> {code:java}
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> //cepMapByHomeId.print();
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> {code}
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> {code:java}
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> {code}
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented by a step of 1 sec. 
> I start multiple processes to simulate data form different homes.
> THE PROBLEM:
> Flink completely misses capturing events for a large subset of the input 
> data. I barely see the events for about 4-5 of the home_id values. I do a 
> print before applying the pattern and after and I see all home_ids before and 
> only a tiny subset after. Since the data is exactly the same, I expect all 
> homeid to be captured and written to my sink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197099#comment-16197099
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143489310
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,8 +33,7 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
--- End diff --

Do note that we must be on the look-out for requests that use primitive 
fields, as jackson will default them to 0 if they are missing, which will cause 
misleading error messages.


> Port CheckpointStatsDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7709
> URL: https://issues.apache.org/jira/browse/FLINK-7709
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

2017-10-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4763#discussion_r143489310
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,8 +33,7 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
--- End diff --

Do note that we must be on the look-out for requests that use primitive 
fields, as jackson will default them to 0 if they are missing, which will cause 
misleading error messages.


---


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-10-09 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197094#comment-16197094
 ] 

Kostas Kloudas commented on FLINK-7606:
---

I do not think that this would solve the problem. 
In event time, an element that has a timestamp X does not mean that also the 
event time has advanced passed that time. 
The watermark is the one that signals that time has advanced. So if the 
watermark does not advance, we cannot be sure that we can close a window. 

Only if you know your data, _e.g._ that timestamps are in order, then you can 
assume that. 
But in this case you could probably write your own timestamp extractor, that 
after a timeout, it sends a watermark with the "correct" timestamp.

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: Schermata 2017-09-27 alle 00.35.53.png, heap-dump1.png, 
> heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-10-09 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r143487841
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private var leftExpirationTime: Long = 0L;
+  private var rightExpirationTime: Long = 0L;
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+
+  // for delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and emitting it (as 
part of a joined result).
+* Only reasonable for row time join.
+*
+* @return the maximum delay for the outputs
+*/
+  def getMaxOutputDelay: Long = Math.max(leftRelativeSize, 

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197091#comment-16197091
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r143487841
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private var leftExpirationTime: Long = 0L;
+  private var rightExpirationTime: Long = 0L;
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+
+  // for delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and 

[jira] [Created] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-09 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-7784:
---

 Summary: Don't fail TwoPhaseCommitSinkFunction when failing to 
commit
 Key: FLINK-7784
 URL: https://issues.apache.org/jira/browse/FLINK-7784
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.4.0
Reporter: Aljoscha Krettek
Priority: Blocker


Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails (either 
when doing it via the completed checkpoint notification or when trying to 
commit after restoring after failure). This means that the job will go into an 
infinite recovery loop because we will always keep failing.

In some cases it might be better to ignore those failures and keep on 
processing and this should be the default. We can provide an option that allows 
failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7783) Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()

2017-10-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7783:

Summary: Don't always remove checkpoints in 
ZooKeeperCompletedCheckpointStore#recover()  (was: Don't always remove 
checkpoints ZooKeeperCompletedCheckpointStore#recover())

> Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
> --
>
> Key: FLINK-7783
> URL: https://issues.apache.org/jira/browse/FLINK-7783
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, we always delete checkpoint handles if they (or the data from the 
> DFS) cannot be read: 
> https://github.com/apache/flink/blob/91a4b276171afb760bfff9ccf30593e648e91dfb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L180
> This can lead to problems in case the DFS is temporarily now available, i.e. 
> we could inadvertently
> delete all checkpoints even though they are still valid.
> A user reported this problem on the mailing list: 
> https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7783) Don't always remove checkpoints ZooKeeperCompletedCheckpointStore#recover()

2017-10-09 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-7783:
---

 Summary: Don't always remove checkpoints 
ZooKeeperCompletedCheckpointStore#recover()
 Key: FLINK-7783
 URL: https://issues.apache.org/jira/browse/FLINK-7783
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Affects Versions: 1.3.2, 1.4.0
Reporter: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.4.0, 1.3.3


Currently, we always delete checkpoint handles if they (or the data from the 
DFS) cannot be read: 
https://github.com/apache/flink/blob/91a4b276171afb760bfff9ccf30593e648e91dfb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L180

This can lead to problems in case the DFS is temporarily now available, i.e. we 
could inadvertently
delete all checkpoints even though they are still valid.

A user reported this problem on the mailing list: 
https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern

2017-10-09 Thread Ajay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197028#comment-16197028
 ] 

Ajay commented on FLINK-7782:
-


Here is an example of a simple event I am trying to detect. The first and last 
line are the interesting points/events. The CEP library is not able to detect 
something like that. 

4> (96,Sat Sep 30 22:30:25 UTC 2017,complex 
event,Low,32.781082,-117.01864,12.0,20.0)

4> (96,Sat Sep 30 22:30:26 UTC 2017,complex 
event,High,32.781082,-117.01864,0.0235,20.0)

4> (96,Sat Sep 30 22:30:27 UTC 2017,complex 
event,High,32.781082,-117.01864,0.02319611,20.0)

4> (96,Sat Sep 30 22:30:28 UTC 2017,complex 
event,Medium,32.781082,-117.01864,0.023357224,20.0)

4> (96,Sat Sep 30 22:30:29 UTC 2017,complex 
event,Low,32.781082,-117.01864,0.060904443,20.0)

4> (96,Sat Sep 30 22:30:30 UTC 2017,complex 
event,Medium,32.781082,-117.01864,0.100115,20.0)

4> (96,Sat Sep 30 22:30:31 UTC 2017,complex 
event,High,32.781082,-117.01864,0.12398389,20.0)

4> (96,Sat Sep 30 22:30:32 UTC 2017,complex 
event,Medium,32.781082,-117.01864,0.15611167,20.0)

4> (96,Sat Sep 30 22:30:33 UTC 2017,complex 
event,Low,32.781082,-117.01864,0.15817556,20.0)

4> (96,Sat Sep 30 22:30:34 UTC 2017,complex 
event,Low,32.781082,-117.01864,0.09934334,20.0)

4> (96,Sat Sep 30 22:30:35 UTC 2017,complex 
event,High,32.781082,-117.01864,16.0,20.0)



Notes about this experiment.

1. Only one kafka partition and just one topic

2. Flink env parallelism set to 4 and I am using AscendingTimestampExtractor on 
KafkaSource09.

3. In the data above, the first element is the id that I use for keyBy

4. I started 4 Kafka producers in parallel with a random delay between them

5. Each producer sends 1 rows from a csv at an average of 18 seconds. Of 
the data from 4 producers, the events for only one was detected. 

6. Looking at the log files, I print on the stream and see all 4 lines 
where each id is associated with one process number. In the above data 96 is 
only associated with 4. In this case there is just one partition in Kafka. If I 
were to increase the number of partitions each id is spread across multiple 
processes.

7. I had ran another test with a different set of 4 ids just before the one 
I've presented above and I expected to see 148 events for 4 ids and I saw all 
of them being captured. I did not change anything as far as delays in the 
producer.

> Flink CEP not recognizing pattern
> -
>
> Key: FLINK-7782
> URL: https://issues.apache.org/jira/browse/FLINK-7782
> Project: Flink
>  Issue Type: Bug
>Reporter: Ajay
>
> I am using flink version 1.3.2. Flink has a kafka source. I am using 
> KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM 
> running Ubuntu 16.04. From the flink dashboard, I see that I have 2 
> Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> {code:java}
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> {code}
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> {code:java}
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> //cepMapByHomeId.print();
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> {code}
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> {code:java}
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> {code}
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows 

[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern

2017-10-09 Thread Ajay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197027#comment-16197027
 ] 

Ajay commented on FLINK-7782:
-

1. Using an env parallelism of 1 performed similar with the additional problem 
that there was significant lag in the kafka topic
2. I removed the additional keyBy(0) but that did not change anything
3. I also tried only to check for the start only pattern and it was exactly the 
same where I saw one of the homes going through but 3 others just getting 
dropped. 
4. I also tried slowing down the rate from 5000/second into Kafka to about 
1000/second but I see similar results. 

> Flink CEP not recognizing pattern
> -
>
> Key: FLINK-7782
> URL: https://issues.apache.org/jira/browse/FLINK-7782
> Project: Flink
>  Issue Type: Bug
>Reporter: Ajay
>
> I am using flink version 1.3.2. Flink has a kafka source. I am using 
> KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM 
> running Ubuntu 16.04. From the flink dashboard, I see that I have 2 
> Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> {code:java}
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> {code}
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> {code:java}
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> //cepMapByHomeId.print();
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> {code}
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> {code:java}
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> {code}
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented by a step of 1 sec. 
> I start multiple processes to simulate data form different homes.
> THE PROBLEM:
> Flink completely misses capturing events for a large subset of the input 
> data. I barely see the events for about 4-5 of the home_id values. I do a 
> print before applying the pattern and after and I see all home_ids before and 
> only a tiny subset after. Since the data is exactly the same, I expect all 
> homeid to be captured and written to my sink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197024#comment-16197024
 ] 

ASF GitHub Bot commented on FLINK-7739:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4749
  
Users shouldn't be able to call `shutdown()` and if they are, they 
shouldn't be using it (it would be incorrect). `shutdown()` was a protected 
method and shouldn't be used the users. Unfortunately in many places in 
`flink-tests` it was used instead of public `stop()`, because protected methods 
are visible inside the same package and it was really confusing which one 
should be used. 

Ideally it would be great to have another visibility level like "only 
visible to the children", but there is no such thing :/ Thus I renamed 
`shutdown()` to make clear distinction from public `stop()`. 


> Improve Kafka*ITCase tests stability
> 
>
> Key: FLINK-7739
> URL: https://issues.apache.org/jira/browse/FLINK-7739
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern

2017-10-09 Thread Ajay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197023#comment-16197023
 ] 

Ajay commented on FLINK-7782:
-

Related to FLINK-7606

> Flink CEP not recognizing pattern
> -
>
> Key: FLINK-7782
> URL: https://issues.apache.org/jira/browse/FLINK-7782
> Project: Flink
>  Issue Type: Bug
>Reporter: Ajay
>
> I am using flink version 1.3.2. Flink has a kafka source. I am using 
> KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM 
> running Ubuntu 16.04. From the flink dashboard, I see that I have 2 
> Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> {code:java}
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> {code}
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> {code:java}
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> //cepMapByHomeId.print();
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> {code}
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> {code:java}
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> {code}
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented by a step of 1 sec. 
> I start multiple processes to simulate data form different homes.
> THE PROBLEM:
> Flink completely misses capturing events for a large subset of the input 
> data. I barely see the events for about 4-5 of the home_id values. I do a 
> print before applying the pattern and after and I see all home_ids before and 
> only a tiny subset after. Since the data is exactly the same, I expect all 
> homeid to be captured and written to my sink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4647: [FLINK-7575] [WEB-DASHBOARD] Display "Fetching..."...

2017-10-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4647


---


[jira] [Commented] (FLINK-7575) Dashboard jobs/tasks metrics display 0 when metrics are not yet available

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197025#comment-16197025
 ] 

ASF GitHub Bot commented on FLINK-7575:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4647


> Dashboard jobs/tasks metrics display 0 when metrics are not yet available
> -
>
> Key: FLINK-7575
> URL: https://issues.apache.org/jira/browse/FLINK-7575
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.3.2
>Reporter: James Lafa
>Assignee: James Lafa
>Priority: Minor
> Fix For: 1.4.0
>
>
> The web frontend is currently displaying "0" when a metric is not available 
> yet (ex: records-in/out, bytes-in/out). 
> 0 is misleading and it's preferable to display no value while the value is 
> still unknown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4749: [FLINK-7739][tests] Properly shutdown resources in tests

2017-10-09 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4749
  
Users shouldn't be able to call `shutdown()` and if they are, they 
shouldn't be using it (it would be incorrect). `shutdown()` was a protected 
method and shouldn't be used the users. Unfortunately in many places in 
`flink-tests` it was used instead of public `stop()`, because protected methods 
are visible inside the same package and it was really confusing which one 
should be used. 

Ideally it would be great to have another visibility level like "only 
visible to the children", but there is no such thing :/ Thus I renamed 
`shutdown()` to make clear distinction from public `stop()`. 


---


[jira] [Commented] (FLINK-7761) Twitter example is not self-contained

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197022#comment-16197022
 ] 

ASF GitHub Bot commented on FLINK-7761:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4773
  
merging.


> Twitter example is not self-contained
> -
>
> Key: FLINK-7761
> URL: https://issues.apache.org/jira/browse/FLINK-7761
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> The Twitter example jar is not self-contained as it excludes the shaded guava 
> dependency from the twitter connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7575) Dashboard jobs/tasks metrics display 0 when metrics are not yet available

2017-10-09 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7575.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

1.4: c7968e9c2ad6601f9f38b673ead5bc71611a6fbd

> Dashboard jobs/tasks metrics display 0 when metrics are not yet available
> -
>
> Key: FLINK-7575
> URL: https://issues.apache.org/jira/browse/FLINK-7575
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.3.2
>Reporter: James Lafa
>Assignee: James Lafa
>Priority: Minor
> Fix For: 1.4.0
>
>
> The web frontend is currently displaying "0" when a metric is not available 
> yet (ex: records-in/out, bytes-in/out). 
> 0 is misleading and it's preferable to display no value while the value is 
> still unknown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4773: [FLINK-7761] [examples] Include shaded guava dependency i...

2017-10-09 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4773
  
merging.


---


[jira] [Created] (FLINK-7782) Flink CEP not recognizing pattern

2017-10-09 Thread Ajay (JIRA)
Ajay created FLINK-7782:
---

 Summary: Flink CEP not recognizing pattern
 Key: FLINK-7782
 URL: https://issues.apache.org/jira/browse/FLINK-7782
 Project: Flink
  Issue Type: Bug
Reporter: Ajay


I am using flink version 1.3.2. Flink has a kafka source. I am using 
KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM running 
Ubuntu 16.04. From the flink dashboard, I see that I have 2 Taskmanagers & 4 
Task slots

What I observe is the following. The input to Kafka is a json string and when 
parsed on the flink side, it looks like this


{code:java}
(101,Sun Sep 24 23:18:53 UTC 2017,complex 
event,High,37.75142,-122.39458,12.0,20.0)
{code}

I use a Tuple8 to capture the parsed data. The first field is home_id. The time 
characteristic is set to EventTime and I have an AscendingTimestampExtractor 
using the timestamp field. I have parallelism for the execution environment is 
set to 4. I have a rather simple event that I am trying to capture


{code:java}
DataStream> 
cepMapByHomeId = cepMap.keyBy(0);

//cepMapByHomeId.print();

Pattern, 
?> cep1 =

Pattern.>begin("start")
.where(new OverLowThreshold())
.followedBy("end")
.where(new OverHighThreshold());


PatternStream> patternStream = CEP.pattern(cepMapByHomeId, cep1);


DataStream> alerts = patternStream.select(new PackageCapturedEvents());


{code}
The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
16. The output of the pattern is like this


{code:java}
(201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
event,Non-event,37.75837,-122.41467)
{code}



On the Kafka producer side, I am trying send simulated data for around 100 
homes, so the home_id would go from 0-100 and the input is keyed by home_id. I 
have about 10 partitions in kafka. The producer just loops going through a csv 
file with a delay of about 100 ms between 2 rows of the csv file. The data is 
exactly the same for all 100 of the csv files except for home_id and the lat & 
long information. The timestamp is incremented by a step of 1 sec. I start 
multiple processes to simulate data form different homes.

THE PROBLEM:

Flink completely misses capturing events for a large subset of the input data. 
I barely see the events for about 4-5 of the home_id values. I do a print 
before applying the pattern and after and I see all home_ids before and only a 
tiny subset after. Since the data is exactly the same, I expect all homeid to 
be captured and written to my sink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4787: [FLINK-6615][core] simplify FileUtils

2017-10-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4787
  
Is there any problem with the current implementation?
The current implementation was carefully done to gracefully handle 
concurrent removals and allow to pick whether to clean directories with or 
without files.


---


[jira] [Commented] (FLINK-6615) tmp directory not cleaned up on shutdown

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197009#comment-16197009
 ] 

ASF GitHub Bot commented on FLINK-6615:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4787
  
Is there any problem with the current implementation?
The current implementation was carefully done to gracefully handle 
concurrent removals and allow to pick whether to clean directories with or 
without files.


> tmp directory not cleaned up on shutdown
> 
>
> Key: FLINK-6615
> URL: https://issues.apache.org/jira/browse/FLINK-6615
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Andrey
>Assignee: Bowen Li
>
> Steps to reproduce:
> 1) Stop task manager gracefully (kill -6 )
> 2) In the logs:
> {code}
> 2017-05-17 13:35:50,147 INFO  org.apache.zookeeper.ClientCnxn 
>   - EventThread shut down [main-EventThread]
> 2017-05-17 13:35:50,200 ERROR 
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - IOManager 
> failed to properly clean up temp file directory: 
> /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47 
> [flink-akka.actor.default-dispatcher-2]
> java.nio.file.DirectoryNotEmptyException: 
> /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47
>   at 
> sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.flink.util.FileUtils.deleteDirectory(FileUtils.java:154)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager.shutdown(IOManager.java:109)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:185)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:241)
>   at akka.actor.Actor$class.aroundPostStop(Actor.scala:477)
> {code}
> Expected:
> * on shutdown delete non-empty directory anyway. 
> Notes:
> * after process terminated, I've checked 
> "/mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47" directory and 
> didn't find anything there. So it looks like timing issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196946#comment-16196946
 ] 

ASF GitHub Bot commented on FLINK-5005:
---

Github user DieBauer commented on the issue:

https://github.com/apache/flink/pull/3703
  
Hi, I'm sorry for the late reaction. I haven't found the time to work on 
this anymore (also priorities shifted... )

Therefore this pull request is stale. (it still could be used as a 
reference).

I think the main challenge is in serialising the java8 lambdas. And 
dropping the support for scala 2.10 and Java7 certainly helps in taming the 
pom.xml profiles. 

I will close this pull request to not keep the hopes up.


> Remove Scala 2.10 support; add Scala 2.12 support
> -
>
> Key: FLINK-5005
> URL: https://issues.apache.org/jira/browse/FLINK-5005
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Reporter: Andrew Roberts
>Assignee: Aljoscha Krettek
>
> Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and 
> offers many compile-time and runtime speed improvements. It would be great to 
> get artifacts up on maven central to allow Flink users to migrate to Scala 
> 2.12.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196947#comment-16196947
 ] 

ASF GitHub Bot commented on FLINK-5005:
---

Github user DieBauer closed the pull request at:

https://github.com/apache/flink/pull/3703


> Remove Scala 2.10 support; add Scala 2.12 support
> -
>
> Key: FLINK-5005
> URL: https://issues.apache.org/jira/browse/FLINK-5005
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Reporter: Andrew Roberts
>Assignee: Aljoscha Krettek
>
> Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and 
> offers many compile-time and runtime speed improvements. It would be great to 
> get artifacts up on maven central to allow Flink users to migrate to Scala 
> 2.12.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3703: [FLINK-5005] WIP: publish scala 2.12 artifacts

2017-10-09 Thread DieBauer
Github user DieBauer closed the pull request at:

https://github.com/apache/flink/pull/3703


---


[GitHub] flink issue #3703: [FLINK-5005] WIP: publish scala 2.12 artifacts

2017-10-09 Thread DieBauer
Github user DieBauer commented on the issue:

https://github.com/apache/flink/pull/3703
  
Hi, I'm sorry for the late reaction. I haven't found the time to work on 
this anymore (also priorities shifted... )

Therefore this pull request is stale. (it still could be used as a 
reference).

I think the main challenge is in serialising the java8 lambdas. And 
dropping the support for scala 2.10 and Java7 certainly helps in taming the 
pom.xml profiles. 

I will close this pull request to not keep the hopes up.


---


[jira] [Created] (FLINK-7781) Support sime on-demand metrics aggregation

2017-10-09 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-7781:
---

 Summary: Support sime on-demand metrics aggregation
 Key: FLINK-7781
 URL: https://issues.apache.org/jira/browse/FLINK-7781
 Project: Flink
  Issue Type: Improvement
  Components: Metrics, REST
Affects Versions: 1.4.0
Reporter: Chesnay Schepler
 Fix For: 1.4.0


We should support aggregations (min, max, avg, sum) of metrics in the REST API. 
This is primarily about aggregating across subtasks, for example the number of 
incoming records across all subtasks.

This is useful for simple use-cases where a dedicated metrics backend is 
overkill, and will allow us to provide better metrics in the web UI (since we 
can expose these aggregated as well).

I propose to add a new query parameter "agg=[min,max,avg,sum]". As a start this 
parameter should only be used for task metrics. (This is simply the main 
use-case i have in mind)

The aggregation should (naturally) only work for numeric metrics.

We will need a HashSet of metrics that exist for subtasks of a given tasks that 
has to be updated in {{MetricStore#add}}.

All task metrics are either stored as
# {{.}} or
# {{..}}.

If a user sends a request {{get=mymetric,agg=sum}}, only the metrics of the 
first kind are to be considered. Similarly, given a request 
{{get=myoperator.mymetric,agg=sum}} only metrics of the second kind are to be 
considered.

Ideally, the name of the aggregated metric (i.e. the original name without 
subtask index) is also contained in the list of available metrics.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5316) Make the GenericWriteAheadSink backwards compatible.

2017-10-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-5316.
---
Resolution: Won't Fix

Closing for now since there was no noise about this.

> Make the GenericWriteAheadSink backwards compatible.
> 
>
> Key: FLINK-5316
> URL: https://issues.apache.org/jira/browse/FLINK-5316
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5235) Don't automatically add HBase classpath to Flink classpath

2017-10-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5235:

Summary: Don't automatically add HBase classpath to Flink classpath  (was: 
HBase classpath automatically added to Flink classpath)

> Don't automatically add HBase classpath to Flink classpath
> --
>
> Key: FLINK-5235
> URL: https://issues.apache.org/jira/browse/FLINK-5235
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Maximilian Michels
>
> If the {{HBASE_CONF_DIR}} is set, the startup script adds the output of 
> {{hbase classpath}} to the classpath. This is a huge list of dependencies 
> which is very likely to clash with Flink dependencies.
> For example, on my machine:
> {noformat}
> 

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196863#comment-16196863
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r143448956
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private var leftExpirationTime: Long = 0L;
+  private var rightExpirationTime: Long = 0L;
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+
+  // for delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and 

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-10-09 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r143448956
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private var leftExpirationTime: Long = 0L;
+  private var rightExpirationTime: Long = 0L;
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+
+  // for delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and emitting it (as 
part of a joined result).
+* Only reasonable for row time join.
+*
+* @return the maximum delay for the outputs
+*/
+  def getMaxOutputDelay: Long = Math.max(leftRelativeSize, 

[jira] [Commented] (FLINK-7575) Dashboard jobs/tasks metrics display 0 when metrics are not yet available

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196860#comment-16196860
 ] 

ASF GitHub Bot commented on FLINK-7575:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4647
  
merging.


> Dashboard jobs/tasks metrics display 0 when metrics are not yet available
> -
>
> Key: FLINK-7575
> URL: https://issues.apache.org/jira/browse/FLINK-7575
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.3.2
>Reporter: James Lafa
>Assignee: James Lafa
>Priority: Minor
>
> The web frontend is currently displaying "0" when a metric is not available 
> yet (ex: records-in/out, bytes-in/out). 
> 0 is misleading and it's preferable to display no value while the value is 
> still unknown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4647: [FLINK-7575] [WEB-DASHBOARD] Display "Fetching..." instea...

2017-10-09 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4647
  
merging.


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-10-09 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r143445293
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private var leftExpirationTime: Long = 0L;
+  private var rightExpirationTime: Long = 0L;
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+
+  // for delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and emitting it (as 
part of a joined result).
+* Only reasonable for row time join.
+*
+* @return the maximum delay for the outputs
+*/
+  def getMaxOutputDelay: Long = Math.max(leftRelativeSize, 

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196845#comment-16196845
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r143445293
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private var leftExpirationTime: Long = 0L;
+  private var rightExpirationTime: Long = 0L;
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+
+  // for delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and 

[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2017-10-09 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196831#comment-16196831
 ] 

Kostas Kloudas commented on FLINK-7756:
---

Hi [~shashank734], thanks a lot for reporting it,  I will look into it.

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> 

[jira] [Created] (FLINK-7780) Integrate savepoint command into REST client

2017-10-09 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-7780:
---

 Summary: Integrate savepoint command into REST client
 Key: FLINK-7780
 URL: https://issues.apache.org/jira/browse/FLINK-7780
 Project: Flink
  Issue Type: Sub-task
  Components: Client, REST
Affects Versions: 1.4.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
Priority: Blocker
 Fix For: 1.4.0






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >