[GitHub] flink pull request #5677: [hotfix] update doc of InternalTimerService.regist...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5677 [hotfix] update doc of InternalTimerService.registerEventTimeTimer() ## What is the purpose of the change update doc of InternalTimerService.registerEventTimeTimer() ## Brief change log update doc of InternalTimerService.registerEventTimeTimer() ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5677.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5677 commit 410601674a532f01268acd37c9c043b39d9ae6b1 Author: Bowen LiDate: 2018-03-10T07:35:15Z [hotfix] update doc of InternalTimerService.registerEventTimeTimer() ---
[jira] [Comment Edited] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394069#comment-16394069 ] Hequn Cheng edited comment on FLINK-8690 at 3/10/18 6:53 AM: - Rename current {{FlinkLogicalAggregateConverter}} to {{FlinkLogicalAggregateDataSetConverter}} will bring no change for dataset. Calcite can only find a plan generated by AggregateExpandDistinctAggregatesRule. was (Author: hequn8128): Rename current FlinkLogicalAggregateConverter to FlinkLogicalAggregateDataSetConverter will bring no change for dataset. Calcite can only find a plan generated by AggregateExpandDistinctAggregatesRule. > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394069#comment-16394069 ] Hequn Cheng commented on FLINK-8690: Rename current FlinkLogicalAggregateConverter to FlinkLogicalAggregateDataSetConverter will bring no change for dataset. Calcite can only find a plan generated by AggregateExpandDistinctAggregatesRule. > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394069#comment-16394069 ] Hequn Cheng edited comment on FLINK-8690 at 3/10/18 6:53 AM: - Rename current {{FlinkLogicalAggregateConverter}} to {{FlinkLogicalAggregateDataSetConverter}} will bring no change for dataset. Calcite can only find a plan generated by {{AggregateExpandDistinctAggregatesRule}}. was (Author: hequn8128): Rename current {{FlinkLogicalAggregateConverter}} to {{FlinkLogicalAggregateDataSetConverter}} will bring no change for dataset. Calcite can only find a plan generated by AggregateExpandDistinctAggregatesRule. > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394068#comment-16394068 ] Rong Rong commented on FLINK-8690: -- You are right. Haven't thought about that. I will verify the plan cost. However I am less worry about the DataStream side as the Calcite rule will generate 2 operations that requires state backend while using MapView will generate only 1. My worry is on DataSet, as per my initial implementation of splitting FlinkLogicalAggregate into 2, both of them ignores the plan generated by {{AggregateExpandDistinctAggregatesRule}} > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394064#comment-16394064 ] Hequn Cheng commented on FLINK-8690: No need to worry about the {{AggregateExpandDistinctAggregatesRule}}, the cost of plan generated by it will be bigger than the cost of the plan generated by {{FlinkLogicalAggregateDataStreamConverter}} and calcite will choose the best plan(i.e., the plan with the smallest cost). > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394046#comment-16394046 ] Rong Rong edited comment on FLINK-8690 at 3/10/18 5:54 AM: --- That should resolve our problem partially. The real reason why I introduced another node before logical plan is because **AggregateExpandDistinctAggregatesRule** is actually calcite specific and will apply globally regardless of whether it is on DataSet or DataStream and that's the rule we want to avoid applying in DataStream API. Basically it converts {code:java} COUNT (DISTINCT f1) {code} Into {code:java} COUNT (DIST_f1) FROM (SELECT f1 AS DIST_f1 GROUP BY f1) {code} There's always a way to unite these two operators together using FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then unite them together. Another possibility is to introduce logical plan RuleSet based on whether it's on stream or batch. But that seems to disagree with the purpose of "logical" plan was (Author: walterddr): That should resolve our problem partially. The real reason why I introduced another node before logical plan is because **AggregateExpandDistinctAggregatesRule** is actually calcite specific and will apply globally regardless of whether it is on DataSet or DataStream and that's the rule we want to avoid applying in DataStream API. Basically it converts {code:java} COUNT (DISTINCT f1) {code} Into {code:java} COUNT (DIST_f1) FROM (SELECT f1 AS DIST_f1 GROUP BY f1) {code} There's always a way to unite these two operators together using FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then unite them together. > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394046#comment-16394046 ] Rong Rong edited comment on FLINK-8690 at 3/10/18 5:52 AM: --- That should resolve our problem partially. The real reason why I introduced another node before logical plan is because **AggregateExpandDistinctAggregatesRule** is actually calcite specific and will apply globally regardless of whether it is on DataSet or DataStream and that's the rule we want to avoid applying in DataStream API. Basically it converts {code:java} COUNT (DISTINCT f1) {code} Into {code:java} COUNT (DIST_f1) FROM (SELECT f1 AS DIST_f1 GROUP BY f1) {code} There's always a way to unite these two operators together using FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then unite them together. was (Author: walterddr): That should resolve our problem partially. The real reason why I introduced another node before logical plan is because **AggregateExpandDistinctAggregatesRule** is actually calcite specific and will apply globally regardless of whether it is on DataSet or DataStream and that's the rule we want to avoid applying in DataStream API. Basically it converts {code:java} /COUNT (DISTINCT f1) {code} Into {code:java} COUNT (DIST_A) FROM (SELECT A AS DIST_A GROUP BY A) {code} There's always a way to unite these two operators together using FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then unite them together. > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394046#comment-16394046 ] Rong Rong commented on FLINK-8690: -- That should resolve our problem partially. The real reason why I introduced another node before logical plan is because **AggregateExpandDistinctAggregatesRule** is actually calcite specific and will apply globally regardless of whether it is on DataSet or DataStream and that's the rule we want to avoid applying in DataStream API. Basically it converts {code:java} /COUNT (DISTINCT f1) {code} Into {code:java} COUNT (DIST_A) FROM (SELECT A AS DIST_A GROUP BY A) {code} There's always a way to unite these two operators together using FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then unite them together. > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394042#comment-16394042 ] Rong Rong commented on FLINK-8863: -- Sounds good. We can deal with that in future. Thanks for the quick reply > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for > certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create > objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394012#comment-16394012 ] ASF GitHub Bot commented on FLINK-8655: --- Github user Bekreth commented on the issue: https://github.com/apache/flink/pull/5538 Previously, I was trying to leverage the current annotation parsers. It would be possible to pass in a keyspace more directly into AnotationParser in the shaded Datastax component, but I'm against this as it requires maintaining a version Datastax that differs from the open source version. It could be possible to just replace the entire annotation with reflections instead of just editing 1 property, but that still requires some futzing in reflections that I think would be best to avoid. Another option could be to extend the necessary Datastax classes within the CassandraPojoSink to facilitate the alterations necessary to pass in keyspace dynamically. I'm thinking this is the most favorable option. > Add a default keyspace to CassandraSink > --- > > Key: FLINK-8655 > URL: https://issues.apache.org/jira/browse/FLINK-8655 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.0 >Reporter: Christopher Hughes >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > Currently, to use the CassandraPojoSink, it is necessary for a user to > provide keyspace information on the desired POJOs using datastax annotations. > This allows various POJOs to be written to multiple keyspaces while sinking > messages, but prevent runtime flexibility. > For many developers, non-production environments may all share a single > Cassandra instance differentiated by keyspace names. I propose adding a > `defaultKeyspace(String keyspace)` to the ClusterBuilder. POJOs lacking a > definitive keyspace would attempt to be loaded to the provided default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5538: [FLINK-8655] [DataSink] Added default keyspace to Cassand...
Github user Bekreth commented on the issue: https://github.com/apache/flink/pull/5538 Previously, I was trying to leverage the current annotation parsers. It would be possible to pass in a keyspace more directly into AnotationParser in the shaded Datastax component, but I'm against this as it requires maintaining a version Datastax that differs from the open source version. It could be possible to just replace the entire annotation with reflections instead of just editing 1 property, but that still requires some futzing in reflections that I think would be best to avoid. Another option could be to extend the necessary Datastax classes within the CassandraPojoSink to facilitate the alterations necessary to pass in keyspace dynamically. I'm thinking this is the most favorable option. ---
[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394002#comment-16394002 ] Hequn Cheng commented on FLINK-8690: Hi, we don't have to create a new logical RelNode. As I said in the design doc, we can rename current FlinkLogicalAggregateConverter to FlinkLogicalAggregateDataSetConverter and add another FlinkLogicalAggregateDataStreamConverter which supports distinct aggregates. Both FlinkLogicalAggregateDataSetConverter and FlinkLogicalAggregateDataStreamConverter convert a LogicalAggregate to a FlinkLogicalAggregate. This may be more neat. What do you think? > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393990#comment-16393990 ] Xingcan Cui commented on FLINK-8863: Yes, I agree that there may be conflicts among different JARs specified with the \{{-jar}} parameters. Maybe we can choose a policy with coarse-grained lib resolving first and make it fine-grained (e.g., specify different JARs for different uses) in the future. > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for > certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create > objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
[ https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393985#comment-16393985 ] ASF GitHub Bot commented on FLINK-8854: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5662 Thanks for the explanation, @twalthr! I'll update the PR and resolve the conflicts caused. > Mapping of SchemaValidator.deriveFieldMapping() is incorrect. > - > > Key: FLINK-8854 > URL: https://issues.apache.org/jira/browse/FLINK-8854 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not > correct. > It should not only include all fields of the table schema, but also all > fields of the format schema (mapped to themselves). Otherwise, it is not > possible to use a timestamp extractor on a field that is not in table schema. > For example this configuration would fail: > {code} > sources: > - name: TaxiRides > schema: > - name: rideId > type: LONG > - name: rowTime > type: TIMESTAMP > rowtime: > timestamps: > type: "from-field" > from: "rideTime" > watermarks: > type: "periodic-bounded" > delay: "6" > connector: > > format: > property-version: 1 > type: json > schema: "ROW(rideId LONG, rideTime TIMESTAMP)" > {code} > because {{rideTime}} is not in the table schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5662: [FLINK-8854] [table] Fix schema mapping with time attribu...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5662 Thanks for the explanation, @twalthr! I'll update the PR and resolve the conflicts caused. ---
[jira] [Commented] (FLINK-8895) Job failed when one kafka broker shutdown
[ https://issues.apache.org/jira/browse/FLINK-8895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393973#comment-16393973 ] godfrey johnson commented on FLINK-8895: [~StephanEwen] After the partition leader is changed, the consumer can get all the recorders by connecting to the other brokers. So, it is possible to avoid the flink job failure by filtering the failed broker, right? > Job failed when one kafka broker shutdown > - > > Key: FLINK-8895 > URL: https://issues.apache.org/jira/browse/FLINK-8895 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.1 >Reporter: godfrey johnson >Priority: Major > > I used a FlinkKafkaConsumer08 to get records from kafka,but job failed when > a broker shutdown. > > I want to know it is possible to filter the failed broker and get the records > with the others brokers?which need to modify Flink's source code. > > And I get the following error: > {code:java} > // code placeholder > java.net.SocketTimeoutException at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) at > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at > kafka.utils.Utils$.read(Utils.scala:380) at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at > kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) at > org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:220) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393966#comment-16393966 ] Rong Rong commented on FLINK-8863: -- Thanks [~xccui] for the quick reply. I am assuming users would have to provide the JAR files when launch the SQL client. one question I have is if there are multiple version of the same class was found, or multiple function signature list were found in several different JARs. This can clearly be avoided if we only limit one UDF JAR file to search functions from. Another question is related to our use case in FLINK-7373, where we have dynamic UDF / JAR file declaration in SQL itself, so I was wondering if there can be a optional JAR file field to specify which JAR file function should be loading from. But, this use case could also be categorized as "Functions that are implemented in SQL" as per [~twalthr]'s description of the JIRA. What do you think? > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for > certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create > objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351986#comment-16351986 ] Ted Yu edited comment on FLINK-5486 at 3/10/18 1:57 AM: Can this get more review, please ? was (Author: yuzhih...@gmail.com): Can this get more review, please? > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Major > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6924) ADD LOG(X) supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393960#comment-16393960 ] ASF GitHub Bot commented on FLINK-6924: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5638#discussion_r173607312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1130,4 +1130,13 @@ object concat_ws { } } +object log { + def apply(base: Expression, antilogarithm: Expression): Expression = { +Log(base, antilogarithm) + } + def apply(antilogarithm: Expression): Expression = { +new Log(antilogarithm) --- End diff -- nvm, I think it might be too weird to write something like `base.log(antilogarithm)` in table API. > ADD LOG(X) supported in TableAPI > > > Key: FLINK-6924 > URL: https://issues.apache.org/jira/browse/FLINK-6924 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: zjuwangg >Priority: Major > Labels: starter > > See FLINK-6891 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5638#discussion_r173607312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1130,4 +1130,13 @@ object concat_ws { } } +object log { + def apply(base: Expression, antilogarithm: Expression): Expression = { +Log(base, antilogarithm) + } + def apply(antilogarithm: Expression): Expression = { +new Log(antilogarithm) --- End diff -- nvm, I think it might be too weird to write something like `base.log(antilogarithm)` in table API. ---
[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393956#comment-16393956 ] yanxiaobin edited comment on FLINK-8794 at 3/10/18 1:53 AM: About : 1.What I described above is that there will be such a situation when there is no failure in this job. I think I've found the problem. I found through log that filesystem's rename method has been executed without any exception, but the filename hasn't changed, so I think it should be S3's problem. This should not be a problem with Flink. was (Author: backlight): About : 1.What I described above is that there will be such a situation when there is no failure in this job. I found through log that filesystem's rename method has been executed without any exception, but the filename hasn't changed, so I think it should be S3's problem. This should not be a problem with Flink. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393956#comment-16393956 ] yanxiaobin commented on FLINK-8794: --- About : 1.What I described above is that there will be such a situation when there is no failure in this job. I found through log that filesystem's rename method has been executed without any exception, but the filename hasn't changed, so I think it should be S3's problem. This should not be a problem with Flink. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393957#comment-16393957 ] Xingcan Cui commented on FLINK-8863: Hi [~walterddr], commonly, all the required classes are supposed to be found in the provided JAR files, including the UDFs. Do you have some other ideas for that? > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for > certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create > objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8794: -- Issue Type: Improvement (was: Bug) > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6924) ADD LOG(X) supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393949#comment-16393949 ] ASF GitHub Bot commented on FLINK-6924: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5638#discussion_r173606447 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1130,4 +1130,13 @@ object concat_ws { } } +object log { + def apply(base: Expression, antilogarithm: Expression): Expression = { +Log(base, antilogarithm) + } + def apply(antilogarithm: Expression): Expression = { +new Log(antilogarithm) --- End diff -- Is it just `new Ln(antilogarithm)`, we might be able to restructure them together? > ADD LOG(X) supported in TableAPI > > > Key: FLINK-6924 > URL: https://issues.apache.org/jira/browse/FLINK-6924 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: zjuwangg >Priority: Major > Labels: starter > > See FLINK-6891 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5638#discussion_r173606447 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1130,4 +1130,13 @@ object concat_ws { } } +object log { + def apply(base: Expression, antilogarithm: Expression): Expression = { +Log(base, antilogarithm) + } + def apply(antilogarithm: Expression): Expression = { +new Log(antilogarithm) --- End diff -- Is it just `new Ln(antilogarithm)`, we might be able to restructure them together? ---
[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393942#comment-16393942 ] Rong Rong commented on FLINK-8863: -- Hi [~twalthr], in the task description, there's no specification regarding where the UDF is loaded from; According to FLIP-24, seems like there's only MyToolBox.jar is described as containing UDF. Are we going to always assume UDFs are contained in a pre-defined JAR file? > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for > certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create > objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios
[ https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393756#comment-16393756 ] ASF GitHub Bot commented on FLINK-6206: --- Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/5399 Just to be clear, we would revert all the changes, except the ones for `flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java`, right? > Log task state transitions as warn/error for FAILURE scenarios > -- > > Key: FLINK-6206 > URL: https://issues.apache.org/jira/browse/FLINK-6206 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Dan Bress >Priority: Critical > > If a task fails due to an exception, I would like that to be logged at a warn > or an error level. currently its info > {code} > private boolean transitionState(ExecutionState currentState, ExecutionState > newState, Throwable cause) { > if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { > if (cause == null) { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState); > } else { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState, cause); > } > return true; > } else { > return false; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...
Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/5399 Just to be clear, we would revert all the changes, except the ones for `flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java`, right? ---
[jira] [Commented] (FLINK-8888) Upgrade AWS SDK in flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393725#comment-16393725 ] ASF GitHub Bot commented on FLINK-: --- Github user kailashhd commented on the issue: https://github.com/apache/flink/pull/5663 Currently in flink connector we are depending only on aws-sdk-kinesis and not on aws-java-sdk-bundle and also don't depend on kinesisvideo. So by default the dependency on kinesisvideo is not included in the connector which means we don't have to exclude any dependencies. I also verified that there is no unwanted netty dependencies by running mvn dependency:tree. The only instance of netty is this: `[INFO] | +- org.apache.flink:flink-shaded-netty:jar:4.0.27.Final-2.0:provided` in accordance to the value in flink-parent pom. > Upgrade AWS SDK in flink-connector-kinesis > -- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Bump up the java aws sdk version to 1.11.272. Evaluate also the impact of > this version upgrade for KCL and KPL versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5663: [FLINK-8888] [Kinesis Connectors] Update the AWS SDK for ...
Github user kailashhd commented on the issue: https://github.com/apache/flink/pull/5663 Currently in flink connector we are depending only on aws-sdk-kinesis and not on aws-java-sdk-bundle and also don't depend on kinesisvideo. So by default the dependency on kinesisvideo is not included in the connector which means we don't have to exclude any dependencies. I also verified that there is no unwanted netty dependencies by running mvn dependency:tree. The only instance of netty is this: `[INFO] | +- org.apache.flink:flink-shaded-netty:jar:4.0.27.Final-2.0:provided` in accordance to the value in flink-parent pom. ---
[jira] [Updated] (FLINK-8913) RocksDB state backend crashes in alpine image
[ https://issues.apache.org/jira/browse/FLINK-8913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joshua Griffith updated FLINK-8913: --- Summary: RocksDB state backend crashes in alpine image (was: RocksDB state backend crashes in hadoop28-scala_2.11-alpine image) > RocksDB state backend crashes in alpine image > - > > Key: FLINK-8913 > URL: https://issues.apache.org/jira/browse/FLINK-8913 > Project: Flink > Issue Type: Bug > Components: Docker, State Backends, Checkpointing >Affects Versions: 1.4.1 > Environment: {{~> minikube version}} > {{minikube version: v0.25.0}}{{}}{{~> minikube config view}} > {{- cpus: 4}} > {{- kubernetes-version: v1.8.0}} > {{- memory: 8192}} > {{- vm-driver: hyperkit}} > {{- WantReportError: true}} >Reporter: Joshua Griffith >Priority: Major > > Running the word count example jar with the RocksDB state backend enabled > (via job manager configuration) with the Flink hadoop28-scala_2.11-alpine > image crashes with the following error: > > {{2018-03-09 21:09:12,928 INFO}}{{2018-03-09 21:09:12,892 INFO > org.apache.flink.runtime.taskmanager.Task - Source: Collection Source -> Flat > Map (1/1) (38365cd6326de6df92b72d1acbda0f72) switched from RUNNING to > FINISHED.}} > {{2018-03-09 21:09:12,892 INFO org.apache.flink.runtime.taskmanager.Task - > Freeing task resources for Source: Collection Source -> Flat Map (1/1) > (38365cd6326de6df92b72d1acbda0f72).}} > {{2018-03-09 21:09:12,894 INFO org.apache.flink.runtime.taskmanager.Task - > Ensuring all FileSystem streams are closed for task Source: Collection Source > -> Flat Map (1/1) (38365cd6326de6df92b72d1acbda0f72) [FINISHED]}} > {{2018-03-09 21:09:12,897 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and > sending final execution state FINISHED to JobManager for task Source: > Collection Source -> Flat Map (38365cd6326de6df92b72d1acbda0f72)}} > {{2018-03-09 21:09:12,902 INFO > org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Successfully > loaded RocksDB native > libraryorg.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot.}} > {{#}} > {{# A fatal error has been detected by the Java Runtime Environment:}} > {{#}} > {{# SIGSEGV (0xb) at pc=0x001432b6, pid=1, tid=0x7fc4036e1ae8}} > {{#}} > {{# JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build > 1.8.0_151-b12)}} > {{# Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 > compressed oops)}} > {{# Derivative: IcedTea 3.6.0}} > {{# Distribution: Custom build (Tue Nov 21 11:22:36 GMT 2017)}} > {{# Problematic frame:}} > {{# C 0x001432b6}} > {{#}} > {{# Core dump written. Default location: /opt/flink/core or core.1}} > {{#}} > {{# An error report file with more information is saved as:}} > {{# /opt/flink/hs_err_pid1.log}} > {{#}} > {{# If you would like to submit a bug report, please include}} > {{# instructions on how to reproduce the bug and visit:}} > {{# http://icedtea.classpath.org/bugzilla}} > > Switching to the Debian image fixes this issue. I imagine it has to do with > alpine's alternative libc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8913) RocksDB state backend crashes in hadoop28-scala_2.11-alpine image
Joshua Griffith created FLINK-8913: -- Summary: RocksDB state backend crashes in hadoop28-scala_2.11-alpine image Key: FLINK-8913 URL: https://issues.apache.org/jira/browse/FLINK-8913 Project: Flink Issue Type: Bug Components: Docker, State Backends, Checkpointing Affects Versions: 1.4.1 Environment: {{~> minikube version}} {{minikube version: v0.25.0}}{{}}{{~> minikube config view}} {{- cpus: 4}} {{- kubernetes-version: v1.8.0}} {{- memory: 8192}} {{- vm-driver: hyperkit}} {{- WantReportError: true}} Reporter: Joshua Griffith Running the word count example jar with the RocksDB state backend enabled (via job manager configuration) with the Flink hadoop28-scala_2.11-alpine image crashes with the following error: {{2018-03-09 21:09:12,928 INFO}}{{2018-03-09 21:09:12,892 INFO org.apache.flink.runtime.taskmanager.Task - Source: Collection Source -> Flat Map (1/1) (38365cd6326de6df92b72d1acbda0f72) switched from RUNNING to FINISHED.}} {{2018-03-09 21:09:12,892 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Collection Source -> Flat Map (1/1) (38365cd6326de6df92b72d1acbda0f72).}} {{2018-03-09 21:09:12,894 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Collection Source -> Flat Map (1/1) (38365cd6326de6df92b72d1acbda0f72) [FINISHED]}} {{2018-03-09 21:09:12,897 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Collection Source -> Flat Map (38365cd6326de6df92b72d1acbda0f72)}} {{2018-03-09 21:09:12,902 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Successfully loaded RocksDB native libraryorg.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Initializing RocksDB keyed state backend from snapshot.}} {{#}} {{# A fatal error has been detected by the Java Runtime Environment:}} {{#}} {{# SIGSEGV (0xb) at pc=0x001432b6, pid=1, tid=0x7fc4036e1ae8}} {{#}} {{# JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build 1.8.0_151-b12)}} {{# Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 compressed oops)}} {{# Derivative: IcedTea 3.6.0}} {{# Distribution: Custom build (Tue Nov 21 11:22:36 GMT 2017)}} {{# Problematic frame:}} {{# C 0x001432b6}} {{#}} {{# Core dump written. Default location: /opt/flink/core or core.1}} {{#}} {{# An error report file with more information is saved as:}} {{# /opt/flink/hs_err_pid1.log}} {{#}} {{# If you would like to submit a bug report, please include}} {{# instructions on how to reproduce the bug and visit:}} {{# http://icedtea.classpath.org/bugzilla}} Switching to the Debian image fixes this issue. I imagine it has to do with alpine's alternative libc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios
[ https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393392#comment-16393392 ] ASF GitHub Bot commented on FLINK-6206: --- Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/5399 Sure, that makes sense! > Log task state transitions as warn/error for FAILURE scenarios > -- > > Key: FLINK-6206 > URL: https://issues.apache.org/jira/browse/FLINK-6206 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Dan Bress >Priority: Critical > > If a task fails due to an exception, I would like that to be logged at a warn > or an error level. currently its info > {code} > private boolean transitionState(ExecutionState currentState, ExecutionState > newState, Throwable cause) { > if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { > if (cause == null) { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState); > } else { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState, cause); > } > return true; > } else { > return false; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...
Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/5399 Sure, that makes sense! ---
[jira] [Closed] (FLINK-8786) SpillableSubpartitionView#getNextBuffer always sets isMoreAvailable to false when switching from spillable to spilled
[ https://issues.apache.org/jira/browse/FLINK-8786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8786. --- > SpillableSubpartitionView#getNextBuffer always sets isMoreAvailable to false > when switching from spillable to spilled > - > > Key: FLINK-8786 > URL: https://issues.apache.org/jira/browse/FLINK-8786 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > When processing the last in-memory buffer in > {{SpillableSubpartitionView#getNextBuffer}}, we always set the > {{isMoreAvailable}} flag of the returned {{BufferAndBacklog}} to {{false}} > irrespective of what may be in the spill writer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8786) SpillableSubpartitionView#getNextBuffer always sets isMoreAvailable to false when switching from spillable to spilled
[ https://issues.apache.org/jira/browse/FLINK-8786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8786. - Resolution: Fixed Fix Version/s: 1.6.0 Fixed in - 1.5.0 via 835adcc373ce169f202055e9b4f9dc3fb9123772 and d1a969f7ad018ef44f40f974eb49ba004494fcdf - 1.6.0 via 112c54fb07e2a3f33322ba99a9d59c1a8dbc and c19df9ff670c06aeb381339c244bbd22fe13fd4d > SpillableSubpartitionView#getNextBuffer always sets isMoreAvailable to false > when switching from spillable to spilled > - > > Key: FLINK-8786 > URL: https://issues.apache.org/jira/browse/FLINK-8786 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > When processing the last in-memory buffer in > {{SpillableSubpartitionView#getNextBuffer}}, we always set the > {{isMoreAvailable}} flag of the returned {{BufferAndBacklog}} to {{false}} > irrespective of what may be in the spill writer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available
[ https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393374#comment-16393374 ] ASF GitHub Bot commented on FLINK-8755: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5581 > SpilledSubpartitionView wrongly relys on the backlog for determining whether > more data is available > --- > > Key: FLINK-8755 > URL: https://issues.apache.org/jira/browse/FLINK-8755 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > {code} > public BufferAndBacklog getNextBuffer() throws IOException, > InterruptedException { > //... > int newBacklog = parent.decreaseBuffersInBacklog(current); > return new BufferAndBacklog(current, newBacklog > 0, newBacklog, > nextBufferIsEvent); > {code} > relies on the backlog to signal further data availability. However, if there > are only events left in the buffer queue, their buffers are not included in > the backlog counting and therefore, {{isMoreAvailable}} will be wrongly > {{false}} here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available
[ https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8755. --- > SpilledSubpartitionView wrongly relys on the backlog for determining whether > more data is available > --- > > Key: FLINK-8755 > URL: https://issues.apache.org/jira/browse/FLINK-8755 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > {code} > public BufferAndBacklog getNextBuffer() throws IOException, > InterruptedException { > //... > int newBacklog = parent.decreaseBuffersInBacklog(current); > return new BufferAndBacklog(current, newBacklog > 0, newBacklog, > nextBufferIsEvent); > {code} > relies on the backlog to signal further data availability. However, if there > are only events left in the buffer queue, their buffers are not included in > the backlog counting and therefore, {{isMoreAvailable}} will be wrongly > {{false}} here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available
[ https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8755. - Resolution: Fixed Fix Version/s: 1.6.0 Fixed in - 1.5.0 via 5c7457aa2aece89b77e9a9402cc514f90e083a69 and d1a969f7ad018ef44f40f974eb49ba004494fcdf - 1.6.0 via 18b75e32bb8f4f155f729574b2d377459104471e and c19df9ff670c06aeb381339c244bbd22fe13fd4d > SpilledSubpartitionView wrongly relys on the backlog for determining whether > more data is available > --- > > Key: FLINK-8755 > URL: https://issues.apache.org/jira/browse/FLINK-8755 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > {code} > public BufferAndBacklog getNextBuffer() throws IOException, > InterruptedException { > //... > int newBacklog = parent.decreaseBuffersInBacklog(current); > return new BufferAndBacklog(current, newBacklog > 0, newBacklog, > nextBufferIsEvent); > {code} > relies on the backlog to signal further data availability. However, if there > are only events left in the buffer queue, their buffers are not included in > the backlog counting and therefore, {{isMoreAvailable}} will be wrongly > {{false}} here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5581 ---
[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios
[ https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393357#comment-16393357 ] ASF GitHub Bot commented on FLINK-6206: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5399 Do you think it would be a good approach to promote that to ERROR logging on the TaskManager (who actually encountered the exception) and leave the JobManager at INFO (who only handles a regular failure/recovery cycle)? > Log task state transitions as warn/error for FAILURE scenarios > -- > > Key: FLINK-6206 > URL: https://issues.apache.org/jira/browse/FLINK-6206 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Dan Bress >Priority: Critical > > If a task fails due to an exception, I would like that to be logged at a warn > or an error level. currently its info > {code} > private boolean transitionState(ExecutionState currentState, ExecutionState > newState, Throwable cause) { > if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { > if (cause == null) { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState); > } else { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState, cause); > } > return true; > } else { > return false; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5399 Do you think it would be a good approach to promote that to ERROR logging on the TaskManager (who actually encountered the exception) and leave the JobManager at INFO (who only handles a regular failure/recovery cycle)? ---
[jira] [Commented] (FLINK-8888) Upgrade AWS SDK in flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393327#comment-16393327 ] ASF GitHub Bot commented on FLINK-: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5663 Is your testing Flink job both reading from and writing to Kinesis, aka both KCL and KPL are tested? If so, +1 > Upgrade AWS SDK in flink-connector-kinesis > -- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Bump up the java aws sdk version to 1.11.272. Evaluate also the impact of > this version upgrade for KCL and KPL versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5663: [FLINK-8888] [Kinesis Connectors] Update the AWS SDK for ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5663 Is your testing Flink job both reading from and writing to Kinesis, aka both KCL and KPL are tested? If so, +1 ---
[jira] [Commented] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value
[ https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393314#comment-16393314 ] ASF GitHub Bot commented on FLINK-8364: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5356 hmmm I think you are right, this actually might be a non-issue in the first place > Add iterator() to ListState which returns empty iterator when it has no value > - > > Key: FLINK-8364 > URL: https://issues.apache.org/jira/browse/FLINK-8364 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Add iterator() to ListState which returns empty iterator when it has no value -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5356: [FLINK-8364][state backend] Add iterator() to ListState w...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5356 hmmm I think you are right, this actually might be a non-issue in the first place ---
[jira] [Commented] (FLINK-8515) update RocksDBMapState to replace deprecated remove() with delete()
[ https://issues.apache.org/jira/browse/FLINK-8515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393312#comment-16393312 ] ASF GitHub Bot commented on FLINK-8515: --- Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/5365 > update RocksDBMapState to replace deprecated remove() with delete() > --- > > Key: FLINK-8515 > URL: https://issues.apache.org/jira/browse/FLINK-8515 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.5.0 > > > Currently in RocksDBMapState: > {code:java} > @Override > public void remove(UK userKey) throws IOException, RocksDBException { > byte[] rawKeyBytes = > serializeUserKeyWithCurrentKeyAndNamespace(userKey); > backend.db.remove(columnFamily, writeOptions, rawKeyBytes); > } > {code} > remove() is actually deprecated. Should be replaced with delete() -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5365: [FLINK-8515] update RocksDBMapState to replace dep...
Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/5365 ---
[jira] [Assigned] (FLINK-8897) Rowtime materialization causes "mismatched type" AssertionError
[ https://issues.apache.org/jira/browse/FLINK-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-8897: --- Assignee: Timo Walther > Rowtime materialization causes "mismatched type" AssertionError > --- > > Key: FLINK-8897 > URL: https://issues.apache.org/jira/browse/FLINK-8897 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > As raised in [this > thread|https://lists.apache.org/thread.html/e2ea38aa7ae224d7481145334955d84243690e9aad10d58310bdb8e7@%3Cuser.flink.apache.org%3E], > the query created by the following code will throw a calcite "mismatch type" > ({{Timestamp(3)}} and {{TimeIndicator}}) exception. > {code:java} > String sql1 = "select id, eventTs as t1, count(*) over (partition by id order > by eventTs rows between 100 preceding and current row) as cnt1 from myTable1"; > String sql2 = "select distinct id as r_id, eventTs as t2, count(*) over > (partition by id order by eventTs rows between 50 preceding and current row) > as cnt2 from myTable2"; > Table left = tableEnv.sqlQuery(sql1); > Table right = tableEnv.sqlQuery(sql2); > left.join(right).where("id === r_id && t1 === t2").select("id, > t1").writeToSink(...) > {code} > The logical plan is as follows. > {code} > LogicalProject(id=[$0], t1=[$1]) > LogicalFilter(condition=[AND(=($0, $3), =($1, $4))]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalAggregate(group=[{0, 1, 2}]) > LogicalWindow(window#0=[window(partition {0} order by [1] rows > between $2 PRECEDING and CURRENT ROW aggs [COUNT()])]) > LogicalProject(id=[$0], eventTs=[$3]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > LogicalAggregate(group=[{0, 1, 2}]) > LogicalWindow(window#0=[window(partition {0} order by [1] rows > between $2 PRECEDING and CURRENT ROW aggs [COUNT()])]) > LogicalProject(id=[$0], eventTs=[$3]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > {code} > That is because the the rowtime field after an aggregation will be > materialized while the {{RexInputRef}} type for the filter's operands ({{t1 > === t2}}) is still {{TimeIndicator}}. We should make them unified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8903) Error calculation based on rolling window in table API and SQL API
[ https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-8903. Resolution: Not A Problem Hi [~lilizhao], processing time is inherently non-deterministic, i.e., different executions produce different results. You have to use event time if you want more accurate results. Closing this issue as "Not a Problem" > Error calculation based on rolling window in table API and SQL API > -- > > Key: FLINK-8903 > URL: https://issues.apache.org/jira/browse/FLINK-8903 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.4.1 >Reporter: lilizhao >Priority: Critical > Fix For: 1.5.0 > > Attachments: TableAndSQLTest.java > > > Error calculation based on rolling window in table API and SQL API > The variance of the calculation is equal to the average. > 1 The test code is detailed in the appendix > 2 The test data are as follows > 1 li > 100 li > 3 The Table API test result as follows > (true,50.5,101.0,50.5,li,2018-03-09 09:11:00.0) > 4 The SQL API test result as follows > (true,50.5,2,101.0,50.5,li,2018-03-09 09:21:00.0) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios
[ https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393157#comment-16393157 ] ASF GitHub Bot commented on FLINK-6206: --- Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/5399 The reason I proposed the change is that any unexpected behavior, even when you can recover from it, can be indicative of a potential bug/misconfiguration. INFO logs are by far noisier than errors, which means it is tough to identify problems unless you log them with ERROR (or at least WARN). I'd say we at least demote this to WARN. When one is analyzing logs, trying to find leads to current problems, it's common to filter out all INFO logs (otherwise it's mayhem); so anything that caused an exception to be thrown should be highlighted. > Log task state transitions as warn/error for FAILURE scenarios > -- > > Key: FLINK-6206 > URL: https://issues.apache.org/jira/browse/FLINK-6206 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Dan Bress >Priority: Critical > > If a task fails due to an exception, I would like that to be logged at a warn > or an error level. currently its info > {code} > private boolean transitionState(ExecutionState currentState, ExecutionState > newState, Throwable cause) { > if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { > if (cause == null) { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState); > } else { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState, cause); > } > return true; > } else { > return false; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...
Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/5399 The reason I proposed the change is that any unexpected behavior, even when you can recover from it, can be indicative of a potential bug/misconfiguration. INFO logs are by far noisier than errors, which means it is tough to identify problems unless you log them with ERROR (or at least WARN). I'd say we at least demote this to WARN. When one is analyzing logs, trying to find leads to current problems, it's common to filter out all INFO logs (otherwise it's mayhem); so anything that caused an exception to be thrown should be highlighted. ---
[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)
[ https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393140#comment-16393140 ] ASF GitHub Bot commented on FLINK-8910: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5676 CC @aljoscha > Introduce automated end-to-end test for local recovery (including sticky > scheduling) > > > Key: FLINK-8910 > URL: https://issues.apache.org/jira/browse/FLINK-8910 > Project: Flink > Issue Type: Test > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > We should have an automated end-to-end test that can run nightly to check > that sticky allocation and local recovery work as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)
[ https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393139#comment-16393139 ] ASF GitHub Bot commented on FLINK-8910: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/5676 [FLINK-8910][tests] Automated end-to-end test for local recovery and sticky scheduling This PR adds an automated end-to-end test for the local recovery feature, which also includes sticky allocation. We expose allocation id through (not public) `StreamingRuntimeContext `. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink automated-test-finish Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5676 commit 7de3d9d8ddaa84684e8c757285201387ad556ef2 Author: Stefan RichterDate: 2018-03-08T18:20:32Z [FLINK-8910][tests] Expose allocation id through runtime ctx commit d8e740484a340f20277b9ed1e2d22b9d96897937 Author: Stefan Richter Date: 2018-03-06T09:35:44Z [FLINK-8910][tests] Automated test for local recovery (including sticky allocation) > Introduce automated end-to-end test for local recovery (including sticky > scheduling) > > > Key: FLINK-8910 > URL: https://issues.apache.org/jira/browse/FLINK-8910 > Project: Flink > Issue Type: Test > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > We should have an automated end-to-end test that can run nightly to check > that sticky allocation and local recovery work as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5676: [FLINK-8910][tests] Automated end-to-end test for local r...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5676 CC @aljoscha ---
[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/5676 [FLINK-8910][tests] Automated end-to-end test for local recovery and sticky scheduling This PR adds an automated end-to-end test for the local recovery feature, which also includes sticky allocation. We expose allocation id through (not public) `StreamingRuntimeContext `. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink automated-test-finish Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5676 commit 7de3d9d8ddaa84684e8c757285201387ad556ef2 Author: Stefan RichterDate: 2018-03-08T18:20:32Z [FLINK-8910][tests] Expose allocation id through runtime ctx commit d8e740484a340f20277b9ed1e2d22b9d96897937 Author: Stefan Richter Date: 2018-03-06T09:35:44Z [FLINK-8910][tests] Automated test for local recovery (including sticky allocation) ---
[jira] [Updated] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8740: Priority: Critical (was: Blocker) > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Priority: Critical > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job Metrics: > {code:java} > if (isRecovery) { >log.info(s"Removing metrics for $jobId, new will be added during recover") >jobManagerMetricGroup.removeJob(jobId) > }{code} > I'd be happy to submit this in a PR if that is acceptable to open up the > discussion, but I am not sure the consequences of not closing the previous > JMMG or perhaps simply not re-registering job-level metrics during recovery. > Doing this would seem to entail informing lower levels about the recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode
[ https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8740: Priority: Blocker (was: Critical) > Job-level metrics lost during job re-submission in HA mode > -- > > Key: FLINK-8740 > URL: https://issues.apache.org/jira/browse/FLINK-8740 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Joshua DeWald >Priority: Blocker > Fix For: 1.5.0 > > > When Flink is running in High Availability and a leader re-election occurs to > the same job manager, the job is unable to register the job-level metrics due > to a name collision. > This may occur even if a different Job Manager is elected, but as it is a > local JobManagerMetricsGroup which spits out the error, that is unlikely the > case. > > *Expected Behavior* > When a job is forced to re-submit due to Job Manager re-election, job-level > metrics should be available in the new instance of the job (uptime, > checkpoints size, checkpoint duration, etc) > *Actual Behavior* > When job gets re-submitted, it is unable to register job-level metrics due to > collision in the JobManagerMetricGroup, which leads to situation where even > though job is running the metrics around checkpoints and uptime are not > available > *Steps to reproduce* > # Start up Flink in HA mode using ZooKeeper, single node is fine > # Submit a job to the cluster > # Stop and restart ZooKeeper > # In Job Manager logs you will see the following errors: > # > {noformat} > 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'totalNumberOfCheckpoints'. Metric will not be reported > 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfInProgressCheckpoints'. Metric will not be reported > 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - > Name collision: Group already contains a Metric with the name > 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat} > *Proposed Solution* > I suspect that there may be other related issues than just the metrics, but a > code change that seems to fix the issue is that, during recovery, to remove > the existing registered Job Metrics: > {code:java} > if (isRecovery) { >log.info(s"Removing metrics for $jobId, new will be added during recover") >jobManagerMetricGroup.removeJob(jobId) > }{code} > I'd be happy to submit this in a PR if that is acceptable to open up the > discussion, but I am not sure the consequences of not closing the previous > JMMG or perhaps simply not re-registering job-level metrics during recovery. > Doing this would seem to entail informing lower levels about the recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393051#comment-16393051 ] ASF GitHub Bot commented on FLINK-8357: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5371 Two more questions about this: 1. There is code in the shell scripts that rotates log file each time you start / stop the cluster, with .0 /.1/.2/etc suffixes to the log files. Have you tested whether that still works with the changed setup? 2. I think rolling by time is something that users expect more commonly that rolling by size. What do you think here? > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5371: [FLINK-8357] [conf] Enable rolling in default log setting...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5371 Two more questions about this: 1. There is code in the shell scripts that rotates log file each time you start / stop the cluster, with .0 /.1/.2/etc suffixes to the log files. Have you tested whether that still works with the changed setup? 2. I think rolling by time is something that users expect more commonly that rolling by size. What do you think here? ---
[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available
[ https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393028#comment-16393028 ] ASF GitHub Bot commented on FLINK-8755: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5581 Thanks for the patch. Looks good, well tested and reviewed (thanks, @pnowojski). Merging this... > SpilledSubpartitionView wrongly relys on the backlog for determining whether > more data is available > --- > > Key: FLINK-8755 > URL: https://issues.apache.org/jira/browse/FLINK-8755 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {code} > public BufferAndBacklog getNextBuffer() throws IOException, > InterruptedException { > //... > int newBacklog = parent.decreaseBuffersInBacklog(current); > return new BufferAndBacklog(current, newBacklog > 0, newBacklog, > nextBufferIsEvent); > {code} > relies on the backlog to signal further data availability. However, if there > are only events left in the buffer queue, their buffers are not included in > the backlog counting and therefore, {{isMoreAvailable}} will be wrongly > {{false}} here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in spilled...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5581 Thanks for the patch. Looks good, well tested and reviewed (thanks, @pnowojski). Merging this... ---
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16393002#comment-16393002 ] ASF GitHub Bot commented on FLINK-8599: --- Github user ChengzhiZhao commented on the issue: https://github.com/apache/flink/pull/5521 @StephanEwen @kl0u Thanks for you feedback, I will put an option for user to choose > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5521: [FLINK-8599] Improve the failure behavior of the FileInpu...
Github user ChengzhiZhao commented on the issue: https://github.com/apache/flink/pull/5521 @StephanEwen @kl0u Thanks for you feedback, I will put an option for user to choose ---
[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios
[ https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392999#comment-16392999 ] ASF GitHub Bot commented on FLINK-6206: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5399 I am unsure about this change, so let's discuss the pros and cons a bit. So far, this purposefully logs on *INFO* so far, because from the JobManager's perspective, a task failing and recovering if not an erroneous situation. It conveys the assumption that failures and recoveries are perfectly expected as part of the Job life cycle. The assumption Something that is logged on "ERROR" is something where a user may want to dig into and see whether they should do something about it. Flink does not follow this perspective perfectly in all parts, but that was the reasoning behind the fact to use *info* for these state changes. Happy to hear other thoughts on this. > Log task state transitions as warn/error for FAILURE scenarios > -- > > Key: FLINK-6206 > URL: https://issues.apache.org/jira/browse/FLINK-6206 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Dan Bress >Priority: Critical > > If a task fails due to an exception, I would like that to be logged at a warn > or an error level. currently its info > {code} > private boolean transitionState(ExecutionState currentState, ExecutionState > newState, Throwable cause) { > if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { > if (cause == null) { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState); > } else { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState, cause); > } > return true; > } else { > return false; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5399 I am unsure about this change, so let's discuss the pros and cons a bit. So far, this purposefully logs on *INFO* so far, because from the JobManager's perspective, a task failing and recovering if not an erroneous situation. It conveys the assumption that failures and recoveries are perfectly expected as part of the Job life cycle. The assumption Something that is logged on "ERROR" is something where a user may want to dig into and see whether they should do something about it. Flink does not follow this perspective perfectly in all parts, but that was the reasoning behind the fact to use *info* for these state changes. Happy to hear other thoughts on this. ---
[jira] [Commented] (FLINK-8515) update RocksDBMapState to replace deprecated remove() with delete()
[ https://issues.apache.org/jira/browse/FLINK-8515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392978#comment-16392978 ] ASF GitHub Bot commented on FLINK-8515: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5365 This change has already gotten in via another commit. @bowenli86 can you close this PR? Thank you for the contribution! > update RocksDBMapState to replace deprecated remove() with delete() > --- > > Key: FLINK-8515 > URL: https://issues.apache.org/jira/browse/FLINK-8515 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.5.0 > > > Currently in RocksDBMapState: > {code:java} > @Override > public void remove(UK userKey) throws IOException, RocksDBException { > byte[] rawKeyBytes = > serializeUserKeyWithCurrentKeyAndNamespace(userKey); > backend.db.remove(columnFamily, writeOptions, rawKeyBytes); > } > {code} > remove() is actually deprecated. Should be replaced with delete() -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5365: [FLINK-8515] update RocksDBMapState to replace deprecated...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5365 This change has already gotten in via another commit. @bowenli86 can you close this PR? Thank you for the contribution! ---
[jira] [Closed] (FLINK-8898) Materialize time indicators in conditions of LogicalFilter
[ https://issues.apache.org/jira/browse/FLINK-8898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng closed FLINK-8898. -- Resolution: Duplicate > Materialize time indicators in conditions of LogicalFilter > -- > > Key: FLINK-8898 > URL: https://issues.apache.org/jira/browse/FLINK-8898 > Project: Flink > Issue Type: Bug >Reporter: Hequn Cheng >Priority: Major > > Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators > in conditions of LogicalFilter which leads to type miss exceptions. We can > reproduce the exception by the following test case. > {code:java} > @Test > def reproduceTypeMissmatch(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > env.setStateBackend(getStateBackend) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > StreamITCase.clear > val data1 = new mutable.MutableList[(Int, Long, Int, Long)] > data1.+=((1, 1L, 1, 1L)) > data1.+=((1, 2L, 1, 1L)) > val t1 = env.fromCollection(data1) > .assignTimestampsAndWatermarks(new Row5WatermarkExtractor) > .toTable(tEnv, 'id, 'ip, 'type, 'eventTs.rowtime) > tEnv.registerTable("myTable", t1) > val sql1 = "select distinct id, eventTs as eventTs, count(*) over (partition > by id order by eventTs rows" + > " between 100 preceding and current row) as cnt1 from myTable" > val sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over > (partition by id " + > "order by eventTs rows between 50 preceding and current row) as cnt2 from > myTable" > val left = tEnv.sqlQuery(sql1) > val right = tEnv.sqlQuery(sql2) > left.join(right).where("id = r_id && eventTs === r_eventTs").select('id) > .writeToSink(new TestRetractSink, queryConfig) > env.execute() > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8898) Materialize time indicators in conditions of LogicalFilter
[ https://issues.apache.org/jira/browse/FLINK-8898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392975#comment-16392975 ] Hequn Cheng commented on FLINK-8898: duplicated with [FLINK-8897|https://issues.apache.org/jira/browse/FLINK-8897] > Materialize time indicators in conditions of LogicalFilter > -- > > Key: FLINK-8898 > URL: https://issues.apache.org/jira/browse/FLINK-8898 > Project: Flink > Issue Type: Bug >Reporter: Hequn Cheng >Priority: Major > > Currently, {{RelTimeIndicatorConverter}} do not materialize time indicators > in conditions of LogicalFilter which leads to type miss exceptions. We can > reproduce the exception by the following test case. > {code:java} > @Test > def reproduceTypeMissmatch(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > env.setStateBackend(getStateBackend) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > StreamITCase.clear > val data1 = new mutable.MutableList[(Int, Long, Int, Long)] > data1.+=((1, 1L, 1, 1L)) > data1.+=((1, 2L, 1, 1L)) > val t1 = env.fromCollection(data1) > .assignTimestampsAndWatermarks(new Row5WatermarkExtractor) > .toTable(tEnv, 'id, 'ip, 'type, 'eventTs.rowtime) > tEnv.registerTable("myTable", t1) > val sql1 = "select distinct id, eventTs as eventTs, count(*) over (partition > by id order by eventTs rows" + > " between 100 preceding and current row) as cnt1 from myTable" > val sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over > (partition by id " + > "order by eventTs rows between 50 preceding and current row) as cnt2 from > myTable" > val left = tEnv.sqlQuery(sql1) > val right = tEnv.sqlQuery(sql2) > left.join(right).where("id = r_id && eventTs === r_eventTs").select('id) > .writeToSink(new TestRetractSink, queryConfig) > env.execute() > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] Optionally store elements of...
Github user je-ik commented on the issue: https://github.com/apache/flink/pull/5185 @StephanEwen I think it should be configurable. As Aljoscha pointed out, it is needed to ensure that these two representations have the same serialized form in checkpoints, because that way users can switch back and forth the implementations between application restarts. Unfortunately, I didn't have time to dive into that so far. :-( ---
[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]
[ https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392976#comment-16392976 ] ASF GitHub Bot commented on FLINK-8297: --- Github user je-ik commented on the issue: https://github.com/apache/flink/pull/5185 @StephanEwen I think it should be configurable. As Aljoscha pointed out, it is needed to ensure that these two representations have the same serialized form in checkpoints, because that way users can switch back and forth the implementations between application restarts. Unfortunately, I didn't have time to dive into that so far. :-( > RocksDBListState stores whole list in single byte[] > --- > > Key: FLINK-8297 > URL: https://issues.apache.org/jira/browse/FLINK-8297 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.4.0, 1.3.2 >Reporter: Jan Lukavský >Priority: Major > > RocksDBListState currently keeps whole list of data in single RocksDB > key-value pair, which implies that the list actually must fit into memory. > Larger lists are not supported and end up with OOME or other error. The > RocksDBListState could be modified so that individual items in list are > stored in separate keys in RocksDB and can then be iterated over. A simple > implementation could reuse existing RocksDBMapState, with key as index to the > list and a single RocksDBValueState keeping track of how many items has > already been added to the list. Because this implementation might be less > efficient in come cases, it would be good to make it opt-in by a construct > like > {{new RocksDBStateBackend().enableLargeListsPerKey()}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6214) WindowAssigners do not allow negative offsets
[ https://issues.apache.org/jira/browse/FLINK-6214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392966#comment-16392966 ] ASF GitHub Bot commented on FLINK-6214: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5376 Ah, one problem: What happens if the offset is larger (by absolute value) than the window length? Then the offset would still be negative with this change. > WindowAssigners do not allow negative offsets > - > > Key: FLINK-6214 > URL: https://issues.apache.org/jira/browse/FLINK-6214 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Timo Walther >Priority: Major > > Both the website and the JavaDoc promotes > ".window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) For > example, in China you would have to specify an offset of Time.hours(-8)". But > both the sliding and tumbling event time assigners do not allow offset to be > negative. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5376: [FLINK-6214] WindowAssigners do not allow negative offset...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5376 Ah, one problem: What happens if the offset is larger (by absolute value) than the window length? Then the offset would still be negative with this change. ---
[jira] [Commented] (FLINK-6214) WindowAssigners do not allow negative offsets
[ https://issues.apache.org/jira/browse/FLINK-6214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392962#comment-16392962 ] ASF GitHub Bot commented on FLINK-6214: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5376 If we decide that we want to support negative offsets, this looks like a good implementation. @aljoscha what is your take, should we support negative offsets? > WindowAssigners do not allow negative offsets > - > > Key: FLINK-6214 > URL: https://issues.apache.org/jira/browse/FLINK-6214 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Timo Walther >Priority: Major > > Both the website and the JavaDoc promotes > ".window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) For > example, in China you would have to specify an offset of Time.hours(-8)". But > both the sliding and tumbling event time assigners do not allow offset to be > negative. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5376: [FLINK-6214] WindowAssigners do not allow negative offset...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5376 If we decide that we want to support negative offsets, this looks like a good implementation. @aljoscha what is your take, should we support negative offsets? ---
[jira] [Commented] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value
[ https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392961#comment-16392961 ] ASF GitHub Bot commented on FLINK-8364: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5356 I am wondering whether this discussion is a bit confused. All state facing the user in the APIs already has the behavior that there is no `null`, but only empty iterators. That's because all state is wrapped into a `UserFacingListState` in the `DefaultKeyedStateStore`. So, is this a non-issue, actually? Something that may only affect test implementations of `ListState` > Add iterator() to ListState which returns empty iterator when it has no value > - > > Key: FLINK-8364 > URL: https://issues.apache.org/jira/browse/FLINK-8364 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Add iterator() to ListState which returns empty iterator when it has no value -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]
[ https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392957#comment-16392957 ] ASF GitHub Bot commented on FLINK-8297: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5185 Is the general thought to always have list state as a map, or only fallback to that way if the lists cross a certain threshold? Since list state backs many common operations (non aggregating windows) we have to be super careful with the performance implications of this. > RocksDBListState stores whole list in single byte[] > --- > > Key: FLINK-8297 > URL: https://issues.apache.org/jira/browse/FLINK-8297 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.4.0, 1.3.2 >Reporter: Jan Lukavský >Priority: Major > > RocksDBListState currently keeps whole list of data in single RocksDB > key-value pair, which implies that the list actually must fit into memory. > Larger lists are not supported and end up with OOME or other error. The > RocksDBListState could be modified so that individual items in list are > stored in separate keys in RocksDB and can then be iterated over. A simple > implementation could reuse existing RocksDBMapState, with key as index to the > list and a single RocksDBValueState keeping track of how many items has > already been added to the list. Because this implementation might be less > efficient in come cases, it would be good to make it opt-in by a construct > like > {{new RocksDBStateBackend().enableLargeListsPerKey()}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5356: [FLINK-8364][state backend] Add iterator() to ListState w...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5356 I am wondering whether this discussion is a bit confused. All state facing the user in the APIs already has the behavior that there is no `null`, but only empty iterators. That's because all state is wrapped into a `UserFacingListState` in the `DefaultKeyedStateStore`. So, is this a non-issue, actually? Something that may only affect test implementations of `ListState` ---
[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] Optionally store elements of...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5185 Is the general thought to always have list state as a map, or only fallback to that way if the lists cross a certain threshold? Since list state backs many common operations (non aggregating windows) we have to be super careful with the performance implications of this. ---
[jira] [Commented] (FLINK-8822) RotateLogFile may not work well when sed version is below 4.2
[ https://issues.apache.org/jira/browse/FLINK-8822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392953#comment-16392953 ] ASF GitHub Bot commented on FLINK-8822: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5609 ehh...probably. The PR has a point as `sed --help` (v4.2.2) does not list the `-E` option. If we want to be super safe we could check that `sed -r` is supported and use `-E` as a backup. > RotateLogFile may not work well when sed version is below 4.2 > - > > Key: FLINK-8822 > URL: https://issues.apache.org/jira/browse/FLINK-8822 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Xin Liu >Priority: Major > Fix For: 1.5.0 > > > In bin/config.sh rotateLogFilesWithPrefix(),it use extended regular to > process filename with "sed -E",but when sed version is below 4.2,it turns out > "sed: invalid option -- 'E'" > and RotateLogFile won't work well : There will be only one logfile no matter > what is $MAX_LOG_FILE_NUMBER. > so use sed -r may be more suitable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5609: [FLINK-8822] RotateLogFile may not work well when sed ver...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5609 ehh...probably. The PR has a point as `sed --help` (v4.2.2) does not list the `-E` option. If we want to be super safe we could check that `sed -r` is supported and use `-E` as a backup. ---
[jira] [Updated] (FLINK-8897) Rowtime materialization causes "mismatched type" AssertionError
[ https://issues.apache.org/jira/browse/FLINK-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8897: Fix Version/s: 1.5.0 > Rowtime materialization causes "mismatched type" AssertionError > --- > > Key: FLINK-8897 > URL: https://issues.apache.org/jira/browse/FLINK-8897 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Xingcan Cui >Priority: Blocker > Fix For: 1.5.0 > > > As raised in [this > thread|https://lists.apache.org/thread.html/e2ea38aa7ae224d7481145334955d84243690e9aad10d58310bdb8e7@%3Cuser.flink.apache.org%3E], > the query created by the following code will throw a calcite "mismatch type" > ({{Timestamp(3)}} and {{TimeIndicator}}) exception. > {code:java} > String sql1 = "select id, eventTs as t1, count(*) over (partition by id order > by eventTs rows between 100 preceding and current row) as cnt1 from myTable1"; > String sql2 = "select distinct id as r_id, eventTs as t2, count(*) over > (partition by id order by eventTs rows between 50 preceding and current row) > as cnt2 from myTable2"; > Table left = tableEnv.sqlQuery(sql1); > Table right = tableEnv.sqlQuery(sql2); > left.join(right).where("id === r_id && t1 === t2").select("id, > t1").writeToSink(...) > {code} > The logical plan is as follows. > {code} > LogicalProject(id=[$0], t1=[$1]) > LogicalFilter(condition=[AND(=($0, $3), =($1, $4))]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalAggregate(group=[{0, 1, 2}]) > LogicalWindow(window#0=[window(partition {0} order by [1] rows > between $2 PRECEDING and CURRENT ROW aggs [COUNT()])]) > LogicalProject(id=[$0], eventTs=[$3]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > LogicalAggregate(group=[{0, 1, 2}]) > LogicalWindow(window#0=[window(partition {0} order by [1] rows > between $2 PRECEDING and CURRENT ROW aggs [COUNT()])]) > LogicalProject(id=[$0], eventTs=[$3]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > {code} > That is because the the rowtime field after an aggregation will be > materialized while the {{RexInputRef}} type for the filter's operands ({{t1 > === t2}}) is still {{TimeIndicator}}. We should make them unified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8897) Rowtime materialization causes "mismatched type" AssertionError
[ https://issues.apache.org/jira/browse/FLINK-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8897: Priority: Blocker (was: Major) > Rowtime materialization causes "mismatched type" AssertionError > --- > > Key: FLINK-8897 > URL: https://issues.apache.org/jira/browse/FLINK-8897 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Xingcan Cui >Priority: Blocker > Fix For: 1.5.0 > > > As raised in [this > thread|https://lists.apache.org/thread.html/e2ea38aa7ae224d7481145334955d84243690e9aad10d58310bdb8e7@%3Cuser.flink.apache.org%3E], > the query created by the following code will throw a calcite "mismatch type" > ({{Timestamp(3)}} and {{TimeIndicator}}) exception. > {code:java} > String sql1 = "select id, eventTs as t1, count(*) over (partition by id order > by eventTs rows between 100 preceding and current row) as cnt1 from myTable1"; > String sql2 = "select distinct id as r_id, eventTs as t2, count(*) over > (partition by id order by eventTs rows between 50 preceding and current row) > as cnt2 from myTable2"; > Table left = tableEnv.sqlQuery(sql1); > Table right = tableEnv.sqlQuery(sql2); > left.join(right).where("id === r_id && t1 === t2").select("id, > t1").writeToSink(...) > {code} > The logical plan is as follows. > {code} > LogicalProject(id=[$0], t1=[$1]) > LogicalFilter(condition=[AND(=($0, $3), =($1, $4))]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalAggregate(group=[{0, 1, 2}]) > LogicalWindow(window#0=[window(partition {0} order by [1] rows > between $2 PRECEDING and CURRENT ROW aggs [COUNT()])]) > LogicalProject(id=[$0], eventTs=[$3]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > LogicalAggregate(group=[{0, 1, 2}]) > LogicalWindow(window#0=[window(partition {0} order by [1] rows > between $2 PRECEDING and CURRENT ROW aggs [COUNT()])]) > LogicalProject(id=[$0], eventTs=[$3]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > {code} > That is because the the rowtime field after an aggregation will be > materialized while the {{RexInputRef}} type for the filter's operands ({{t1 > === t2}}) is still {{TimeIndicator}}. We should make them unified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
[ https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392937#comment-16392937 ] ASF GitHub Bot commented on FLINK-8854: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5662 Thanks for the comments @xccui. It's never to late for feedback. Sorry, maybe I merged this too quickly. We still need to call `builder.forJsonSchema()` if the schema contains a `proctime` attribute. The most common use case will be to extend the format by time attributes. With your approach the format would contain an additional timestamp that is definitely not part of the format schema. > Mapping of SchemaValidator.deriveFieldMapping() is incorrect. > - > > Key: FLINK-8854 > URL: https://issues.apache.org/jira/browse/FLINK-8854 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not > correct. > It should not only include all fields of the table schema, but also all > fields of the format schema (mapped to themselves). Otherwise, it is not > possible to use a timestamp extractor on a field that is not in table schema. > For example this configuration would fail: > {code} > sources: > - name: TaxiRides > schema: > - name: rideId > type: LONG > - name: rowTime > type: TIMESTAMP > rowtime: > timestamps: > type: "from-field" > from: "rideTime" > watermarks: > type: "periodic-bounded" > delay: "6" > connector: > > format: > property-version: 1 > type: json > schema: "ROW(rideId LONG, rideTime TIMESTAMP)" > {code} > because {{rideTime}} is not in the table schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5662: [FLINK-8854] [table] Fix schema mapping with time attribu...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5662 Thanks for the comments @xccui. It's never to late for feedback. Sorry, maybe I merged this too quickly. We still need to call `builder.forJsonSchema()` if the schema contains a `proctime` attribute. The most common use case will be to extend the format by time attributes. With your approach the format would contain an additional timestamp that is definitely not part of the format schema. ---
[jira] [Updated] (FLINK-8912) Web UI does not render error messages correctly in FLIP-6 mode
[ https://issues.apache.org/jira/browse/FLINK-8912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8912: Labels: flip6 (was: ) > Web UI does not render error messages correctly in FLIP-6 mode > -- > > Key: FLINK-8912 > URL: https://issues.apache.org/jira/browse/FLINK-8912 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.5.0, 1.6.0 > Environment: commit: c531486288caf5241cdf7f0f00f087f3ce82239f >Reporter: Gary Yao >Priority: Blocker > Labels: flip6 > Fix For: 1.5.0 > > > *Description* > The Web UI renders error messages returned by the REST API incorrectly, e.g., > on the job submission page. The JSON returned by the REST API is rendered as > a whole. However, the UI should only render the contents of the {{errors}} > field. > *Steps to reproduce* > Submit {{examples/streaming/SocketWindowWordCount.jar}} without specifying > program arguments. Error message will be rendered as follows: > {noformat} > {"errors":["org.apache.flink.client.program.ProgramInvocationException: The > program plan could not be fetched - the program aborted > pre-maturely.\n\nSystem.err: (none)\n\nSystem.out: No port specified. Please > run 'SocketWindowWordCount --hostname --port ', where > hostname (localhost by default) and port is the address of the text > server\nTo start a simple text server, run 'netcat -l ' and type the > input text into the command line\n"]} > {noformat} > Note that flip6 mode must be enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8909) pyflink.sh not working with yarn
[ https://issues.apache.org/jira/browse/FLINK-8909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8909: Affects Version/s: 1.4.2 > pyflink.sh not working with yarn > > > Key: FLINK-8909 > URL: https://issues.apache.org/jira/browse/FLINK-8909 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.2 >Reporter: Hitesh Tiwari >Priority: Blocker > > Hi, > i want to run the python application from pyflink.sh with yarn-cluster mode. > Added "-m yarn-cluster -yn 1 " in pyflink.sh. so my updated pyflink.sh is > executing below coomand: > "$FLINK_BIN_DIR"/flink run -m yarn-cluster -yn 1 -v > "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@" > Running pyflink.sh: > ./bin/pyflink.sh /opt/pnda/hitesh/flink-1.4.2/examples/python/WordCount.py > While running getting below Error: > java.lang.Exception: The user defined 'open()' method caused an exception: An > error occurred while copying the file. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: An error occurred while copying the > file. > at > org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100) > at > org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > ... 3 more > Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist > or the user running Flink ('yarn') has insufficient permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) > at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > 03/09/2018 11:20:23 Job execution switched to status FAILING. > java.lang.Exception: The user defined 'open()' method caused an exception: > An error occurred while copying the file. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: An error occurred while copying the > file. > at > org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100) > at > org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > ... 3 more > Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist > or the user running Flink ('yarn') has insufficient permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) > at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at
[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
[ https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392933#comment-16392933 ] ASF GitHub Bot commented on FLINK-8854: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5662#discussion_r173465435 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -198,14 +205,20 @@ object SchemaValidator { val isProctime = properties .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") .orElse(false) - val isRowtime = properties -.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE" + val isRowtime = properties.containsKey(tsType) if (!isProctime && !isRowtime) { // check for a aliasing val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM") .orElse(n) builder.field(fieldName, t) } + // only use the rowtime attribute if it references a field + else if (isRowtime && + properties.getString(tsType) == ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) { --- End diff -- You are right, we should declare `ExistingField` `final`. In the custom extractor case, a user has to supply the format manually. Maybe we will need an explanation logic in the future such that a user can see how the derived format looks like and if it makes sense to declare it explicitly. > Mapping of SchemaValidator.deriveFieldMapping() is incorrect. > - > > Key: FLINK-8854 > URL: https://issues.apache.org/jira/browse/FLINK-8854 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not > correct. > It should not only include all fields of the table schema, but also all > fields of the format schema (mapped to themselves). Otherwise, it is not > possible to use a timestamp extractor on a field that is not in table schema. > For example this configuration would fail: > {code} > sources: > - name: TaxiRides > schema: > - name: rideId > type: LONG > - name: rowTime > type: TIMESTAMP > rowtime: > timestamps: > type: "from-field" > from: "rideTime" > watermarks: > type: "periodic-bounded" > delay: "6" > connector: > > format: > property-version: 1 > type: json > schema: "ROW(rideId LONG, rideTime TIMESTAMP)" > {code} > because {{rideTime}} is not in the table schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5662#discussion_r173465435 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -198,14 +205,20 @@ object SchemaValidator { val isProctime = properties .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") .orElse(false) - val isRowtime = properties -.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE" + val isRowtime = properties.containsKey(tsType) if (!isProctime && !isRowtime) { // check for a aliasing val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM") .orElse(n) builder.field(fieldName, t) } + // only use the rowtime attribute if it references a field + else if (isRowtime && + properties.getString(tsType) == ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) { --- End diff -- You are right, we should declare `ExistingField` `final`. In the custom extractor case, a user has to supply the format manually. Maybe we will need an explanation logic in the future such that a user can see how the derived format looks like and if it makes sense to declare it explicitly. ---
[jira] [Created] (FLINK-8912) Web UI does not render error messages correctly in FLIP-6 mode
Gary Yao created FLINK-8912: --- Summary: Web UI does not render error messages correctly in FLIP-6 mode Key: FLINK-8912 URL: https://issues.apache.org/jira/browse/FLINK-8912 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.5.0, 1.6.0 Environment: commit: c531486288caf5241cdf7f0f00f087f3ce82239f Reporter: Gary Yao Fix For: 1.5.0 *Description* The Web UI renders error messages returned by the REST API incorrectly, e.g., on the job submission page. The JSON returned by the REST API is rendered as a whole. However, the UI should only render the contents of the {{errors}} field. *Steps to reproduce* Submit {{examples/streaming/SocketWindowWordCount.jar}} without specifying program arguments. Error message will be rendered as follows: {noformat} {"errors":["org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely.\n\nSystem.err: (none)\n\nSystem.out: No port specified. Please run 'SocketWindowWordCount --hostname --port ', where hostname (localhost by default) and port is the address of the text server\nTo start a simple text server, run 'netcat -l ' and type the input text into the command line\n"]} {noformat} Note that flip6 mode must be enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8822) RotateLogFile may not work well when sed version is below 4.2
[ https://issues.apache.org/jira/browse/FLINK-8822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392919#comment-16392919 ] ASF GitHub Bot commented on FLINK-8822: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5609 @zentol What do you think? Is this safe to merge? > RotateLogFile may not work well when sed version is below 4.2 > - > > Key: FLINK-8822 > URL: https://issues.apache.org/jira/browse/FLINK-8822 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 >Reporter: Xin Liu >Priority: Major > Fix For: 1.5.0 > > > In bin/config.sh rotateLogFilesWithPrefix(),it use extended regular to > process filename with "sed -E",but when sed version is below 4.2,it turns out > "sed: invalid option -- 'E'" > and RotateLogFile won't work well : There will be only one logfile no matter > what is $MAX_LOG_FILE_NUMBER. > so use sed -r may be more suitable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5609: [FLINK-8822] RotateLogFile may not work well when sed ver...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5609 @zentol What do you think? Is this safe to merge? ---
[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392915#comment-16392915 ] ASF GitHub Bot commented on FLINK-8655: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5538 I see the need for the feature, but I am skeptical concerning the implementation. This PR reflectively modifies the contents of a String. This is prone to cause problems, for multiple reasons. Strings cache hash codes, and strings themselves are interned and shared. Is there no other way to pass a default namespace to the mapping? > Add a default keyspace to CassandraSink > --- > > Key: FLINK-8655 > URL: https://issues.apache.org/jira/browse/FLINK-8655 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.0 >Reporter: Christopher Hughes >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > Currently, to use the CassandraPojoSink, it is necessary for a user to > provide keyspace information on the desired POJOs using datastax annotations. > This allows various POJOs to be written to multiple keyspaces while sinking > messages, but prevent runtime flexibility. > For many developers, non-production environments may all share a single > Cassandra instance differentiated by keyspace names. I propose adding a > `defaultKeyspace(String keyspace)` to the ClusterBuilder. POJOs lacking a > definitive keyspace would attempt to be loaded to the provided default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5538: [FLINK-8655] [DataSink] Added default keyspace to Cassand...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5538 I see the need for the feature, but I am skeptical concerning the implementation. This PR reflectively modifies the contents of a String. This is prone to cause problems, for multiple reasons. Strings cache hash codes, and strings themselves are interned and shared. Is there no other way to pass a default namespace to the mapping? ---
[jira] [Commented] (FLINK-8854) Mapping of SchemaValidator.deriveFieldMapping() is incorrect.
[ https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392913#comment-16392913 ] ASF GitHub Bot commented on FLINK-8854: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5662#discussion_r173455747 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -198,14 +205,20 @@ object SchemaValidator { val isProctime = properties .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") .orElse(false) - val isRowtime = properties -.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE" + val isRowtime = properties.containsKey(tsType) if (!isProctime && !isRowtime) { // check for a aliasing val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM") .orElse(n) builder.field(fieldName, t) } + // only use the rowtime attribute if it references a field + else if (isRowtime && + properties.getString(tsType) == ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) { --- End diff -- What if the user uses the custom extractor to define his/her own `ExistingField` extractor that references a field? > Mapping of SchemaValidator.deriveFieldMapping() is incorrect. > - > > Key: FLINK-8854 > URL: https://issues.apache.org/jira/browse/FLINK-8854 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not > correct. > It should not only include all fields of the table schema, but also all > fields of the format schema (mapped to themselves). Otherwise, it is not > possible to use a timestamp extractor on a field that is not in table schema. > For example this configuration would fail: > {code} > sources: > - name: TaxiRides > schema: > - name: rideId > type: LONG > - name: rowTime > type: TIMESTAMP > rowtime: > timestamps: > type: "from-field" > from: "rideTime" > watermarks: > type: "periodic-bounded" > delay: "6" > connector: > > format: > property-version: 1 > type: json > schema: "ROW(rideId LONG, rideTime TIMESTAMP)" > {code} > because {{rideTime}} is not in the table schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8714) Suggest new users to use env.readTextFile method with 2 arguments (using the charset), not to rely on system charset (which varies across environments)
[ https://issues.apache.org/jira/browse/FLINK-8714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392912#comment-16392912 ] ASF GitHub Bot commented on FLINK-8714: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5536 Sorry, I think the JavaDoc comment that triggered this change was actually incorrect in the first place. By default, the read methods always use "UTF-8" rather than the system default charset, so it is actually not non-deterministic. I would personally vote fix the javadoc and other docs that incorrectly claim this is using the system-dependent charset, and leave the other docs as they are (not explicitly pass the same charset name that is anyways passed, makes it simpler). > Suggest new users to use env.readTextFile method with 2 arguments (using the > charset), not to rely on system charset (which varies across environments) > --- > > Key: FLINK-8714 > URL: https://issues.apache.org/jira/browse/FLINK-8714 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Michal Klempa >Priority: Trivial > Labels: easyfix, newbie, patch-available > > When a newcomer (like me), goes through the docs, there are several places > where examples encourage to read the input data using the > {{env.readTextFile()}} method. > > This method variant does not take a second argument - character set (see > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-).] > This versoin relies (according to Javadoc) on " The file will be read with > the system's default character set. " > > This behavior is also default in Java, like in the > {{java.util.String.getBytes()}} method, where not supplying the charset mean > - use the system locale or the one which JVM was started with (see > [https://stackoverflow.com/questions/64038/setting-java-locale-settings).] > There are two ways to set locale prior to JVM start (-D arguments or set > LC_ALL variable). > > Given this is something a new Flink user may not know about, nor he wants to > spend hours trying to find the environment-related bug (it works on > localhost, but in production the locale is different), I would kindly suggest > a change in documentation: lets migrate examples to use the two-argument > version of {{readTextFile(filePath, charsetName)}}. > > I am open to criticism and suggestions. The listing of {{readTextFile}} I was > able to grep in docs is: > {code:java} > ./dev/datastream_api.md:- `readTextFile(path)` - Reads text files, i.e. files > that respect the `TextInputFormat` specification, line-by-line and returns > them as Strings. > ./dev/datastream_api.md:- `readTextFile(path)` - Reads text files, i.e. files > that respect the `TextInputFormat` specification, line-by-line and returns > them as Strings. > ./dev/libs/storm_compatibility.md:DataStream text = > env.readTextFile(localFilePath); > ./dev/cluster_execution.md: DataSet data = > env.readTextFile("hdfs://path/to/file"); > ./dev/batch/index.md:- `readTextFile(path)` / `TextInputFormat` - Reads files > line wise and returns them as Strings. > ./dev/batch/index.md:- `readTextFileWithValue(path)` / `TextValueInputFormat` > - Reads files line wise and returns them as > ./dev/batch/index.md:DataSet localLines = > env.readTextFile("file:///path/to/my/textfile"); > ./dev/batch/index.md:DataSet hdfsLines = > env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile"); > ./dev/batch/index.md:DataSet logs = > env.readTextFile("file:///path/with.nested/files") > ./dev/batch/index.md:- `readTextFile(path)` / `TextInputFormat` - Reads files > line wise and returns them as Strings. > ./dev/batch/index.md:- `readTextFileWithValue(path)` / `TextValueInputFormat` > - Reads files line wise and returns them as > ./dev/batch/index.md:val localLines = > env.readTextFile("file:///path/to/my/textfile") > ./dev/batch/index.md:val hdfsLines = > env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile") > ./dev/batch/index.md:env.readTextFile("file:///path/with.nested/files").withParameters(parameters) > ./dev/batch/index.md:DataSet lines = env.readTextFile(pathToTextFile); > ./dev/batch/index.md:val lines = env.readTextFile(pathToTextFile) > ./dev/batch/examples.md:DataSet text = > env.readTextFile("/path/to/file"); > ./dev/batch/examples.md:val text = env.readTextFile("/path/to/file") > ./dev/api_concepts.md:DataStream text = > env.readTextFile("file:///path/to/file"); > ./dev/api_concepts.md:val text:
[GitHub] flink issue #5536: [FLINK-8714][Documentation] Added either charsetName) or ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5536 Sorry, I think the JavaDoc comment that triggered this change was actually incorrect in the first place. By default, the read methods always use "UTF-8" rather than the system default charset, so it is actually not non-deterministic. I would personally vote fix the javadoc and other docs that incorrectly claim this is using the system-dependent charset, and leave the other docs as they are (not explicitly pass the same charset name that is anyways passed, makes it simpler). ---