[jira] [Created] (FLINK-9476) Lost sideOutPut Late Elements in CEP Operator
aitozi created FLINK-9476: - Summary: Lost sideOutPut Late Elements in CEP Operator Key: FLINK-9476 URL: https://issues.apache.org/jira/browse/FLINK-9476 Project: Flink Issue Type: Improvement Components: CEP Affects Versions: 1.4.2 Reporter: aitozi Assignee: aitozi -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9443) Remove unused parameter in StreamGraphHasherV2
[ https://issues.apache.org/jira/browse/FLINK-9443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494721#comment-16494721 ] ASF GitHub Bot commented on FLINK-9443: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6080 @aljoscha please help review this ,thanks > Remove unused parameter in StreamGraphHasherV2 > --- > > Key: FLINK-9443 > URL: https://issues.apache.org/jira/browse/FLINK-9443 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.2.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > > After Flink1.2 it used StreamGraphHasherV2 to generate hashes, The method > generateNodeLocalHash dont use the information like (parallel, userFunction) > now, so the parameter should be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6080: [FLINK-9443]Remove unused parameter in generateNodeLocalH...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6080 @aljoscha please help review this ,thanks ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494712#comment-16494712 ] ASF GitHub Bot commented on FLINK-9187: --- Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191647381 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- ![image](https://user-images.githubusercontent.com/20113411/4070-3b47089a-640f-11e8-8cd3-bc5684c07228.png) > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191647381 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- ![image](https://user-images.githubusercontent.com/20113411/4070-3b47089a-640f-11e8-8cd3-bc5684c07228.png) ---
[jira] [Created] (FLINK-9475) introduce an approximate version of "select distinct"
Sihua Zhou created FLINK-9475: - Summary: introduce an approximate version of "select distinct" Key: FLINK-9475 URL: https://issues.apache.org/jira/browse/FLINK-9475 Project: Flink Issue Type: New Feature Components: Table API SQL Affects Versions: 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou Base on the "Elastic Bloom Filter", it easy to implement an approximate version of "select distinct" that have an excellent performance. Its accuracy should be configurable, e.g. 95%, 98%. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9474) Introduce an approximate version of "count distinct"
Sihua Zhou created FLINK-9474: - Summary: Introduce an approximate version of "count distinct" Key: FLINK-9474 URL: https://issues.apache.org/jira/browse/FLINK-9474 Project: Flink Issue Type: New Feature Components: Table API SQL Affects Versions: 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou We can implement an approximate version of "count distinct" base on the "Elastic Bloom Filter", It could be very fast because we don't need to query the state anymore, its accuracy should could be configurable. e.g 95%, 98%. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9473) Compilation fails after upgrade to Calcite 1.17
[ https://issues.apache.org/jira/browse/FLINK-9473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-9473: --- Description: {noformat} /apacheFlink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:40: error: class ExternalCatalogSchema needs to be abstract, since: [ERROR] it has 2 unimplemented members. [ERROR] /** As seen from class ExternalCatalogSchema, the missing signatures are as follows. [ERROR] * For convenience, these are usable as stub implementations. [ERROR] */ [ERROR] def getType(x$1: String): org.apache.calcite.rel.type.RelProtoDataType = ??? [ERROR] def getTypeNames(): java.util.Set[String] = ??? [ERROR] [ERROR] class ExternalCatalogSchema( [ERROR] ^ [WARNING] two warnings found [ERROR] one error found {noformat} while https://issues.apache.org/jira/browse/CALCITE-2045 into interface _org.apache.calcite.schema.Schema_ there were introduced two more methods: _org.apache.calcite.schema.Schema#getTypeNames_, _org.apache.calcite.schema.Schema#getType_ was: {noformat} /apacheFlink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:40: error: class ExternalCatalogSchema needs to be abstract, since: [ERROR] it has 2 unimplemented members. [ERROR] /** As seen from class ExternalCatalogSchema, the missing signatures are as follows. [ERROR] * For convenience, these are usable as stub implementations. [ERROR] */ [ERROR] def getType(x$1: String): org.apache.calcite.rel.type.RelProtoDataType = ??? [ERROR] def getTypeNames(): java.util.Set[String] = ??? [ERROR] [ERROR] class ExternalCatalogSchema( [ERROR] ^ [WARNING] two warnings found [ERROR] one error found {noformat} > Compilation fails after upgrade to Calcite 1.17 > --- > > Key: FLINK-9473 > URL: https://issues.apache.org/jira/browse/FLINK-9473 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > > {noformat} > /apacheFlink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:40: > error: class ExternalCatalogSchema needs to be abstract, since: > [ERROR] it has 2 unimplemented members. > [ERROR] /** As seen from class ExternalCatalogSchema, the missing signatures > are as follows. > [ERROR] * For convenience, these are usable as stub implementations. > [ERROR] */ > [ERROR] def getType(x$1: String): > org.apache.calcite.rel.type.RelProtoDataType = ??? > [ERROR] def getTypeNames(): java.util.Set[String] = ??? > [ERROR] > [ERROR] class ExternalCatalogSchema( > [ERROR] ^ > [WARNING] two warnings found > [ERROR] one error found > {noformat} > while https://issues.apache.org/jira/browse/CALCITE-2045 into interface > _org.apache.calcite.schema.Schema_ there were introduced two more methods: > _org.apache.calcite.schema.Schema#getTypeNames_, > _org.apache.calcite.schema.Schema#getType_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9473) Compilation fails after upgrade to Calcite 1.17
Sergey Nuyanzin created FLINK-9473: -- Summary: Compilation fails after upgrade to Calcite 1.17 Key: FLINK-9473 URL: https://issues.apache.org/jira/browse/FLINK-9473 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Sergey Nuyanzin Assignee: Sergey Nuyanzin {noformat} /apacheFlink/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:40: error: class ExternalCatalogSchema needs to be abstract, since: [ERROR] it has 2 unimplemented members. [ERROR] /** As seen from class ExternalCatalogSchema, the missing signatures are as follows. [ERROR] * For convenience, these are usable as stub implementations. [ERROR] */ [ERROR] def getType(x$1: String): org.apache.calcite.rel.type.RelProtoDataType = ??? [ERROR] def getTypeNames(): java.util.Set[String] = ??? [ERROR] [ERROR] class ExternalCatalogSchema( [ERROR] ^ [WARNING] two warnings found [ERROR] one error found {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494682#comment-16494682 ] ASF GitHub Bot commented on FLINK-9451: --- Github user medcv commented on the issue: https://github.com/apache/flink/pull/6089 @zentol updated the PR as suggested! Please review > End-to-end test: Scala Quickstarts > -- > > Key: FLINK-9451 > URL: https://issues.apache.org/jira/browse/FLINK-9451 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts >Affects Versions: 1.5.0, 1.4.1, 1.4.2 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Blocker > > We could add an end-to-end test which verifies Flink's quickstarts scala. It > should do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6089: [FLINK-9451]End-to-end test: Scala Quickstarts
Github user medcv commented on the issue: https://github.com/apache/flink/pull/6089 @zentol updated the PR as suggested! Please review ---
[jira] [Updated] (FLINK-9364) Add doc of the memory usage in flink
[ https://issues.apache.org/jira/browse/FLINK-9364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9364: -- Summary: Add doc of the memory usage in flink (was: Add doc for the memory usage in flink) > Add doc of the memory usage in flink > > > Key: FLINK-9364 > URL: https://issues.apache.org/jira/browse/FLINK-9364 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > We need to add a doc to describe the memory usage in flink, especially when > people use the RocksDBBackend, many people get confuse because of that (I've > saw serval question related to this on the user emails). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494674#comment-16494674 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen I've addressed your comments, could you please have a look again? > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9468) get outputLimit of LimitedConnectionsFileSystem incorrectly
[ https://issues.apache.org/jira/browse/FLINK-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494676#comment-16494676 ] ASF GitHub Bot commented on FLINK-9468: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6094 CC @StephanEwen > get outputLimit of LimitedConnectionsFileSystem incorrectly > --- > > Key: FLINK-9468 > URL: https://issues.apache.org/jira/browse/FLINK-9468 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > In {{LimitedConnectionsFileSystem#createStream}}, we get the outputLimit > incorrectly. > {code:java} > private T createStream( > final SupplierWithException streamOpener, > final HashSet openStreams, > final boolean output) throws IOException { > final int outputLimit = output && maxNumOpenInputStreams > 0 ? > maxNumOpenOutputStreams : Integer.MAX_VALUE; > /**/ > } > {code} > should be > {code:java} > private T createStream( > final SupplierWithException streamOpener, > final HashSet openStreams, > final boolean output) throws IOException { > final int outputLimit = output && maxNumOpenOutputStreams > 0 ? > maxNumOpenOutputStreams : Integer.MAX_VALUE; > /**/ > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen I've addressed your comments, could you please have a look again? ---
[GitHub] flink issue #6094: [FLINK-9468][filesystem] fix calculate outputLimit incorr...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6094 CC @StephanEwen ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494606#comment-16494606 ] ASF GitHub Bot commented on FLINK-9187: --- Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191628800 --- Diff: docs/monitoring/metrics.md --- @@ -699,6 +699,39 @@ Flink metric types are mapped to Prometheus metric types as follows: All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels. +### PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter) --- End diff -- thanks for review, I will improve the doc > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191628800 --- Diff: docs/monitoring/metrics.md --- @@ -699,6 +699,39 @@ Flink metric types are mapped to Prometheus metric types as follows: All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels. +### PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter) --- End diff -- thanks for review, I will improve the doc ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494605#comment-16494605 ] ASF GitHub Bot commented on FLINK-9187: --- Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191628488 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- :+1: right it's better to use JobManager ID / TaskManager ID to compose the jobName, then jobName is `JM ID` / `TM ID` or combined with the specified prefix like `prefix + JM ID` / `prefix + TM ID` but for now, JM IDs are currently not exposed, so use random strings instead of JM/TM ID > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191628488 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- :+1: right it's better to use JobManager ID / TaskManager ID to compose the jobName, then jobName is `JM ID` / `TM ID` or combined with the specified prefix like `prefix + JM ID` / `prefix + TM ID` but for now, JM IDs are currently not exposed, so use random strings instead of JM/TM ID ---
[jira] [Closed] (FLINK-9460) Redundant output in table & upsert semantics
[ https://issues.apache.org/jira/browse/FLINK-9460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhengcanbin closed FLINK-9460. -- Resolution: Not A Problem > Redundant output in table & upsert semantics > > > Key: FLINK-9460 > URL: https://issues.apache.org/jira/browse/FLINK-9460 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: zhengcanbin >Priority: Minor > Labels: patch > Fix For: 1.6.0 > > Attachments: image-2018-05-29-11-39-45-698.png, > image-2018-05-29-11-51-20-671.png > > > The output seems incorrect in my table & upsert example, here's the code: > {code:java} > object VerifyUpsert { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.createLocalEnvironment() > val tEnv = TableEnvironment.getTableEnvironment(env) > env.setParallelism(1) > val input = env.socketTextStream("localhost", 9099).map { x => > val tokens = x.split(",") > DemoSource(tokens(0), tokens(1), tokens(2)) > } > tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, > 'page_id) > val fieldNames = Array("record_time", "pv", "uv") > val fieldTypes = Array(Types.STRING, Types.LONG, > Types.LONG).asInstanceOf[Array[TypeInformation[_]]] > tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, > MyPrintSink(fieldNames, fieldTypes)) > tEnv.sqlUpdate( > """ > |INSERT INTO demoSink > |SELECT > | SUBSTRING(record_time, 1, 16) as record_time, > | count(user_id) as pv, > | count(DISTINCT user_id) as uv > |FROM demoSource > |GROUP BY SUBSTRING(record_time, 1, 16) > """.stripMargin) > env.execute() > } > case class DemoSource(record_time: String, user_id: String, page_id: String) > } > case class MyPrintSink(var fNames: Array[String], var fTypes: > Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] { > override def setKeyFields(keys: Array[String]): Unit = Seq.empty > override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {} > override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, > fNames) > override def emitDataStream(dataStream: > DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = dataStream.addSink(new > PrintSinkFunction()) > override def getFieldNames: Array[String] = fNames > override def getFieldTypes: Array[TypeInformation[_]] = fTypes > override def configure(fieldNames: Array[String], fieldTypes: > Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = { > val copy = MyPrintSink(fNames, fTypes) > copy.fNames = fieldNames > copy.fTypes = fieldTypes > copy > } > }{code} > when application starts, I type in netcat client one record a time, below > table shows outputs for every input record: > > ||input||output|| > |2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)| > |2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)| > |2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)| > |2018-05-24 21:34:12,0,4|{color:#ff}(true,2018-05-24 21:34,2,2){color} > (true,2018-05-24 21:34,4,3)| > > when the forth record is consumed, two output records would be printed in > sink, obviously the first one record with red color is redundant. I followed > the source code and found something wrong with > > {code:java} > org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement > {code} > !image-2018-05-29-11-51-20-671.png! > I think when (!generateRetraction) && !inputC.change is true, we should not > invoke out.collect here. > > [~StephanEwen] pls look over this -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation
[ https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494511#comment-16494511 ] ASF GitHub Bot commented on FLINK-9472: --- GitHub user alexeyts reopened a pull request: https://github.com/apache/flink/pull/6098 [FLINK-9472] fix archetype documentation typo ## What is the purpose of the change Simple typo fix ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alexeyts/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6098 commit 023670b70d2eb7d79d62c189af5f931f0f27af04 Author: Alexey Tsitkin Date: 2018-05-29T18:42:53Z [FLINK-9472] fix archetype documentation typo > Maven archetype code has typo in documentation > -- > > Key: FLINK-9472 > URL: https://issues.apache.org/jira/browse/FLINK-9472 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Alexey Tsitkin >Priority: Trivial > Labels: documentation, easyfix, pull-request-available > > The word application is misspelled (as `appliation`) in java/scala code > documentation in the maven archetype. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo
GitHub user alexeyts reopened a pull request: https://github.com/apache/flink/pull/6098 [FLINK-9472] fix archetype documentation typo ## What is the purpose of the change Simple typo fix ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alexeyts/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6098 commit 023670b70d2eb7d79d62c189af5f931f0f27af04 Author: Alexey Tsitkin Date: 2018-05-29T18:42:53Z [FLINK-9472] fix archetype documentation typo ---
[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation
[ https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494510#comment-16494510 ] ASF GitHub Bot commented on FLINK-9472: --- Github user alexeyts closed the pull request at: https://github.com/apache/flink/pull/6098 > Maven archetype code has typo in documentation > -- > > Key: FLINK-9472 > URL: https://issues.apache.org/jira/browse/FLINK-9472 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Alexey Tsitkin >Priority: Trivial > Labels: documentation, easyfix, pull-request-available > > The word application is misspelled (as `appliation`) in java/scala code > documentation in the maven archetype. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo
Github user alexeyts closed the pull request at: https://github.com/apache/flink/pull/6098 ---
[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation
[ https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494350#comment-16494350 ] ASF GitHub Bot commented on FLINK-9472: --- GitHub user alexeyts reopened a pull request: https://github.com/apache/flink/pull/6098 [FLINK-9472] fix archetype documentation typo ## What is the purpose of the change Simple typo fix ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alexeyts/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6098 commit 023670b70d2eb7d79d62c189af5f931f0f27af04 Author: Alexey Tsitkin Date: 2018-05-29T18:42:53Z [FLINK-9472] fix archetype documentation typo > Maven archetype code has typo in documentation > -- > > Key: FLINK-9472 > URL: https://issues.apache.org/jira/browse/FLINK-9472 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Alexey Tsitkin >Priority: Trivial > Labels: documentation, easyfix, pull-request-available > > The word application is misspelled (as `appliation`) in java/scala code > documentation in the maven archetype. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation
[ https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494349#comment-16494349 ] ASF GitHub Bot commented on FLINK-9472: --- Github user alexeyts closed the pull request at: https://github.com/apache/flink/pull/6098 > Maven archetype code has typo in documentation > -- > > Key: FLINK-9472 > URL: https://issues.apache.org/jira/browse/FLINK-9472 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Alexey Tsitkin >Priority: Trivial > Labels: documentation, easyfix, pull-request-available > > The word application is misspelled (as `appliation`) in java/scala code > documentation in the maven archetype. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo
GitHub user alexeyts reopened a pull request: https://github.com/apache/flink/pull/6098 [FLINK-9472] fix archetype documentation typo ## What is the purpose of the change Simple typo fix ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alexeyts/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6098 commit 023670b70d2eb7d79d62c189af5f931f0f27af04 Author: Alexey Tsitkin Date: 2018-05-29T18:42:53Z [FLINK-9472] fix archetype documentation typo ---
[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo
Github user alexeyts closed the pull request at: https://github.com/apache/flink/pull/6098 ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494297#comment-16494297 ] ASF GitHub Bot commented on FLINK-9187: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191578999 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- It is very possible that I'm overthinking this, and I've come up with a compromise. If there is one thing we learned in regards to the metric system it is that users despise random IDs, especially so if they can't connect them with anything else. The random ID that you're suggesting is exactly that; a random piece of data, that effectively is just a workaround for the questionable design of the PushGateway. For the sake of analyzing metrics this ID is irrelevant, it just eats up space. The randomness is especially problematic since this ID is used for deleting metrics (which one has to do at some point), making this arbitrary value _really_ important. For our intents however we just need _unique_ value for each container, i.e. dispatcher/taskmanager etc., not necessarily random . Every distributed component already has such an ID, most notable the TaskManager ID that is already exposed to the metric system. JobManager IDs are currently not exposed, but it was only a matter of time until this becomes necessary. While technically still a random value it at least does not an an entirely not label/dimension, but merely copies an existing one. > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191578999 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- It is very possible that I'm overthinking this, and I've come up with a compromise. If there is one thing we learned in regards to the metric system it is that users despise random IDs, especially so if they can't connect them with anything else. The random ID that you're suggesting is exactly that; a random piece of data, that effectively is just a workaround for the questionable design of the PushGateway. For the sake of analyzing metrics this ID is irrelevant, it just eats up space. The randomness is especially problematic since this ID is used for deleting metrics (which one has to do at some point), making this arbitrary value _really_ important. For our intents however we just need _unique_ value for each container, i.e. dispatcher/taskmanager etc., not necessarily random . Every distributed component already has such an ID, most notable the TaskManager ID that is already exposed to the metric system. JobManager IDs are currently not exposed, but it was only a matter of time until this becomes necessary. While technically still a random value it at least does not an an entirely not label/dimension, but merely copies an existing one. ---
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191576816 --- Diff: docs/monitoring/metrics.md --- @@ -699,6 +699,39 @@ Flink metric types are mapped to Prometheus metric types as follows: All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels. +### PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter) --- End diff -- Please add a section highlighting the differences and use-cases compared to the existing reporter. In particular we should mention that this reporter, like the existing reporter, is not suited for short-lived jobs. ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494284#comment-16494284 ] ASF GitHub Bot commented on FLINK-9187: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191576816 --- Diff: docs/monitoring/metrics.md --- @@ -699,6 +699,39 @@ Flink metric types are mapped to Prometheus metric types as follows: All Flink metrics variables (see [List of all Variables](#list-of-all-variables)) are exported to Prometheus as labels. +### PrometheusPushGateway (org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter) --- End diff -- Please add a section highlighting the differences and use-cases compared to the existing reporter. In particular we should mention that this reporter, like the existing reporter, is not suited for short-lived jobs. > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9472) Maven archetype code has typo in documentation
[ https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Tsitkin updated FLINK-9472: -- Labels: documentation easyfix pull-request-available (was: documentation easyfix) > Maven archetype code has typo in documentation > -- > > Key: FLINK-9472 > URL: https://issues.apache.org/jira/browse/FLINK-9472 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Alexey Tsitkin >Priority: Trivial > Labels: documentation, easyfix, pull-request-available > > The word application is misspelled (as `appliation`) in java/scala code > documentation in the maven archetype. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9472) Maven archetype code has typo in documentation
[ https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494231#comment-16494231 ] ASF GitHub Bot commented on FLINK-9472: --- GitHub user alexeyts opened a pull request: https://github.com/apache/flink/pull/6098 [FLINK-9472] fix archetype documentation typo ## What is the purpose of the change Simple typo fix ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alexeyts/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6098 commit 023670b70d2eb7d79d62c189af5f931f0f27af04 Author: Alexey Tsitkin Date: 2018-05-29T18:42:53Z [FLINK-9472] fix archetype documentation typo > Maven archetype code has typo in documentation > -- > > Key: FLINK-9472 > URL: https://issues.apache.org/jira/browse/FLINK-9472 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Alexey Tsitkin >Priority: Trivial > Labels: documentation, easyfix > > The word application is misspelled (as `appliation`) in java/scala code > documentation in the maven archetype. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6098: [FLINK-9472] fix archetype documentation typo
GitHub user alexeyts opened a pull request: https://github.com/apache/flink/pull/6098 [FLINK-9472] fix archetype documentation typo ## What is the purpose of the change Simple typo fix ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alexeyts/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6098 commit 023670b70d2eb7d79d62c189af5f931f0f27af04 Author: Alexey Tsitkin Date: 2018-05-29T18:42:53Z [FLINK-9472] fix archetype documentation typo ---
[jira] [Updated] (FLINK-9472) Maven archetype code has typo in documentation
[ https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Tsitkin updated FLINK-9472: -- Labels: documentation easyfix (was: ) > Maven archetype code has typo in documentation > -- > > Key: FLINK-9472 > URL: https://issues.apache.org/jira/browse/FLINK-9472 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Alexey Tsitkin >Priority: Trivial > Labels: documentation, easyfix > > The word application is misspelled (as `appliation`) in java/scala code > documentation in the maven archetype. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9472) Maven archetype code has typo in documentation
[ https://issues.apache.org/jira/browse/FLINK-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Tsitkin updated FLINK-9472: -- Component/s: Documentation > Maven archetype code has typo in documentation > -- > > Key: FLINK-9472 > URL: https://issues.apache.org/jira/browse/FLINK-9472 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Alexey Tsitkin >Priority: Trivial > > The word application is misspelled (as `appliation`) in java/scala code > documentation in the maven archetype. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9472) Maven archetype code has typo in documentation
Alexey Tsitkin created FLINK-9472: - Summary: Maven archetype code has typo in documentation Key: FLINK-9472 URL: https://issues.apache.org/jira/browse/FLINK-9472 Project: Flink Issue Type: Bug Reporter: Alexey Tsitkin The word application is misspelled (as `appliation`) in java/scala code documentation in the maven archetype. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494193#comment-16494193 ] ASF GitHub Bot commented on FLINK-9187: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191561341 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- eh forget it, that's not a viable option for containerized environments that this issue targets anyway... > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191561341 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- eh forget it, that's not a viable option for containerized environments that this issue targets anyway... ---
[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter
[ https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494189#comment-16494189 ] ASF GitHub Bot commented on FLINK-9187: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191560994 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- What would happen if every taskmanager/jobmanager has it's own pushgateway? > add prometheus pushgateway reporter > --- > > Key: FLINK-9187 > URL: https://issues.apache.org/jira/browse/FLINK-9187 > Project: Flink > Issue Type: New Feature > Components: Metrics >Affects Versions: 1.4.2 >Reporter: lamber-ken >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > make flink system can send metrics to prometheus via pushgateway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5857#discussion_r191560994 --- Diff: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.util.AbstractID; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.PushGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * /** + * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus Pushgateway. + */ +@PublicEvolving +public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + + public static final char JOB_NAME_SEPARATOR = '-'; + public static final String JOB_NAME_PREFIX = "flink" + JOB_NAME_SEPARATOR; + + private PushGateway pushGateway; + private final String jobName; + + public PrometheusPushGatewayReporter() { + String random = new AbstractID().toString(); + jobName = JOB_NAME_PREFIX + random; --- End diff -- What would happen if every taskmanager/jobmanager has it's own pushgateway? ---
[jira] [Commented] (FLINK-5550) NotFoundException: Could not find job with id
[ https://issues.apache.org/jira/browse/FLINK-5550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494118#comment-16494118 ] Alexandr Arkhipov commented on FLINK-5550: -- Would you mind if I take this issue and work on it? > NotFoundException: Could not find job with id > - > > Key: FLINK-5550 > URL: https://issues.apache.org/jira/browse/FLINK-5550 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.0.3 > Environment: centos >Reporter: jiwengang >Priority: Minor > Labels: newbie > > Job is canceled, but still report the following exception: > 2017-01-18 10:35:18,677 WARN > org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while > handling request > org.apache.flink.runtime.webmonitor.NotFoundException: Could not find job > with id 3b98e734c868cc2b992743cfe8911ad0 > at > org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler.handleRequest(AbstractExecutionGraphRequestHandler.java:58) > at > org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135) > at > org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112) > at > org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62) > at > io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57) > at > io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:104) > at > org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9471) Job ending exceptions being logged at Info level
[ https://issues.apache.org/jira/browse/FLINK-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] SUBRAMANYA SURESH updated FLINK-9471: - Description: We are using Flink SQL, I see job ending logs that are logged at info level, that makes it very hard for me to tune out the Info messages in the configuration. Note: If I do end up using Info, the same executionGraph logs the entire query for the operationGraph for every info statement, and this fills up the logs easily if we have say 100-200 queries. Note the "-" below indicate an entire line of execution graph for this query (redacted for privacy). 018-03-30 03:32:09,942 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> - - - - - (208/725).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator Source: Custom Source -> (- - - - - ) (208/725). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://security-temp/savedSearches/checkpoint/561eb649376bef2f2d8daa1e3a0fa6db/chk-1/067924e4-c861-4de1-823e-b255a0bf9998 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:892) was: We are using Flink SQL, I see job ending logs that are logged at info level, that makes it very hard for me to tune out the Info messages in the configuration. Note: If I do end up using Info, the same executionGraph logs the entire query for the operationGraph for every info statement, and this fills up the logs easily if we have say 100-200 queries. Note the "-" below indicate an entire line of execution graph for this query (redacted for privacy). 2018-03-30 03:32:09,943 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 because: java.lang.Exception: Could not materialize checkpoint 1 for operator Source: Custom Source -> (Map -> where: (AND(=- - - - -.} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator Source: Custom Source -> (Map -> where: (AND(=(Environment, _UTF-16LE'SFDC-IT'), =(RuleMatch, _UTF-16LE'SFA'), =(LogType, _UTF-16LE'SAML-AUTH'), =(Outcome, _UTF-16LE'DENY'))), select: (proctime, CAST(_UTF-16LE'SFDC-IT') AS Environment, CollectedTimestamp, EventTimestamp, _raw, Aggregator), Map -> where: (AND(=(Environment, _UTF-16LE'SFDC-IT'), =(RuleMatch, _UTF-16LE'SFA'), =(LogType, _UTF-16LE'SAML-AUTH'), =(Outcome, _UTF-16LE'DENY'))), select: (proctime, CAST(_UTF-16LE'SFDC-IT') AS Environment, CollectedTimestamp, EventTimestamp, _raw, Aggregator)) (353/725). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://security-temp/savedSearches/checkpoint/561eb649376bef2f2d8daa1e3a0fa6db/chk-1/31b94717-9e6d-49b8-b64d-2a1a8ba04425 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:892) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed operator state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) at
[jira] [Commented] (FLINK-6977) Add MD5/SHA1/SHA2 supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494097#comment-16494097 ] Alexandr Arkhipov commented on FLINK-6977: -- Is there someone working on this? If not, would you mind if I take this and fix? > Add MD5/SHA1/SHA2 supported in TableAPI > --- > > Key: FLINK-6977 > URL: https://issues.apache.org/jira/browse/FLINK-6977 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Priority: Major > Labels: starter > > See FLINK-6926 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9471) Job ending exceptions being logged at Info level
SUBRAMANYA SURESH created FLINK-9471: Summary: Job ending exceptions being logged at Info level Key: FLINK-9471 URL: https://issues.apache.org/jira/browse/FLINK-9471 Project: Flink Issue Type: Bug Affects Versions: 1.4.2 Reporter: SUBRAMANYA SURESH We are using Flink SQL, I see job ending logs that are logged at info level, that makes it very hard for me to tune out the Info messages in the configuration. Note: If I do end up using Info, the same executionGraph logs the entire query for the operationGraph for every info statement, and this fills up the logs easily if we have say 100-200 queries. Note the "-" below indicate an entire line of execution graph for this query (redacted for privacy). 2018-03-30 03:32:09,943 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 because: java.lang.Exception: Could not materialize checkpoint 1 for operator Source: Custom Source -> (Map -> where: (AND(=- - - - -.} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator Source: Custom Source -> (Map -> where: (AND(=(Environment, _UTF-16LE'SFDC-IT'), =(RuleMatch, _UTF-16LE'SFA'), =(LogType, _UTF-16LE'SAML-AUTH'), =(Outcome, _UTF-16LE'DENY'))), select: (proctime, CAST(_UTF-16LE'SFDC-IT') AS Environment, CollectedTimestamp, EventTimestamp, _raw, Aggregator), Map -> where: (AND(=(Environment, _UTF-16LE'SFDC-IT'), =(RuleMatch, _UTF-16LE'SFA'), =(LogType, _UTF-16LE'SAML-AUTH'), =(Outcome, _UTF-16LE'DENY'))), select: (proctime, CAST(_UTF-16LE'SFDC-IT') AS Environment, CollectedTimestamp, EventTimestamp, _raw, Aggregator)) (353/725). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://security-temp/savedSearches/checkpoint/561eb649376bef2f2d8daa1e3a0fa6db/chk-1/31b94717-9e6d-49b8-b64d-2a1a8ba04425 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:892) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed operator state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) ... 5 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494025#comment-16494025 ] ASF GitHub Bot commented on FLINK-9470: --- Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/6097 +1 Nice! > Allow querying the key in KeyedProcessFunction > -- > > Key: FLINK-9470 > URL: https://issues.apache.org/jira/browse/FLINK-9470 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.6.0 > > > {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing > timer while {{KeyedProcessFunction.Context}} does not allow querying the key > of the event we're currently processing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6097: [FLINK-9470] Allow querying the key in KeyedProcessFuncti...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/6097 +1 Nice! ---
[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493943#comment-16493943 ] ASF GitHub Bot commented on FLINK-9470: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/6097 +1. I might forget to add the interfaces back then, would be good to have them > Allow querying the key in KeyedProcessFunction > -- > > Key: FLINK-9470 > URL: https://issues.apache.org/jira/browse/FLINK-9470 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.6.0 > > > {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing > timer while {{KeyedProcessFunction.Context}} does not allow querying the key > of the event we're currently processing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6097: [FLINK-9470] Allow querying the key in KeyedProcessFuncti...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/6097 +1. I might forget to add the interfaces back then, would be good to have them ---
[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493895#comment-16493895 ] ASF GitHub Bot commented on FLINK-9451: --- Github user medcv commented on the issue: https://github.com/apache/flink/pull/6089 @zentol sure! I will update the PR with your requested changes. > End-to-end test: Scala Quickstarts > -- > > Key: FLINK-9451 > URL: https://issues.apache.org/jira/browse/FLINK-9451 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts >Affects Versions: 1.5.0, 1.4.1, 1.4.2 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Blocker > > We could add an end-to-end test which verifies Flink's quickstarts scala. It > should do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6089: [FLINK-9451]End-to-end test: Scala Quickstarts
Github user medcv commented on the issue: https://github.com/apache/flink/pull/6089 @zentol sure! I will update the PR with your requested changes. ---
[jira] [Closed] (FLINK-9356) Improve error message for when queryable state not ready / reachable
[ https://issues.apache.org/jira/browse/FLINK-9356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-9356. - Resolution: Fixed Merged on master with 0fd497a7bf00694302a71ccf4b3ddd214942a228 and on 1.5 with 7852054f7ec8b16a1e258cb6c766d36109738ebd > Improve error message for when queryable state not ready / reachable > > > Key: FLINK-9356 > URL: https://issues.apache.org/jira/browse/FLINK-9356 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.5.0 >Reporter: Florian Schmidt >Assignee: vinoyang >Priority: Major > > When executing the queryable state client and either > a. The queryable state is not ready > b. There is no job with the specified job-id > > one sees the following exception: > > {code:java} > Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Failed request 0. > Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not contact the state location oracle to retrieve the state location. > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122) > at > org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75) > Caused by: java.lang.RuntimeException: Failed request 0. > Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not contact the state location oracle to retrieve the state location. > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at >
[jira] [Commented] (FLINK-9356) Improve error message for when queryable state not ready / reachable
[ https://issues.apache.org/jira/browse/FLINK-9356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493831#comment-16493831 ] ASF GitHub Bot commented on FLINK-9356: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6028 > Improve error message for when queryable state not ready / reachable > > > Key: FLINK-9356 > URL: https://issues.apache.org/jira/browse/FLINK-9356 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.5.0 >Reporter: Florian Schmidt >Assignee: vinoyang >Priority: Major > > When executing the queryable state client and either > a. The queryable state is not ready > b. There is no job with the specified job-id > > one sees the following exception: > > {code:java} > Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Failed request 0. > Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not contact the state location oracle to retrieve the state location. > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122) > at > org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75) > Caused by: java.lang.RuntimeException: Failed request 0. > Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not contact the state location oracle to retrieve the state location. > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at >
[GitHub] flink pull request #6028: [FLINK-9356] Improve error message for when querya...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6028 ---
[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493816#comment-16493816 ] ASF GitHub Bot commented on FLINK-9451: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6089 Let's not mix concerns here. Re-using examples can lead to situations where the quickstart tests fail because an example was modified, which by all means shouldn't happen. The examples should certainly be tested as well though, but that's a separate issue. > End-to-end test: Scala Quickstarts > -- > > Key: FLINK-9451 > URL: https://issues.apache.org/jira/browse/FLINK-9451 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts >Affects Versions: 1.5.0, 1.4.1, 1.4.2 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Blocker > > We could add an end-to-end test which verifies Flink's quickstarts scala. It > should do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6089: [FLINK-9451]End-to-end test: Scala Quickstarts
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6089 Let's not mix concerns here. Re-using examples can lead to situations where the quickstart tests fail because an example was modified, which by all means shouldn't happen. The examples should certainly be tested as well though, but that's a separate issue. ---
[jira] [Reopened] (FLINK-9466) LocalRecoveryRocksDBFullITCase failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-9466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reopened FLINK-9466: - reopen to change fixVersion > LocalRecoveryRocksDBFullITCase failed on Travis > --- > > Key: FLINK-9466 > URL: https://issues.apache.org/jira/browse/FLINK-9466 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{LocalRecoveryRocksDBFullITCase}} failed on Travis: > https://api.travis-ci.org/v3/job/385097117/log.txt. > Not sure what caused the failure where the window computes a wrong result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9466) LocalRecoveryRocksDBFullITCase failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-9466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-9466. --- Resolution: Cannot Reproduce > LocalRecoveryRocksDBFullITCase failed on Travis > --- > > Key: FLINK-9466 > URL: https://issues.apache.org/jira/browse/FLINK-9466 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{LocalRecoveryRocksDBFullITCase}} failed on Travis: > https://api.travis-ci.org/v3/job/385097117/log.txt. > Not sure what caused the failure where the window computes a wrong result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9466) LocalRecoveryRocksDBFullITCase failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-9466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-9466: Fix Version/s: (was: 1.6.0) > LocalRecoveryRocksDBFullITCase failed on Travis > --- > > Key: FLINK-9466 > URL: https://issues.apache.org/jira/browse/FLINK-9466 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{LocalRecoveryRocksDBFullITCase}} failed on Travis: > https://api.travis-ci.org/v3/job/385097117/log.txt. > Not sure what caused the failure where the window computes a wrong result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn
[ https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493684#comment-16493684 ] ASF GitHub Bot commented on FLINK-7836: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5593 hi @tillrohrmann can this PR been merged into master branch, so that we can close it? > specifying node label for flink job to run on yarn > -- > > Key: FLINK-7836 > URL: https://issues.apache.org/jira/browse/FLINK-7836 > Project: Flink > Issue Type: New Feature > Components: Client >Affects Versions: 1.3.2 >Reporter: zhaibaba >Assignee: vinoyang >Priority: Major > > flink client cannot specify node label for flink job to run on yarn -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5593: [FLINK-7836][Client] specifying node label for flink job ...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5593 hi @tillrohrmann can this PR been merged into master branch, so that we can close it? ---
[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn
[ https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493683#comment-16493683 ] ASF GitHub Bot commented on FLINK-7836: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5593 hi @tillrohrmann can this PR been merged into master branch, so that we can close it? > specifying node label for flink job to run on yarn > -- > > Key: FLINK-7836 > URL: https://issues.apache.org/jira/browse/FLINK-7836 > Project: Flink > Issue Type: New Feature > Components: Client >Affects Versions: 1.3.2 >Reporter: zhaibaba >Assignee: vinoyang >Priority: Major > > flink client cannot specify node label for flink job to run on yarn -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5593: [FLINK-7836][Client] specifying node label for flink job ...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5593 hi @tillrohrmann can this PR been merged into master branch, so that we can close it? ---
[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493668#comment-16493668 ] ASF GitHub Bot commented on FLINK-9470: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6097#discussion_r191460728 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -50,6 +52,30 @@ @Rule public ExpectedException expectedException = ExpectedException.none(); + @Test + public void testKeyQuerying() throws Exception { + + KeyedProcessOperator, String> operator = + new KeyedProcessOperator<>(new KeyQueryingProcessFunction()); + + OneInputStreamOperatorTestHarness, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, (in) -> in.f0 , BasicTypeInfo.INT_TYPE_INFO); + --- End diff -- same here. > Allow querying the key in KeyedProcessFunction > -- > > Key: FLINK-9470 > URL: https://issues.apache.org/jira/browse/FLINK-9470 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.6.0 > > > {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing > timer while {{KeyedProcessFunction.Context}} does not allow querying the key > of the event we're currently processing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493667#comment-16493667 ] ASF GitHub Bot commented on FLINK-9470: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6097#discussion_r191460316 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -50,6 +52,30 @@ @Rule public ExpectedException expectedException = ExpectedException.none(); + @Test + public void testKeyQuerying() throws Exception { + + KeyedProcessOperator, String> operator = --- End diff -- The `OneInputStreamOperatorTestHarness` is `AutoCloseable` so I would recommend to go with ``` try(harness=...) { ... } ``` And remove the explicit call to `harness.close()`. This is a nice practice to start enforcing in new tests as it cleans up any leaks in case of exceptions and stuff. > Allow querying the key in KeyedProcessFunction > -- > > Key: FLINK-9470 > URL: https://issues.apache.org/jira/browse/FLINK-9470 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.6.0 > > > {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing > timer while {{KeyedProcessFunction.Context}} does not allow querying the key > of the event we're currently processing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6097#discussion_r191460728 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -50,6 +52,30 @@ @Rule public ExpectedException expectedException = ExpectedException.none(); + @Test + public void testKeyQuerying() throws Exception { + + KeyedProcessOperator, String> operator = + new KeyedProcessOperator<>(new KeyQueryingProcessFunction()); + + OneInputStreamOperatorTestHarness, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, (in) -> in.f0 , BasicTypeInfo.INT_TYPE_INFO); + --- End diff -- same here. ---
[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6097#discussion_r191460316 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -50,6 +52,30 @@ @Rule public ExpectedException expectedException = ExpectedException.none(); + @Test + public void testKeyQuerying() throws Exception { + + KeyedProcessOperator, String> operator = --- End diff -- The `OneInputStreamOperatorTestHarness` is `AutoCloseable` so I would recommend to go with ``` try(harness=...) { ... } ``` And remove the explicit call to `harness.close()`. This is a nice practice to start enforcing in new tests as it cleans up any leaks in case of exceptions and stuff. ---
[jira] [Commented] (FLINK-8353) Add support for timezones
[ https://issues.apache.org/jira/browse/FLINK-8353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493639#comment-16493639 ] Weike Dong commented on FLINK-8353: --- I strongly support these features, preferably there could be a way to set a specific timezone for a particular job, so that all the subsequent temporal processing could be based on that. As user's input data are often collected from other systems that do not follow the rules set by Flink (UTC+0), currently some temporal UDFs are needed to perform such transformations, which adds the complexity for the whole system, especially in case of watermark generation or output of processing time into external database, etc. > Add support for timezones > - > > Key: FLINK-8353 > URL: https://issues.apache.org/jira/browse/FLINK-8353 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Priority: Major > > This is an umbrella issue for adding support for timezones in the Table & SQL > API. > Usually companies work with different timezones simultaneously. We could add > support for the new time classes introduced with Java 8 and enable our scalar > functions to also work with those (or some custom time class implementations > like those from Calcite). We need a good design for this to address most of > the problems users face related to timestamp and timezones. > It is up for discussion how to ship date, time, timestamp instances through > the cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables
[ https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493636#comment-16493636 ] ASF GitHub Bot commented on FLINK-9258: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/5959 > ConcurrentModificationException in ComponentMetricGroup.getAllVariables > --- > > Key: FLINK-9258 > URL: https://issues.apache.org/jira/browse/FLINK-9258 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0, 1.4.3, 1.5.1 > > > Seeing this exception at the job startup time. Looks like there is a race > condition when the metrics variables are constructed. > The error is intermittent. > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) > at java.util.HashMap$EntryIterator.next(HashMap.java:1471) > at java.util.HashMap$EntryIterator.next(HashMap.java:1469) > at java.util.HashMap.putMapEntries(HashMap.java:511) > at java.util.HashMap.putAll(HashMap.java:784) > at > org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63) > at > org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63) > at > com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147) > at > com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170) > at > com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75) > at > com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5959: [FLINK-9258][metrics] Thread-safe initialization o...
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/5959 ---
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493635#comment-16493635 ] ASF GitHub Bot commented on FLINK-9423: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191454466 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); --- End diff -- I know, was not quiet sure if this has performance implications. > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6062: [FLINK-9423][state] Implement efficient deletes fo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191454466 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); --- End diff -- I know, was not quiet sure if this has performance implications. ---
[jira] [Commented] (FLINK-9470) Allow querying the key in KeyedProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-9470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493633#comment-16493633 ] ASF GitHub Bot commented on FLINK-9470: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/6097 [FLINK-9470] Allow querying the key in KeyedProcessFunction R: @kl0u @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-9470-keyedprocessfunction-key Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6097.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6097 commit 0d3cccd546e782113a843df099fd5d8fe1880933 Author: Aljoscha Krettek Date: 2018-05-29T14:46:19Z [FLINK-9470] Allow querying the key in KeyedProcessFunction > Allow querying the key in KeyedProcessFunction > -- > > Key: FLINK-9470 > URL: https://issues.apache.org/jira/browse/FLINK-9470 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.6.0 > > > {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing > timer while {{KeyedProcessFunction.Context}} does not allow querying the key > of the event we're currently processing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9466) LocalRecoveryRocksDBFullITCase failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-9466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9466. Resolution: Fixed Accidentally overwrote the Travis log file. Closing until seeing this issue again. > LocalRecoveryRocksDBFullITCase failed on Travis > --- > > Key: FLINK-9466 > URL: https://issues.apache.org/jira/browse/FLINK-9466 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.6.0 > > > The {{LocalRecoveryRocksDBFullITCase}} failed on Travis: > https://api.travis-ci.org/v3/job/385097117/log.txt. > Not sure what caused the failure where the window computes a wrong result. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8790) Improve performance for recovery from incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493631#comment-16493631 ] ASF GitHub Bot commented on FLINK-8790: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5582#discussion_r191453657 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java --- @@ -138,4 +138,12 @@ private static void writeVariableIntBytes( value >>>= 8; } while (value != 0); } + + public static byte[] serializeKeyGroup(int keyGroup, int keyGroupPrefixBytes) { + byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; --- End diff -- Some info about the experiment I did: - I set `ReadOptions::ignore_range_deletions = true` to speed up the read performance, because we won't read any records that belong to the key-group we have deleted. - I only call the `deleteRange()` twice, because we will at most call it twice in the recovery of the incremental checkpoint. > Improve performance for recovery from incremental checkpoint > > > Key: FLINK-8790 > URL: https://issues.apache.org/jira/browse/FLINK-8790 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > When there are multi state handle to be restored, we can improve the > performance as follow: > 1. Choose the best state handle to init the target db > 2. Use the other state handles to create temp db, and clip the db according > to the target key group range (via rocksdb.deleteRange()), this can help use > get rid of the `key group check` in > `data insertion loop` and also help us get rid of traversing the useless > record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/6097 [FLINK-9470] Allow querying the key in KeyedProcessFunction R: @kl0u @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-9470-keyedprocessfunction-key Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6097.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6097 commit 0d3cccd546e782113a843df099fd5d8fe1880933 Author: Aljoscha Krettek Date: 2018-05-29T14:46:19Z [FLINK-9470] Allow querying the key in KeyedProcessFunction ---
[GitHub] flink pull request #5582: [FLINK-8790][State] Improve performance for recove...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5582#discussion_r191453657 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java --- @@ -138,4 +138,12 @@ private static void writeVariableIntBytes( value >>>= 8; } while (value != 0); } + + public static byte[] serializeKeyGroup(int keyGroup, int keyGroupPrefixBytes) { + byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; --- End diff -- Some info about the experiment I did: - I set `ReadOptions::ignore_range_deletions = true` to speed up the read performance, because we won't read any records that belong to the key-group we have deleted. - I only call the `deleteRange()` twice, because we will at most call it twice in the recovery of the incremental checkpoint. ---
[jira] [Created] (FLINK-9470) Allow querying the key in KeyedProcessFunction
Aljoscha Krettek created FLINK-9470: --- Summary: Allow querying the key in KeyedProcessFunction Key: FLINK-9470 URL: https://issues.apache.org/jira/browse/FLINK-9470 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Fix For: 1.6.0 {{KeyedProcessFunction.OnTimerContext}} allows querying the key of the firing timer while {{KeyedProcessFunction.Context}} does not allow querying the key of the event we're currently processing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493615#comment-16493615 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191447784 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java --- @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Tests for {@link InternalTimerHeap}. + */ +public class InternalTimerHeapTest { + + private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 1); + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + int count) { + insertRandomTimers(timerPriorityQueue, null, count); + } + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + Set> checkSet, + int count) { + + ThreadLocalRandom localRandom = ThreadLocalRandom.current(); + + for (int i = 0; i < count; ++i) { + TimerHeapInternalTimer timer = + new TimerHeapInternalTimer<>(localRandom.nextLong(), i, VoidNamespace.INSTANCE); + if (checkSet != null) { + Preconditions.checkState(checkSet.add(timer)); + } + Assert.assertTrue(timerPriorityQueue.add(timer)); + } + } + + private static InternalTimerHeap newPriorityQueue(int initialCapacity) { + return new InternalTimerHeap<>( + initialCapacity, + KEY_GROUP_RANGE, + KEY_GROUP_RANGE.getNumberOfKeyGroups()); + } + + @Test + public void testCombined() { + final int initialCapacity = 4; + final int testSize = 1000; + InternalTimerHeap timerPriorityQueue = newPriorityQueue(initialCapacity); + HashSet> checkSet = new HashSet<>(testSize); + + insertRandomTimers(timerPriorityQueue, checkSet, testSize); + + long lastTimestamp = Long.MIN_VALUE; + int lastSize = timerPriorityQueue.size(); + Assert.assertEquals(testSize, lastSize); + TimerHeapInternalTimer timer; + while ((timer = timerPriorityQueue.peek()) != null) { + Assert.assertFalse(timerPriorityQueue.isEmpty()); + Assert.assertEquals(lastSize, timerPriorityQueue.size()); + Assert.assertEquals(timer, timerPriorityQueue.poll()); + Assert.assertTrue(checkSet.remove(timer)); + Assert.assertTrue(timer.getTimestamp() >= lastTimestamp); + lastTimestamp = timer.getTimestamp(); + --lastSize; + } + + Assert.assertTrue(timerPriorityQueue.isEmpty()); + Assert.assertEquals(0, timerPriorityQueue.size()); + Assert.assertEquals(0, checkSet.size()); + } + + @Test + public void testAdd() { +
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493621#comment-16493621 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191450664 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java --- @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Tests for {@link InternalTimerHeap}. + */ +public class InternalTimerHeapTest { + + private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 1); + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + int count) { + insertRandomTimers(timerPriorityQueue, null, count); + } + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + Set> checkSet, + int count) { + + ThreadLocalRandom localRandom = ThreadLocalRandom.current(); + + for (int i = 0; i < count; ++i) { + TimerHeapInternalTimer timer = + new TimerHeapInternalTimer<>(localRandom.nextLong(), i, VoidNamespace.INSTANCE); + if (checkSet != null) { + Preconditions.checkState(checkSet.add(timer)); + } + Assert.assertTrue(timerPriorityQueue.add(timer)); + } + } + + private static InternalTimerHeap newPriorityQueue(int initialCapacity) { + return new InternalTimerHeap<>( + initialCapacity, + KEY_GROUP_RANGE, + KEY_GROUP_RANGE.getNumberOfKeyGroups()); + } + + @Test + public void testCombined() { + final int initialCapacity = 4; + final int testSize = 1000; + InternalTimerHeap timerPriorityQueue = newPriorityQueue(initialCapacity); + HashSet> checkSet = new HashSet<>(testSize); + + insertRandomTimers(timerPriorityQueue, checkSet, testSize); + + long lastTimestamp = Long.MIN_VALUE; + int lastSize = timerPriorityQueue.size(); + Assert.assertEquals(testSize, lastSize); + TimerHeapInternalTimer timer; + while ((timer = timerPriorityQueue.peek()) != null) { + Assert.assertFalse(timerPriorityQueue.isEmpty()); + Assert.assertEquals(lastSize, timerPriorityQueue.size()); + Assert.assertEquals(timer, timerPriorityQueue.poll()); + Assert.assertTrue(checkSet.remove(timer)); + Assert.assertTrue(timer.getTimestamp() >= lastTimestamp); + lastTimestamp = timer.getTimestamp(); + --lastSize; + } + + Assert.assertTrue(timerPriorityQueue.isEmpty()); + Assert.assertEquals(0, timerPriorityQueue.size()); + Assert.assertEquals(0, checkSet.size()); + } + + @Test + public void testAdd() { +
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493609#comment-16493609 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191422654 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { --- End diff -- Why does this class implement the `Queue` and `Set` interface? Is it intended to be used as a `Queue` or `Set` somewhere? > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493605#comment-16493605 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191425505 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** +* This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. +*/ + private final HashMap, TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup; + + /** +* The array that represents the heap-organized priority queue. +*/ + private TimerHeapInternalTimer[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* The key-group range of timers that are managed by this queue. +*/ + private final KeyGroupRange keyGroupRange; + + /** +* The total number of key-groups of the job. +*/ + private final int totalNumberOfKeyGroups; + + + /** +* Creates an empty {@link InternalTimerHeap} with the requested initial capacity. +* +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; +
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493603#comment-16493603 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191435370 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** +* This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. +*/ + private final HashMap, TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup; + + /** +* The array that represents the heap-organized priority queue. +*/ + private TimerHeapInternalTimer[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* The key-group range of timers that are managed by this queue. +*/ + private final KeyGroupRange keyGroupRange; + + /** +* The total number of key-groups of the job. +*/ + private final int totalNumberOfKeyGroups; + + + /** +* Creates an empty {@link InternalTimerHeap} with the requested initial capacity. +* +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; +
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493599#comment-16493599 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191413234 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -301,114 +259,39 @@ public void advanceWatermark(long time) throws Exception { keySerializer.snapshotConfiguration(), namespaceSerializer, namespaceSerializer.snapshotConfiguration(), - getEventTimeTimerSetForKeyGroup(keyGroupIdx), - getProcessingTimeTimerSetForKeyGroup(keyGroupIdx)); + eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx), + processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx)); } /** * Restore the timers (both processing and event time ones) for a given {@code keyGroupIdx}. * -* @param restoredTimersSnapshot the restored snapshot containing the key-group's timers, +* @param restoredSnapshot the restored snapshot containing the key-group's timers, * and the serializers that were used to write them * @param keyGroupIdx the id of the key-group to be put in the snapshot. */ @SuppressWarnings("unchecked") - public void restoreTimersForKeyGroup(InternalTimersSnapshot restoredTimersSnapshot, int keyGroupIdx) throws IOException { - this.restoredTimersSnapshot = (InternalTimersSnapshot) restoredTimersSnapshot; + public void restoreTimersForKeyGroup(InternalTimersSnapshot restoredSnapshot, int keyGroupIdx) { + this.restoredTimersSnapshot = (InternalTimersSnapshot) restoredSnapshot; - if ((this.keyDeserializer != null && !this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer())) || - (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer( { + if ((this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) || --- End diff -- This check could be factored out into a method with a meaningful and easy to understand name, e.g. `checkSerializerCompatibility`. > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493617#comment-16493617 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191446041 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java --- @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Tests for {@link InternalTimerHeap}. + */ +public class InternalTimerHeapTest { --- End diff -- Should extend `TestLogger`. > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493616#comment-16493616 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191445752 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java --- @@ -96,7 +96,7 @@ public AbstractInternalTimersSnapshotWriter(InternalTimersSnapshot timersS public final void writeTimersSnapshot(DataOutputView out) throws IOException { writeKeyAndNamespaceSerializers(out); - InternalTimer.TimerSerializer timerSerializer = new InternalTimer.TimerSerializer<>( + TimerHeapInternalTimer.TimerSerializer timerSerializer = new TimerHeapInternalTimer.TimerSerializer<>( --- End diff -- Isn't this a bit problematic if we add a new `TimerService` implementation other than the `HeapInternalTimerService`? The `InternalTimersSnapshot` is independent of the underlying timer service implementation and so should the timer serializer be. > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493602#comment-16493602 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191423366 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { --- End diff -- That way we could also save us from implementing some methods like `toArray` or `iterator`. > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493607#comment-16493607 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191436243 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Implementation of {@link InternalTimer} for the {@link InternalTimerHeap}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public final class TimerHeapInternalTimer implements InternalTimer { + + /** The index that indicates that a tracked internal timer is not tracked. */ + private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = Integer.MIN_VALUE; + + private final long timestamp; + + private final K key; + + private final N namespace; + + /** +* This field holds the current physical index of this timer when it is managed by a timer heap so that we can +* support fast deletes. +*/ + private transient int timerHeapIndex; --- End diff -- `TimerHeapInternalTimer` is non serializable. Thus, the `transient` keyword should not be needed. > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493604#comment-16493604 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191427611 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** +* This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. +*/ + private final HashMap, TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup; + + /** +* The array that represents the heap-organized priority queue. +*/ + private TimerHeapInternalTimer[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* The key-group range of timers that are managed by this queue. +*/ + private final KeyGroupRange keyGroupRange; + + /** +* The total number of key-groups of the job. +*/ + private final int totalNumberOfKeyGroups; + + + /** +* Creates an empty {@link InternalTimerHeap} with the requested initial capacity. +* +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; +
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493620#comment-16493620 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191446801 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java --- @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Tests for {@link InternalTimerHeap}. + */ +public class InternalTimerHeapTest { + + private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 1); + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + int count) { + insertRandomTimers(timerPriorityQueue, null, count); + } + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + Set> checkSet, --- End diff -- `@Nullable` missing > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493618#comment-16493618 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191447618 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java --- @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Tests for {@link InternalTimerHeap}. + */ +public class InternalTimerHeapTest { + + private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 1); + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + int count) { + insertRandomTimers(timerPriorityQueue, null, count); + } + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + Set> checkSet, + int count) { + + ThreadLocalRandom localRandom = ThreadLocalRandom.current(); + + for (int i = 0; i < count; ++i) { + TimerHeapInternalTimer timer = + new TimerHeapInternalTimer<>(localRandom.nextLong(), i, VoidNamespace.INSTANCE); + if (checkSet != null) { + Preconditions.checkState(checkSet.add(timer)); + } + Assert.assertTrue(timerPriorityQueue.add(timer)); + } + } + + private static InternalTimerHeap newPriorityQueue(int initialCapacity) { + return new InternalTimerHeap<>( + initialCapacity, + KEY_GROUP_RANGE, + KEY_GROUP_RANGE.getNumberOfKeyGroups()); + } + + @Test + public void testCombined() { --- End diff -- A more descriptive test method name would be helpful. > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493614#comment-16493614 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191441546 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Implementation of {@link InternalTimer} for the {@link InternalTimerHeap}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public final class TimerHeapInternalTimer implements InternalTimer { + + /** The index that indicates that a tracked internal timer is not tracked. */ + private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = Integer.MIN_VALUE; + + private final long timestamp; + + private final K key; + + private final N namespace; + + /** +* This field holds the current physical index of this timer when it is managed by a timer heap so that we can +* support fast deletes. +*/ + private transient int timerHeapIndex; + + TimerHeapInternalTimer(long timestamp, K key, N namespace) { + this.timestamp = timestamp; + this.key = key; + this.namespace = namespace; + this.timerHeapIndex = NOT_MANAGED_BY_TIMER_QUEUE_INDEX; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public K getKey() { + return key; + } + + @Override + public N getNamespace() { + return namespace; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o instanceof InternalTimer) { + InternalTimer timer = (InternalTimer) o; + return timestamp == timer.getTimestamp() + && key.equals(timer.getKey()) + && namespace.equals(timer.getNamespace()); + } + + return false; + } + + /** +* Returns the current index of this timer in the owning timer heap. +*/ + int getTimerHeapIndex() { + return timerHeapIndex; + } + + /** +* Sets the current index of this timer in the owning timer heap and should only be called by the managing heap. +* @param timerHeapIndex the new index in the timer heap. +*/ + void setTimerHeapIndex(int timerHeapIndex) { + this.timerHeapIndex = timerHeapIndex; + } + + /** +* This method can be called to indicate that the timer is no longer managed be a timer heap, e.g. because it as +* removed. +*/ + void removedFromTimerQueue() { + setTimerHeapIndex(NOT_MANAGED_BY_TIMER_QUEUE_INDEX); + } + + @Override + public int hashCode() { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + key.hashCode(); + result = 31 * result +
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493611#comment-16493611 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191412983 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -199,17 +186,9 @@ public long currentWatermark() { @Override public void registerProcessingTimeTimer(N namespace, long time) { - InternalTimer timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - - // make sure we only put one timer per key into the queue - Set> timerSet = getProcessingTimeTimerSetForTimer(timer); - if (timerSet.add(timer)) { - - InternalTimer oldHead = processingTimeTimersQueue.peek(); + InternalTimer oldHead = processingTimeTimersQueue.peek(); + if (processingTimeTimersQueue.scheduleTimer(time, (K) keyContext.getCurrentKey(), namespace)) { --- End diff -- I know this is out of scope, but I think we could get rid of the `KeyContext` by passing the current key to the `registerProcessingTimeTimer` method. Moreover, instead of calling `KeyContext#setCurrentKey` we could pass the key value to the `Triggerable#onEvent/ProcessingTime` method. Triggering side effects via the `KeyContext` before calling certain methods is imo very brittle. > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493613#comment-16493613 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191440142 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Implementation of {@link InternalTimer} for the {@link InternalTimerHeap}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public final class TimerHeapInternalTimer implements InternalTimer { + + /** The index that indicates that a tracked internal timer is not tracked. */ + private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = Integer.MIN_VALUE; + + private final long timestamp; + + private final K key; --- End diff -- Can this be `null`? > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493612#comment-16493612 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191440239 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Implementation of {@link InternalTimer} for the {@link InternalTimerHeap}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public final class TimerHeapInternalTimer implements InternalTimer { + + /** The index that indicates that a tracked internal timer is not tracked. */ + private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = Integer.MIN_VALUE; + + private final long timestamp; + + private final K key; + + private final N namespace; --- End diff -- `null`? > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493619#comment-16493619 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191446484 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java --- @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Tests for {@link InternalTimerHeap}. + */ +public class InternalTimerHeapTest { + + private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 1); + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + int count) { --- End diff -- Double indentation for parameter list which are wrapped. > Implement efficient deletes for heap based timer service > > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493610#comment-16493610 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191440434 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Implementation of {@link InternalTimer} for the {@link InternalTimerHeap}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public final class TimerHeapInternalTimer implements InternalTimer { + + /** The index that indicates that a tracked internal timer is not tracked. */ + private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = Integer.MIN_VALUE; + + private final long timestamp; + + private final K key; + + private final N namespace; + + /** +* This field holds the current physical index of this timer when it is managed by a timer heap so that we can +* support fast deletes. +*/ + private transient int timerHeapIndex; + + TimerHeapInternalTimer(long timestamp, K key, N namespace) { + this.timestamp = timestamp; + this.key = key; + this.namespace = namespace; + this.timerHeapIndex = NOT_MANAGED_BY_TIMER_QUEUE_INDEX; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public K getKey() { + return key; + } + + @Override + public N getNamespace() { + return namespace; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o instanceof InternalTimer) { + InternalTimer timer = (InternalTimer) o; + return timestamp == timer.getTimestamp() + && key.equals(timer.getKey()) + && namespace.equals(timer.getNamespace()); + } + + return false; + } + + /** +* Returns the current index of this timer in the owning timer heap. +*/ + int getTimerHeapIndex() { + return timerHeapIndex; + } + + /** +* Sets the current index of this timer in the owning timer heap and should only be called by the managing heap. +* @param timerHeapIndex the new index in the timer heap. +*/ + void setTimerHeapIndex(int timerHeapIndex) { + this.timerHeapIndex = timerHeapIndex; + } + + /** +* This method can be called to indicate that the timer is no longer managed be a timer heap, e.g. because it as +* removed. +*/ + void removedFromTimerQueue() { + setTimerHeapIndex(NOT_MANAGED_BY_TIMER_QUEUE_INDEX); + } + + @Override + public int hashCode() { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + key.hashCode(); + result = 31 * result +
[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493606#comment-16493606 ] ASF GitHub Bot commented on FLINK-9423: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191426419 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication maps. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Queue>, Set> { + + /** +* A safe maximum size for arrays in the JVM. +*/ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** +* Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. +*/ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** +* This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. +*/ + private final HashMap, TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup; + + /** +* The array that represents the heap-organized priority queue. +*/ + private TimerHeapInternalTimer[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* The key-group range of timers that are managed by this queue. +*/ + private final KeyGroupRange keyGroupRange; + + /** +* The total number of key-groups of the job. +*/ + private final int totalNumberOfKeyGroups; + + + /** +* Creates an empty {@link InternalTimerHeap} with the requested initial capacity. +* +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; +