[jira] [Comment Edited] (FLINK-7782) Flink CEP not recognizing pattern
[ https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198175#comment-16198175 ] Ajay edited comment on FLINK-7782 at 10/10/17 5:18 AM: --- Hi Kostas, My stream is pretty much unbounded in an endless loop and the pattern repeats, so there is a continuous stream of data. I have implemented an ascending timestamp extractor. However, I do see some issues with state. Here is a quick snapshot of the size of state from the Flink job. End to End Duration State Size Buffered During Alignment Minimum 9ms 260 KB 0 B Average 57ms988 KB 2.26 KB Maximum 3s 15.3 MB 271 KB ID Status AcknowledgedTrigger TimeLatest Acknowledgement Failure TimeEnd to End Duration State Size Buffered During Alignment Failure Message 514 Failed 11/12 (92%) 22:15:1222:15:1322:15:15 2s 4.11 MB 0 B Checkpoint failed: Checkpoint Coordinator is suspending. On my console where my Flink job was running I saw the following message Size of the state is larger than the maximum permitted memory-backed state. Size=5621890 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. was (Author: ajkrishna): Hi Kostas, My stream is pretty much unbounded in an endless loop and the pattern repeats, so there is a continuous stream of data. I have implemented an ascending timestamp extractor. However, I do see some issues with state. Here is a quick snapshot of the size of state from the Flink job. End to End Duration State Size Buffered During Alignment Minimum 9ms 260 KB 0 B Average 57ms988 KB 2.26 KB Maximum 3s 15.3 MB 271 KB ID Status AcknowledgedTrigger TimeLatest Acknowledgement Failure TimeEnd to End Duration State Size Buffered During Alignment Failure Message 514 Failed 11/12 (92%) 22:15:1222:15:1322:15:15 2s 4.11 MB 0 B Checkpoint failed: Checkpoint Coordinator is suspending. I have tried the same job with a parallelism of 1 and while there is significant lag in Kafka, the behavior from the CEP library is exactly the same. > Flink CEP not recognizing pattern > - > > Key: FLINK-7782 > URL: https://issues.apache.org/jira/browse/FLINK-7782 > Project: Flink > Issue Type: Bug >Reporter: Ajay > > I am using flink version 1.3.2. Flink has a kafka source. I am using > KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM > running Ubuntu 16.04. From the flink dashboard, I see that I have 2 > Taskmanagers & 4 Task slots > What I observe is the following. The input to Kafka is a json string and when > parsed on the flink side, it looks like this > {code:java} > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > {code} > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism for > the execution environment is set to 4. I have a rather simple event that I am > trying to capture > {code:java} > DataStream> > cepMapByHomeId = cepMap.keyBy(0); > //cepMapByHomeId.print(); > > Pattern , ?> cep1 = > > Pattern. >begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1); > DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents()); > {code} > The pattern checks if the 7th field in the tuple8 goes over 12 and then over > 16. The output of the pattern is like this > {code:java} > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > {code} > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by home_id. > I have about 10 partitions in kafka. The producer just loops going through a > csv file with a delay of about 100 ms between 2 rows of the csv file. The > data is exactly the same for all 100 of the csv files except for home_id and > the lat & long information. The timestamp is incremented
[jira] [Comment Edited] (FLINK-7782) Flink CEP not recognizing pattern
[ https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198175#comment-16198175 ] Ajay edited comment on FLINK-7782 at 10/10/17 5:15 AM: --- Hi Kostas, My stream is pretty much unbounded in an endless loop and the pattern repeats, so there is a continuous stream of data. I have implemented an ascending timestamp extractor. However, I do see some issues with state. Here is a quick snapshot of the size of state from the Flink job. End to End Duration State Size Buffered During Alignment Minimum 9ms 260 KB 0 B Average 57ms988 KB 2.26 KB Maximum 3s 15.3 MB 271 KB ID Status AcknowledgedTrigger TimeLatest Acknowledgement Failure TimeEnd to End Duration State Size Buffered During Alignment Failure Message 514 Failed 11/12 (92%) 22:15:1222:15:1322:15:15 2s 4.11 MB 0 B Checkpoint failed: Checkpoint Coordinator is suspending. I have tried the same job with a parallelism of 1 and while there is significant lag in Kafka, the behavior from the CEP library is exactly the same. was (Author: ajkrishna): Hi Kostas, My stream is pretty much unbounded in an endless loop and the pattern repeats, so there is a continuous stream of data. Also, I was monitoring my Kafka manager and the Flink streaming dashboard. Kafka shows almost no lag and I see watermarks advancing in Flink. I have implemented an ascending timestamp extractor. Here is quick snapshot of the size of state from the Flink job. End to End Duration State Size Buffered During Alignment Minimum 9ms 260 KB 0 B Average 18ms481 KB 1 B Maximum 1s 709 KB 437 B I have tried the same job with a parallelism of 1 and while there is significant lag in Kafka, the behavior from the CEP library is exactly the same. > Flink CEP not recognizing pattern > - > > Key: FLINK-7782 > URL: https://issues.apache.org/jira/browse/FLINK-7782 > Project: Flink > Issue Type: Bug >Reporter: Ajay > > I am using flink version 1.3.2. Flink has a kafka source. I am using > KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM > running Ubuntu 16.04. From the flink dashboard, I see that I have 2 > Taskmanagers & 4 Task slots > What I observe is the following. The input to Kafka is a json string and when > parsed on the flink side, it looks like this > {code:java} > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > {code} > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism for > the execution environment is set to 4. I have a rather simple event that I am > trying to capture > {code:java} > DataStream> > cepMapByHomeId = cepMap.keyBy(0); > //cepMapByHomeId.print(); > > Pattern , ?> cep1 = > > Pattern. >begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1); > DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents()); > {code} > The pattern checks if the 7th field in the tuple8 goes over 12 and then over > 16. The output of the pattern is like this > {code:java} > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > {code} > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by home_id. > I have about 10 partitions in kafka. The producer just loops going through a > csv file with a delay of about 100 ms between 2 rows of the csv file. The > data is exactly the same for all 100 of the csv files except for home_id and > the lat & long information. The timestamp is incremented by a step of 1 sec. > I start multiple processes to simulate data form different homes. > THE PROBLEM: > Flink completely misses capturing events for a large subset of the input > data. I barely see the events for about 4-5 of the home_id values. I do a > print before applying the pattern and after and I see all home_ids before and > only a tiny subset
[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern
[ https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198175#comment-16198175 ] Ajay commented on FLINK-7782: - Hi Kostas, My stream is pretty much unbounded in an endless loop and the pattern repeats, so there is a continuous stream of data. Also, I was monitoring my Kafka manager and the Flink streaming dashboard. Kafka shows almost no lag and I see watermarks advancing in Flink. I have implemented an ascending timestamp extractor. Here is quick snapshot of the size of state from the Flink job. End to End Duration State Size Buffered During Alignment Minimum 9ms 260 KB 0 B Average 18ms481 KB 1 B Maximum 1s 709 KB 437 B I have tried the same job with a parallelism of 1 and while there is significant lag in Kafka, the behavior from the CEP library is exactly the same. > Flink CEP not recognizing pattern > - > > Key: FLINK-7782 > URL: https://issues.apache.org/jira/browse/FLINK-7782 > Project: Flink > Issue Type: Bug >Reporter: Ajay > > I am using flink version 1.3.2. Flink has a kafka source. I am using > KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM > running Ubuntu 16.04. From the flink dashboard, I see that I have 2 > Taskmanagers & 4 Task slots > What I observe is the following. The input to Kafka is a json string and when > parsed on the flink side, it looks like this > {code:java} > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > {code} > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism for > the execution environment is set to 4. I have a rather simple event that I am > trying to capture > {code:java} > DataStream> > cepMapByHomeId = cepMap.keyBy(0); > //cepMapByHomeId.print(); > > Pattern , ?> cep1 = > > Pattern. >begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1); > DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents()); > {code} > The pattern checks if the 7th field in the tuple8 goes over 12 and then over > 16. The output of the pattern is like this > {code:java} > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > {code} > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by home_id. > I have about 10 partitions in kafka. The producer just loops going through a > csv file with a delay of about 100 ms between 2 rows of the csv file. The > data is exactly the same for all 100 of the csv files except for home_id and > the lat & long information. The timestamp is incremented by a step of 1 sec. > I start multiple processes to simulate data form different homes. > THE PROBLEM: > Flink completely misses capturing events for a large subset of the input > data. I barely see the events for about 4-5 of the home_id values. I do a > print before applying the pattern and after and I see all home_ids before and > only a tiny subset after. Since the data is exactly the same, I expect all > homeid to be captured and written to my sink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7764) FlinkKafkaProducer010 does not accept name, uid, or parallelism
[ https://issues.apache.org/jira/browse/FLINK-7764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui reassigned FLINK-7764: -- Assignee: Xingcan Cui > FlinkKafkaProducer010 does not accept name, uid, or parallelism > --- > > Key: FLINK-7764 > URL: https://issues.apache.org/jira/browse/FLINK-7764 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > As [reported on the user > list|https://lists.apache.org/thread.html/1e97b79cb5611a942beb609a61b5fba6d62a49c9c86336718a9a0004@%3Cuser.flink.apache.org%3E]: > When I try to use KafkaProducer with timestamps it fails to set name, uid or > parallelism. It uses default values. > {code} > FlinkKafkaProducer010.FlinkKafkaProducer010Configuration producer = > FlinkKafkaProducer010 > .writeToKafkaWithTimestamps(stream, topicName, schema, props, > partitioner); > producer.setFlushOnCheckpoint(flushOnCheckpoint); > producer.name("foo") > .uid("bar") > .setParallelism(5); > return producer; > {code} > As operator name it shows "FlinKafkaProducer 0.10.x” with the typo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197978#comment-16197978 ] ASF GitHub Bot commented on FLINK-6105: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4316 I think InterruptedException should be handled uniformly in HadoopInputFormatBase.java > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: mingleizhang > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4316: [FLINK-6105] Use InterruptedIOException instead of IOExce...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4316 I think InterruptedException should be handled uniformly in HadoopInputFormatBase.java ---
[jira] [Updated] (FLINK-7049) TestingApplicationMaster keeps running after integration tests finish
[ https://issues.apache.org/jira/browse/FLINK-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7049: -- Description: After integration tests finish, TestingApplicationMaster is still running. Toward the end of flink-yarn-tests/target/flink-yarn-tests-ha/flink-yarn-tests-ha-logDir-nm-1_0/application_1498768839874_0001/container_1498768839874_0001_03_01/jobmanager.log : {code} 2017-06-29 22:09:49,681 INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:46165 2017-06-29 22:09:49,681 ERROR org.apache.flink.shaded.org.apache.curator.ConnectionState- Authentication failed 2017-06-29 22:09:49,682 WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) 2017-06-29 22:09:50,782 WARN org.apache.zookeeper.ClientCnxn - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-3597644653611245612.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. 2017-06-29 22:09:50,782 INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:46165 2017-06-29 22:09:50,782 ERROR org.apache.flink.shaded.org.apache.curator.ConnectionState- Authentication failed 2017-06-29 22:09:50,783 WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) {code} was: After integration tests finish, TestingApplicationMaster is still running. Toward the end of flink-yarn-tests/target/flink-yarn-tests-ha/flink-yarn-tests-ha-logDir-nm-1_0/application_1498768839874_0001/container_1498768839874_0001_03_01/jobmanager.log : {code} 2017-06-29 22:09:49,681 INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:46165 2017-06-29 22:09:49,681 ERROR org.apache.flink.shaded.org.apache.curator.ConnectionState- Authentication failed 2017-06-29 22:09:49,682 WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) 2017-06-29 22:09:50,782 WARN org.apache.zookeeper.ClientCnxn - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-3597644653611245612.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. 2017-06-29 22:09:50,782 INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:46165 2017-06-29 22:09:50,782 ERROR org.apache.flink.shaded.org.apache.curator.ConnectionState- Authentication failed 2017-06-29 22:09:50,783 WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) {code} > TestingApplicationMaster keeps running after integration tests finish > - > >
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197673#comment-16197673 ] Fabian Hueske commented on FLINK-7548: -- I've created a proposal for a reworked TableSource API that solves several issues. I've pushed a WIP branch to my repository: https://github.com/fhueske/flink/tree/tableWatermarks-extended The main changes are as follows: - The {{TableSource}} interface gets a new method {{getTableSchema(): TableSchema}} which returns the schema with all fields of the resulting table. For streaming tables, this includes also rowtime indicators. These fields have the corresponding {{TimeIndicatorTypeInformation}} types. This means, that the {{TypeInformation}} returned by {{getResultType()}} does no longer determine the schema and fields of the table. Hence, we need to perform a mapping from the physical input type ({{getResultType()}} and the logical table schema {{getTableSchema()}}. By default, this mapping is done based on field names, i.e., a table schema field is mapped to the field of the same name in the physical input type. If we cannot find a physical field for a logical field or if the types do not match, the {{TableSource}} is rejected. This default behavior resembles the current behavior. If fields should not be automatically mapped by name, we will allow to specify a manual index based mapping (similar to the previous {{DefinedFieldNames}} interface (note: this is not yet included in the WIP branch). - Processing time fields (i.e., fields in the table schema with type {{TimeIndicatorTypeInformation(false)}}) are automatically inserted into the schema during the initial conversion. - A {{StreamTableSource}} with rowtime fields (i.e., fields in the table schema with type {{TimeIndicatorTypeInformation(true)}}) requires an additional interface {{DefinedRowtimeAttributes}}. The interface provides a {{RowtimeAttributeDescriptor}} for each rowtime field (right now only a single rowtime field is supported but we are prepared for more). The {{RowtimeAttributeDescriptor}} provides a {{TimestampExtractor}} which gives a {{RexNode}} expression to extract the timestamp field from the input type. A corresponding expression is code-gen'd into the initial conversion and executed by the table scan operator. Moreover, {{RowtimeAttributeDescriptor}} gives a {{WatermarkStrategy}} which is used to generate watermarks for a rowtime attribute. Watermarks are also generated by the table scan operator. - We can provided built-in implementations for {{TimestampExtractor}} (right now only {{ExistingField}} to convert a {{Long}} or {{Timestamp}} attributes into a rowtime attribute) and {{WatermarkStrategy}} (right now {{AscendingWatermarks}} and {{BoundedOutOfOrderWatermarks}}). Additional implementations can be added in later PRs. I think the proposal is a good solution because: - the table schema is explicitly defined by the {{TableSource}} and not by the API internals as before (handling of time attributes is currently hidden). This will make {{CREATE TABLE}} DDL statements easier. - timestamp extraction and watermark assignment can be configured by extensible interfaces without changing the API internals. The actual timestamp extraction and watermark generation happen in the table scan operator, so the {{TableSource}} does not have to deal with it. - Timestamp extraction happens with via {{RexNode}} expressions. So this is very versatile and it should be possible to call UDFs if these had been registered before (need to check this though). - We can provide built-in implementations for the most common strategies. I think all requirements that we had listed in this issue before, should be solvable with this design. - Projection push-down can be enabled for table sources with timestamp attributes which doesn't work right now (not part of the WIP branch) - I could remove a few weird artifacts / workarounds of the current design that changes the tables schema internally. Of course, there are also a downside: - We are touching the {{TableSource}} interface and related interfaces. This is not a big deal for Flink's built-in sources (because there are not so many), but might be for users that have more {{TableSource}}s implemented. Please let me know what you think [~jark], [~wheat9], [~twalthr], [~xccui], [~ykt836], and everybody else. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. >
[jira] [Commented] (FLINK-7787) Remove guava dependency in the cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-7787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197582#comment-16197582 ] Chesnay Schepler commented on FLINK-7787: - we can't get rid of the cassandra guava dependency since it is hard-coded into the datastax driver API. > Remove guava dependency in the cassandra connector > -- > > Key: FLINK-7787 > URL: https://issues.apache.org/jira/browse/FLINK-7787 > Project: Flink > Issue Type: Bug >Reporter: Haohui Mai >Assignee: Haohui Mai > > As discovered in FLINK-6225, the cassandra connector uses the future classes > in the guava library. We can get rid of the dependency by using the > equivalent classes provided by Java 8. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7788) Allow port range for queryable state client proxy.
Kostas Kloudas created FLINK-7788: - Summary: Allow port range for queryable state client proxy. Key: FLINK-7788 URL: https://issues.apache.org/jira/browse/FLINK-7788 Project: Flink Issue Type: Improvement Components: Queryable State Affects Versions: 1.4.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.4.0 Currently the newly introduced queryable state client proxy can only take one port as a parameter to bind to. In case of multiple proxies running on one machine, this can result in port clashes and inability to start the corresponding proxies. This issue proposes to allow the specification of a port range, so that if some ports in the range are occupied, the proxy can still pick from the remaining free ones. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7787) Remove guava dependency in the cassandra connector
Haohui Mai created FLINK-7787: - Summary: Remove guava dependency in the cassandra connector Key: FLINK-7787 URL: https://issues.apache.org/jira/browse/FLINK-7787 Project: Flink Issue Type: Bug Reporter: Haohui Mai Assignee: Haohui Mai As discovered in FLINK-6225, the cassandra connector uses the future classes in the guava library. We can get rid of the dependency by using the equivalent classes provided by Java 8. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197500#comment-16197500 ] ASF GitHub Bot commented on FLINK-6225: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3748 Given we have dropped the support for Java 7, it might make sense to just rewrite the class to use the completablefuture in java 8. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3748 Given we have dropped the support for Java 7, it might make sense to just rewrite the class to use the completablefuture in java 8. ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197497#comment-16197497 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3748 When importing guava you have to explicitly add an exclusion to https://github.com/apache/flink/blob/master/tools/maven/suppressions.xml. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3748 When importing guava you have to explicitly add an exclusion to https://github.com/apache/flink/blob/master/tools/maven/suppressions.xml. ---
[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector
[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197419#comment-16197419 ] ASF GitHub Bot commented on FLINK-6988: --- Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4239 > Add Apache Kafka 0.11 connector > --- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector
[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197420#comment-16197420 ] ASF GitHub Bot commented on FLINK-6988: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 Thanks :) > Add Apache Kafka 0.11 connector > --- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...
Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4239 ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 Thanks :) ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197379#comment-16197379 ] ASF GitHub Bot commented on FLINK-6225: --- Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/3748 @fhueske Hi zentol hasn't replied can you help to take a look. thx > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink
Github user PangZhi commented on the issue: https://github.com/apache/flink/pull/3748 @fhueske Hi zentol hasn't replied can you help to take a look. thx ---
[jira] [Commented] (FLINK-7731) Trigger on GlobalWindow does not clean state completely
[ https://issues.apache.org/jira/browse/FLINK-7731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197372#comment-16197372 ] Aljoscha Krettek commented on FLINK-7731: - No, FLINK-7700 is not related because we're not dealing with merging windows here. I looked at the code again and it seems that {{clear()}} is currently only called when the window is being cleaned up because of window expiration. [~gerardg] Can you try manually calling clear in places where you return {{FIRE_AND_PURGE}}? This should solve the problem of growing state. > Trigger on GlobalWindow does not clean state completely > --- > > Key: FLINK-7731 > URL: https://issues.apache.org/jira/browse/FLINK-7731 > Project: Flink > Issue Type: Bug > Components: Core, DataStream API >Affects Versions: 1.3.2 >Reporter: Gerard Garcia >Priority: Minor > > I have an operator that consists of: > CoGroup Datastream -> GlobalWindow -> CustomTrigger -> Apply function > The custom trigger fires and purges the elements after it has received the > expected number of elements (or when a timeout fires) from one of the streams > and the apply function merges the elements with the ones received from the > other stream. It appears that the state of the operator grows continuously so > it seems it never gets completely cleaned. > There is a discussion in > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clean-GlobalWidnow-state-td15613.html > that suggests that it may be a bug. > This job reproduces the issue: > https://github.com/GerardGarcia/flink-global-window-growing-state -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197323#comment-16197323 ] ASF GitHub Bot commented on FLINK-7072: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4742 > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4742 ---
[jira] [Closed] (FLINK-6988) Add Apache Kafka 0.11 connector
[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-6988. --- Resolution: Fixed Implemented in 2f651e9a69a9929ef154e7bf6fcba624b0e8b9a1 and surrounding commits. > Add Apache Kafka 0.11 connector > --- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector
[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197306#comment-16197306 ] ASF GitHub Bot commented on FLINK-6988: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4239 Merged! Could you please close this PR? > Add Apache Kafka 0.11 connector > --- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4239 Merged! ð Could you please close this PR? ---
[jira] [Commented] (FLINK-6219) Add a sorted state primitive
[ https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197278#comment-16197278 ] Fabian Hueske commented on FLINK-6219: -- Thanks [~phoenixjiangnan], I updated the linked issue. FLINK-6204 was closed in favor of the newer issue FLINK-7193. > Add a sorted state primitive > > > Key: FLINK-6219 > URL: https://issues.apache.org/jira/browse/FLINK-6219 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Table API & SQL >Reporter: sunjincheng > > When we implement the OVER window of > [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations] > We notice that we need a state primitive which supports sorting, allows for > efficient insertion, traversal in order, and removal from the head. > For example: In event-time OVER window, we need to sort by time,If the datas > as follow: > {code} > (1L, 1, Hello) > (2L, 2, Hello) > (5L, 5, Hello) > (4L, 4, Hello) > {code} > We randomly insert the datas, just like: > {code} > put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, > Hello)), > {code} > We deal with elements in time order: > {code} > process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, > Hello)),process((5L, 5, Hello)) > {code} > Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] > [~aljoscha] [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7388) ProcessFunction.onTimer() sets processing time as timestamp
[ https://issues.apache.org/jira/browse/FLINK-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197245#comment-16197245 ] ASF GitHub Bot commented on FLINK-7388: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4786#discussion_r143517965 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java --- @@ -79,7 +79,6 @@ public void onEventTime(InternalTimertimer) throws Exception @Override public void onProcessingTime(InternalTimer timer) throws Exception { - collector.setAbsoluteTimestamp(timer.getTimestamp()); --- End diff -- @aljoscha Thanks for pointing this out. Fixed! > ProcessFunction.onTimer() sets processing time as timestamp > --- > > Key: FLINK-7388 > URL: https://issues.apache.org/jira/browse/FLINK-7388 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Bowen Li > Fix For: 1.4.0 > > > The {{ProcessFunction.onTimer()}} method sets the current processing time as > event-time timestamp when it is called from a processing time timer. > I don't think this behavior is useful. Processing time timestamps won't be > aligned with watermarks and are not deterministic. The only reason would be > to have _some_ value in the timestamp field. However, the behavior is very > subtle and might not be noticed by users. > IMO, it would be better to erase the timestamp. This will cause downstream > operator that rely on timestamps to fail and notify the users that the logic > they implemented was probably not what they intended to do. > What do you think [~aljoscha]? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4786: [FLINK-7388][DataStream API] ProcessFunction.onTim...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4786#discussion_r143517965 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java --- @@ -79,7 +79,6 @@ public void onEventTime(InternalTimertimer) throws Exception @Override public void onProcessingTime(InternalTimer timer) throws Exception { - collector.setAbsoluteTimestamp(timer.getTimestamp()); --- End diff -- @aljoscha Thanks for pointing this out. Fixed! ---
[jira] [Commented] (FLINK-6615) tmp directory not cleaned up on shutdown
[ https://issues.apache.org/jira/browse/FLINK-6615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197240#comment-16197240 ] ASF GitHub Bot commented on FLINK-6615: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4787 I was looking into [FLINK-6615](https://issues.apache.org/jira/browse/FLINK-6615), and I think 6615 is not a problem due to files may be created concurrently. But I found the current impl of `deleteDirectory()` to be cumbersome. The new code also handles concurrent removals well, and I can add back `cleanDirectory()` though it's not used anywhere yet. I'm happy to keep the original impl if you prefer it. Which version do you like better? > tmp directory not cleaned up on shutdown > > > Key: FLINK-6615 > URL: https://issues.apache.org/jira/browse/FLINK-6615 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Andrey >Assignee: Bowen Li > > Steps to reproduce: > 1) Stop task manager gracefully (kill -6 ) > 2) In the logs: > {code} > 2017-05-17 13:35:50,147 INFO org.apache.zookeeper.ClientCnxn > - EventThread shut down [main-EventThread] > 2017-05-17 13:35:50,200 ERROR > org.apache.flink.runtime.io.disk.iomanager.IOManager - IOManager > failed to properly clean up temp file directory: > /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47 > [flink-akka.actor.default-dispatcher-2] > java.nio.file.DirectoryNotEmptyException: > /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47 > at > sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.flink.util.FileUtils.deleteDirectory(FileUtils.java:154) > at > org.apache.flink.runtime.io.disk.iomanager.IOManager.shutdown(IOManager.java:109) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:185) > at > org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:241) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > {code} > Expected: > * on shutdown delete non-empty directory anyway. > Notes: > * after process terminated, I've checked > "/mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47" directory and > didn't find anything there. So it looks like timing issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4787: [FLINK-6615][core] simplify FileUtils
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4787 I was looking into [FLINK-6615](https://issues.apache.org/jira/browse/FLINK-6615), and I think 6615 is not a problem due to files may be created concurrently. But I found the current impl of `deleteDirectory()` to be cumbersome. The new code also handles concurrent removals well, and I can add back `cleanDirectory()` though it's not used anywhere yet. I'm happy to keep the original impl if you prefer it. Which version do you like better? ---
[jira] [Comment Edited] (FLINK-6219) Add a sorted state primitive
[ https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197188#comment-16197188 ] Bowen Li edited comment on FLINK-6219 at 10/9/17 4:18 PM: -- [~fhueske] The new title is much better since the discussion above said we don't need a dedicated sort backend. Besides, I think the dependency in 'Issues Link' is set wrong. This ticket shows to be a dependency of FLINK-6204 but 6204 is already finished. I reopened this ticket. Maybe you can correct the dependencies of issues, or add some explanation here? was (Author: phoenixjiangnan): [~fhueske] I think the dependency in 'Issues Link' is set wrong. This ticket shows to be a dependency of FLINK-6204 but 6204 is already finished. I reopened this ticket. Maybe you can correct the dependencies of issues, or add some explanation here? > Add a sorted state primitive > > > Key: FLINK-6219 > URL: https://issues.apache.org/jira/browse/FLINK-6219 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Table API & SQL >Reporter: sunjincheng > > When we implement the OVER window of > [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations] > We notice that we need a state primitive which supports sorting, allows for > efficient insertion, traversal in order, and removal from the head. > For example: In event-time OVER window, we need to sort by time,If the datas > as follow: > {code} > (1L, 1, Hello) > (2L, 2, Hello) > (5L, 5, Hello) > (4L, 4, Hello) > {code} > We randomly insert the datas, just like: > {code} > put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, > Hello)), > {code} > We deal with elements in time order: > {code} > process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, > Hello)),process((5L, 5, Hello)) > {code} > Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] > [~aljoscha] [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7786) JarActionHandler.tokenizeArguments removes all quotes from program arguments
Brice Bingman created FLINK-7786: Summary: JarActionHandler.tokenizeArguments removes all quotes from program arguments Key: FLINK-7786 URL: https://issues.apache.org/jira/browse/FLINK-7786 Project: Flink Issue Type: Bug Affects Versions: 1.3.0 Reporter: Brice Bingman Priority: Minor I would like to send json as an argument to my program, but when submitting it via the REST API, all the quotes in the json are gone, resulting in invalid json. The JarActionHandler.tokenizeArguments should only remove the leading and trailing quotes of an argument, not all of the quotes. Current workaround: Replace the quotes in json with an escape character and replace them back in the run method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4773: [FLINK-7761] [examples] Include shaded guava depen...
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/4773 ---
[jira] [Closed] (FLINK-7761) Twitter example is not self-contained
[ https://issues.apache.org/jira/browse/FLINK-7761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7761. --- Resolution: Fixed 1.4: 08bfdae684548c2e9edf189f29d892335fd27dd3 > Twitter example is not self-contained > - > > Key: FLINK-7761 > URL: https://issues.apache.org/jira/browse/FLINK-7761 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > The Twitter example jar is not self-contained as it excludes the shaded guava > dependency from the twitter connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7761) Twitter example is not self-contained
[ https://issues.apache.org/jira/browse/FLINK-7761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197215#comment-16197215 ] ASF GitHub Bot commented on FLINK-7761: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/4773 > Twitter example is not self-contained > - > > Key: FLINK-7761 > URL: https://issues.apache.org/jira/browse/FLINK-7761 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > The Twitter example jar is not self-contained as it excludes the shaded guava > dependency from the twitter connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7785) DispatcherTest failure
Chesnay Schepler created FLINK-7785: --- Summary: DispatcherTest failure Key: FLINK-7785 URL: https://issues.apache.org/jira/browse/FLINK-7785 Project: Flink Issue Type: Bug Components: Distributed Coordination, Tests Affects Versions: 1.4.0 Reporter: Chesnay Schepler Fix For: 1.4.0 https://s3.amazonaws.com/archive.travis-ci.org/jobs/285598045 {code} Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.539 sec <<< FAILURE! - in org.apache.flink.runtime.dispatcher.DispatcherTest testLeaderElection(org.apache.flink.runtime.dispatcher.DispatcherTest) Time elapsed: 0.016 sec <<< ERROR! akka.actor.InvalidActorNameException: actor name [dispatcher] is not unique! at akka.actor.dungeon.ChildrenContainer$TerminatingChildrenContainer.reserve(ChildrenContainer.scala:192) at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76) at akka.actor.ActorCell.reserveChild(ActorCell.scala:369) at akka.actor.dungeon.Children$class.makeChild(Children.scala:201) at akka.actor.dungeon.Children$class.attachChild(Children.scala:41) at akka.actor.ActorCell.attachChild(ActorCell.scala:369) at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553) at org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:208) at org.apache.flink.runtime.rpc.RpcEndpoint.(RpcEndpoint.java:89) at org.apache.flink.runtime.rpc.FencedRpcEndpoint.(FencedRpcEndpoint.java:44) at org.apache.flink.runtime.dispatcher.Dispatcher.(Dispatcher.java:102) at org.apache.flink.runtime.dispatcher.DispatcherTest$TestingDispatcher.(DispatcherTest.java:207) at org.apache.flink.runtime.dispatcher.DispatcherTest.testLeaderElection(DispatcherTest.java:171) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6219) Add a sorted state primitive
[ https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197188#comment-16197188 ] Bowen Li commented on FLINK-6219: - [~fhueske] I think the dependency in 'Issues Link' is set wrong. This ticket shows to be a dependency of FLINK-6204 but 6204 is already finished. I reopened this ticket. Maybe you can correct the dependencies of issues, or add some explanation here? > Add a sorted state primitive > > > Key: FLINK-6219 > URL: https://issues.apache.org/jira/browse/FLINK-6219 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Table API & SQL >Reporter: sunjincheng > > When we implement the OVER window of > [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations] > We notice that we need a state primitive which supports sorting, allows for > efficient insertion, traversal in order, and removal from the head. > For example: In event-time OVER window, we need to sort by time,If the datas > as follow: > {code} > (1L, 1, Hello) > (2L, 2, Hello) > (5L, 5, Hello) > (4L, 4, Hello) > {code} > We randomly insert the datas, just like: > {code} > put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, > Hello)), > {code} > We deal with elements in time order: > {code} > process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, > Hello)),process((5L, 5, Hello)) > {code} > Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] > [~aljoscha] [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-6219) Add a sorted state primitive
[ https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reopened FLINK-6219: - > Add a sorted state primitive > > > Key: FLINK-6219 > URL: https://issues.apache.org/jira/browse/FLINK-6219 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Table API & SQL >Reporter: sunjincheng > > When we implement the OVER window of > [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations] > We notice that we need a state primitive which supports sorting, allows for > efficient insertion, traversal in order, and removal from the head. > For example: In event-time OVER window, we need to sort by time,If the datas > as follow: > {code} > (1L, 1, Hello) > (2L, 2, Hello) > (5L, 5, Hello) > (4L, 4, Hello) > {code} > We randomly insert the datas, just like: > {code} > put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, > Hello)), > {code} > We deal with elements in time order: > {code} > process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, > Hello)),process((5L, 5, Hello)) > {code} > Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] > [~aljoscha] [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197180#comment-16197180 ] ASF GitHub Bot commented on FLINK-7709: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143508371 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,8 +33,7 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); --- End diff -- This is a valid objection which I share. I'll remove this change and set the map of `TaskCheckpointStatistics` to an empty map in case that we want to leave the details out. > Port CheckpointStatsDetailsHandler to new REST endpoint > --- > > Key: FLINK-7709 > URL: https://issues.apache.org/jira/browse/FLINK-7709 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197181#comment-16197181 ] ASF GitHub Bot commented on FLINK-7739: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4777#discussion_r143480344 --- Diff: flink-core/pom.xml --- @@ -77,6 +88,12 @@ under the License. org.apache.commons commons-compress + + --- End diff -- `avro` is pulling `xz` as well ``` [INFO] +- org.apache.avro:avro:jar:1.8.2:compile [INFO] | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile [INFO] | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile [INFO] | +- com.thoughtworks.paranamer:paranamer:jar:2.7:compile [INFO] | \- org.tukaani:xz:jar:1.5:compile ``` > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4777: [FLINK-7739] Enable dependency convergence
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4777#discussion_r143480344 --- Diff: flink-core/pom.xml --- @@ -77,6 +88,12 @@ under the License. org.apache.commons commons-compress + + --- End diff -- `avro` is pulling `xz` as well ``` [INFO] +- org.apache.avro:avro:jar:1.8.2:compile [INFO] | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile [INFO] | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile [INFO] | +- com.thoughtworks.paranamer:paranamer:jar:2.7:compile [INFO] | \- org.tukaani:xz:jar:1.5:compile ``` ---
[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143508371 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,8 +33,7 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); --- End diff -- This is a valid objection which I share. I'll remove this change and set the map of `TaskCheckpointStatistics` to an empty map in case that we want to leave the details out. ---
[jira] [Closed] (FLINK-7750) Strange behaviour in savepoints
[ https://issues.apache.org/jira/browse/FLINK-7750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7750. --- Resolution: Not A Problem > Strange behaviour in savepoints > --- > > Key: FLINK-7750 > URL: https://issues.apache.org/jira/browse/FLINK-7750 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.2 >Reporter: Razvan >Priority: Blocker > > I recently upgraded from 1.2.0 and savepoint creration behaves strange. > Whenever I try to create a savepoint with specified directory Apache Flink > creates a folder on the active JobManager (even if I trigger savepoint > creation from a different JobManager) which contains only _metadata. And > another folder on the TaskManager where the job is running which contains the > actual savepoint. > Obviously if I try to restore it says it can't find the savepoint. > This worked in 1.2.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7750) Strange behaviour in savepoints
[ https://issues.apache.org/jira/browse/FLINK-7750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197175#comment-16197175 ] Aljoscha Krettek commented on FLINK-7750: - If the data was not split when using Flink 1.2.x this was only a side-effect of an optimisation in the state backend that would allow it to not store state in a file system and instead send it to the job manager. I'm afraid what you are observing is the expected behaviour. > Strange behaviour in savepoints > --- > > Key: FLINK-7750 > URL: https://issues.apache.org/jira/browse/FLINK-7750 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.2 >Reporter: Razvan >Priority: Blocker > > I recently upgraded from 1.2.0 and savepoint creration behaves strange. > Whenever I try to create a savepoint with specified directory Apache Flink > creates a folder on the active JobManager (even if I trigger savepoint > creation from a different JobManager) which contains only _metadata. And > another folder on the TaskManager where the job is running which contains the > actual savepoint. > Obviously if I try to restore it says it can't find the savepoint. > This worked in 1.2.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197170#comment-16197170 ] ASF GitHub Bot commented on FLINK-7709: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143505801 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered;
[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143505801 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonProperty(FIELD_NAME_TASKS)
[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143504804 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonProperty(FIELD_NAME_TASKS)
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197163#comment-16197163 ] ASF GitHub Bot commented on FLINK-7709: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143504804 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered;
[jira] [Updated] (FLINK-7495) AbstractUdfStreamOperator#initializeState() should be called in AsyncWaitOperator#initializeState()
[ https://issues.apache.org/jira/browse/FLINK-7495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7495: Component/s: (was: State Backends, Checkpointing) DataStream API > AbstractUdfStreamOperator#initializeState() should be called in > AsyncWaitOperator#initializeState() > --- > > Key: FLINK-7495 > URL: https://issues.apache.org/jira/browse/FLINK-7495 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: Fang Yong >Priority: Minor > > {code} > recoveredStreamElements = context > .getOperatorStateStore() > .getListState(new ListStateDescriptor<>(STATE_NAME, > inStreamElementSerializer)); > {code} > Call to AbstractUdfStreamOperator#initializeState() should be added in the > beginning -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7388) ProcessFunction.onTimer() sets processing time as timestamp
[ https://issues.apache.org/jira/browse/FLINK-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197158#comment-16197158 ] ASF GitHub Bot commented on FLINK-7388: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4786#discussion_r143502884 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java --- @@ -79,7 +79,6 @@ public void onEventTime(InternalTimertimer) throws Exception @Override public void onProcessingTime(InternalTimer timer) throws Exception { - collector.setAbsoluteTimestamp(timer.getTimestamp()); --- End diff -- This should call `eraseTimestamp()` because we might still have a timestamp set from processing some previous elements. Same for the other occurrences in the code. > ProcessFunction.onTimer() sets processing time as timestamp > --- > > Key: FLINK-7388 > URL: https://issues.apache.org/jira/browse/FLINK-7388 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Bowen Li > Fix For: 1.4.0 > > > The {{ProcessFunction.onTimer()}} method sets the current processing time as > event-time timestamp when it is called from a processing time timer. > I don't think this behavior is useful. Processing time timestamps won't be > aligned with watermarks and are not deterministic. The only reason would be > to have _some_ value in the timestamp field. However, the behavior is very > subtle and might not be noticed by users. > IMO, it would be better to erase the timestamp. This will cause downstream > operator that rely on timestamps to fail and notify the users that the logic > they implemented was probably not what they intended to do. > What do you think [~aljoscha]? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143502870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonProperty(FIELD_NAME_TASKS) +
[GitHub] flink pull request #4786: [FLINK-7388][DataStream API] ProcessFunction.onTim...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4786#discussion_r143502884 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java --- @@ -79,7 +79,6 @@ public void onEventTime(InternalTimertimer) throws Exception @Override public void onProcessingTime(InternalTimer timer) throws Exception { - collector.setAbsoluteTimestamp(timer.getTimestamp()); --- End diff -- This should call `eraseTimestamp()` because we might still have a timestamp set from processing some previous elements. Same for the other occurrences in the code. ---
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197156#comment-16197156 ] ASF GitHub Bot commented on FLINK-7709: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143502870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; +
[jira] [Commented] (FLINK-7167) Job can't set checkpoint meta directory to cover the system setting state.checkpoints.dir
[ https://issues.apache.org/jira/browse/FLINK-7167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197144#comment-16197144 ] Aljoscha Krettek commented on FLINK-7167: - Thanks for pointing this out, [~phoenixjiangnan]. There is actually a (very outdated) PR that also tries to tackle this issue: https://github.com/apache/flink/pull/3522. I think the solution we will go for in the end is to get rid of this configuration parameter, this should make things easier. > Job can't set checkpoint meta directory to cover the system setting > state.checkpoints.dir > -- > > Key: FLINK-7167 > URL: https://issues.apache.org/jira/browse/FLINK-7167 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.3.1 >Reporter: yuqi >Assignee: Bowen Li > > If we want to recover a failed job use checkpoint, till now, as all job > checkpoint meta data are in the same directory and do not have specific > identification, we have to traverse all file in the directory to find the > data of this job. this is rather troublesome. so seting this configuration > in the job level is preferable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197137#comment-16197137 ] ASF GitHub Bot commented on FLINK-7709: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143500886 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered;
[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143500886 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonProperty(FIELD_NAME_TASKS)
[jira] [Closed] (FLINK-5510) Replace Scala Future with FlinkFuture in QueryableStateClient
[ https://issues.apache.org/jira/browse/FLINK-5510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-5510. --- Resolution: Duplicate With the QS client reworking of FLINK-7770 this issue becomes obsolete since FLINK-7770 also removes the Scala Futures in favour of Java Futures. > Replace Scala Future with FlinkFuture in QueryableStateClient > - > > Key: FLINK-5510 > URL: https://issues.apache.org/jira/browse/FLINK-5510 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Reporter: Ufuk Celebi >Priority: Minor > > The entry point for queryable state users is the {{QueryableStateClient}} > which returns query results via Scala Futures. Since merging the initial > version of QueryableState we have introduced the FlinkFuture wrapper type in > order to not expose our Scala dependency via the API. > Since APIs tend to stick around longer than expected, it might be worthwhile > to port the exposed QueryableStateClient interface to use the FlinkFuture. > Early users can still get the Scala Future via FlinkFuture#getScalaFuture(). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7736) Fix some of the alerts raised by lgtm.com
[ https://issues.apache.org/jira/browse/FLINK-7736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197132#comment-16197132 ] ASF GitHub Bot commented on FLINK-7736: --- Github user 1m2c3t4 commented on the issue: https://github.com/apache/flink/pull/4784 All review comments addressed > Fix some of the alerts raised by lgtm.com > - > > Key: FLINK-7736 > URL: https://issues.apache.org/jira/browse/FLINK-7736 > Project: Flink > Issue Type: Improvement >Reporter: Malcolm Taylor >Assignee: Malcolm Taylor > > lgtm.com has identified a number of issues giving scope for improvement in > the code: [https://lgtm.com/projects/g/apache/flink/alerts/?mode=list] > This issue is to address some of the simpler ones. Some of these are quite > clear bugs such as off-by-one errors. Others are areas where the code might > be made clearer, such as use of a variable name which shadows another > variable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4784: FLINK-7736: fix some lgtm.com alerts
Github user 1m2c3t4 commented on the issue: https://github.com/apache/flink/pull/4784 All review comments addressed ---
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197131#comment-16197131 ] ASF GitHub Bot commented on FLINK-7709: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143499239 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered;
[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143499239 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonProperty(FIELD_NAME_TASKS)
[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern
[ https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197108#comment-16197108 ] Kostas Kloudas commented on FLINK-7782: --- Hi [~ajkrishna], as seen in the JIRA that you posted (https://issues.apache.org/jira/browse/FLINK-7606) the problem there was that there are no more elements in the stream, so the watermark was not advancing. This lead to the elements being buffered inside Flink, and waiting for the watermark to advace, so that their "window" expires. Can this be the case also for you? You can monitor the size of you state, or you can write you own timestamp extractor and print every watermark you send. With parallelism of 1 you can see if the watermark is greater than that of your last element, or not (which would mean that your data is buffered). > Flink CEP not recognizing pattern > - > > Key: FLINK-7782 > URL: https://issues.apache.org/jira/browse/FLINK-7782 > Project: Flink > Issue Type: Bug >Reporter: Ajay > > I am using flink version 1.3.2. Flink has a kafka source. I am using > KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM > running Ubuntu 16.04. From the flink dashboard, I see that I have 2 > Taskmanagers & 4 Task slots > What I observe is the following. The input to Kafka is a json string and when > parsed on the flink side, it looks like this > {code:java} > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > {code} > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism for > the execution environment is set to 4. I have a rather simple event that I am > trying to capture > {code:java} > DataStream> > cepMapByHomeId = cepMap.keyBy(0); > //cepMapByHomeId.print(); > > Pattern , ?> cep1 = > > Pattern. >begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1); > DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents()); > {code} > The pattern checks if the 7th field in the tuple8 goes over 12 and then over > 16. The output of the pattern is like this > {code:java} > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > {code} > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by home_id. > I have about 10 partitions in kafka. The producer just loops going through a > csv file with a delay of about 100 ms between 2 rows of the csv file. The > data is exactly the same for all 100 of the csv files except for home_id and > the lat & long information. The timestamp is incremented by a step of 1 sec. > I start multiple processes to simulate data form different homes. > THE PROBLEM: > Flink completely misses capturing events for a large subset of the input > data. I barely see the events for about 4-5 of the home_id values. I do a > print before applying the pattern and after and I see all home_ids before and > only a tiny subset after. Since the data is exactly the same, I expect all > homeid to be captured and written to my sink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197099#comment-16197099 ] ASF GitHub Bot commented on FLINK-7709: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143489310 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,8 +33,7 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); --- End diff -- Do note that we must be on the look-out for requests that use primitive fields, as jackson will default them to 0 if they are missing, which will cause misleading error messages. > Port CheckpointStatsDetailsHandler to new REST endpoint > --- > > Key: FLINK-7709 > URL: https://issues.apache.org/jira/browse/FLINK-7709 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4763#discussion_r143489310 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,8 +33,7 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); --- End diff -- Do note that we must be on the look-out for requests that use primitive fields, as jackson will default them to 0 if they are missing, which will cause misleading error messages. ---
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197094#comment-16197094 ] Kostas Kloudas commented on FLINK-7606: --- I do not think that this would solve the problem. In event time, an element that has a timestamp X does not mean that also the event time has advanced passed that time. The watermark is the one that signals that time has advanced. So if the watermark does not advance, we cannot be sure that we can close a window. Only if you know your data, _e.g._ that timestamps are in order, then you can assume that. But in this case you could probably write your own timestamp extractor, that after a timeout, it sends a watermark with the "correct" timestamp. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: Schermata 2017-09-27 alle 00.35.53.png, heap-dump1.png, > heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143487841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197091#comment-16197091 ] ASF GitHub Bot commented on FLINK-6233: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143487841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Created] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
Aljoscha Krettek created FLINK-7784: --- Summary: Don't fail TwoPhaseCommitSinkFunction when failing to commit Key: FLINK-7784 URL: https://issues.apache.org/jira/browse/FLINK-7784 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.4.0 Reporter: Aljoscha Krettek Priority: Blocker Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails (either when doing it via the completed checkpoint notification or when trying to commit after restoring after failure). This means that the job will go into an infinite recovery loop because we will always keep failing. In some cases it might be better to ignore those failures and keep on processing and this should be the default. We can provide an option that allows failing the sink on failing commits. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7783) Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
[ https://issues.apache.org/jira/browse/FLINK-7783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7783: Summary: Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() (was: Don't always remove checkpoints ZooKeeperCompletedCheckpointStore#recover()) > Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() > -- > > Key: FLINK-7783 > URL: https://issues.apache.org/jira/browse/FLINK-7783 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Currently, we always delete checkpoint handles if they (or the data from the > DFS) cannot be read: > https://github.com/apache/flink/blob/91a4b276171afb760bfff9ccf30593e648e91dfb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L180 > This can lead to problems in case the DFS is temporarily now available, i.e. > we could inadvertently > delete all checkpoints even though they are still valid. > A user reported this problem on the mailing list: > https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7783) Don't always remove checkpoints ZooKeeperCompletedCheckpointStore#recover()
Aljoscha Krettek created FLINK-7783: --- Summary: Don't always remove checkpoints ZooKeeperCompletedCheckpointStore#recover() Key: FLINK-7783 URL: https://issues.apache.org/jira/browse/FLINK-7783 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Affects Versions: 1.3.2, 1.4.0 Reporter: Aljoscha Krettek Priority: Blocker Fix For: 1.4.0, 1.3.3 Currently, we always delete checkpoint handles if they (or the data from the DFS) cannot be read: https://github.com/apache/flink/blob/91a4b276171afb760bfff9ccf30593e648e91dfb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L180 This can lead to problems in case the DFS is temporarily now available, i.e. we could inadvertently delete all checkpoints even though they are still valid. A user reported this problem on the mailing list: https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern
[ https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197028#comment-16197028 ] Ajay commented on FLINK-7782: - Here is an example of a simple event I am trying to detect. The first and last line are the interesting points/events. The CEP library is not able to detect something like that. 4> (96,Sat Sep 30 22:30:25 UTC 2017,complex event,Low,32.781082,-117.01864,12.0,20.0) 4> (96,Sat Sep 30 22:30:26 UTC 2017,complex event,High,32.781082,-117.01864,0.0235,20.0) 4> (96,Sat Sep 30 22:30:27 UTC 2017,complex event,High,32.781082,-117.01864,0.02319611,20.0) 4> (96,Sat Sep 30 22:30:28 UTC 2017,complex event,Medium,32.781082,-117.01864,0.023357224,20.0) 4> (96,Sat Sep 30 22:30:29 UTC 2017,complex event,Low,32.781082,-117.01864,0.060904443,20.0) 4> (96,Sat Sep 30 22:30:30 UTC 2017,complex event,Medium,32.781082,-117.01864,0.100115,20.0) 4> (96,Sat Sep 30 22:30:31 UTC 2017,complex event,High,32.781082,-117.01864,0.12398389,20.0) 4> (96,Sat Sep 30 22:30:32 UTC 2017,complex event,Medium,32.781082,-117.01864,0.15611167,20.0) 4> (96,Sat Sep 30 22:30:33 UTC 2017,complex event,Low,32.781082,-117.01864,0.15817556,20.0) 4> (96,Sat Sep 30 22:30:34 UTC 2017,complex event,Low,32.781082,-117.01864,0.09934334,20.0) 4> (96,Sat Sep 30 22:30:35 UTC 2017,complex event,High,32.781082,-117.01864,16.0,20.0) Notes about this experiment. 1. Only one kafka partition and just one topic 2. Flink env parallelism set to 4 and I am using AscendingTimestampExtractor on KafkaSource09. 3. In the data above, the first element is the id that I use for keyBy 4. I started 4 Kafka producers in parallel with a random delay between them 5. Each producer sends 1 rows from a csv at an average of 18 seconds. Of the data from 4 producers, the events for only one was detected. 6. Looking at the log files, I print on the stream and see all 4 lines where each id is associated with one process number. In the above data 96 is only associated with 4. In this case there is just one partition in Kafka. If I were to increase the number of partitions each id is spread across multiple processes. 7. I had ran another test with a different set of 4 ids just before the one I've presented above and I expected to see 148 events for 4 ids and I saw all of them being captured. I did not change anything as far as delays in the producer. > Flink CEP not recognizing pattern > - > > Key: FLINK-7782 > URL: https://issues.apache.org/jira/browse/FLINK-7782 > Project: Flink > Issue Type: Bug >Reporter: Ajay > > I am using flink version 1.3.2. Flink has a kafka source. I am using > KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM > running Ubuntu 16.04. From the flink dashboard, I see that I have 2 > Taskmanagers & 4 Task slots > What I observe is the following. The input to Kafka is a json string and when > parsed on the flink side, it looks like this > {code:java} > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > {code} > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism for > the execution environment is set to 4. I have a rather simple event that I am > trying to capture > {code:java} > DataStream> > cepMapByHomeId = cepMap.keyBy(0); > //cepMapByHomeId.print(); > > Pattern , ?> cep1 = > > Pattern. >begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1); > DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents()); > {code} > The pattern checks if the 7th field in the tuple8 goes over 12 and then over > 16. The output of the pattern is like this > {code:java} > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > {code} > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by home_id. > I have about 10 partitions in kafka. The producer just loops going through a > csv file with a delay of about 100 ms between 2 rows
[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern
[ https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197027#comment-16197027 ] Ajay commented on FLINK-7782: - 1. Using an env parallelism of 1 performed similar with the additional problem that there was significant lag in the kafka topic 2. I removed the additional keyBy(0) but that did not change anything 3. I also tried only to check for the start only pattern and it was exactly the same where I saw one of the homes going through but 3 others just getting dropped. 4. I also tried slowing down the rate from 5000/second into Kafka to about 1000/second but I see similar results. > Flink CEP not recognizing pattern > - > > Key: FLINK-7782 > URL: https://issues.apache.org/jira/browse/FLINK-7782 > Project: Flink > Issue Type: Bug >Reporter: Ajay > > I am using flink version 1.3.2. Flink has a kafka source. I am using > KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM > running Ubuntu 16.04. From the flink dashboard, I see that I have 2 > Taskmanagers & 4 Task slots > What I observe is the following. The input to Kafka is a json string and when > parsed on the flink side, it looks like this > {code:java} > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > {code} > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism for > the execution environment is set to 4. I have a rather simple event that I am > trying to capture > {code:java} > DataStream> > cepMapByHomeId = cepMap.keyBy(0); > //cepMapByHomeId.print(); > > Pattern , ?> cep1 = > > Pattern. >begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1); > DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents()); > {code} > The pattern checks if the 7th field in the tuple8 goes over 12 and then over > 16. The output of the pattern is like this > {code:java} > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > {code} > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by home_id. > I have about 10 partitions in kafka. The producer just loops going through a > csv file with a delay of about 100 ms between 2 rows of the csv file. The > data is exactly the same for all 100 of the csv files except for home_id and > the lat & long information. The timestamp is incremented by a step of 1 sec. > I start multiple processes to simulate data form different homes. > THE PROBLEM: > Flink completely misses capturing events for a large subset of the input > data. I barely see the events for about 4-5 of the home_id values. I do a > print before applying the pattern and after and I see all home_ids before and > only a tiny subset after. Since the data is exactly the same, I expect all > homeid to be captured and written to my sink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7739) Improve Kafka*ITCase tests stability
[ https://issues.apache.org/jira/browse/FLINK-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197024#comment-16197024 ] ASF GitHub Bot commented on FLINK-7739: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4749 Users shouldn't be able to call `shutdown()` and if they are, they shouldn't be using it (it would be incorrect). `shutdown()` was a protected method and shouldn't be used the users. Unfortunately in many places in `flink-tests` it was used instead of public `stop()`, because protected methods are visible inside the same package and it was really confusing which one should be used. Ideally it would be great to have another visibility level like "only visible to the children", but there is no such thing :/ Thus I renamed `shutdown()` to make clear distinction from public `stop()`. > Improve Kafka*ITCase tests stability > > > Key: FLINK-7739 > URL: https://issues.apache.org/jira/browse/FLINK-7739 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7782) Flink CEP not recognizing pattern
[ https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197023#comment-16197023 ] Ajay commented on FLINK-7782: - Related to FLINK-7606 > Flink CEP not recognizing pattern > - > > Key: FLINK-7782 > URL: https://issues.apache.org/jira/browse/FLINK-7782 > Project: Flink > Issue Type: Bug >Reporter: Ajay > > I am using flink version 1.3.2. Flink has a kafka source. I am using > KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM > running Ubuntu 16.04. From the flink dashboard, I see that I have 2 > Taskmanagers & 4 Task slots > What I observe is the following. The input to Kafka is a json string and when > parsed on the flink side, it looks like this > {code:java} > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > {code} > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism for > the execution environment is set to 4. I have a rather simple event that I am > trying to capture > {code:java} > DataStream> > cepMapByHomeId = cepMap.keyBy(0); > //cepMapByHomeId.print(); > > Pattern , ?> cep1 = > > Pattern. >begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1); > DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents()); > {code} > The pattern checks if the 7th field in the tuple8 goes over 12 and then over > 16. The output of the pattern is like this > {code:java} > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > {code} > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by home_id. > I have about 10 partitions in kafka. The producer just loops going through a > csv file with a delay of about 100 ms between 2 rows of the csv file. The > data is exactly the same for all 100 of the csv files except for home_id and > the lat & long information. The timestamp is incremented by a step of 1 sec. > I start multiple processes to simulate data form different homes. > THE PROBLEM: > Flink completely misses capturing events for a large subset of the input > data. I barely see the events for about 4-5 of the home_id values. I do a > print before applying the pattern and after and I see all home_ids before and > only a tiny subset after. Since the data is exactly the same, I expect all > homeid to be captured and written to my sink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4647: [FLINK-7575] [WEB-DASHBOARD] Display "Fetching..."...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4647 ---
[jira] [Commented] (FLINK-7575) Dashboard jobs/tasks metrics display 0 when metrics are not yet available
[ https://issues.apache.org/jira/browse/FLINK-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197025#comment-16197025 ] ASF GitHub Bot commented on FLINK-7575: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4647 > Dashboard jobs/tasks metrics display 0 when metrics are not yet available > - > > Key: FLINK-7575 > URL: https://issues.apache.org/jira/browse/FLINK-7575 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.2 >Reporter: James Lafa >Assignee: James Lafa >Priority: Minor > Fix For: 1.4.0 > > > The web frontend is currently displaying "0" when a metric is not available > yet (ex: records-in/out, bytes-in/out). > 0 is misleading and it's preferable to display no value while the value is > still unknown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4749: [FLINK-7739][tests] Properly shutdown resources in tests
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4749 Users shouldn't be able to call `shutdown()` and if they are, they shouldn't be using it (it would be incorrect). `shutdown()` was a protected method and shouldn't be used the users. Unfortunately in many places in `flink-tests` it was used instead of public `stop()`, because protected methods are visible inside the same package and it was really confusing which one should be used. Ideally it would be great to have another visibility level like "only visible to the children", but there is no such thing :/ Thus I renamed `shutdown()` to make clear distinction from public `stop()`. ---
[jira] [Commented] (FLINK-7761) Twitter example is not self-contained
[ https://issues.apache.org/jira/browse/FLINK-7761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197022#comment-16197022 ] ASF GitHub Bot commented on FLINK-7761: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4773 merging. > Twitter example is not self-contained > - > > Key: FLINK-7761 > URL: https://issues.apache.org/jira/browse/FLINK-7761 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > The Twitter example jar is not self-contained as it excludes the shaded guava > dependency from the twitter connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7575) Dashboard jobs/tasks metrics display 0 when metrics are not yet available
[ https://issues.apache.org/jira/browse/FLINK-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7575. --- Resolution: Fixed Fix Version/s: 1.4.0 1.4: c7968e9c2ad6601f9f38b673ead5bc71611a6fbd > Dashboard jobs/tasks metrics display 0 when metrics are not yet available > - > > Key: FLINK-7575 > URL: https://issues.apache.org/jira/browse/FLINK-7575 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.2 >Reporter: James Lafa >Assignee: James Lafa >Priority: Minor > Fix For: 1.4.0 > > > The web frontend is currently displaying "0" when a metric is not available > yet (ex: records-in/out, bytes-in/out). > 0 is misleading and it's preferable to display no value while the value is > still unknown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4773: [FLINK-7761] [examples] Include shaded guava dependency i...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4773 merging. ---
[jira] [Created] (FLINK-7782) Flink CEP not recognizing pattern
Ajay created FLINK-7782: --- Summary: Flink CEP not recognizing pattern Key: FLINK-7782 URL: https://issues.apache.org/jira/browse/FLINK-7782 Project: Flink Issue Type: Bug Reporter: Ajay I am using flink version 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots What I observe is the following. The input to Kafka is a json string and when parsed on the flink side, it looks like this {code:java} (101,Sun Sep 24 23:18:53 UTC 2017,complex event,High,37.75142,-122.39458,12.0,20.0) {code} I use a Tuple8 to capture the parsed data. The first field is home_id. The time characteristic is set to EventTime and I have an AscendingTimestampExtractor using the timestamp field. I have parallelism for the execution environment is set to 4. I have a rather simple event that I am trying to capture {code:java} DataStream> cepMapByHomeId = cepMap.keyBy(0); //cepMapByHomeId.print(); Pattern , ?> cep1 = Pattern. >begin("start") .where(new OverLowThreshold()) .followedBy("end") .where(new OverHighThreshold()); PatternStream > patternStream = CEP.pattern(cepMapByHomeId, cep1); DataStream > alerts = patternStream.select(new PackageCapturedEvents()); {code} The pattern checks if the 7th field in the tuple8 goes over 12 and then over 16. The output of the pattern is like this {code:java} (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex event,Non-event,37.75837,-122.41467) {code} On the Kafka producer side, I am trying send simulated data for around 100 homes, so the home_id would go from 0-100 and the input is keyed by home_id. I have about 10 partitions in kafka. The producer just loops going through a csv file with a delay of about 100 ms between 2 rows of the csv file. The data is exactly the same for all 100 of the csv files except for home_id and the lat & long information. The timestamp is incremented by a step of 1 sec. I start multiple processes to simulate data form different homes. THE PROBLEM: Flink completely misses capturing events for a large subset of the input data. I barely see the events for about 4-5 of the home_id values. I do a print before applying the pattern and after and I see all home_ids before and only a tiny subset after. Since the data is exactly the same, I expect all homeid to be captured and written to my sink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4787: [FLINK-6615][core] simplify FileUtils
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4787 Is there any problem with the current implementation? The current implementation was carefully done to gracefully handle concurrent removals and allow to pick whether to clean directories with or without files. ---
[jira] [Commented] (FLINK-6615) tmp directory not cleaned up on shutdown
[ https://issues.apache.org/jira/browse/FLINK-6615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197009#comment-16197009 ] ASF GitHub Bot commented on FLINK-6615: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4787 Is there any problem with the current implementation? The current implementation was carefully done to gracefully handle concurrent removals and allow to pick whether to clean directories with or without files. > tmp directory not cleaned up on shutdown > > > Key: FLINK-6615 > URL: https://issues.apache.org/jira/browse/FLINK-6615 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Andrey >Assignee: Bowen Li > > Steps to reproduce: > 1) Stop task manager gracefully (kill -6 ) > 2) In the logs: > {code} > 2017-05-17 13:35:50,147 INFO org.apache.zookeeper.ClientCnxn > - EventThread shut down [main-EventThread] > 2017-05-17 13:35:50,200 ERROR > org.apache.flink.runtime.io.disk.iomanager.IOManager - IOManager > failed to properly clean up temp file directory: > /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47 > [flink-akka.actor.default-dispatcher-2] > java.nio.file.DirectoryNotEmptyException: > /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47 > at > sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.flink.util.FileUtils.deleteDirectory(FileUtils.java:154) > at > org.apache.flink.runtime.io.disk.iomanager.IOManager.shutdown(IOManager.java:109) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:185) > at > org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:241) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > {code} > Expected: > * on shutdown delete non-empty directory anyway. > Notes: > * after process terminated, I've checked > "/mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47" directory and > didn't find anything there. So it looks like timing issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support
[ https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196946#comment-16196946 ] ASF GitHub Bot commented on FLINK-5005: --- Github user DieBauer commented on the issue: https://github.com/apache/flink/pull/3703 Hi, I'm sorry for the late reaction. I haven't found the time to work on this anymore (also priorities shifted... ) Therefore this pull request is stale. (it still could be used as a reference). I think the main challenge is in serialising the java8 lambdas. And dropping the support for scala 2.10 and Java7 certainly helps in taming the pom.xml profiles. I will close this pull request to not keep the hopes up. > Remove Scala 2.10 support; add Scala 2.12 support > - > > Key: FLINK-5005 > URL: https://issues.apache.org/jira/browse/FLINK-5005 > Project: Flink > Issue Type: Improvement > Components: Scala API >Reporter: Andrew Roberts >Assignee: Aljoscha Krettek > > Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and > offers many compile-time and runtime speed improvements. It would be great to > get artifacts up on maven central to allow Flink users to migrate to Scala > 2.12.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support
[ https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196947#comment-16196947 ] ASF GitHub Bot commented on FLINK-5005: --- Github user DieBauer closed the pull request at: https://github.com/apache/flink/pull/3703 > Remove Scala 2.10 support; add Scala 2.12 support > - > > Key: FLINK-5005 > URL: https://issues.apache.org/jira/browse/FLINK-5005 > Project: Flink > Issue Type: Improvement > Components: Scala API >Reporter: Andrew Roberts >Assignee: Aljoscha Krettek > > Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and > offers many compile-time and runtime speed improvements. It would be great to > get artifacts up on maven central to allow Flink users to migrate to Scala > 2.12.0. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3703: [FLINK-5005] WIP: publish scala 2.12 artifacts
Github user DieBauer closed the pull request at: https://github.com/apache/flink/pull/3703 ---
[GitHub] flink issue #3703: [FLINK-5005] WIP: publish scala 2.12 artifacts
Github user DieBauer commented on the issue: https://github.com/apache/flink/pull/3703 Hi, I'm sorry for the late reaction. I haven't found the time to work on this anymore (also priorities shifted... ) Therefore this pull request is stale. (it still could be used as a reference). I think the main challenge is in serialising the java8 lambdas. And dropping the support for scala 2.10 and Java7 certainly helps in taming the pom.xml profiles. I will close this pull request to not keep the hopes up. ---
[jira] [Created] (FLINK-7781) Support sime on-demand metrics aggregation
Chesnay Schepler created FLINK-7781: --- Summary: Support sime on-demand metrics aggregation Key: FLINK-7781 URL: https://issues.apache.org/jira/browse/FLINK-7781 Project: Flink Issue Type: Improvement Components: Metrics, REST Affects Versions: 1.4.0 Reporter: Chesnay Schepler Fix For: 1.4.0 We should support aggregations (min, max, avg, sum) of metrics in the REST API. This is primarily about aggregating across subtasks, for example the number of incoming records across all subtasks. This is useful for simple use-cases where a dedicated metrics backend is overkill, and will allow us to provide better metrics in the web UI (since we can expose these aggregated as well). I propose to add a new query parameter "agg=[min,max,avg,sum]". As a start this parameter should only be used for task metrics. (This is simply the main use-case i have in mind) The aggregation should (naturally) only work for numeric metrics. We will need a HashSet of metrics that exist for subtasks of a given tasks that has to be updated in {{MetricStore#add}}. All task metrics are either stored as # {{.}} or # {{..}}. If a user sends a request {{get=mymetric,agg=sum}}, only the metrics of the first kind are to be considered. Similarly, given a request {{get=myoperator.mymetric,agg=sum}} only metrics of the second kind are to be considered. Ideally, the name of the aggregated metric (i.e. the original name without subtask index) is also contained in the list of available metrics. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5316) Make the GenericWriteAheadSink backwards compatible.
[ https://issues.apache.org/jira/browse/FLINK-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-5316. --- Resolution: Won't Fix Closing for now since there was no noise about this. > Make the GenericWriteAheadSink backwards compatible. > > > Key: FLINK-5316 > URL: https://issues.apache.org/jira/browse/FLINK-5316 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.2.0 >Reporter: Kostas Kloudas > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5235) Don't automatically add HBase classpath to Flink classpath
[ https://issues.apache.org/jira/browse/FLINK-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5235: Summary: Don't automatically add HBase classpath to Flink classpath (was: HBase classpath automatically added to Flink classpath) > Don't automatically add HBase classpath to Flink classpath > -- > > Key: FLINK-5235 > URL: https://issues.apache.org/jira/browse/FLINK-5235 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Reporter: Maximilian Michels > > If the {{HBASE_CONF_DIR}} is set, the startup script adds the output of > {{hbase classpath}} to the classpath. This is a huge list of dependencies > which is very likely to clash with Flink dependencies. > For example, on my machine: > {noformat} >
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196863#comment-16196863 ] ASF GitHub Bot commented on FLINK-6233: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143448956 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143448956 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[jira] [Commented] (FLINK-7575) Dashboard jobs/tasks metrics display 0 when metrics are not yet available
[ https://issues.apache.org/jira/browse/FLINK-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196860#comment-16196860 ] ASF GitHub Bot commented on FLINK-7575: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4647 merging. > Dashboard jobs/tasks metrics display 0 when metrics are not yet available > - > > Key: FLINK-7575 > URL: https://issues.apache.org/jira/browse/FLINK-7575 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.2 >Reporter: James Lafa >Assignee: James Lafa >Priority: Minor > > The web frontend is currently displaying "0" when a metric is not available > yet (ex: records-in/out, bytes-in/out). > 0 is misleading and it's preferable to display no value while the value is > still unknown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4647: [FLINK-7575] [WEB-DASHBOARD] Display "Fetching..." instea...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4647 merging. ---
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143445293 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +*/ + def getMaxOutputDelay: Long = Math.max(leftRelativeSize,
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196845#comment-16196845 ] ASF GitHub Bot commented on FLINK-6233: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143445293 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.Types +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + private var leftExpirationTime: Long = 0L; + private var rightExpirationTime: Long = 0L; + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + + // for delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and
[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.
[ https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196831#comment-16196831 ] Kostas Kloudas commented on FLINK-7756: --- Hi [~shashank734], thanks a lot for reporting it, I will look into it. > RocksDB state backend Checkpointing (Async and Incremental) is not working > with CEP. > - > > Key: FLINK-7756 > URL: https://issues.apache.org/jira/browse/FLINK-7756 > Project: Flink > Issue Type: Bug > Components: CEP, State Backends, Checkpointing, Streaming >Affects Versions: 1.3.2 > Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend >Reporter: Shashank Agarwal > > When i try to use RocksDBStateBackend on my staging cluster (which is using > HDFS as file system) it crashes. But When i use FsStateBackend on staging > (which is using HDFS as file system) it is working fine. > On local with local file system it's working fine in both cases. > Please check attached logs. I have around 20-25 tasks in my app. > {code:java} > 2017-09-29 14:21:31,639 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=0). > 2017-09-29 14:21:31,640 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,020 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=1). > 2017-09-29 14:21:32,022 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,078 INFO com.datastax.driver.core.NettyUtil > - Found Netty's native epoll transport in the classpath, using > it > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (1/2) > (b879f192c4e8aae6671cdafb3a24c00a). > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Map (2/2) > (1ea5aef6ccc7031edc6b37da2912d90b). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (2/2) > (4bac8e764c67520d418a4c755be23d4d). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched > from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 > for operator Co-Flat Map (1/2).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator Co-Flat Map (1/2). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at >
[jira] [Created] (FLINK-7780) Integrate savepoint command into REST client
Chesnay Schepler created FLINK-7780: --- Summary: Integrate savepoint command into REST client Key: FLINK-7780 URL: https://issues.apache.org/jira/browse/FLINK-7780 Project: Flink Issue Type: Sub-task Components: Client, REST Affects Versions: 1.4.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Priority: Blocker Fix For: 1.4.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)