[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607912#comment-16607912 ] ASF GitHub Bot commented on FLINK-7243: --- HuangZhenQiu commented on issue #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#issuecomment-419613742 @lvhuyen Thanks for digging out the root cause. I guess I should pass the logic type into RowPrimitiveConverter. So that different type of data stored as Binary can be handled differently. I am working on fix for it with more test case. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] HuangZhenQiu commented on issue #6483: [FLINK-7243][flink-formats] Add parquet input format
HuangZhenQiu commented on issue #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#issuecomment-419613742 @lvhuyen Thanks for digging out the root cause. I guess I should pass the logic type into RowPrimitiveConverter. So that different type of data stored as Binary can be handled differently. I am working on fix for it with more test case. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10273) Access composite type fields after a function
[ https://issues.apache.org/jira/browse/FLINK-10273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607907#comment-16607907 ] Rong Rong commented on FLINK-10273: --- I found a solution via modifying Calcite {{Parser.jj}}. However, it probably will need some additional polish to be able to added to Calcite's release. I will create a JIRA ticket though. > Access composite type fields after a function > - > > Key: FLINK-10273 > URL: https://issues.apache.org/jira/browse/FLINK-10273 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Rong Rong >Priority: Major > > If a function returns a composite type, for example, {{Row(lon: Float, lat: > Float)}}. There is currently no way of accessing fields. > Both queries fail with exceptions: > {code} > select t.c.lat, t.c.lon FROM (select toCoords(12) as c) AS t > {code} > {code} > select toCoords(12).lat > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10303) Fix critical vulnerabilities Python API
[ https://issues.apache.org/jira/browse/FLINK-10303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607869#comment-16607869 ] vinoyang commented on FLINK-10303: -- cc [~Zentol] Maybe this issue deserves our attention? > Fix critical vulnerabilities Python API > --- > > Key: FLINK-10303 > URL: https://issues.apache.org/jira/browse/FLINK-10303 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.6.0 >Reporter: Konstantin Knauf >Priority: Major > > A user has reported two "critical" vulnerabilities in the Python API, which > we should probably fix: > * https://nvd.nist.gov/vuln/detail/CVE-2016-4000 > * https://cwe.mitre.org/data/definitions/384.html in > flink-streaming-python_2.11-1.6.0.jar <= pip-1.6-py2.py3-none-any.whl <= > sessions.py : [2.1.0, 2.6.0) > For users, who don't need the Python API, an easy work-around is exclude the > flink-streaming-python_2.11.jar from the distribution. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6674: [hotfix] [runtime] Remove unused legacy exception class
TisonKun commented on issue #6674: [hotfix] [runtime] Remove unused legacy exception class URL: https://github.com/apache/flink/pull/6674#issuecomment-419529625 cc @zentol @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10305) flink-conf.yaml grows continuously
Dimitrije created FLINK-10305: - Summary: flink-conf.yaml grows continuously Key: FLINK-10305 URL: https://issues.apache.org/jira/browse/FLINK-10305 Project: Flink Issue Type: Bug Reporter: Dimitrije `query.server.port` & `blob.server.port` variables are continuously appended to the flink-conf.yaml when the job manager restarts. Running a jobmanager & taskmanager using docker-compose I am using a single `flink-conf.yaml` which is mounted as a volume to a jobmanager and taskmanager container. Every time the jobmanager restarts, it appends the two variables to the end of the file causing it to grow. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10298) Batch Job Failover Strategy
[ https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607460#comment-16607460 ] 陈梓立 edited comment on FLINK-10298 at 9/7/18 6:14 PM: - Hi [~isunjin], Thanks for raise this JIRA! After read the documentation, it seems that the main issue this design concerned is downstream recover from upstream missing or data consumption exceptions. Thus I wonder if you have read [FILNK-6227|https://issues.apache.org/jira/browse/FLINK-6227] which introduce the DataConsumptionException for downstream task failure that would resolve this case? was (Author: tison): Hi [~isunjin], Thanks for raise this JIRA! After read the documentation, it seems that the main issue this design concerned is downstream recover from upstream missing or data consumption exceptions. Thus I wonder if you have read [FILNK-6227] which introduce the DataConsumptionException for downstream task failure that would resolve this case? > Batch Job Failover Strategy > --- > > Key: FLINK-10298 > URL: https://issues.apache.org/jira/browse/FLINK-10298 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > > The new failover strategy needs to consider handling failures according to > different failure types. It orchestrates all the logics we mentioned in this > [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit], > we can put the logic in onTaskFailure method of the FailoverStrategy > interface, with the logic inline: > {code:java} > public void onTaskFailure(Execution taskExecution, Throwable cause) { > //1. Get the throwable type > //2. If the type is NonrecoverableType fail the job > //3. If the type is PatritionDataMissingError, do revocation > //4. If the type is EnvironmentError, do check blacklist > //5. Other failure types are recoverable, but we need to remember the > count of the failure, > //6. if it exceeds the threshold, fail the job > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10298) Batch Job Failover Strategy
[ https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607460#comment-16607460 ] 陈梓立 edited comment on FLINK-10298 at 9/7/18 6:13 PM: - Hi [~isunjin], Thanks for raise this JIRA! After read the documentation, it seems that the main issue this design concerned is downstream recover from upstream missing or data consumption exceptions. Thus I wonder if you have read [FILNK-6227] which introduce the DataConsumptionException for downstream task failure that would resolve this case? was (Author: tison): Hi [~isunjin], Thanks for raise this JIRA! After read the documentation, it seems that the main issue this design concerned is downstream recover from upstream DataConsumptionException. Thus I wonder if you have read [FILNK-6227] which introduce the DataConsumptionException for downstream task failure that would resolve this case? > Batch Job Failover Strategy > --- > > Key: FLINK-10298 > URL: https://issues.apache.org/jira/browse/FLINK-10298 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > > The new failover strategy needs to consider handling failures according to > different failure types. It orchestrates all the logics we mentioned in this > [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit], > we can put the logic in onTaskFailure method of the FailoverStrategy > interface, with the logic inline: > {code:java} > public void onTaskFailure(Execution taskExecution, Throwable cause) { > //1. Get the throwable type > //2. If the type is NonrecoverableType fail the job > //3. If the type is PatritionDataMissingError, do revocation > //4. If the type is EnvironmentError, do check blacklist > //5. Other failure types are recoverable, but we need to remember the > count of the failure, > //6. if it exceeds the threshold, fail the job > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10298) Batch Job Failover Strategy
[ https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607460#comment-16607460 ] 陈梓立 commented on FLINK-10298: - Hi [~isunjin], Thanks for raise this JIRA! After read the documentation, it seems that the main issue this design concerned is downstream recover from upstream DataConsumptionException. Thus I wonder if you have read [FILNK-6227] which introduce the DataConsumptionException for downstream task failure that would resolve this case? > Batch Job Failover Strategy > --- > > Key: FLINK-10298 > URL: https://issues.apache.org/jira/browse/FLINK-10298 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > > The new failover strategy needs to consider handling failures according to > different failure types. It orchestrates all the logics we mentioned in this > [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit], > we can put the logic in onTaskFailure method of the FailoverStrategy > interface, with the logic inline: > {code:java} > public void onTaskFailure(Execution taskExecution, Throwable cause) { > //1. Get the throwable type > //2. If the type is NonrecoverableType fail the job > //3. If the type is PatritionDataMissingError, do revocation > //4. If the type is EnvironmentError, do check blacklist > //5. Other failure types are recoverable, but we need to remember the > count of the failure, > //6. if it exceeds the threshold, fail the job > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9964) Add a CSV table format factory
[ https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607420#comment-16607420 ] ASF GitHub Bot commented on FLINK-9964: --- buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r216035112 ## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * Deserialization schema from CSV to Flink types. + * + * Deserializes a byte[] message as a {@link JsonNode} and + * convert it to {@link Row}. + * + * Failure during deserialization are forwarded as wrapped IOExceptions. + */ +@PublicEvolving +public class CsvRowDeserializationSchema implements DeserializationSchema { + + /** Schema describing the input csv data. */ + private CsvSchema csvSchema; + + /** Type information describing the input csv data. */ + private TypeInformation rowTypeInfo; + + /** CsvMapper used to write {@link JsonNode} into bytes. */ + private CsvMapper csvMapper = new CsvMapper(); + + /** Charset for byte[]. */ + private String charset = "UTF-8"; + + + /** +* Create a csv row DeserializationSchema with given {@link TypeInformation}. +*/ + CsvRowDeserializationSchema(TypeInformation rowTypeInfo) { + Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not be null !"); + this.rowTypeInfo = rowTypeInfo; + this.csvSchema = CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + JsonNode root = csvMapper.readerFor(JsonNode.class) + .with(csvSchema).readValue(message); + return convertRow(root, (RowTypeInfo) rowTypeInfo); + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return rowTypeInfo; + } + + /** +* +* @param root json node that contains a row's data. +* @param rowTypeInfo type information for root. +* @return result row +*/ + private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) { + String[] fields = rowTypeInfo.getFieldNames(); + TypeInformation[] types = rowTypeInfo.getFieldTypes(); + Row row = new Row(fields.length); + + for (int i = 0; i < fields.length; i++) { + String columnName = fields[i]; + JsonNode node = root.get(columnName); + row.setField(i, convert(node, types[i])); + } + return row; + } + + /** +* +* @param node array node that contains a row's data.
[GitHub] buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory
buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r216035112 ## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * Deserialization schema from CSV to Flink types. + * + * Deserializes a byte[] message as a {@link JsonNode} and + * convert it to {@link Row}. + * + * Failure during deserialization are forwarded as wrapped IOExceptions. + */ +@PublicEvolving +public class CsvRowDeserializationSchema implements DeserializationSchema { + + /** Schema describing the input csv data. */ + private CsvSchema csvSchema; + + /** Type information describing the input csv data. */ + private TypeInformation rowTypeInfo; + + /** CsvMapper used to write {@link JsonNode} into bytes. */ + private CsvMapper csvMapper = new CsvMapper(); + + /** Charset for byte[]. */ + private String charset = "UTF-8"; + + + /** +* Create a csv row DeserializationSchema with given {@link TypeInformation}. +*/ + CsvRowDeserializationSchema(TypeInformation rowTypeInfo) { + Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not be null !"); + this.rowTypeInfo = rowTypeInfo; + this.csvSchema = CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + JsonNode root = csvMapper.readerFor(JsonNode.class) + .with(csvSchema).readValue(message); + return convertRow(root, (RowTypeInfo) rowTypeInfo); + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return rowTypeInfo; + } + + /** +* +* @param root json node that contains a row's data. +* @param rowTypeInfo type information for root. +* @return result row +*/ + private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) { + String[] fields = rowTypeInfo.getFieldNames(); + TypeInformation[] types = rowTypeInfo.getFieldTypes(); + Row row = new Row(fields.length); + + for (int i = 0; i < fields.length; i++) { + String columnName = fields[i]; + JsonNode node = root.get(columnName); + row.setField(i, convert(node, types[i])); + } + return row; + } + + /** +* +* @param node array node that contains a row's data. +* @param rowTypeInfo type information for node. +* @return result row +*/ + private Row convertRow(ArrayNode node, RowTypeInfo rowTypeInfo) { + TypeInformation[] types =
[jira] [Commented] (FLINK-9964) Add a CSV table format factory
[ https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607413#comment-16607413 ] ASF GitHub Bot commented on FLINK-9964: --- buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r216034480 ## File path: flink-formats/flink-csv/pom.xml ## @@ -0,0 +1,88 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-csv + flink-csv + + jar + + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + + flink-table_2.11 + ${project.version} + provided + + true + + + + + com.fasterxml.jackson.dataformat Review comment: @twalthr ok, I've updated it to 2.7.9. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a CSV table format factory > -- > > Key: FLINK-9964 > URL: https://issues.apache.org/jira/browse/FLINK-9964 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Jiayi Liao >Priority: Major > Labels: pull-request-available > > We should add a RFC 4180 compliant CSV table format factory to read and write > data into Kafka and other connectors. This requires a > {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we > want to represent all data types and nested types is still up for discussion. > For example, we could flatten and deflatten nested types as it is done > [here|http://support.gnip.com/articles/json2csv.html]. We can also have a > look how tools such as the Avro to CSV tool perform the conversion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory
buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r216034480 ## File path: flink-formats/flink-csv/pom.xml ## @@ -0,0 +1,88 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-csv + flink-csv + + jar + + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + + flink-table_2.11 + ${project.version} + provided + + true + + + + + com.fasterxml.jackson.dataformat Review comment: @twalthr ok, I've updated it to 2.7.9. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers
[ https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607382#comment-16607382 ] ASF GitHub Bot commented on FLINK-9190: --- TisonKun commented on issue #5931: [FLINK-9190][flip6,yarn] Request new container if container completed unexpectedly URL: https://github.com/apache/flink/pull/5931#issuecomment-419507178 IIRC it is [FLINK-9455](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-9455?filter=allopenissues) :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnResourceManager sometimes does not request new Containers > - > > Key: FLINK-9190 > URL: https://issues.apache.org/jira/browse/FLINK-9190 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 > Environment: Hadoop 2.8.3 > ZooKeeper 3.4.5 > Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6, pull-request-available > Fix For: 1.5.0 > > Attachments: yarn-logs > > > *Description* > The {{YarnResourceManager}} does not request new containers if > {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is > restarted due to {{NoResourceAvailableException}}, and the job runs normally > afterwards. I suspect that {{TaskManager}} failures are not registered if the > failure occurs before the {{TaskManager}} registers with the master. Logs are > attached; I added additional log statements to > {{YarnResourceManager.onContainersCompleted}} and > {{YarnResourceManager.onContainersAllocated}}. > *Expected Behavior* > The {{YarnResourceManager}} should recognize that the container is completed > and keep requesting new containers. The job should run as soon as resources > are available. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #5931: [FLINK-9190][flip6, yarn] Request new container if container completed unexpectedly
TisonKun commented on issue #5931: [FLINK-9190][flip6,yarn] Request new container if container completed unexpectedly URL: https://github.com/apache/flink/pull/5931#issuecomment-419507178 IIRC it is [FLINK-9455](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-9455?filter=allopenissues) :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10242) Disable latency metrics by default
[ https://issues.apache.org/jira/browse/FLINK-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10242. Resolution: Fixed Release Note: Latency metrics are now disabled by default, which all affect all jobs that do not explicitly set the latencyTrackingInterval via the ExecutionConfig. To restore the previous default behavior users have to configure the metrics.latency.interval in flink-conf.yaml. > Disable latency metrics by default > -- > > Key: FLINK-10242 > URL: https://issues.apache.org/jira/browse/FLINK-10242 > Project: Flink > Issue Type: Sub-task > Components: Configuration, Metrics >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With the plethora of recent issue around latency metrics we should disable > them by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10242) Disable latency metrics by default
[ https://issues.apache.org/jira/browse/FLINK-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-10242: -- > Disable latency metrics by default > -- > > Key: FLINK-10242 > URL: https://issues.apache.org/jira/browse/FLINK-10242 > Project: Flink > Issue Type: Sub-task > Components: Configuration, Metrics >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With the plethora of recent issue around latency metrics we should disable > them by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10243) Add option to reduce latency metrics granularity
[ https://issues.apache.org/jira/browse/FLINK-10243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10243. Resolution: Fixed Release Note: The default granularity for latency metrics was modified. To restore the previous behavior users have to explicitly set the granularity to SUBTASK. master: 91f8fe831c6af21928fceb3a13d87c9ed5019981 > Add option to reduce latency metrics granularity > > > Key: FLINK-10243 > URL: https://issues.apache.org/jira/browse/FLINK-10243 > Project: Flink > Issue Type: Sub-task > Components: Configuration, Metrics >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The latency is currently tracked separately from each operator subtask to > each source subtask. The total number of latency metrics in the cluster is > thus {{(# of sources) * (# of operators) * parallelism²}}, i.e. quadratic > scaling. > If we'd ignore the source subtask the scaling would be a lot more manageable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10243) Add option to reduce latency metrics granularity
[ https://issues.apache.org/jira/browse/FLINK-10243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607357#comment-16607357 ] ASF GitHub Bot commented on FLINK-10243: zentol closed pull request #6658: [FLINK-10243][metrics] Make latency metrics granularity configurable URL: https://github.com/apache/flink/pull/6658 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 02f4ceb162f..0c0b0dd2ffb 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -7,6 +7,11 @@ + +metrics.latency.granularity +"operator" +Defines the granularity of latency metrics. Accepted values are:single - Track latency without differentiating between sources and subtasks.operator - Track latency while differentiating between sources, but not subtasks.subtask - Track latency while differentiating between sources and subtasks. + metrics.latency.history-size 128 diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 336ead6e193..67444a5397c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.description.Description; import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.configuration.description.TextElement.text; /** * Configuration options for metrics and metric reporters. @@ -112,6 +113,17 @@ " Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly" + " impact the performance of the cluster."); + public static final ConfigOption LATENCY_SOURCE_GRANULARITY = + key("metrics.latency.granularity") + .defaultValue("operator") + .withDescription(Description.builder() + .text("Defines the granularity of latency metrics. Accepted values are:") + .list( + text("single - Track latency without differentiating between sources and subtasks."), + text("operator - Track latency while differentiating between sources, but not subtasks."), + text("subtask - Track latency while differentiating between sources and subtasks.")) + .build()); + /** The number of measured latencies to maintain at each operator. */ public static final ConfigOption LATENCY_HISTORY_SIZE = key("metrics.latency.history-size") diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index f52168bd9b9..f3c22080ab7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -72,6 +72,7 @@ import java.io.Closeable; import java.io.Serializable; +import java.util.Locale; /** * Base class for all stream operators. Operators that contain a user function should extend the class @@ -193,11 +194,33 @@ public void setup(StreamTask containingTask, StreamConfig config, Outputhttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import
[GitHub] zentol closed pull request #6658: [FLINK-10243][metrics] Make latency metrics granularity configurable
zentol closed pull request #6658: [FLINK-10243][metrics] Make latency metrics granularity configurable URL: https://github.com/apache/flink/pull/6658 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 02f4ceb162f..0c0b0dd2ffb 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -7,6 +7,11 @@ + +metrics.latency.granularity +"operator" +Defines the granularity of latency metrics. Accepted values are:single - Track latency without differentiating between sources and subtasks.operator - Track latency while differentiating between sources, but not subtasks.subtask - Track latency while differentiating between sources and subtasks. + metrics.latency.history-size 128 diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 336ead6e193..67444a5397c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.description.Description; import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.configuration.description.TextElement.text; /** * Configuration options for metrics and metric reporters. @@ -112,6 +113,17 @@ " Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly" + " impact the performance of the cluster."); + public static final ConfigOption LATENCY_SOURCE_GRANULARITY = + key("metrics.latency.granularity") + .defaultValue("operator") + .withDescription(Description.builder() + .text("Defines the granularity of latency metrics. Accepted values are:") + .list( + text("single - Track latency without differentiating between sources and subtasks."), + text("operator - Track latency while differentiating between sources, but not subtasks."), + text("subtask - Track latency while differentiating between sources and subtasks.")) + .build()); + /** The number of measured latencies to maintain at each operator. */ public static final ConfigOption LATENCY_HISTORY_SIZE = key("metrics.latency.history-size") diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index f52168bd9b9..f3c22080ab7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -72,6 +72,7 @@ import java.io.Closeable; import java.io.Serializable; +import java.util.Locale; /** * Base class for all stream operators. Operators that contain a user function should extend the class @@ -193,11 +194,33 @@ public void setup(StreamTask containingTask, StreamConfig config, Outputhttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.GenericMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import
[GitHub] TisonKun opened a new pull request #6674: [hotfix] [runtime] Remove unused legacy exception class
TisonKun opened a new pull request #6674: [hotfix] [runtime] Remove unused legacy exception class URL: https://github.com/apache/flink/pull/6674 ## What is the purpose of the change Remove unused legacy exception class `SlotException`. This exception class was introduced by a569f38f16186518b53461842d37b09fb1df45e9 and out of used since 0ef7fddeff8430fd40d2d7a1b8a6454fd9416ced, somehow using `FlinkException` replaced it. As slot allocate design changed and we now have more exactly "SlotException" like `SlotAllocationException` and `SlotOccupiedException`, we can remove this class anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8660) Enable the user to provide custom HAServices implementation
[ https://issues.apache.org/jira/browse/FLINK-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607352#comment-16607352 ] ASF GitHub Bot commented on FLINK-8660: --- tillrohrmann commented on issue #5530: [FLINK-8660] Enable the user to provide custom HAServices implementation URL: https://github.com/apache/flink/pull/5530#issuecomment-419501245 Yes I think we should try to get this PR into 1.7 since it allows to implement your own `HighAvailabilityServices`. I'll try to find some time next week to take a look what the current state is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable the user to provide custom HAServices implementation > > > Key: FLINK-8660 > URL: https://issues.apache.org/jira/browse/FLINK-8660 > Project: Flink > Issue Type: Improvement > Components: Cluster Management, Configuration, Distributed > Coordination >Affects Versions: 1.4.0, 1.5.0 >Reporter: Krzysztof Białek >Assignee: Krzysztof Białek >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > At the moment Flink uses ZooKeeper as HA backend. > The goal of this improvement is to make Flink supporting more HA backends, > also maintained as independent projects. > The following changes are required to achieve it: > # Add {{HighAvailabilityServicesFactory}} interface > # Add new option {{HighAvailabilityMode.CUSTOM}} > # Add new configuration property {{high-availability.factoryClass}} > # Use the factory in {{HighAvailabilityServicesUtils}} to instantiate > {{HighAvailabilityServices}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #5530: [FLINK-8660] Enable the user to provide custom HAServices implementation
tillrohrmann commented on issue #5530: [FLINK-8660] Enable the user to provide custom HAServices implementation URL: https://github.com/apache/flink/pull/5530#issuecomment-419501245 Yes I think we should try to get this PR into 1.7 since it allows to implement your own `HighAvailabilityServices`. I'll try to find some time next week to take a look what the current state is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped
[ https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607349#comment-16607349 ] ASF GitHub Bot commented on FLINK-9891: --- tillrohrmann commented on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode. URL: https://github.com/apache/flink/pull/6540#issuecomment-419500426 I like the idea of making this behaviour configurable. @packet23 do you have time to add this functionality? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink cluster is not shutdown in YARN mode when Flink client is stopped > --- > > Key: FLINK-9891 > URL: https://issues.apache.org/jira/browse/FLINK-9891 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0, 1.5.1 >Reporter: Sergey Krasovskiy >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > We are not using session mode and detached mode. The command to run Flink job > on YARN is: > {code:java} > /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm > 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount > {code} > Flink CLI logs: > {code:java} > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-07-18 12:47:03,747 INFO > org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service > address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/ > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,248 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the > HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink > YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > 2018-07-18 12:47:04,409 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: > ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, > numberTaskManagers=1, slotsPerTaskManager=1} > 2018-07-18 12:47:04,783 WARN > org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit > local reads feature cannot be used because libhadoop cannot be loaded. > 2018-07-18 12:47:04,788 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration > directory > ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf') > contains both LOG4J and Logback configuration files. Please delete or rename > one of them. > 2018-07-18 12:47:07,846 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application > master application_1531474158783_10814 > 2018-07-18 12:47:08,073 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application > application_1531474158783_10814 > 2018-07-18 12:47:08,074 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster > to be allocated > 2018-07-18 12:47:08,076 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, > current state ACCEPTED > 2018-07-18 12:47:12,864 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has > been deployed successfully. > {code} > Job Manager logs: > {code:java} > 2018-07-18 12:47:09,913 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > > 2018-07-18 12:47:09,915 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting > YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ >
[GitHub] tillrohrmann commented on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
tillrohrmann commented on issue #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode. URL: https://github.com/apache/flink/pull/6540#issuecomment-419500426 I like the idea of making this behaviour configurable. @packet23 do you have time to add this functionality? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers
[ https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607348#comment-16607348 ] ASF GitHub Bot commented on FLINK-9190: --- tillrohrmann commented on issue #5931: [FLINK-9190][flip6,yarn] Request new container if container completed unexpectedly URL: https://github.com/apache/flink/pull/5931#issuecomment-419500186 Yes at the moment, this could happen. However, the superfluous `TaskManager` should be released after it idled around for too long. Moreover, I'm currently working on making the `SlotManager` aware of how many outstanding slots he has requested. That way he should not allocate additional containers in case of a failover of the `ExecutionGraph`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnResourceManager sometimes does not request new Containers > - > > Key: FLINK-9190 > URL: https://issues.apache.org/jira/browse/FLINK-9190 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 > Environment: Hadoop 2.8.3 > ZooKeeper 3.4.5 > Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6, pull-request-available > Fix For: 1.5.0 > > Attachments: yarn-logs > > > *Description* > The {{YarnResourceManager}} does not request new containers if > {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is > restarted due to {{NoResourceAvailableException}}, and the job runs normally > afterwards. I suspect that {{TaskManager}} failures are not registered if the > failure occurs before the {{TaskManager}} registers with the master. Logs are > attached; I added additional log statements to > {{YarnResourceManager.onContainersCompleted}} and > {{YarnResourceManager.onContainersAllocated}}. > *Expected Behavior* > The {{YarnResourceManager}} should recognize that the container is completed > and keep requesting new containers. The job should run as soon as resources > are available. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #5931: [FLINK-9190][flip6, yarn] Request new container if container completed unexpectedly
tillrohrmann commented on issue #5931: [FLINK-9190][flip6,yarn] Request new container if container completed unexpectedly URL: https://github.com/apache/flink/pull/5931#issuecomment-419500186 Yes at the moment, this could happen. However, the superfluous `TaskManager` should be released after it idled around for too long. Moreover, I'm currently working on making the `SlotManager` aware of how many outstanding slots he has requested. That way he should not allocate additional containers in case of a failover of the `ExecutionGraph`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10304) Remove deprecated AbstractYarnClusterDescriptor field
[ https://issues.apache.org/jira/browse/FLINK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607335#comment-16607335 ] ASF GitHub Bot commented on FLINK-10304: TisonKun commented on a change in pull request #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field URL: https://github.com/apache/flink/pull/6673#discussion_r216017006 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ## @@ -997,7 +980,6 @@ public ApplicationReport startAppMaster( appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString()); appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager())); - appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached)); Review comment: I wonder if YARN need this config to distinguish a DETACHED session, if so, `AbstractYarnClusterDescriptor` need to know whether or not it is in a detached mode, and we should keep the original code and remove the `@Deprecated` annotation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated AbstractYarnClusterDescriptor field > - > > Key: FLINK-10304 > URL: https://issues.apache.org/jira/browse/FLINK-10304 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Depend on [~trohrm...@apache.org]'s > [commit|https://github.com/apache/flink/commit/6356128865bff7463bf03185d18b129ed3633bc2], > {{AbstractYarnClusterDescriptor}} should not care whether it is in DETACHED > mode. > After digging I found the main usages of it are > 1. {{FlinkYarnSessionCli#run}}, this can be resolved by checking whether > {{allOptions}} has {{DETACHED_OPTION}} locally. > 2. when AbstractYarnClusterDescriptor start a AM, it sets > {{appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));}}. > At this point it seems that YarnClusterDescriptor should know whether or not > it is in detached mode. > If usage 2 is irrelevant now, we can get rid of deprecated method in FLIP-6 > codebase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field
TisonKun commented on a change in pull request #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field URL: https://github.com/apache/flink/pull/6673#discussion_r216017006 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ## @@ -997,7 +980,6 @@ public ApplicationReport startAppMaster( appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString()); appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager())); - appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached)); Review comment: I wonder if YARN need this config to distinguish a DETACHED session, if so, `AbstractYarnClusterDescriptor` need to know whether or not it is in a detached mode, and we should keep the original code and remove the `@Deprecated` annotation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10304) Remove deprecated AbstractYarnClusterDescriptor field
[ https://issues.apache.org/jira/browse/FLINK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607332#comment-16607332 ] ASF GitHub Bot commented on FLINK-10304: TisonKun commented on a change in pull request #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field URL: https://github.com/apache/flink/pull/6673#discussion_r216017006 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ## @@ -997,7 +980,6 @@ public ApplicationReport startAppMaster( appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString()); appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager())); - appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached)); Review comment: I wonder if YARN need this config to distinguish a DETACHED session, if so, `AbstractYarnClusterDescriptor` need to know whether or not it is in a detached mode. We should keep the original code and remove the `@Deprecated` annotation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated AbstractYarnClusterDescriptor field > - > > Key: FLINK-10304 > URL: https://issues.apache.org/jira/browse/FLINK-10304 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Depend on [~trohrm...@apache.org]'s > [commit|https://github.com/apache/flink/commit/6356128865bff7463bf03185d18b129ed3633bc2], > {{AbstractYarnClusterDescriptor}} should not care whether it is in DETACHED > mode. > After digging I found the main usages of it are > 1. {{FlinkYarnSessionCli#run}}, this can be resolved by checking whether > {{allOptions}} has {{DETACHED_OPTION}} locally. > 2. when AbstractYarnClusterDescriptor start a AM, it sets > {{appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));}}. > At this point it seems that YarnClusterDescriptor should know whether or not > it is in detached mode. > If usage 2 is irrelevant now, we can get rid of deprecated method in FLIP-6 > codebase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field
TisonKun commented on a change in pull request #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field URL: https://github.com/apache/flink/pull/6673#discussion_r216017006 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ## @@ -997,7 +980,6 @@ public ApplicationReport startAppMaster( appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString()); appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager())); - appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached)); Review comment: I wonder if YARN need this config to distinguish a DETACHED session, if so, `AbstractYarnClusterDescriptor` need to know whether or not it is in a detached mode. We should keep the original code and remove the `@Deprecated` annotation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10304) Remove deprecated AbstractYarnClusterDescriptor field
[ https://issues.apache.org/jira/browse/FLINK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607314#comment-16607314 ] ASF GitHub Bot commented on FLINK-10304: TisonKun opened a new pull request #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field URL: https://github.com/apache/flink/pull/6673 ## Brief change log Remove deprecated AbstractYarnClusterDescriptor field ## Verifying this change This change is already covered by existing tests, such as `FlinkYarnSessionCliTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers:(no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated AbstractYarnClusterDescriptor field > - > > Key: FLINK-10304 > URL: https://issues.apache.org/jira/browse/FLINK-10304 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Depend on [~trohrm...@apache.org]'s > [commit|https://github.com/apache/flink/commit/6356128865bff7463bf03185d18b129ed3633bc2], > {{AbstractYarnClusterDescriptor}} should not care whether it is in DETACHED > mode. > After digging I found the main usages of it are > 1. {{FlinkYarnSessionCli#run}}, this can be resolved by checking whether > {{allOptions}} has {{DETACHED_OPTION}} locally. > 2. when AbstractYarnClusterDescriptor start a AM, it sets > {{appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));}}. > At this point it seems that YarnClusterDescriptor should know whether or not > it is in detached mode. > If usage 2 is irrelevant now, we can get rid of deprecated method in FLIP-6 > codebase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10304) Remove deprecated AbstractYarnClusterDescriptor field
[ https://issues.apache.org/jira/browse/FLINK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10304: --- Labels: pull-request-available (was: ) > Remove deprecated AbstractYarnClusterDescriptor field > - > > Key: FLINK-10304 > URL: https://issues.apache.org/jira/browse/FLINK-10304 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.7.0 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Depend on [~trohrm...@apache.org]'s > [commit|https://github.com/apache/flink/commit/6356128865bff7463bf03185d18b129ed3633bc2], > {{AbstractYarnClusterDescriptor}} should not care whether it is in DETACHED > mode. > After digging I found the main usages of it are > 1. {{FlinkYarnSessionCli#run}}, this can be resolved by checking whether > {{allOptions}} has {{DETACHED_OPTION}} locally. > 2. when AbstractYarnClusterDescriptor start a AM, it sets > {{appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));}}. > At this point it seems that YarnClusterDescriptor should know whether or not > it is in detached mode. > If usage 2 is irrelevant now, we can get rid of deprecated method in FLIP-6 > codebase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field
TisonKun opened a new pull request #6673: [FLINK-10304] [client] Remove deprecated AbstractYarnClusterDescriptor field URL: https://github.com/apache/flink/pull/6673 ## Brief change log Remove deprecated AbstractYarnClusterDescriptor field ## Verifying this change This change is already covered by existing tests, such as `FlinkYarnSessionCliTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers:(no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10304) Remove deprecated AbstractYarnClusterDescriptor field
陈梓立 created FLINK-10304: --- Summary: Remove deprecated AbstractYarnClusterDescriptor field Key: FLINK-10304 URL: https://issues.apache.org/jira/browse/FLINK-10304 Project: Flink Issue Type: Improvement Components: Client, YARN Affects Versions: 1.7.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.7.0 Depend on [~trohrm...@apache.org]'s [commit|https://github.com/apache/flink/commit/6356128865bff7463bf03185d18b129ed3633bc2], {{AbstractYarnClusterDescriptor}} should not care whether it is in DETACHED mode. After digging I found the main usages of it are 1. {{FlinkYarnSessionCli#run}}, this can be resolved by checking whether {{allOptions}} has {{DETACHED_OPTION}} locally. 2. when AbstractYarnClusterDescriptor start a AM, it sets {{appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));}}. At this point it seems that YarnClusterDescriptor should know whether or not it is in detached mode. If usage 2 is irrelevant now, we can get rid of deprecated method in FLIP-6 codebase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10303) Fix critical vulnerabilities Python API
Konstantin Knauf created FLINK-10303: Summary: Fix critical vulnerabilities Python API Key: FLINK-10303 URL: https://issues.apache.org/jira/browse/FLINK-10303 Project: Flink Issue Type: Improvement Components: Python API Affects Versions: 1.6.0 Reporter: Konstantin Knauf A user has reported two "critical" vulnerabilities in the Python API, which we should probably fix: * https://nvd.nist.gov/vuln/detail/CVE-2016-4000 * https://cwe.mitre.org/data/definitions/384.html in flink-streaming-python_2.11-1.6.0.jar <= pip-1.6-py2.py3-none-any.whl <= sessions.py : [2.1.0, 2.6.0) For users, who don't need the Python API, an easy work-around is exclude the flink-streaming-python_2.11.jar from the distribution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10293) RemoteStreamEnvironment does not forward port to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-10293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10293. Resolution: Fixed master: 2dbd41354dc85b68754cad1777f733ca070eea9a 1.6: d8cc7684f5ffbea782a9b3acb98759bf3c2913b0 1.5: c1815463e47abf93e9906041f0c160ad40ff5413 > RemoteStreamEnvironment does not forward port to RestClusterClient > -- > > Key: FLINK-10293 > URL: https://issues.apache.org/jira/browse/FLINK-10293 > Project: Flink > Issue Type: Bug > Components: Client, Streaming >Affects Versions: 1.5.1, 1.6.0, 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > A user reported on the ML that the port given to the RemoteStreamEnvironment > is not forwarded to the RestClusterClient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9964) Add a CSV table format factory
[ https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607236#comment-16607236 ] ASF GitHub Bot commented on FLINK-9964: --- twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r215992673 ## File path: flink-formats/flink-csv/pom.xml ## @@ -0,0 +1,88 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-csv + flink-csv + + jar + + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + + flink-table_2.11 + ${project.version} + provided + + true + + + + + com.fasterxml.jackson.dataformat Review comment: I think that is ok. The features don't look very important: https://github.com/FasterXML/jackson-dataformats-text/blob/master/csv/release-notes/VERSION @buptljy could you update the version accordingly. I will review your changes once we bumped up flink-shaded. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a CSV table format factory > -- > > Key: FLINK-9964 > URL: https://issues.apache.org/jira/browse/FLINK-9964 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Jiayi Liao >Priority: Major > Labels: pull-request-available > > We should add a RFC 4180 compliant CSV table format factory to read and write > data into Kafka and other connectors. This requires a > {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we > want to represent all data types and nested types is still up for discussion. > For example, we could flatten and deflatten nested types as it is done > [here|http://support.gnip.com/articles/json2csv.html]. We can also have a > look how tools such as the Avro to CSV tool perform the conversion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory
twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r215992673 ## File path: flink-formats/flink-csv/pom.xml ## @@ -0,0 +1,88 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-csv + flink-csv + + jar + + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + + flink-table_2.11 + ${project.version} + provided + + true + + + + + com.fasterxml.jackson.dataformat Review comment: I think that is ok. The features don't look very important: https://github.com/FasterXML/jackson-dataformats-text/blob/master/csv/release-notes/VERSION @buptljy could you update the version accordingly. I will review your changes once we bumped up flink-shaded. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9964) Add a CSV table format factory
[ https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607219#comment-16607219 ] ASF GitHub Bot commented on FLINK-9964: --- zentol commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r215989983 ## File path: flink-formats/flink-csv/pom.xml ## @@ -0,0 +1,88 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-csv + flink-csv + + jar + + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + + flink-table_2.11 + ${project.version} + provided + + true + + + + + com.fasterxml.jackson.dataformat Review comment: I guess this will also mean that we will use 2.7.9 instead. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a CSV table format factory > -- > > Key: FLINK-9964 > URL: https://issues.apache.org/jira/browse/FLINK-9964 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Jiayi Liao >Priority: Major > Labels: pull-request-available > > We should add a RFC 4180 compliant CSV table format factory to read and write > data into Kafka and other connectors. This requires a > {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we > want to represent all data types and nested types is still up for discussion. > For example, we could flatten and deflatten nested types as it is done > [here|http://support.gnip.com/articles/json2csv.html]. We can also have a > look how tools such as the Avro to CSV tool perform the conversion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory
zentol commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r215989983 ## File path: flink-formats/flink-csv/pom.xml ## @@ -0,0 +1,88 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-csv + flink-csv + + jar + + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + + flink-table_2.11 + ${project.version} + provided + + true + + + + + com.fasterxml.jackson.dataformat Review comment: I guess this will also mean that we will use 2.7.9 instead. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10302) Mark legacy(non-flip 6) code as Deprecated
[ https://issues.apache.org/jira/browse/FLINK-10302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607217#comment-16607217 ] 陈梓立 commented on FLINK-10302: - Here are some example, their name are quite misleading. {{JobClientActor}}, {{YarnApplicationMasterRunner}}, {{JobInfo}} and note that FLIP-6 also has some classes named starts with JobManager, which make things worse. > Mark legacy(non-flip 6) code as Deprecated > -- > > Key: FLINK-10302 > URL: https://issues.apache.org/jira/browse/FLINK-10302 > Project: Flink > Issue Type: Improvement >Reporter: 陈梓立 >Priority: Major > > There are several time I dash into some classes/methods, finding them weird > from the FLIP-6 codebase and finally figure out that they are legacy codes. > Currently we mix up legacy code with FLIP-6 code in same place(i.e. some > package), new contributor might casually lost into such code and result in > works in vain. > With [FLINK-4319] closed we announced that FLIP-6 is production ready, and > [~trohrm...@apache.org] comments on this > [commits|https://github.com/apache/flink/commit/ddd6a99a95b56c52ea5b5153b7270b578f5479bc#commitcomment-30330739] > shows that it is planned to remove legacy code. > I'd prefer to marking all legacy class as Deprecated for now thus our > contributors could recognize them quickly and do not ruin they work in vain. > What do you think? cc [~Zentol] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9964) Add a CSV table format factory
[ https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607214#comment-16607214 ] ASF GitHub Bot commented on FLINK-9964: --- zentol commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r215989141 ## File path: flink-formats/flink-csv/pom.xml ## @@ -0,0 +1,88 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-csv + flink-csv + + jar + + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + + flink-table_2.11 + ${project.version} + provided + + true + + + + + com.fasterxml.jackson.dataformat Review comment: well yeah because it pulls in standard jackson. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a CSV table format factory > -- > > Key: FLINK-9964 > URL: https://issues.apache.org/jira/browse/FLINK-9964 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Jiayi Liao >Priority: Major > Labels: pull-request-available > > We should add a RFC 4180 compliant CSV table format factory to read and write > data into Kafka and other connectors. This requires a > {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we > want to represent all data types and nested types is still up for discussion. > For example, we could flatten and deflatten nested types as it is done > [here|http://support.gnip.com/articles/json2csv.html]. We can also have a > look how tools such as the Avro to CSV tool perform the conversion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory
zentol commented on a change in pull request #6541: [FLINK-9964] [table] Add a CSV table format factory URL: https://github.com/apache/flink/pull/6541#discussion_r215989141 ## File path: flink-formats/flink-csv/pom.xml ## @@ -0,0 +1,88 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-formats + 1.7-SNAPSHOT + .. + + + flink-csv + flink-csv + + jar + + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + + flink-table_2.11 + ${project.version} + provided + + true + + + + + com.fasterxml.jackson.dataformat Review comment: well yeah because it pulls in standard jackson. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10302) Mark legacy(non-flip 6) code as Deprecated
陈梓立 created FLINK-10302: --- Summary: Mark legacy(non-flip 6) code as Deprecated Key: FLINK-10302 URL: https://issues.apache.org/jira/browse/FLINK-10302 Project: Flink Issue Type: Improvement Reporter: 陈梓立 There are several time I dash into some classes/methods, finding them weird from the FLIP-6 codebase and finally figure out that they are legacy codes. Currently we mix up legacy code with FLIP-6 code in same place(i.e. some package), new contributor might casually lost into such code and result in works in vain. With [FLINK-4319] closed we announced that FLIP-6 is production ready, and [~trohrm...@apache.org] comments on this [commits|https://github.com/apache/flink/commit/ddd6a99a95b56c52ea5b5153b7270b578f5479bc#commitcomment-30330739] shows that it is planned to remove legacy code. I'd prefer to marking all legacy class as Deprecated for now thus our contributors could recognize them quickly and do not ruin they work in vain. What do you think? cc [~Zentol] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10174) getbytes with no charsets test error for hex and toBase64
[ https://issues.apache.org/jira/browse/FLINK-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-10174. -- Resolution: Fixed Fixed in 1.7.0: 99bf3a5087ac80ff20c8d22adef82f30e53ba568 > getbytes with no charsets test error for hex and toBase64 > - > > Key: FLINK-10174 > URL: https://issues.apache.org/jira/browse/FLINK-10174 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: xueyu >Assignee: xueyu >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Hex and toBase64 builtin method use str.getBytes() with no Charset. It maybe > depend on local execution environment for special Unicode and maybe result in > errors when test Hex for special Unicode -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10174) getbytes with no charsets test error for hex and toBase64
[ https://issues.apache.org/jira/browse/FLINK-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607199#comment-16607199 ] ASF GitHub Bot commented on FLINK-10174: asfgit closed pull request #6579: [FLINK-10174][table] fix getbytes in hex and toBase64 URL: https://github.com/apache/flink/pull/6579 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala index d92af7aaae2..3cf618ee717 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.functions import java.lang.{StringBuilder, Long => JLong} import java.math.{BigDecimal => JBigDecimal} +import org.apache.commons.codec.Charsets import org.apache.commons.codec.binary.{Base64, Hex} import scala.annotation.varargs @@ -206,12 +207,12 @@ object ScalarFunctions { /** * Returns the base string decoded with base64. */ - def fromBase64(str: String): String = new String(Base64.decodeBase64(str)) + def fromBase64(str: String): String = new String(Base64.decodeBase64(str), Charsets.UTF_8) /** * Returns the base64-encoded result of the input string. */ - def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes()) + def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes(Charsets.UTF_8)) /** * Returns the hex string of a long argument. @@ -221,7 +222,7 @@ object ScalarFunctions { /** * Returns the hex string of a string argument. */ - def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase() + def hex(x: String): String = Hex.encodeHexString(x.getBytes(Charsets.UTF_8)).toUpperCase() /** * Returns an UUID string using Java utilities. diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 145f3c5fba3..4a7a4f881a7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -483,6 +483,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f24.hex()", "HEX(f24)", "2A5F546869732069732061207465737420537472696E672E") + +testAllApis( + "你好".hex(), + "'你好'.hex()", + "HEX('你好')", + "E4BDA0E5A5BD" +) } @Test @@ -563,6 +570,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f33.fromBase64()", "FROM_BASE64(f33)", "null") + +testAllApis( + "5L2g5aW9".fromBase64(), + "'5L2g5aW9'.fromBase64()", + "FROM_BASE64('5L2g5aW9')", + "你好" +) } @Test @@ -591,6 +605,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f33.toBase64()", "TO_BASE64(f33)", "null") + +testAllApis( + "你好".toBase64(), + "'你好'.toBase64()", + "TO_BASE64('你好')", + "5L2g5aW9" +) } @Test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > getbytes with no charsets test error for hex and toBase64 > - > > Key: FLINK-10174 > URL: https://issues.apache.org/jira/browse/FLINK-10174 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: xueyu >Assignee: xueyu >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Hex and toBase64 builtin method use str.getBytes() with no Charset. It maybe > depend on local execution environment for special Unicode and maybe result in > errors when test Hex for special Unicode -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6579: [FLINK-10174][table] fix getbytes in hex and toBase64
asfgit closed pull request #6579: [FLINK-10174][table] fix getbytes in hex and toBase64 URL: https://github.com/apache/flink/pull/6579 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala index d92af7aaae2..3cf618ee717 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.functions import java.lang.{StringBuilder, Long => JLong} import java.math.{BigDecimal => JBigDecimal} +import org.apache.commons.codec.Charsets import org.apache.commons.codec.binary.{Base64, Hex} import scala.annotation.varargs @@ -206,12 +207,12 @@ object ScalarFunctions { /** * Returns the base string decoded with base64. */ - def fromBase64(str: String): String = new String(Base64.decodeBase64(str)) + def fromBase64(str: String): String = new String(Base64.decodeBase64(str), Charsets.UTF_8) /** * Returns the base64-encoded result of the input string. */ - def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes()) + def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes(Charsets.UTF_8)) /** * Returns the hex string of a long argument. @@ -221,7 +222,7 @@ object ScalarFunctions { /** * Returns the hex string of a string argument. */ - def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase() + def hex(x: String): String = Hex.encodeHexString(x.getBytes(Charsets.UTF_8)).toUpperCase() /** * Returns an UUID string using Java utilities. diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 145f3c5fba3..4a7a4f881a7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -483,6 +483,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f24.hex()", "HEX(f24)", "2A5F546869732069732061207465737420537472696E672E") + +testAllApis( + "你好".hex(), + "'你好'.hex()", + "HEX('你好')", + "E4BDA0E5A5BD" +) } @Test @@ -563,6 +570,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f33.fromBase64()", "FROM_BASE64(f33)", "null") + +testAllApis( + "5L2g5aW9".fromBase64(), + "'5L2g5aW9'.fromBase64()", + "FROM_BASE64('5L2g5aW9')", + "你好" +) } @Test @@ -591,6 +605,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f33.toBase64()", "TO_BASE64(f33)", "null") + +testAllApis( + "你好".toBase64(), + "'你好'.toBase64()", + "TO_BASE64('你好')", + "5L2g5aW9" +) } @Test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10174) getbytes with no charsets test error for hex and toBase64
[ https://issues.apache.org/jira/browse/FLINK-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607145#comment-16607145 ] ASF GitHub Bot commented on FLINK-10174: xueyumusic commented on a change in pull request #6579: [FLINK-10174][table] fix getbytes in hex and toBase64 URL: https://github.com/apache/flink/pull/6579#discussion_r215964734 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -221,7 +222,7 @@ object ScalarFunctions { /** * Returns the hex string of a string argument. */ - def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase() + def hex(x: String): String = Hex.encodeHexString(x.getBytes(Charsets.UTF_8)).toUpperCase() Review comment: Thanks for the suggestion! @twalthr . It looks this place https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala#L71 also used Charsets.UTF_8, maybe could update at the same time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > getbytes with no charsets test error for hex and toBase64 > - > > Key: FLINK-10174 > URL: https://issues.apache.org/jira/browse/FLINK-10174 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: xueyu >Assignee: xueyu >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Hex and toBase64 builtin method use str.getBytes() with no Charset. It maybe > depend on local execution environment for special Unicode and maybe result in > errors when test Hex for special Unicode -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xueyumusic commented on a change in pull request #6579: [FLINK-10174][table] fix getbytes in hex and toBase64
xueyumusic commented on a change in pull request #6579: [FLINK-10174][table] fix getbytes in hex and toBase64 URL: https://github.com/apache/flink/pull/6579#discussion_r215964734 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -221,7 +222,7 @@ object ScalarFunctions { /** * Returns the hex string of a string argument. */ - def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase() + def hex(x: String): String = Hex.encodeHexString(x.getBytes(Charsets.UTF_8)).toUpperCase() Review comment: Thanks for the suggestion! @twalthr . It looks this place https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala#L71 also used Charsets.UTF_8, maybe could update at the same time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-10193) Default RPC timeout is used when triggering savepoint via JobMasterGateway
[ https://issues.apache.org/jira/browse/FLINK-10193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10193. --- Resolution: Fixed Fixed via 1.7.0: bfbcd908bf9b59f64ddf8c783293addc098ee516 1.6.1: 5d93520ae2a051f141cc5e92cf73411ecbde4390 1.5.4: d1c77f9686c36bf09939efd9bbf07ac554b41902 > Default RPC timeout is used when triggering savepoint via JobMasterGateway > -- > > Key: FLINK-10193 > URL: https://issues.apache.org/jira/browse/FLINK-10193 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > Attachments: SlowToCheckpoint.java > > > When calling {{JobMasterGateway#triggerSavepoint(String, boolean, Time)}}, > the default timeout is used because the time parameter of the method is not > annotated with {{@RpcTimeout}}. > *Expected behavior* > * timeout for the RPC should be {{RpcUtils.INF_TIMEOUT}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10193) Default RPC timeout is used when triggering savepoint via JobMasterGateway
[ https://issues.apache.org/jira/browse/FLINK-10193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607137#comment-16607137 ] ASF GitHub Bot commented on FLINK-10193: asfgit closed pull request #6601: [FLINK-10193] Add @RpcTimeout to JobMasterGateway.triggerSavepoint URL: https://github.com/apache/flink/pull/6601 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 01cb2b6b099..4a66d32a2ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -95,7 +95,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskexecutor.AccumulatorReport; @@ -909,7 +908,7 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { } @Override - public CompletableFuture requestJobDetails(@RpcTimeout Time timeout) { + public CompletableFuture requestJobDetails(Time timeout) { final ExecutionGraph currentExecutionGraph = executionGraph; return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), scheduledExecutorService); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 981222d17a6..bc073c192bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -268,7 +268,7 @@ void heartbeatFromTaskManager( CompletableFuture triggerSavepoint( @Nullable final String targetDirectory, final boolean cancelJob, - final Time timeout); + @RpcTimeout final Time timeout); /** * Requests the statistics on operator back pressure. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 66ca769165a..9a2bc97b62b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -92,6 +92,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; @@ -108,6 +109,7 @@ import org.junit.rules.TemporaryFolder; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.FileOutputStream; @@ -122,24 +124,28 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for {@link JobMaster}. */ public class JobMasterTest extends TestLogger { - static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0]; + private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0]; @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -418,7 +424,7 @@ public void testAutomaticRestartingWhenCheckpointing()
[GitHub] asfgit closed pull request #6601: [FLINK-10193] Add @RpcTimeout to JobMasterGateway.triggerSavepoint
asfgit closed pull request #6601: [FLINK-10193] Add @RpcTimeout to JobMasterGateway.triggerSavepoint URL: https://github.com/apache/flink/pull/6601 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 01cb2b6b099..4a66d32a2ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -95,7 +95,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskexecutor.AccumulatorReport; @@ -909,7 +908,7 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { } @Override - public CompletableFuture requestJobDetails(@RpcTimeout Time timeout) { + public CompletableFuture requestJobDetails(Time timeout) { final ExecutionGraph currentExecutionGraph = executionGraph; return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), scheduledExecutorService); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 981222d17a6..bc073c192bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -268,7 +268,7 @@ void heartbeatFromTaskManager( CompletableFuture triggerSavepoint( @Nullable final String targetDirectory, final boolean cancelJob, - final Time timeout); + @RpcTimeout final Time timeout); /** * Requests the statistics on operator back pressure. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 66ca769165a..9a2bc97b62b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -92,6 +92,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; @@ -108,6 +109,7 @@ import org.junit.rules.TemporaryFolder; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.FileOutputStream; @@ -122,24 +124,28 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for {@link JobMaster}. */ public class JobMasterTest extends TestLogger { - static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0]; + private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0]; @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -418,7 +424,7 @@ public void testAutomaticRestartingWhenCheckpointing() throws Exception { } /** -* Tests that an existing checkpoint will have precedence over an savepoint +* Tests that an existing checkpoint will have precedence over an savepoint. */ @Test
[jira] [Commented] (FLINK-10281) Table function parse regular expression contains backslash failed
[ https://issues.apache.org/jira/browse/FLINK-10281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607122#comment-16607122 ] ASF GitHub Bot commented on FLINK-10281: dawidwys commented on issue #6671: [FLINK-10281] [table] Fix string literal parsing in Table API URL: https://github.com/apache/flink/pull/6671#issuecomment-419438925 +1, Looks good from my side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Table function parse regular expression contains backslash failed > - > > Key: FLINK-10281 > URL: https://issues.apache.org/jira/browse/FLINK-10281 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > for example, regular expression matches text ("\w") or number ("\d") : > {code:java} > testAllApis( > "foothebar".regexExtract("foo([\\w]+)", 1), //OK, the method got > 'foo([\w]+)' > "'foothebar'.regexExtract('foo([w]+)', 1)", //failed, the method got > 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get compile error. > "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)", //OK, the method got > 'foo([\w]+)' but must pass four '\' > "thebar" > ) > {code} > the "similar to" function has the same issue. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on issue #6671: [FLINK-10281] [table] Fix string literal parsing in Table API
dawidwys commented on issue #6671: [FLINK-10281] [table] Fix string literal parsing in Table API URL: https://github.com/apache/flink/pull/6671#issuecomment-419438925 +1, Looks good from my side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool
[ https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607114#comment-16607114 ] Shimin Yang commented on FLINK-10247: - Hi [~Zentol], sounds good. I am gonna working on it. > Run MetricQueryService in separate thread pool > -- > > Key: FLINK-10247 > URL: https://issues.apache.org/jira/browse/FLINK-10247 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > In order to make the {{MetricQueryService}} run independently of the main > Flink components, it should get its own dedicated thread pool assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10247) Run MetricQueryService in separate thread pool
[ https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10247: --- Assignee: Shimin Yang > Run MetricQueryService in separate thread pool > -- > > Key: FLINK-10247 > URL: https://issues.apache.org/jira/browse/FLINK-10247 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > In order to make the {{MetricQueryService}} run independently of the main > Flink components, it should get its own dedicated thread pool assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10174) getbytes with no charsets test error for hex and toBase64
[ https://issues.apache.org/jira/browse/FLINK-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607082#comment-16607082 ] ASF GitHub Bot commented on FLINK-10174: twalthr commented on a change in pull request #6579: [FLINK-10174][table] fix getbytes in hex and toBase64 URL: https://github.com/apache/flink/pull/6579#discussion_r215946888 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -221,7 +222,7 @@ object ScalarFunctions { /** * Returns the hex string of a string argument. */ - def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase() + def hex(x: String): String = Hex.encodeHexString(x.getBytes(Charsets.UTF_8)).toUpperCase() Review comment: @xueyumusic please enable more IDE checkstyles. `Charsets` is deprecated. I will fix this while merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > getbytes with no charsets test error for hex and toBase64 > - > > Key: FLINK-10174 > URL: https://issues.apache.org/jira/browse/FLINK-10174 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: xueyu >Assignee: xueyu >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Hex and toBase64 builtin method use str.getBytes() with no Charset. It maybe > depend on local execution environment for special Unicode and maybe result in > errors when test Hex for special Unicode -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on a change in pull request #6579: [FLINK-10174][table] fix getbytes in hex and toBase64
twalthr commented on a change in pull request #6579: [FLINK-10174][table] fix getbytes in hex and toBase64 URL: https://github.com/apache/flink/pull/6579#discussion_r215946888 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -221,7 +222,7 @@ object ScalarFunctions { /** * Returns the hex string of a string argument. */ - def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase() + def hex(x: String): String = Hex.encodeHexString(x.getBytes(Charsets.UTF_8)).toUpperCase() Review comment: @xueyumusic please enable more IDE checkstyles. `Charsets` is deprecated. I will fix this while merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10281) Table function parse regular expression contains backslash failed
[ https://issues.apache.org/jira/browse/FLINK-10281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607069#comment-16607069 ] ASF GitHub Bot commented on FLINK-10281: yanghua commented on issue #6659: [FLINK-10281] [table] Table function parse regular expression contains backslash failed URL: https://github.com/apache/flink/pull/6659#issuecomment-419427295 @twalthr It looks good, in fact I only saw the problem of parsing backslashes in single quotes, because we were having trouble using regular expressions. And you have found and solved more problems for special symbols. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Table function parse regular expression contains backslash failed > - > > Key: FLINK-10281 > URL: https://issues.apache.org/jira/browse/FLINK-10281 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > for example, regular expression matches text ("\w") or number ("\d") : > {code:java} > testAllApis( > "foothebar".regexExtract("foo([\\w]+)", 1), //OK, the method got > 'foo([\w]+)' > "'foothebar'.regexExtract('foo([w]+)', 1)", //failed, the method got > 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get compile error. > "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)", //OK, the method got > 'foo([\w]+)' but must pass four '\' > "thebar" > ) > {code} > the "similar to" function has the same issue. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6659: [FLINK-10281] [table] Table function parse regular expression contains backslash failed
yanghua commented on issue #6659: [FLINK-10281] [table] Table function parse regular expression contains backslash failed URL: https://github.com/apache/flink/pull/6659#issuecomment-419427295 @twalthr It looks good, in fact I only saw the problem of parsing backslashes in single quotes, because we were having trouble using regular expressions. And you have found and solved more problems for special symbols. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors
[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607064#comment-16607064 ] ASF GitHub Bot commented on FLINK-7964: --- yanghua commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-419426033 @pnowojski What do you think I follow @eliaslevy advice based on kafka client 2.0 to provide a connector implementation to verify that some discussions are feasible? The existing connector remains unchanged for the time being, what do you think? cc @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Apache Kafka 1.0/1.1 connectors > --- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-7964 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project > Management Committee has packed a number of valuable enhancements into the > release. Here is a summary of a few of them: > * Since its introduction in version 0.10, the Streams API has become hugely > popular among Kafka users, including the likes of Pinterest, Rabobank, > Zalando, and The New York Times. In 1.0, the the API continues to evolve at a > healthy pace. To begin with, the builder API has been improved (KIP-120). A > new API has been added to expose the state of active tasks at runtime > (KIP-130). The new cogroup API makes it much easier to deal with partitioned > aggregates with fewer StateStores and fewer moving parts in your code > (KIP-150). Debuggability gets easier with enhancements to the print() and > writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 > and KIP-161 too. For more on streams, check out the Apache Kafka Streams > documentation, including some helpful new tutorial videos. > * Operating Kafka at scale requires that the system remain observable, and to > make that easier, we’ve made a number of improvements to metrics. These are > too many to summarize without becoming tedious, but Connect metrics have been > significantly improved (KIP-196), a litany of new health check metrics are > now exposed (KIP-188), and we now have a global topic and partition count > (KIP-168). Check out KIP-164 and KIP-187 for even more. > * We now support Java 9, leading, among other things, to significantly faster > TLS and CRC32C implementations. Over-the-wire encryption will be faster now, > which will keep Kafka fast and compute costs low when encryption is enabled. > * In keeping with the security theme, KIP-152 cleans up the error handling on > Simple Authentication Security Layer (SASL) authentication attempts. > Previously, some authentication error conditions were indistinguishable from > broker failures and were not logged in a clear way. This is cleaner now. > * Kafka can now tolerate disk failures better. Historically, JBOD storage > configurations have not been recommended, but the architecture has > nevertheless been tempting: after all, why not rely on Kafka’s own > replication mechanism to protect against storage failure rather than using > RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single > disk failure in a JBOD broker will not bring the entire broker down; rather, > the broker will continue serving any log files that remain on functioning > disks. > * Since release 0.11.0, the idempotent producer (which is the producer used > in the presence of a transaction, which of course is the producer we use for > exactly-once processing) required max.in.flight.requests.per.connection to be > equal to one. As anyone who has written or tested a wire protocol can attest, > this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be > as large as five, relaxing the throughput constraint quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors
yanghua commented on issue #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#issuecomment-419426033 @pnowojski What do you think I follow @eliaslevy advice based on kafka client 2.0 to provide a connector implementation to verify that some discussions are feasible? The existing connector remains unchanged for the time being, what do you think? cc @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10281) Table function parse regular expression contains backslash failed
[ https://issues.apache.org/jira/browse/FLINK-10281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607061#comment-16607061 ] ASF GitHub Bot commented on FLINK-10281: twalthr commented on issue #6659: [FLINK-10281] [table] Table function parse regular expression contains backslash failed URL: https://github.com/apache/flink/pull/6659#issuecomment-419425099 @yanghua I found more issues related to this topic when testing your code. I fixed it in #6671. What do you think about this changes? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Table function parse regular expression contains backslash failed > - > > Key: FLINK-10281 > URL: https://issues.apache.org/jira/browse/FLINK-10281 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > for example, regular expression matches text ("\w") or number ("\d") : > {code:java} > testAllApis( > "foothebar".regexExtract("foo([\\w]+)", 1), //OK, the method got > 'foo([\w]+)' > "'foothebar'.regexExtract('foo([w]+)', 1)", //failed, the method got > 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get compile error. > "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)", //OK, the method got > 'foo([\w]+)' but must pass four '\' > "thebar" > ) > {code} > the "similar to" function has the same issue. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #6659: [FLINK-10281] [table] Table function parse regular expression contains backslash failed
twalthr commented on issue #6659: [FLINK-10281] [table] Table function parse regular expression contains backslash failed URL: https://github.com/apache/flink/pull/6659#issuecomment-419425099 @yanghua I found more issues related to this topic when testing your code. I fixed it in #6671. What do you think about this changes? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10242) Disable latency metrics by default
[ https://issues.apache.org/jira/browse/FLINK-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10242. Resolution: Fixed master: b0522e37181f8485610fd37dda9fc45858b6429e > Disable latency metrics by default > -- > > Key: FLINK-10242 > URL: https://issues.apache.org/jira/browse/FLINK-10242 > Project: Flink > Issue Type: Sub-task > Components: Configuration, Metrics >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With the plethora of recent issue around latency metrics we should disable > them by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9523) Add Kafka examples for Flink Table/SQL API
[ https://issues.apache.org/jira/browse/FLINK-9523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9523: -- Labels: pull-request-available (was: ) > Add Kafka examples for Flink Table/SQL API > --- > > Key: FLINK-9523 > URL: https://issues.apache.org/jira/browse/FLINK-9523 > Project: Flink > Issue Type: Task > Components: Examples >Reporter: Shuyi Chen >Assignee: Mikhail Sokolov >Priority: Major > Labels: pull-request-available > > Given the popularity of Flink SQL and Kafka as streaming source, we want to > add some examples of using Kafka JSON/Avro TableSource in > flink-examples/flink-examples-table module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9523) Add Kafka examples for Flink Table/SQL API
[ https://issues.apache.org/jira/browse/FLINK-9523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607053#comment-16607053 ] ASF GitHub Bot commented on FLINK-9523: --- SokolovMS opened a new pull request #6672: [FLINK-9523] Add Kafka examples for Flink Table/SQL API URL: https://github.com/apache/flink/pull/6672 ## What is the purpose of the change Implementation of FLINK-9523 ## Brief change log Added an example of using Kafka JSON TableSource with executing SQL queries on input table. ## Verifying this change 1. Start ZooKeeper 2. Start Kafka 3. Run Consumer with arguments specified in the JavaDoc 4. Run Producer with arguments specified in the JavaDoc ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no. Could only use them. - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Kafka examples for Flink Table/SQL API > --- > > Key: FLINK-9523 > URL: https://issues.apache.org/jira/browse/FLINK-9523 > Project: Flink > Issue Type: Task > Components: Examples >Reporter: Shuyi Chen >Assignee: Mikhail Sokolov >Priority: Major > Labels: pull-request-available > > Given the popularity of Flink SQL and Kafka as streaming source, we want to > add some examples of using Kafka JSON/Avro TableSource in > flink-examples/flink-examples-table module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10242) Disable latency metrics by default
[ https://issues.apache.org/jira/browse/FLINK-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607054#comment-16607054 ] ASF GitHub Bot commented on FLINK-10242: zentol closed pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default URL: https://github.com/apache/flink/pull/6656 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 98054e94224..02f4ceb162f 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -12,6 +12,11 @@ 128 Defines the number of measured latencies to maintain at each operator. + +metrics.latency.interval +0 +Defines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster. + metrics.reporter.name.parameter (none) diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 7d88a36393c..85c60a67a22 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1638,8 +1638,9 @@ logged by `SystemResourcesMetricsInitializer` during the startup. ## Latency tracking -Flink allows to track the latency of records traveling through the system. To enable the latency tracking -a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. +Flink allows to track the latency of records traveling through the system. This feature is disabled by default. +To enable the latency tracking you must set the `latencyTrackingInterval` to a positive number in either the +[Flink configuration]({{ site.baseurl }}/ops/config.html#metrics-latency-interval) or `ExecutionConfig`. At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMarker`. The marker contains a timestamp from the time when the record has been emitted at the sources. @@ -1659,6 +1660,9 @@ latency issues caused by individual machines. Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results. +Warning Enabling latency metrics can significantly impact the performance +of the cluster. It is highly recommended to only use them for debugging purposes. + ## REST API integration Metrics can be queried through the [Monitoring REST API]({{ site.baseurl }}/monitoring/rest_api.html). diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 59fa803791a..6b7caaac6ec 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.util.Preconditions; @@ -131,7 +132,9 @@ /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ - private long latencyTrackingInterval = 2000L; + private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); + + private boolean isLatencyTrackingConfigured = false; /** * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration @@ -234,8 +237,6 @@ public long getAutoWatermarkInterval() { * Interval for sending latency tracking marks from the sources to the sinks. * Flink will send latency tracking marks from the sources at the specified interval. * -* Recommended value: 2000 (2 seconds). -* * Setting a tracking interval <= 0 disables the latency tracking. * * @param interval Interval in milliseconds. @@ -243,6 +244,7 @@ public long getAutoWatermarkInterval() { @PublicEvolving public ExecutionConfig setLatencyTrackingInterval(long interval) { this.latencyTrackingInterval = interval; + this.isLatencyTrackingConfigured = true; return
[GitHub] zentol closed pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default
zentol closed pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default URL: https://github.com/apache/flink/pull/6656 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 98054e94224..02f4ceb162f 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -12,6 +12,11 @@ 128 Defines the number of measured latencies to maintain at each operator. + +metrics.latency.interval +0 +Defines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster. + metrics.reporter.name.parameter (none) diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 7d88a36393c..85c60a67a22 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1638,8 +1638,9 @@ logged by `SystemResourcesMetricsInitializer` during the startup. ## Latency tracking -Flink allows to track the latency of records traveling through the system. To enable the latency tracking -a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. +Flink allows to track the latency of records traveling through the system. This feature is disabled by default. +To enable the latency tracking you must set the `latencyTrackingInterval` to a positive number in either the +[Flink configuration]({{ site.baseurl }}/ops/config.html#metrics-latency-interval) or `ExecutionConfig`. At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMarker`. The marker contains a timestamp from the time when the record has been emitted at the sources. @@ -1659,6 +1660,9 @@ latency issues caused by individual machines. Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results. +Warning Enabling latency metrics can significantly impact the performance +of the cluster. It is highly recommended to only use them for debugging purposes. + ## REST API integration Metrics can be queried through the [Monitoring REST API]({{ site.baseurl }}/monitoring/rest_api.html). diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 59fa803791a..6b7caaac6ec 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.util.Preconditions; @@ -131,7 +132,9 @@ /** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. */ - private long latencyTrackingInterval = 2000L; + private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); + + private boolean isLatencyTrackingConfigured = false; /** * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration @@ -234,8 +237,6 @@ public long getAutoWatermarkInterval() { * Interval for sending latency tracking marks from the sources to the sinks. * Flink will send latency tracking marks from the sources at the specified interval. * -* Recommended value: 2000 (2 seconds). -* * Setting a tracking interval <= 0 disables the latency tracking. * * @param interval Interval in milliseconds. @@ -243,6 +244,7 @@ public long getAutoWatermarkInterval() { @PublicEvolving public ExecutionConfig setLatencyTrackingInterval(long interval) { this.latencyTrackingInterval = interval; + this.isLatencyTrackingConfigured = true; return this; } @@ -256,12 +258,17 @@ public long getLatencyTrackingInterval() { } /** -* Returns if latency tracking is enabled -* @return True, if the tracking is enabled, false otherwise. +*
[jira] [Commented] (FLINK-10300) Prometheus job level metrics not removed after job finished
[ https://issues.apache.org/jira/browse/FLINK-10300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607051#comment-16607051 ] Helmut Zechmann commented on FLINK-10300: - Yes, we had the problem with flink 1.5.2, but I also reproduced it with flink 1.5.3 using the steps described above. > Prometheus job level metrics not removed after job finished > --- > > Key: FLINK-10300 > URL: https://issues.apache.org/jira/browse/FLINK-10300 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.3 >Reporter: Helmut Zechmann >Priority: Major > > Flink provides job level metrics for flink jobs. After a job is finished > those metrics should be removed, else we run into problems when many jobs are > executed on a cluster. > How to reproduce this: > Setup: > * flink 1.5.3 in standalone mode > * 1 JobManager > * 1 TaskManager > * flink-metrics-prometheus-1.5.3.jar in lib dir > Metrics config: > {code:java} > metrics.reporters: prom > metrics.reporter.prom.class: > org.apache.flink.metrics.prometheus.PrometheusReporter > metrics.reporter.prom.port: 7000-7001 > {code} > Run the wordcount job. After running the job, job related metrics are still > available: > > {code:java} > flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Count{tm_id="ee893c28f70d285e701f838706ce8810",host="helmuts_mbp",} > 1.0 > # HELP flink_taskmanager_job_task_operator_numRecordsOutPerSecond > numRecordsOutPerSecond (scope: taskmanager_job_task_operator) > # TYPE flink_taskmanager_job_task_operator_numRecordsOutPerSecond gauge > flink_taskmanager_job_task_operator_numRecordsOutPerSecond{job_id="2a7c77aacf6b18da389189a3bae6ff48",task_id="529e7a1eaba520b18dc7864f821ada08",task_attempt_id="3bc0d07eb56df676b088a8ec13531c98",host="helmuts_mbp",operator_id="529e7a1eaba520b18dc7864f821ada08",operator_name="DataSource__at_getDefaultTextLineDataSet_WordCountData_java:70___org_apache_flin",task_name="CHAIN_DataSource__at_getDefaultTextLineDataSet_WordCountData_java:70___org_apache_flink_api_java_io_CollectionInputFormat__FlatMap__FlatMap_at_main_WordCount_java:77__Combine__SUM_1___at_main_WordCount_java:80_",task_attempt_num="0",job_name="Flink_Java_Job_at_Fri_Sep_07_13:00:12_CEST_2018",tm_id="ee893c28f70d285e701f838706ce8810",subtask_index="0",} > 0.0 > ... > {code} > > With each finished job the prometheus output gets bigger and bigger until > the prometheus output fails to load. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10281) Table function parse regular expression contains backslash failed
[ https://issues.apache.org/jira/browse/FLINK-10281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607052#comment-16607052 ] ASF GitHub Bot commented on FLINK-10281: twalthr opened a new pull request #6671: [FLINK-10281] [table] Fix string literal parsing in Table API URL: https://github.com/apache/flink/pull/6671 ## What is the purpose of the change This PR fixes the string literal parsing of the Table API. Proper escaping of the quotes was not possible in the past. The logic was overly complicated and was not handling escaping correctly. The new logic relies on the Java/Scala programs escaping and uses duplicate quotes for escaping the quotes (see the docs). ## Brief change log - Rework of the string literals for `ExpressionParser` ## Verifying this change This change added tests and can be verified as follows: - `org.apache.flink.table.expressions.LiteralTest#testStringLiterals` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Table function parse regular expression contains backslash failed > - > > Key: FLINK-10281 > URL: https://issues.apache.org/jira/browse/FLINK-10281 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > for example, regular expression matches text ("\w") or number ("\d") : > {code:java} > testAllApis( > "foothebar".regexExtract("foo([\\w]+)", 1), //OK, the method got > 'foo([\w]+)' > "'foothebar'.regexExtract('foo([w]+)', 1)", //failed, the method got > 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get compile error. > "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)", //OK, the method got > 'foo([\w]+)' but must pass four '\' > "thebar" > ) > {code} > the "similar to" function has the same issue. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] SokolovMS opened a new pull request #6672: [FLINK-9523] Add Kafka examples for Flink Table/SQL API
SokolovMS opened a new pull request #6672: [FLINK-9523] Add Kafka examples for Flink Table/SQL API URL: https://github.com/apache/flink/pull/6672 ## What is the purpose of the change Implementation of FLINK-9523 ## Brief change log Added an example of using Kafka JSON TableSource with executing SQL queries on input table. ## Verifying this change 1. Start ZooKeeper 2. Start Kafka 3. Run Consumer with arguments specified in the JavaDoc 4. Run Producer with arguments specified in the JavaDoc ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no. Could only use them. - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr opened a new pull request #6671: [FLINK-10281] [table] Fix string literal parsing in Table API
twalthr opened a new pull request #6671: [FLINK-10281] [table] Fix string literal parsing in Table API URL: https://github.com/apache/flink/pull/6671 ## What is the purpose of the change This PR fixes the string literal parsing of the Table API. Proper escaping of the quotes was not possible in the past. The logic was overly complicated and was not handling escaping correctly. The new logic relies on the Java/Scala programs escaping and uses duplicate quotes for escaping the quotes (see the docs). ## Brief change log - Rework of the string literals for `ExpressionParser` ## Verifying this change This change added tests and can be verified as follows: - `org.apache.flink.table.expressions.LiteralTest#testStringLiterals` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10290) Conversion error in StreamScan and BatchScan
[ https://issues.apache.org/jira/browse/FLINK-10290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607048#comment-16607048 ] ASF GitHub Bot commented on FLINK-10290: jrthe42 commented on issue #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#issuecomment-419422931 Sorry for the missed checkstyle error, already fixed. @yanghua cc @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Conversion error in StreamScan and BatchScan > > > Key: FLINK-10290 > URL: https://issues.apache.org/jira/browse/FLINK-10290 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.3, 1.6.0 >Reporter: wangsan >Assignee: wangsan >Priority: Major > Labels: pull-request-available > > `RowTypeInfo#equals()` only compares field types, and fields names are not > considered. When checking the equality of `inputType` and `internalType`, we > should compare both filed types and field names. > Behavior of this bug: > A table T with schema (a: Long, b:Long, c:Long) > SELECT b,c,a from T > expected: b,c,a > actually: a,b,c -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jrthe42 commented on issue #6666: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan
jrthe42 commented on issue #: [FLINK-10290] [table] Fix conversion error in StreamScan and BatchScan URL: https://github.com/apache/flink/pull/#issuecomment-419422931 Sorry for the missed checkstyle error, already fixed. @yanghua cc @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10293) RemoteStreamEnvironment does not forward port to RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-10293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607046#comment-16607046 ] ASF GitHub Bot commented on FLINK-10293: zentol closed pull request #6665: [FLINK-10293][streaming] Properly forward REST port for remote environments URL: https://github.com/apache/flink/pull/6665 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 480f981bc75..9c36dab75fd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; @@ -201,6 +202,8 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List configuration.setString(JobManagerOptions.ADDRESS, host); configuration.setInteger(JobManagerOptions.PORT, port); + configuration.setInteger(RestOptions.PORT, port); + final ClusterClient client; try { if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java new file mode 100644 index 000..60ee66f4246 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.environment; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Iterator; + +/** + * Tests for the {@link RemoteStreamEnvironment}. + */ +public class RemoteStreamExecutionEnvironmentTest extends TestLogger { + + private static MiniCluster flink; + + @BeforeClass + public static void setUp() throws Exception { + final Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 0); + + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() + .setConfiguration(config) + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(1) + .build(); + + flink = new MiniCluster(miniClusterConfiguration); + + flink.start(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (flink != null) { + flink.close(); + } + } + + /** +* Verifies that the port passed to the RemoteStreamEnvironment is used for connecting to the cluster. +*/ +
[GitHub] zentol closed pull request #6665: [FLINK-10293][streaming] Properly forward REST port for remote environments
zentol closed pull request #6665: [FLINK-10293][streaming] Properly forward REST port for remote environments URL: https://github.com/apache/flink/pull/6665 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 480f981bc75..9c36dab75fd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; @@ -201,6 +202,8 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List configuration.setString(JobManagerOptions.ADDRESS, host); configuration.setInteger(JobManagerOptions.PORT, port); + configuration.setInteger(RestOptions.PORT, port); + final ClusterClient client; try { if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java new file mode 100644 index 000..60ee66f4246 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.environment; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Iterator; + +/** + * Tests for the {@link RemoteStreamEnvironment}. + */ +public class RemoteStreamExecutionEnvironmentTest extends TestLogger { + + private static MiniCluster flink; + + @BeforeClass + public static void setUp() throws Exception { + final Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 0); + + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() + .setConfiguration(config) + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(1) + .build(); + + flink = new MiniCluster(miniClusterConfiguration); + + flink.start(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (flink != null) { + flink.close(); + } + } + + /** +* Verifies that the port passed to the RemoteStreamEnvironment is used for connecting to the cluster. +*/ + @Test + public void testPortForwarding() throws Exception { + final Configuration clientConfiguration = new Configuration(); + clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0); + +
[jira] [Commented] (FLINK-10242) Disable latency metrics by default
[ https://issues.apache.org/jira/browse/FLINK-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607032#comment-16607032 ] ASF GitHub Bot commented on FLINK-10242: zentol commented on a change in pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default URL: https://github.com/apache/flink/pull/6656#discussion_r215938129 ## File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ## @@ -104,6 +104,12 @@ .defaultValue(".taskmanager") .withDescription("Defines the scope format string that is applied to all metrics scoped to an operator."); + public static final ConfigOption LATENCY_INTERVAL = + key("metrics.latency.interval") + .defaultValue(0L) + .withDescription("Defines the interval at which latency tracking marks are emitted from the sources." + Review comment: I'll update the docs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Disable latency metrics by default > -- > > Key: FLINK-10242 > URL: https://issues.apache.org/jira/browse/FLINK-10242 > Project: Flink > Issue Type: Sub-task > Components: Configuration, Metrics >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With the plethora of recent issue around latency metrics we should disable > them by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default
zentol commented on a change in pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default URL: https://github.com/apache/flink/pull/6656#discussion_r215938129 ## File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ## @@ -104,6 +104,12 @@ .defaultValue(".taskmanager") .withDescription("Defines the scope format string that is applied to all metrics scoped to an operator."); + public static final ConfigOption LATENCY_INTERVAL = + key("metrics.latency.interval") + .defaultValue(0L) + .withDescription("Defines the interval at which latency tracking marks are emitted from the sources." + Review comment: I'll update the docs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6450: [FLINK-9991] [table] Add regexp_replace supported in TableAPI and SQL
yanghua commented on issue #6450: [FLINK-9991] [table] Add regexp_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#issuecomment-419417892 @xccui Can you review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9991) Add regexp_replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607029#comment-16607029 ] ASF GitHub Bot commented on FLINK-9991: --- yanghua commented on issue #6450: [FLINK-9991] [table] Add regexp_replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6450#issuecomment-419417892 @xccui Can you review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add regexp_replace supported in TableAPI and SQL > > > Key: FLINK-9991 > URL: https://issues.apache.org/jira/browse/FLINK-9991 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regexp_replace is a very userful function to process String. > For example : > {code:java} > regexp_replace("foobar", "oo|ar", "") //returns 'fb.' > {code} > It is supported as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10300) Prometheus job level metrics not removed after job finished
[ https://issues.apache.org/jira/browse/FLINK-10300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607025#comment-16607025 ] Chesnay Schepler commented on FLINK-10300: -- On the ML you said you were using 1.5.2, can i conclude that this issue occurs in both versions? There was a similar issue that was (supposedly) fixed for 1.5.1: FLINK-9665 > Prometheus job level metrics not removed after job finished > --- > > Key: FLINK-10300 > URL: https://issues.apache.org/jira/browse/FLINK-10300 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.3 >Reporter: Helmut Zechmann >Priority: Major > > Flink provides job level metrics for flink jobs. After a job is finished > those metrics should be removed, else we run into problems when many jobs are > executed on a cluster. > How to reproduce this: > Setup: > * flink 1.5.3 in standalone mode > * 1 JobManager > * 1 TaskManager > * flink-metrics-prometheus-1.5.3.jar in lib dir > Metrics config: > {code:java} > metrics.reporters: prom > metrics.reporter.prom.class: > org.apache.flink.metrics.prometheus.PrometheusReporter > metrics.reporter.prom.port: 7000-7001 > {code} > Run the wordcount job. After running the job, job related metrics are still > available: > > {code:java} > flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Count{tm_id="ee893c28f70d285e701f838706ce8810",host="helmuts_mbp",} > 1.0 > # HELP flink_taskmanager_job_task_operator_numRecordsOutPerSecond > numRecordsOutPerSecond (scope: taskmanager_job_task_operator) > # TYPE flink_taskmanager_job_task_operator_numRecordsOutPerSecond gauge > flink_taskmanager_job_task_operator_numRecordsOutPerSecond{job_id="2a7c77aacf6b18da389189a3bae6ff48",task_id="529e7a1eaba520b18dc7864f821ada08",task_attempt_id="3bc0d07eb56df676b088a8ec13531c98",host="helmuts_mbp",operator_id="529e7a1eaba520b18dc7864f821ada08",operator_name="DataSource__at_getDefaultTextLineDataSet_WordCountData_java:70___org_apache_flin",task_name="CHAIN_DataSource__at_getDefaultTextLineDataSet_WordCountData_java:70___org_apache_flink_api_java_io_CollectionInputFormat__FlatMap__FlatMap_at_main_WordCount_java:77__Combine__SUM_1___at_main_WordCount_java:80_",task_attempt_num="0",job_name="Flink_Java_Job_at_Fri_Sep_07_13:00:12_CEST_2018",tm_id="ee893c28f70d285e701f838706ce8810",subtask_index="0",} > 0.0 > ... > {code} > > With each finished job the prometheus output gets bigger and bigger until > the prometheus output fails to load. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10301) Allow a custom Configuration in StreamNetworkBenchmarkEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607024#comment-16607024 ] ASF GitHub Bot commented on FLINK-10301: zentol commented on a change in pull request #6670: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances URL: https://github.com/apache/flink/pull/6670#discussion_r215934568 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -115,7 +115,8 @@ public void setUp( int channels, boolean localMode, int senderBufferPoolSize, - int receiverBufferPoolSize) throws Exception { + int receiverBufferPoolSize, + Configuration config) throws Exception { Review comment: How about overloading this method instead with a variant that also accepts a configuration? Then it would be easier to create benchmarks across different flink versions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow a custom Configuration in StreamNetworkBenchmarkEnvironment > - > > Key: FLINK-10301 > URL: https://issues.apache.org/jira/browse/FLINK-10301 > Project: Flink > Issue Type: Improvement > Components: Network, Tests >Affects Versions: 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{StreamNetworkBenchmarkEnvironment}} currently instantiates its > {{NettyConfig}} with {{new Configuration()}} but for testing with different > options, it would be nice to allow the user to provide a custom instance > instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6670: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances
zentol commented on a change in pull request #6670: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances URL: https://github.com/apache/flink/pull/6670#discussion_r215934568 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -115,7 +115,8 @@ public void setUp( int channels, boolean localMode, int senderBufferPoolSize, - int receiverBufferPoolSize) throws Exception { + int receiverBufferPoolSize, + Configuration config) throws Exception { Review comment: How about overloading this method instead with a variant that also accepts a configuration? Then it would be easier to create benchmarks across different flink versions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10301) Allow a custom Configuration in StreamNetworkBenchmarkEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10301: --- Labels: pull-request-available (was: ) > Allow a custom Configuration in StreamNetworkBenchmarkEnvironment > - > > Key: FLINK-10301 > URL: https://issues.apache.org/jira/browse/FLINK-10301 > Project: Flink > Issue Type: Improvement > Components: Network, Tests >Affects Versions: 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{StreamNetworkBenchmarkEnvironment}} currently instantiates its > {{NettyConfig}} with {{new Configuration()}} but for testing with different > options, it would be nice to allow the user to provide a custom instance > instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10301) Allow a custom Configuration in StreamNetworkBenchmarkEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607016#comment-16607016 ] ASF GitHub Bot commented on FLINK-10301: NicoK opened a new pull request #6670: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances URL: https://github.com/apache/flink/pull/6670 ## What is the purpose of the change `StreamNetworkBenchmarkEnvironment` currently instantiates its `NettyConfig` with `new Configuration()` but for testing and benchmarking with different options, it would be nice to allow the user to provide a custom instance instead. ## Brief change log - extend `StreamNetworkBenchmarkEnvironment#createNettyNetworkEnvironment` and related methods with a `Configuration` parameter ## Verifying this change This change is already covered by existing tests, such as `StreamNetworkPointToPointBenchmarkTest` and `StreamNetworkThroughputBenchmarkTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow a custom Configuration in StreamNetworkBenchmarkEnvironment > - > > Key: FLINK-10301 > URL: https://issues.apache.org/jira/browse/FLINK-10301 > Project: Flink > Issue Type: Improvement > Components: Network, Tests >Affects Versions: 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{StreamNetworkBenchmarkEnvironment}} currently instantiates its > {{NettyConfig}} with {{new Configuration()}} but for testing with different > options, it would be nice to allow the user to provide a custom instance > instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK opened a new pull request #6670: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances
NicoK opened a new pull request #6670: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances URL: https://github.com/apache/flink/pull/6670 ## What is the purpose of the change `StreamNetworkBenchmarkEnvironment` currently instantiates its `NettyConfig` with `new Configuration()` but for testing and benchmarking with different options, it would be nice to allow the user to provide a custom instance instead. ## Brief change log - extend `StreamNetworkBenchmarkEnvironment#createNettyNetworkEnvironment` and related methods with a `Configuration` parameter ## Verifying this change This change is already covered by existing tests, such as `StreamNetworkPointToPointBenchmarkTest` and `StreamNetworkThroughputBenchmarkTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10301) Allow a custom Configuration in StreamNetworkBenchmarkEnvironment
Nico Kruber created FLINK-10301: --- Summary: Allow a custom Configuration in StreamNetworkBenchmarkEnvironment Key: FLINK-10301 URL: https://issues.apache.org/jira/browse/FLINK-10301 Project: Flink Issue Type: Improvement Components: Network, Tests Affects Versions: 1.7.0 Reporter: Nico Kruber Assignee: Nico Kruber {{StreamNetworkBenchmarkEnvironment}} currently instantiates its {{NettyConfig}} with {{new Configuration()}} but for testing with different options, it would be nice to allow the user to provide a custom instance instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10242) Disable latency metrics by default
[ https://issues.apache.org/jira/browse/FLINK-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607003#comment-16607003 ] ASF GitHub Bot commented on FLINK-10242: StefanRRichter commented on a change in pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default URL: https://github.com/apache/flink/pull/6656#discussion_r215928553 ## File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ## @@ -104,6 +104,12 @@ .defaultValue(".taskmanager") .withDescription("Defines the scope format string that is applied to all metrics scoped to an operator."); + public static final ConfigOption LATENCY_INTERVAL = + key("metrics.latency.interval") + .defaultValue(0L) + .withDescription("Defines the interval at which latency tracking marks are emitted from the sources." + Review comment: Same for the documentation I guess and mention that it is disabled by default. Right now I have to conclude this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Disable latency metrics by default > -- > > Key: FLINK-10242 > URL: https://issues.apache.org/jira/browse/FLINK-10242 > Project: Flink > Issue Type: Sub-task > Components: Configuration, Metrics >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With the plethora of recent issue around latency metrics we should disable > them by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default
StefanRRichter commented on a change in pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default URL: https://github.com/apache/flink/pull/6656#discussion_r215928553 ## File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ## @@ -104,6 +104,12 @@ .defaultValue(".taskmanager") .withDescription("Defines the scope format string that is applied to all metrics scoped to an operator."); + public static final ConfigOption LATENCY_INTERVAL = + key("metrics.latency.interval") + .defaultValue(0L) + .withDescription("Defines the interval at which latency tracking marks are emitted from the sources." + Review comment: Same for the documentation I guess and mention that it is disabled by default. Right now I have to conclude this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10242) Disable latency metrics by default
[ https://issues.apache.org/jira/browse/FLINK-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607001#comment-16607001 ] ASF GitHub Bot commented on FLINK-10242: StefanRRichter commented on a change in pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default URL: https://github.com/apache/flink/pull/6656#discussion_r215928326 ## File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ## @@ -104,6 +104,12 @@ .defaultValue(".taskmanager") .withDescription("Defines the scope format string that is applied to all metrics scoped to an operator."); + public static final ConfigOption LATENCY_INTERVAL = + key("metrics.latency.interval") + .defaultValue(0L) + .withDescription("Defines the interval at which latency tracking marks are emitted from the sources." + Review comment: Maybe enhance the description a bit to warn that this can have significant overhead and should more be used for debugging and not so munch in production? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Disable latency metrics by default > -- > > Key: FLINK-10242 > URL: https://issues.apache.org/jira/browse/FLINK-10242 > Project: Flink > Issue Type: Sub-task > Components: Configuration, Metrics >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > With the plethora of recent issue around latency metrics we should disable > them by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default
StefanRRichter commented on a change in pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default URL: https://github.com/apache/flink/pull/6656#discussion_r215928326 ## File path: flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java ## @@ -104,6 +104,12 @@ .defaultValue(".taskmanager") .withDescription("Defines the scope format string that is applied to all metrics scoped to an operator."); + public static final ConfigOption LATENCY_INTERVAL = + key("metrics.latency.interval") + .defaultValue(0L) + .withDescription("Defines the interval at which latency tracking marks are emitted from the sources." + Review comment: Maybe enhance the description a bit to warn that this can have significant overhead and should more be used for debugging and not so munch in production? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10300) Prometheus job level metrics not removed after job finished
Helmut Zechmann created FLINK-10300: --- Summary: Prometheus job level metrics not removed after job finished Key: FLINK-10300 URL: https://issues.apache.org/jira/browse/FLINK-10300 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.5.3 Reporter: Helmut Zechmann Flink provides job level metrics for flink jobs. After a job is finished those metrics should be removed, else we run into problems when many jobs are executed on a cluster. How to reproduce this: Setup: * flink 1.5.3 in standalone mode * 1 JobManager * 1 TaskManager * flink-metrics-prometheus-1.5.3.jar in lib dir Metrics config: {code:java} metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: 7000-7001 {code} Run the wordcount job. After running the job, job related metrics are still available: {code:java} flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Count{tm_id="ee893c28f70d285e701f838706ce8810",host="helmuts_mbp",} 1.0 # HELP flink_taskmanager_job_task_operator_numRecordsOutPerSecond numRecordsOutPerSecond (scope: taskmanager_job_task_operator) # TYPE flink_taskmanager_job_task_operator_numRecordsOutPerSecond gauge flink_taskmanager_job_task_operator_numRecordsOutPerSecond{job_id="2a7c77aacf6b18da389189a3bae6ff48",task_id="529e7a1eaba520b18dc7864f821ada08",task_attempt_id="3bc0d07eb56df676b088a8ec13531c98",host="helmuts_mbp",operator_id="529e7a1eaba520b18dc7864f821ada08",operator_name="DataSource__at_getDefaultTextLineDataSet_WordCountData_java:70___org_apache_flin",task_name="CHAIN_DataSource__at_getDefaultTextLineDataSet_WordCountData_java:70___org_apache_flink_api_java_io_CollectionInputFormat__FlatMap__FlatMap_at_main_WordCount_java:77__Combine__SUM_1___at_main_WordCount_java:80_",task_attempt_num="0",job_name="Flink_Java_Job_at_Fri_Sep_07_13:00:12_CEST_2018",tm_id="ee893c28f70d285e701f838706ce8810",subtask_index="0",} 0.0 ... {code} With each finished job the prometheus output gets bigger and bigger until the prometheus output fails to load. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool
[ https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606958#comment-16606958 ] Chesnay Schepler commented on FLINK-10247: -- A possibly less invasive option would be to introduce a dedicated thread-pool into the MetricQueryService that is used for processing messages. So we keep the MQS in the same actor system as the dispatcher, but pass all messages to the thread-pool for processing. > Run MetricQueryService in separate thread pool > -- > > Key: FLINK-10247 > URL: https://issues.apache.org/jira/browse/FLINK-10247 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > In order to make the {{MetricQueryService}} run independently of the main > Flink components, it should get its own dedicated thread pool assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7800) Enable window joins without equi-join predicates
[ https://issues.apache.org/jira/browse/FLINK-7800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606916#comment-16606916 ] ASF GitHub Bot commented on FLINK-7800: --- xccui commented on issue #4934: [FLINK-7800] [table] Enable window joins without equi-join predicates URL: https://github.com/apache/flink/pull/4934#issuecomment-419388818 Sounds good. Then I'll do some refactorings to make sure that it works correctly for the current codebase. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable window joins without equi-join predicates > > > Key: FLINK-7800 > URL: https://issues.apache.org/jira/browse/FLINK-7800 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > Currently, windowed joins can only be translated if they have at least on > equi-join predicate. This limitation exists due to the lack of a good cross > join strategy for the DataSet API. > Due to the window, windowed joins do not have to be executed as cross joins. > Hence, the equi-join limitation does not need to be enforces (even though > non-equi joins are executed with a parallelism of 1 right now). > We can resolve this issue by adding a boolean flag to the > {{FlinkLogicalJoinConverter}} rule to permit non-equi joins and add such a > rule to the logical optimization set of the DataStream API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10299) RowSerializer.copy data value cast exception
[ https://issues.apache.org/jira/browse/FLINK-10299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606915#comment-16606915 ] Fabian Hueske commented on FLINK-10299: --- Thanks for reporting this issue [~ambition]. Can you provide a test case to reproduce the problem? > RowSerializer.copy data value cast exception > > > Key: FLINK-10299 > URL: https://issues.apache.org/jira/browse/FLINK-10299 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.0 >Reporter: ambition >Priority: Minor > Attachments: image-2018-09-07-17-47-04-343.png > > > Flink sql deal with User behavior data collection, such as: > {code:java} > { > "event_id": "session_start", > "timestamp": "-", // error data, > "viewport_height": "667", > "viewport_width": "-" //error data > } > {code} > Causing exception info : > {code:java} > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - > Could not restart the job Flink Streaming Job > (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it. > java.lang.ClassCastException: java.lang.String cannot be cast to > java.sql.Timestamp > at > org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - > Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91. > 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO > checkpoint.StandaloneCompletedCheckpointStore > (StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down > {code} > we use Flink checkpoint function and Uncatch exception lead to Could not > restart this job, so we just simple ,hope flink commiter provide better > solution。 > !image-2018-09-07-17-47-04-343.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on issue #4934: [FLINK-7800] [table] Enable window joins without equi-join predicates
xccui commented on issue #4934: [FLINK-7800] [table] Enable window joins without equi-join predicates URL: https://github.com/apache/flink/pull/4934#issuecomment-419388818 Sounds good. Then I'll do some refactorings to make sure that it works correctly for the current codebase. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7800) Enable window joins without equi-join predicates
[ https://issues.apache.org/jira/browse/FLINK-7800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606908#comment-16606908 ] ASF GitHub Bot commented on FLINK-7800: --- fhueske commented on issue #4934: [FLINK-7800] [table] Enable window joins without equi-join predicates URL: https://github.com/apache/flink/pull/4934#issuecomment-419387750 If I remember correctly, we would have to disable some checks for equi-joins and push them to later optimization phases. It's probably not a super important features, but then again it is fairly easy to add and we also support non-keyed window aggregates which run with parallelism 1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable window joins without equi-join predicates > > > Key: FLINK-7800 > URL: https://issues.apache.org/jira/browse/FLINK-7800 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui >Priority: Major > Labels: pull-request-available > > Currently, windowed joins can only be translated if they have at least on > equi-join predicate. This limitation exists due to the lack of a good cross > join strategy for the DataSet API. > Due to the window, windowed joins do not have to be executed as cross joins. > Hence, the equi-join limitation does not need to be enforces (even though > non-equi joins are executed with a parallelism of 1 right now). > We can resolve this issue by adding a boolean flag to the > {{FlinkLogicalJoinConverter}} rule to permit non-equi joins and add such a > rule to the logical optimization set of the DataStream API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #4934: [FLINK-7800] [table] Enable window joins without equi-join predicates
fhueske commented on issue #4934: [FLINK-7800] [table] Enable window joins without equi-join predicates URL: https://github.com/apache/flink/pull/4934#issuecomment-419387750 If I remember correctly, we would have to disable some checks for equi-joins and push them to later optimization phases. It's probably not a super important features, but then again it is fairly easy to add and we also support non-keyed window aggregates which run with parallelism 1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services