[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6040 @snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be bit flaky, so you can safely ignore that for now. I'll take another look at your changes soon. Thanks! ---
[jira] [Commented] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics
[ https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480262#comment-16480262 ] ASF GitHub Bot commented on FLINK-9349: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6040 @snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be bit flaky, so you can safely ignore that for now. I'll take another look at your changes soon. Thanks! > KafkaConnector Exception while fetching from multiple kafka topics > --- > > Key: FLINK-9349 > URL: https://issues.apache.org/jira/browse/FLINK-9349 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0, 1.5.0 >Reporter: Vishal Santoshi >Assignee: Sergey Nuyanzin >Priority: Critical > Attachments: Flink9349Test.java > > > ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java > > It seems the List subscribedPartitionStates was being modified when > runFetchLoop iterated the List. > This can happen if, e.g., FlinkKafkaConsumer runs the following code > concurrently: > kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); > > {code:java} > java.util.ConcurrentModificationException > at > java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966) > at java.util.LinkedList$ListItr.next(LinkedList.java:888) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5978: [FLINK-8554] Upgrade AWS SDK
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5978 Are there anything in `1.11.325` we desperately need? If not, I would oppose upgrading AWS SDK too frequently. Highly likely that we don't need any of the new changes in `1.11.325`. As you can see, the current sdk version is `1.11.319` which is upgraded just a few days ago. There're a few reasons we should discourage it: - It doesn't add much value, and we don't really need it - It costs lots of unnecessary work from both contributors and Flink community (committers, reviewers, etc) - AWS releases their SDK very frequently, in a much faster pace than we can possibly catch up ---
[jira] [Commented] (FLINK-8554) Upgrade AWS SDK
[ https://issues.apache.org/jira/browse/FLINK-8554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480264#comment-16480264 ] ASF GitHub Bot commented on FLINK-8554: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5978 Are there anything in `1.11.325` we desperately need? If not, I would oppose upgrading AWS SDK too frequently. Highly likely that we don't need any of the new changes in `1.11.325`. As you can see, the current sdk version is `1.11.319` which is upgraded just a few days ago. There're a few reasons we should discourage it: - It doesn't add much value, and we don't really need it - It costs lots of unnecessary work from both contributors and Flink community (committers, reviewers, etc) - AWS releases their SDK very frequently, in a much faster pace than we can possibly catch up > Upgrade AWS SDK > --- > > Key: FLINK-8554 > URL: https://issues.apache.org/jira/browse/FLINK-8554 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > AWS SDK 1.11.271 fixes a lot of bugs. > One of which would exhibit the following: > {code} > Caused by: java.lang.NullPointerException > at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729) > at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67) > at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5374 @cjolif since we have now reached a conclusion on where the Elasticsearch connector should be improved in the future, could you maybe close this PR? I assume a new PR will be opened that subsumes this one. ---
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480277#comment-16480277 ] ASF GitHub Bot commented on FLINK-8101: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5374 @cjolif since we have now reached a conclusion on where the Elasticsearch connector should be improved in the future, could you maybe close this PR? I assume a new PR will be opened that subsumes this one. > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480280#comment-16480280 ] ASF GitHub Bot commented on FLINK-8101: --- Github user cjolif closed the pull request at: https://github.com/apache/flink/pull/5374 > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...
Github user cjolif closed the pull request at: https://github.com/apache/flink/pull/5374 ---
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480279#comment-16480279 ] ASF GitHub Bot commented on FLINK-8101: --- Github user cjolif commented on the issue: https://github.com/apache/flink/pull/5374 @tzulitai sure. done. > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...
Github user cjolif commented on the issue: https://github.com/apache/flink/pull/5374 @tzulitai sure. done. ---
[GitHub] flink issue #5823: [FLINK-9008] [e2e] Implements quickstarts end to end test
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5823 Thanks @zhangminglei. This looks good to merge, will proceed to merge this. ---
[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480282#comment-16480282 ] ASF GitHub Bot commented on FLINK-9008: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5823 Thanks @zhangminglei. This looks good to merge, will proceed to merge this. > End-to-end test: Quickstarts > > > Key: FLINK-9008 > URL: https://issues.apache.org/jira/browse/FLINK-9008 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Blocker > Fix For: 1.6.0 > > > We could add an end-to-end test which verifies Flink's quickstarts. It should > do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9387) Several log message errors in queryable-state module
[ https://issues.apache.org/jira/browse/FLINK-9387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480284#comment-16480284 ] Kostas Kloudas commented on FLINK-9387: --- Well there is no change apart from the message wording right? Both before and after it is future.cause(). Or am I missing something? > Several log message errors in queryable-state module > > > Key: FLINK-9387 > URL: https://issues.apache.org/jira/browse/FLINK-9387 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.5.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189185186 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { +
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480292#comment-16480292 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189185186 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = record
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480293#comment-16480293 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189185420 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); --- End diff -- I would skip the initialization in the constructor, if you have he initialization in `checkAvroInitialized()`. Simpler, and avoids having two places that to the initialization which have to be kept in sync. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189185420 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); --- End diff -- I would skip the initialization in the constructor, if you have he initialization in `checkAvroInitialized()`. Simpler, and avoids having two places that to the initialization which have to be kept in sync. ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185047 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ --- End diff -- There probably should be some verification that the job actually runs with DOP=4 ---
[jira] [Commented] (FLINK-9387) Several log message errors in queryable-state module
[ https://issues.apache.org/jira/browse/FLINK-9387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480296#comment-16480296 ] vinoyang commented on FLINK-9387: - this change added a log variable placeholder for future.cause(), the original code missed. > Several log message errors in queryable-state module > > > Key: FLINK-9387 > URL: https://issues.apache.org/jira/browse/FLINK-9387 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.5.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185689 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test information APIs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list -s" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list -r" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test operation on running streaming jobs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +RETURN=`$FLINK_DIR/bin/flink run -d \ +$PERIODIC_JOB_JAR --outputPath file:///${TEST_DATA_DIR}/out/result` +echo "job submission returns: $RETURN" +JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"` +eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}" +EXIT_CODE=$? +fi + +printf "\n
[jira] [Commented] (FLINK-8985) End-to-end test: CLI
[ https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480298#comment-16480298 ] ASF GitHub Bot commented on FLINK-8985: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185232 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test information APIs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" --- End diff -- Should we verify the output of `info`? > End-to-end test: CLI > > > Key: FLINK-8985 > URL: https://issues.apache.org/jira/browse/FLINK-8985 > Project: Flink > Issue Type: Sub-task > Components: Client, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Rong Rong >Priority: Major > > We should an end-to-end test which verifies that all client commands are > working correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185232 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test information APIs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" --- End diff -- Should we verify the output of `info`? ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185140 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ --- End diff -- Since this is a detached execution, we probably want to wait until this job completes before continuing? ---
[jira] [Commented] (FLINK-8985) End-to-end test: CLI
[ https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480301#comment-16480301 ] ASF GitHub Bot commented on FLINK-8985: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185047 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ --- End diff -- There probably should be some verification that the job actually runs with DOP=4 > End-to-end test: CLI > > > Key: FLINK-8985 > URL: https://issues.apache.org/jira/browse/FLINK-8985 > Project: Flink > Issue Type: Sub-task > Components: Client, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Rong Rong >Priority: Major > > We should an end-to-end test which verifies that all client commands are > working correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185253 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test information APIs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list" --- End diff -- Should we verify the output of `list`? ---
[jira] [Commented] (FLINK-8985) End-to-end test: CLI
[ https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480297#comment-16480297 ] ASF GitHub Bot commented on FLINK-8985: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185253 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test information APIs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list" --- End diff -- Should we verify the output of `list`? > End-to-end test: CLI > > > Key: FLINK-8985 > URL: https://issues.apache.org/jira/browse/FLINK-8985 > Project: Flink > Issue Type: Sub-task > Components: Client, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Rong Rong >Priority: Major > > We should an end-to-end test which verifies that all client commands are > working correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8985) End-to-end test: CLI
[ https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480300#comment-16480300 ] ASF GitHub Bot commented on FLINK-8985: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185140 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ --- End diff -- Since this is a detached execution, we probably want to wait until this job completes before continuing? > End-to-end test: CLI > > > Key: FLINK-8985 > URL: https://issues.apache.org/jira/browse/FLINK-8985 > Project: Flink > Issue Type: Sub-task > Components: Client, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Rong Rong >Priority: Major > > We should an end-to-end test which verifies that all client commands are > working correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8985) End-to-end test: CLI
[ https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480299#comment-16480299 ] ASF GitHub Bot commented on FLINK-8985: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189185689 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ + -c org.apache.flink.examples.java.wordcount.WordCount \ + $FLINK_DIR/examples/batch/WordCount.jar \ + --input file:///$FLINK_DIR/README.txt \ + --output file:///${TEST_DATA_DIR}/out/result" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test information APIs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink info $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list -s" +EXIT_CODE=$? +fi +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink list -r" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test operation on running streaming jobs\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +RETURN=`$FLINK_DIR/bin/flink run -d \ +$PERIODIC_JOB_JAR --outputPath file:///${TEST_DATA_DIR}/out/result` +echo "job submission returns: $
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5849 Hi @walterddr, what is the status of this PR? Would be nice if we can move forward with this PR (and also the CLI e2e test PR that also you opened.) ---
[jira] [Commented] (FLINK-8986) End-to-end test: REST
[ https://issues.apache.org/jira/browse/FLINK-8986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480304#comment-16480304 ] ASF GitHub Bot commented on FLINK-8986: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5849 Hi @walterddr, what is the status of this PR? Would be nice if we can move forward with this PR (and also the CLI e2e test PR that also you opened.) > End-to-end test: REST > - > > Key: FLINK-8986 > URL: https://issues.apache.org/jira/browse/FLINK-8986 > Project: Flink > Issue Type: Sub-task > Components: REST, Tests >Reporter: Till Rohrmann >Assignee: Rong Rong >Priority: Major > > We should add an end-to-end test which verifies that we can use the REST > interface to obtain information about a running job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6041: [FLINK-9326] TaskManagerOptions.NUM_TASK_SLOTS does not w...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6041 cc @tillrohrmann ---
[jira] [Commented] (FLINK-9326) TaskManagerOptions.NUM_TASK_SLOTS does not work for local/embedded mode
[ https://issues.apache.org/jira/browse/FLINK-9326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480307#comment-16480307 ] ASF GitHub Bot commented on FLINK-9326: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6041 cc @tillrohrmann > TaskManagerOptions.NUM_TASK_SLOTS does not work for local/embedded mode > --- > > Key: FLINK-9326 > URL: https://issues.apache.org/jira/browse/FLINK-9326 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0 > Environment: Linux 64bit > Flink branch release-1.5 >Reporter: Samuel Doyle >Assignee: vinoyang >Priority: Major > > When attempting to set the number of task slots via the api such ash > {code:java} > configuration = new Configuration(); > configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 16); > configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, 1); > {code} > I will always end up with the default slot setting based on the number of > cores I have where my standalone instance is running, it doesn't matter what > I set the the NUM_TASK_SLOTS to, it has no effect -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6039 Thanks, I'll also commit an empty release notes page to the 1.6 / master docs. ---
[GitHub] flink issue #5798: [FLINK-7917] The return of taskInformationOrBlobKey shoul...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5798 cc @kl0u ---
[jira] [Commented] (FLINK-7917) The return of taskInformationOrBlobKey should be placed inside synchronized in ExecutionJobVertex
[ https://issues.apache.org/jira/browse/FLINK-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480308#comment-16480308 ] ASF GitHub Bot commented on FLINK-7917: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5798 cc @kl0u > The return of taskInformationOrBlobKey should be placed inside synchronized > in ExecutionJobVertex > - > > Key: FLINK-7917 > URL: https://issues.apache.org/jira/browse/FLINK-7917 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > Currently in ExecutionJobVertex#getTaskInformationOrBlobKey: > {code} > } > return taskInformationOrBlobKey; > {code} > The return should be placed inside synchronized block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9393) LocatableInputSplit#hashCode should take hostnames into account
[ https://issues.apache.org/jira/browse/FLINK-9393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480310#comment-16480310 ] Stephan Ewen commented on FLINK-9393: - The leaking of the hostnames array make {{equals()}} vulnerable, it still does not violate the equals/hashcode contract. In some sense it is almost an argument to not change {{hashCode()}}, unless the argument is "equals is partly broken, so hash code should be broken as well". The way that "splitNumber" gets initialized, it is a perfect hashcode, no collisions at all. That is admittedly not guaranteed from the class itself, but it is the effect of the way it is currently used. Adding the hostnames to the hash code can introduce some collisions that were not there before. Its not going to be a problem, but looking at the bigger picture, things may always be a bit different than an individual hashCode function tells. I am not opposed to fixing this issue. What I am trying to do is make contributors aware of the mechanics and implications of contributing to such a large project. We can probably spend two years of everyone's time to just rewrite parts of the code that are already fine, but could be done differently to satisfy some checkstyle, default pattern, common guideline, etc. Every time we do that, we have to be really careful, because changing something that is not wrong cannot really make anything better, but could make things worse (again, implications can be subtle), so great care is needed, and often more time (from reviewing committers) than it may initially seem. Please understand that I am only writing this in order to raise some awareness. I have seen projects that came into a state that they mainly worked on "moving things around" internally, and slowed down a lot on adding user/developer benefit. So in my opinion we should. In that sense, yes, please go ahead, this is fine to be changed and merged. Just take the above into account when looking for parts of Flink you want to change. > LocatableInputSplit#hashCode should take hostnames into account > --- > > Key: FLINK-9393 > URL: https://issues.apache.org/jira/browse/FLINK-9393 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > > Currently: > {code} > public int hashCode() { > return this.splitNumber; > {code} > This is not symmetrical with {{equals}} method where hostnames are compared. > LocatableInputSplit#hashCode should take hostnames into account. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6039 ---
[jira] [Closed] (FLINK-9397) Individual Buffer Timeout of 0 incorrectly leads to default timeout
[ https://issues.apache.org/jira/browse/FLINK-9397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-9397. --- > Individual Buffer Timeout of 0 incorrectly leads to default timeout > --- > > Key: FLINK-9397 > URL: https://issues.apache.org/jira/browse/FLINK-9397 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.4.3 > > > When configuring the buffer timeout of an individual operation to {{0}}, the > StreamGraphGenerator incorrectly uses the default value of the application > (typically 100). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9397) Individual Buffer Timeout of 0 incorrectly leads to default timeout
[ https://issues.apache.org/jira/browse/FLINK-9397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-9397. - Resolution: Fixed Fixed in - 1.4.3 via 611ead023ff0be995a4cebb42d579e7e39442c3c - 1.5.0 via 884c2e39b401fc4f1e0623e856008af53ed5f98e - 1.6.0 via f41debef3f98cbbd49107fd02e101baa7c94fc6e > Individual Buffer Timeout of 0 incorrectly leads to default timeout > --- > > Key: FLINK-9397 > URL: https://issues.apache.org/jira/browse/FLINK-9397 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.4.3 > > > When configuring the buffer timeout of an individual operation to {{0}}, the > StreamGraphGenerator incorrectly uses the default value of the application > (typically 100). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9392) Add @FunctionalInterface annotations to all core functional interfaces
[ https://issues.apache.org/jira/browse/FLINK-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-9392. - Resolution: Fixed Fix Version/s: 1.6.0 Fixed in fafef15eaca1f4277ac1dddb018a8e01eca22817 > Add @FunctionalInterface annotations to all core functional interfaces > -- > > Key: FLINK-9392 > URL: https://issues.apache.org/jira/browse/FLINK-9392 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.6.0 > > > The {{@FunctionalInterface}} annotation should be added to all SAM interfaces > in order to prevent accidentally breaking them (as non SAMs). > We had a case of that before for the {{SinkFunction}} which was compatible > through default methods, but incompatible for users that previously > instantiated that interface through a lambda. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9392) Add @FunctionalInterface annotations to all core functional interfaces
[ https://issues.apache.org/jira/browse/FLINK-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-9392. --- > Add @FunctionalInterface annotations to all core functional interfaces > -- > > Key: FLINK-9392 > URL: https://issues.apache.org/jira/browse/FLINK-9392 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.6.0 > > > The {{@FunctionalInterface}} annotation should be added to all SAM interfaces > in order to prevent accidentally breaking them (as non SAMs). > We had a case of that before for the {{SinkFunction}} which was compatible > through default methods, but incompatible for users that previously > instantiated that interface through a lambda. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189195014 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + + private static final long serialVersionUID = -884738268437806062L; + + /** Provider for schema coder. Used for initializing in each task. */ + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + + /** Coder used for reading schema from incoming stream. */ + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema(Class recordClazz, @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); + this.schemaCoderProvider = schemaCoderProvider; + this.schemaCoder = schemaCoderProvider.get(); + } + + @Override + public T deserialize(byte[] message) { + // read record + try { + checkAvroInitialized(); + getInputStream().setBuffer(message); + Schema writerSchema = schemaCoder.readSchema(getInputStream()); + Schema readerSchema = getReaderSchema(); + + GenericDatumReader datumReader = getDatumReader(); + + datumReader.setSchema(writerSchema); + datumReader.setExpected(readerSchema); + + return datumReader.read(null, getDecoder()); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize Row.", e); --- End diff -- The method `deserialize()` can throw an IOException. That got dropped from the signature, and exceptions are not wrapped into a RuntimeException. That makes exception stack traces more complicated, and hides the fact that "there is a possible exceptional case to handle" from the consumers of that code. I think that this makes a general rule: Whenever using `RutimeException`, take a step back and look at the exception structure and signatures, and see if something is not declared well. ---
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480333#comment-16480333 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189195014 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + + private static final long serialVersionUID = -884738268437806062L; + + /** Provider for schema coder. Used for initializing in each task. */ + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + + /** Coder used for reading schema from incoming stream. */ + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema(Class recordClazz, @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); + this.schemaCoderProvider = schemaCoderProvider; + this.schemaCoder = schemaCoderProvider.get(); + } + + @Override + public T deserialize(byte[] message) { + // read record + try { + checkAvroInitialized(); + getInputStream().setBuffer(message); + Schema writerSchema = schemaCoder.readSchema(getInputStream()); + Schema readerSchema = getReaderSchema(); + + GenericDatumReader datumReader = getDatumReader(); + + datumReader.setSchema(writerSchema); + datumReader.setExpected(readerSchema); + + return datumReader.read(null, getDecoder()); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize Row.", e); --- End diff -- The method `deserialize()` can throw an IOException. That got dropped from the signature, and exceptions are not wrapped into a RuntimeException. That makes exception stack traces more complicated, and hides the fact that "there is a possible exceptional case to handle" from the consumers of that code. I think that this makes a general rule: Whenever using `RutimeException`, take a step back and look at the exception structure and signatures, and see if something is not declared well. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wys
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189197633 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { + this.schemaString = reader.toString(); + } else { + this.schemaString = null; + } + } + + /** +* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema. +* +* @param schema schema of produced records +* @return deserialized record in form of {@link GenericRecord} +*/ + public static AvroDeserializationSchema forGeneric(Schema schema) { --- End diff -- Minor comment: I found it helps code structure/readability to move static/factory methods either to the top or the bottom of the class. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189197766 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { + this.schemaString = reader.toString(); + } else { + this.schemaString = null; + } + } + + /** +* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema. +* +* @param schema schema of produced records +* @return deserialized record in form of {@link GenericRecord} +*/ + public static AvroDeserializationSchema forGeneric(Schema schema) { + return new AvroDeserializationSchema<>(GenericRecord.class, schema); + } + + /** +* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema. +* +* @param tClass class of record to be produced +* @return deserialized record +*/ + public static AvroDeserializationSchema forSpecific(Class tClass) { + return new Avro
[GitHub] flink issue #5823: [FLINK-9008] [e2e] Implements quickstarts end to end test
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5823 Thank you very much to @tzulitai and @zentol review this PR. ---
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480344#comment-16480344 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189197633 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { + this.schemaString = reader.toString(); + } else { + this.schemaString = null; + } + } + + /** +* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema. +* +* @param schema schema of produced records +* @return deserialized record in form of {@link GenericRecord} +*/ + public static AvroDeserializationSchema forGeneric(Schema schema) { --- End diff -- Minor comment: I found it helps code structure/readability to move static/factory methods either to the top or the bottom of the class. > Implement AvroDeserialization
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480345#comment-16480345 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189197766 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { + this.schemaString = reader.toString(); + } else { + this.schemaString = null; + } + } + + /** +* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema. +* +* @param schema schema of produced records +* @return deserialized record in form of {@link GenericRecord} +*/ + public static AvroDeserializationSchema forGeneric(Schema schema) { + return new AvroDeserializationSchema<>(GenericRecord.class, schema); + } + + /** +* Creates {@link AvroDeserializationSchema} that produces classes that were
[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480347#comment-16480347 ] ASF GitHub Bot commented on FLINK-9008: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5823 Thank you very much to @tzulitai and @zentol review this PR. > End-to-end test: Quickstarts > > > Key: FLINK-9008 > URL: https://issues.apache.org/jira/browse/FLINK-9008 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Blocker > Fix For: 1.6.0 > > > We could add an end-to-end test which verifies Flink's quickstarts. It should > do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 Added a few more comment, most importantly around exception wrapping. Otherwise, looking good... ---
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480350#comment-16480350 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 Added a few more comment, most importantly around exception wrapping. Otherwise, looking good... > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5979 ---
[jira] [Commented] (FLINK-9070) Improve performance of RocksDBMapState.clear()
[ https://issues.apache.org/jira/browse/FLINK-9070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480382#comment-16480382 ] ASF GitHub Bot commented on FLINK-9070: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5979 > Improve performance of RocksDBMapState.clear() > -- > > Key: FLINK-9070 > URL: https://issues.apache.org/jira/browse/FLINK-9070 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Truong Duc Kien >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Currently, RocksDBMapState.clear() is implemented by iterating over all the > keys and drop them one by one. This iteration can be quite slow with: > * Large maps > * High-churn maps with a lot of tombstones > There are a few methods to speed-up deletion for a range of keys, each with > their own caveats: > * DeleteRange: still experimental, likely buggy > * DeleteFilesInRange + CompactRange: only good for large ranges > > Flink can also keep a list of inserted keys in-memory, then directly delete > them without having to iterate over the Rocksdb database again. > > Reference: > * [RocksDB article about range > deletion|https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys] > * [Bug in DeleteRange|https://pingcap.com/blog/2017-09-08-rocksdbbug] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9395) multiple left outer joins to subqueries with array values fail
[ https://issues.apache.org/jira/browse/FLINK-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480447#comment-16480447 ] Fabian Hueske commented on FLINK-9395: -- Hi [~kgeis], your analysis is correct. The problem is caused by the array type on the second outer join. The same problem would arise if table {{a}} had an array type, so it is not about the number of joins but about the field types of the outer side of a join (I'll update the JIRA to reflect that). We need to sort (well actually the order is not important, only that values are correctly grouped) on all fields of the outer join input, because the outer join of the DataSet API only supports equality predicates. Hence, all other predicates need to be applied later. If the later applied predicates filter out all join results we still need to ensure to emit a null-padded result. To do that, we group on all fields of the outer side to check if we have a result for an input row or not. There's not really much else we can do with the current state of the DataSet API. To solve this problem, we need to extend the TypeInformation of the array type by adding a TypeComparator that can be used to sort the array type. We will face similar issues for other types that are not sortable. > multiple left outer joins to subqueries with array values fail > -- > > Key: FLINK-9395 > URL: https://issues.apache.org/jira/browse/FLINK-9395 > Project: Flink > Issue Type: Bug >Reporter: Ken Geis >Priority: Major > Attachments: JoinTest.java > > > Where {{a}} is a table with column {{id}}, the following query succeeds: > {code:sql} > SELECT * FROM a > LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id > {code} > I add another join: > {code:sql} > SELECT * FROM a > LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id > LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS c ON a.id = c.id > {code} > This fails with the error: > {noformat} > org.apache.flink.api.common.InvalidProgramException: Selected sort key is not > a sortable type > at > org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145) > at > org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465) > at > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) > at > scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) > at > scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378) > at > org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9395) Outer Joins with array types on the outer join input fail
[ https://issues.apache.org/jira/browse/FLINK-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-9395: - Summary: Outer Joins with array types on the outer join input fail (was: multiple left outer joins to subqueries with array values fail) > Outer Joins with array types on the outer join input fail > - > > Key: FLINK-9395 > URL: https://issues.apache.org/jira/browse/FLINK-9395 > Project: Flink > Issue Type: Bug >Reporter: Ken Geis >Priority: Major > Attachments: JoinTest.java > > > Where {{a}} is a table with column {{id}}, the following query succeeds: > {code:sql} > SELECT * FROM a > LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id > {code} > I add another join: > {code:sql} > SELECT * FROM a > LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id > LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS c ON a.id = c.id > {code} > This fails with the error: > {noformat} > org.apache.flink.api.common.InvalidProgramException: Selected sort key is not > a sortable type > at > org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145) > at > org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465) > at > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) > at > scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) > at > scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378) > at > org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9395) Outer Joins with array types on the outer join input fail
[ https://issues.apache.org/jira/browse/FLINK-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-9395: - Description: Given a table {{a}} with a single column {{id}}, the following query {code:sql} SELECT * FROM (SELECT id, ARRAY[id] AS b FROM a) b LEFT OUTER JOIN a AS b ON a.id = b.id {code} fails with the error: {noformat} org.apache.flink.api.common.InvalidProgramException: Selected sort key is not a sortable type at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145) at org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378) at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146) {noformat} was: Where {{a}} is a table with column {{id}}, the following query succeeds: {code:sql} SELECT * FROM a LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id {code} I add another join: {code:sql} SELECT * FROM a LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS c ON a.id = c.id {code} This fails with the error: {noformat} org.apache.flink.api.common.InvalidProgramException: Selected sort key is not a sortable type at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145) at org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268) at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378) at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146) {noformat} > Outer Joins with array types on the outer join input fail > - > > Key: FLINK-9395 > URL: https://issues.apache.org/jira/browse/FLINK-9395 > Project: Flink > Issue Type: Bug >Reporter: Ken Geis >Priority: Major > Attachments: JoinTest.java > > > Given a table {{a}} with a single column {{id}}, the following query > {code:sql} > SELECT * FROM (SELECT id, ARRAY[id] AS b FROM a) b > LEFT OUTER JOIN a AS b ON a.id = b.id > {code} > fails with the error: > {noformat} > org.apache.flink.api.common.InvalidProgramException: Selected sort key is not > a sortable type > at > org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145) > at > org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466) > at > org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$parti
[jira] [Comment Edited] (FLINK-9091) Failure while enforcing releasability in building flink-json module
[ https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480463#comment-16480463 ] Timo Walther edited comment on FLINK-9091 at 5/18/18 10:20 AM: --- It seems that the versions declared in the {{dependencyManagement}} sections have no impact in other modules from time to time. Maybe it is a bug in the enforcer plugin, I don't know. I could not reproduce this error so far. But I think we can fix it. I have a branch where I use exclusions instead of the {{dependencyManagement}}. [~Zentol] what do you think about these changes? Together with FLINK-8511 we should be good. https://github.com/apache/flink/compare/master...twalthr:FLINK-9091 was (Author: twalthr): It seems that the versions declared in the {{dependencyManagement}} sections have no impact in other modules from time to time. Maybe it is a bug in the enforcer plugin, I don't know. I could not reproduce this error so far. But I think we can fix it. I have a branch where I use exclusions instead of the {{dependencyManagement}}. [~Zentol] what do you think about these changes. Together with FLINK-8511 we should be good. https://github.com/apache/flink/compare/master...twalthr:FLINK-9091 > Failure while enforcing releasability in building flink-json module > --- > > Key: FLINK-9091 > URL: https://issues.apache.org/jira/browse/FLINK-9091 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > Attachments: f-json.out > > > Got the following when building flink-json module: > {code} > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence > failed with message: > Failed while enforcing releasability. See above detailed error message. > ... > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce > (dependency-convergence) on project flink-json: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed. -> > [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module
[ https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480463#comment-16480463 ] Timo Walther commented on FLINK-9091: - It seems that the versions declared in the {{dependencyManagement}} sections have no impact in other modules from time to time. Maybe it is a bug in the enforcer plugin, I don't know. I could not reproduce this error so far. But I think we can fix it. I have a branch where I use exclusions instead of the {{dependencyManagement}}. [~Zentol] what do you think about these changes. Together with FLINK-8511 we should be good. https://github.com/apache/flink/compare/master...twalthr:FLINK-9091 > Failure while enforcing releasability in building flink-json module > --- > > Key: FLINK-9091 > URL: https://issues.apache.org/jira/browse/FLINK-9091 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > Attachments: f-json.out > > > Got the following when building flink-json module: > {code} > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence > failed with message: > Failed while enforcing releasability. See above detailed error message. > ... > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce > (dependency-convergence) on project flink-json: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed. -> > [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5958 @FredTing we had some offline discussion on how to proceed with this. @aljoscha, @twalthr, or @StephanEwen can probably comment more here if I missed anything. The conflict that Stephan mentioned between a "common deserialization schema" interface and exposing surfacing connector specific information is rooted in the fact that both concerns (deserialization and providing connector specific record meta information) is currently coupled in a single interface. Take for example the Kafka connector's `KeyedDeserializationSchema` - there we try to deserialize the Kafka bytes, as well as provide information such as topic / partition / timestamp etc. to allow the user to enrich their user records for downstream business logic. The first part (deserialization of bytes) should be something common for all connector sources, while the second part is Kafka-specific. Therefore, we should perhaps break this up into two separate interfaces, as follows: ``` // common interface for all sources (we already have this) interface DeserializationSchema { T deserialize(byte[] bytes); } // ... and a Kafka-specific interface that is only used to provide record meta information interface ConsumerRecordMetaInfoProvider { T enrich(T record, ConsumerRecordMetaInfo metaInfo); } ``` The second interface is something that each connector should have independently, and does not handle deserialization of the record bytes. The name, of course, is still open to discussion. What do you think? ---
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480505#comment-16480505 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5958 @FredTing we had some offline discussion on how to proceed with this. @aljoscha, @twalthr, or @StephanEwen can probably comment more here if I missed anything. The conflict that Stephan mentioned between a "common deserialization schema" interface and exposing surfacing connector specific information is rooted in the fact that both concerns (deserialization and providing connector specific record meta information) is currently coupled in a single interface. Take for example the Kafka connector's `KeyedDeserializationSchema` - there we try to deserialize the Kafka bytes, as well as provide information such as topic / partition / timestamp etc. to allow the user to enrich their user records for downstream business logic. The first part (deserialization of bytes) should be something common for all connector sources, while the second part is Kafka-specific. Therefore, we should perhaps break this up into two separate interfaces, as follows: ``` // common interface for all sources (we already have this) interface DeserializationSchema { T deserialize(byte[] bytes); } // ... and a Kafka-specific interface that is only used to provide record meta information interface ConsumerRecordMetaInfoProvider { T enrich(T record, ConsumerRecordMetaInfo metaInfo); } ``` The second interface is something that each connector should have independently, and does not handle deserialization of the record bytes. The name, of course, is still open to discussion. What do you think? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8511) Remove legacy code for the TableType annotation
[ https://issues.apache.org/jira/browse/FLINK-8511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] blues zheng reassigned FLINK-8511: -- Assignee: blues zheng (was: vinoyang) > Remove legacy code for the TableType annotation > --- > > Key: FLINK-8511 > URL: https://issues.apache.org/jira/browse/FLINK-8511 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Timo Walther >Assignee: blues zheng >Priority: Critical > > We introduced the very generic TableSource factories that unify the > definition of table sources and are specified using Java service loaders. For > backwards compatibility, the old code paths are still supported but should be > dropped in future Flink versions. > This will touch: > {code} > org.apache.flink.table.annotation.TableType > org.apache.flink.table.catalog.ExternalCatalogTable > org.apache.flink.table.api.NoMatchedTableSourceConverterException > org.apache.flink.table.api.AmbiguousTableSourceConverterException > org.apache.flink.table.catalog.TableSourceConverter > org.apache.flink.table.catalog.ExternalTableSourceUtil > {code} > We can also drop the {{org.reflections}} and {{commons-configuration}} (and > maybe more?) dependencies. > See also FLINK-8240 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5983: [FLINK-7789][DataStream API] Add handler for Async...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5983#discussion_r189244151 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enhanced {@link AsyncFunction} which can handle timeouts. + */ +@PublicEvolving +public interface TimeoutAwareAsyncFunction extends AsyncFunction { + + /** +* asyncInvoke timeout occurred. +* Here you can complete the result future exceptionally with timeout exception, +* or complete with empty result. You can also retry to complete with the right results. +* +* @param input element coming from an upstream task +* @param resultFuture to be completed with the result data +* @exception Exception in case of a user code error. An exception will make the task fail and +* trigger fail-over process. +*/ + void timeout(IN input, ResultFuture resultFuture) throws Exception; --- End diff -- Wouldn't it be enough to add such `timeout()` method to `AsyncFunction` with default implementation that fails the `ResultFuture`? I mean instead of adding new interface and deprecating `AsyncFunction`? ---
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480536#comment-16480536 ] ASF GitHub Bot commented on FLINK-7789: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5983#discussion_r189244151 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enhanced {@link AsyncFunction} which can handle timeouts. + */ +@PublicEvolving +public interface TimeoutAwareAsyncFunction extends AsyncFunction { + + /** +* asyncInvoke timeout occurred. +* Here you can complete the result future exceptionally with timeout exception, +* or complete with empty result. You can also retry to complete with the right results. +* +* @param input element coming from an upstream task +* @param resultFuture to be completed with the result data +* @exception Exception in case of a user code error. An exception will make the task fail and +* trigger fail-over process. +*/ + void timeout(IN input, ResultFuture resultFuture) throws Exception; --- End diff -- Wouldn't it be enough to add such `timeout()` method to `AsyncFunction` with default implementation that fails the `ResultFuture`? I mean instead of adding new interface and deprecating `AsyncFunction`? > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5983: [FLINK-7789][DataStream API] Add handler for Async...
Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/5983#discussion_r189250250 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enhanced {@link AsyncFunction} which can handle timeouts. + */ +@PublicEvolving +public interface TimeoutAwareAsyncFunction extends AsyncFunction { + + /** +* asyncInvoke timeout occurred. +* Here you can complete the result future exceptionally with timeout exception, +* or complete with empty result. You can also retry to complete with the right results. +* +* @param input element coming from an upstream task +* @param resultFuture to be completed with the result data +* @exception Exception in case of a user code error. An exception will make the task fail and +* trigger fail-over process. +*/ + void timeout(IN input, ResultFuture resultFuture) throws Exception; --- End diff -- Thanks for your review. The deprecated `AsyncFunction` is a Java Interface which can not have a method body due to Java grammars. However, your comment make me realize that I just forgot about the Scala API for `AsyncFunction`, so there is more work need to be done. ---
[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480580#comment-16480580 ] ASF GitHub Bot commented on FLINK-7789: --- Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/5983#discussion_r189250250 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enhanced {@link AsyncFunction} which can handle timeouts. + */ +@PublicEvolving +public interface TimeoutAwareAsyncFunction extends AsyncFunction { + + /** +* asyncInvoke timeout occurred. +* Here you can complete the result future exceptionally with timeout exception, +* or complete with empty result. You can also retry to complete with the right results. +* +* @param input element coming from an upstream task +* @param resultFuture to be completed with the result data +* @exception Exception in case of a user code error. An exception will make the task fail and +* trigger fail-over process. +*/ + void timeout(IN input, ResultFuture resultFuture) throws Exception; --- End diff -- Thanks for your review. The deprecated `AsyncFunction` is a Java Interface which can not have a method body due to Java grammars. However, your comment make me realize that I just forgot about the Scala API for `AsyncFunction`, so there is more work need to be done. > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5863: [FLINK-8985][e2etest] initial support for End-to-end CLI ...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5863 Thanks @tzulitai for the review. I will update asap. I am not 100% sure whether I should verify the CLI return but I would definitely add them. ---
[jira] [Commented] (FLINK-8985) End-to-end test: CLI
[ https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480664#comment-16480664 ] ASF GitHub Bot commented on FLINK-8985: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5863 Thanks @tzulitai for the review. I will update asap. I am not 100% sure whether I should verify the CLI return but I would definitely add them. > End-to-end test: CLI > > > Key: FLINK-8985 > URL: https://issues.apache.org/jira/browse/FLINK-8985 > Project: Flink > Issue Type: Sub-task > Components: Client, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Rong Rong >Priority: Major > > We should an end-to-end test which verifies that all client commands are > working correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5849 Hi @tzulitai . I've actually created a new version of the test based on @zentol 's comment on this PR: https://github.com/walterddr/flink/compare/FLINK-8985...walterddr:FLINK-8986-test But it actually depends on https://github.com/apache/flink/pull/5863 as I reused the periodic stream job for testing. Is it possible to create a PR on top of another currently pending PR? ---
[jira] [Commented] (FLINK-8986) End-to-end test: REST
[ https://issues.apache.org/jira/browse/FLINK-8986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480666#comment-16480666 ] ASF GitHub Bot commented on FLINK-8986: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5849 Hi @tzulitai . I've actually created a new version of the test based on @zentol 's comment on this PR: https://github.com/walterddr/flink/compare/FLINK-8985...walterddr:FLINK-8986-test But it actually depends on https://github.com/apache/flink/pull/5863 as I reused the periodic stream job for testing. Is it possible to create a PR on top of another currently pending PR? > End-to-end test: REST > - > > Key: FLINK-8986 > URL: https://issues.apache.org/jira/browse/FLINK-8986 > Project: Flink > Issue Type: Sub-task > Components: REST, Tests >Reporter: Till Rohrmann >Assignee: Rong Rong >Priority: Major > > We should add an end-to-end test which verifies that we can use the REST > interface to obtain information about a running job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state
[ https://issues.apache.org/jira/browse/FLINK-9398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480670#comment-16480670 ] Rong Rong commented on FLINK-9398: -- Hi [~yanghua], have you started working on this task? I was hoping to get this in and work together with FLINK-8985. If you haven't started working on it, can I assign it to myself? Thanks, Rong > Flink CLI list running job returns all jobs except in CREATE state > -- > > Key: FLINK-9398 > URL: https://issues.apache.org/jira/browse/FLINK-9398 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 >Reporter: Rong Rong >Assignee: vinoyang >Priority: Major > > See: > https://github.com/apache/flink/blob/4922ced71a307a26b9f5070b41f72fd5d93b0ac8/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445 > Seems like CLI command: *flink list -r* returns all jobs except jobs in > *CREATE* state. which conflicts with the CLI description: *Running/Restarting > Jobs*. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5955 ---
[jira] [Closed] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8659. - Resolution: Fixed Merged on master with 4973a2ae49d805f56af746659c3eae48b512f983 and on 1.5 with 0c06852b3cecd414aac623ffd155ebc2a2a31336 > Add migration tests for Broadcast state. > > > Key: FLINK-8659 > URL: https://issues.apache.org/jira/browse/FLINK-8659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480731#comment-16480731 ] ASF GitHub Bot commented on FLINK-8659: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5955 > Add migration tests for Broadcast state. > > > Key: FLINK-8659 > URL: https://issues.apache.org/jira/browse/FLINK-8659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state
[ https://issues.apache.org/jira/browse/FLINK-9398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480791#comment-16480791 ] vinoyang commented on FLINK-9398: - hi [~walterddr] I have not started this issue, I have released it, you can start this task. > Flink CLI list running job returns all jobs except in CREATE state > -- > > Key: FLINK-9398 > URL: https://issues.apache.org/jira/browse/FLINK-9398 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 >Reporter: Rong Rong >Assignee: vinoyang >Priority: Major > > See: > https://github.com/apache/flink/blob/4922ced71a307a26b9f5070b41f72fd5d93b0ac8/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445 > Seems like CLI command: *flink list -r* returns all jobs except jobs in > *CREATE* state. which conflicts with the CLI description: *Running/Restarting > Jobs*. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state
[ https://issues.apache.org/jira/browse/FLINK-9398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9398: --- Assignee: (was: vinoyang) > Flink CLI list running job returns all jobs except in CREATE state > -- > > Key: FLINK-9398 > URL: https://issues.apache.org/jira/browse/FLINK-9398 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 >Reporter: Rong Rong >Priority: Major > > See: > https://github.com/apache/flink/blob/4922ced71a307a26b9f5070b41f72fd5d93b0ac8/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445 > Seems like CLI command: *flink list -r* returns all jobs except jobs in > *CREATE* state. which conflicts with the CLI description: *Running/Restarting > Jobs*. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9399) TPCHQuery3 fails due to Serialization issues
Gary Yao created FLINK-9399: --- Summary: TPCHQuery3 fails due to Serialization issues Key: FLINK-9399 URL: https://issues.apache.org/jira/browse/FLINK-9399 Project: Flink Issue Type: Bug Components: Examples Affects Versions: 1.5.0 Reporter: Gary Yao Running {{org.apache.flink.examples.java.relational.TPCHQuery3}} fails because the static class {{ShippingPriorityItem}} has private visibility, and hence cannot be serialized. *Stacktrace* {noformat} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.Exception: The data preparation for task 'CHAIN Join (Join at main(TPCHQuery3.java:155)) -> Combine (SUM(1), at main(TPCHQuery3.java:165)' , caused an error: Cannot instantiate tuple. at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) at org.apache.flink.api.java.DataSet.print(DataSet.java:1652) at org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:174) Caused by: java.lang.Exception: The data preparation for task 'CHAIN Join (Join at main(TPCHQuery3.java:155)) -> Combine (SUM(1), at main(TPCHQuery3.java:165)' , caused an error: Cannot instantiate tuple. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Cannot instantiate tuple. at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:71) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:30) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:799) at org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:487) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.open(NonReusingBuildFirstHashJoinIterator.java:98) at org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:207) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) ... 3 more Caused by: java.lang.IllegalAccessException: Class org.apache.flink.api.java.typeutils.runtime.TupleSerializer can not access a member of class org.apache.flink.examples.java.relational.TPCHQuery3$ShippingPriorityItem with modifiers "public" at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) at java.lang.Class.newInstance(Class.java:436) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:62) ... 9 more {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9399) TPCHQuery3 example fails due to Serialization issues
[ https://issues.apache.org/jira/browse/FLINK-9399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-9399: Summary: TPCHQuery3 example fails due to Serialization issues (was: TPCHQuery3 fails due to Serialization issues) > TPCHQuery3 example fails due to Serialization issues > > > Key: FLINK-9399 > URL: https://issues.apache.org/jira/browse/FLINK-9399 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.5.0 >Reporter: Gary Yao >Priority: Major > > Running {{org.apache.flink.examples.java.relational.TPCHQuery3}} fails > because the static class {{ShippingPriorityItem}} has private visibility, and > hence cannot be serialized. > *Stacktrace* > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: java.lang.Exception: > The data preparation for task 'CHAIN Join (Join at main(TPCHQuery3.java:155)) > -> Combine (SUM(1), at main(TPCHQuery3.java:165)' , caused an error: Cannot > instantiate tuple. > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1652) > at > org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:174) > Caused by: java.lang.Exception: The data preparation for task 'CHAIN Join > (Join at main(TPCHQuery3.java:155)) -> Combine (SUM(1), at > main(TPCHQuery3.java:165)' , caused an error: Cannot instantiate tuple. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Cannot instantiate tuple. > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:71) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:799) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:487) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.open(NonReusingBuildFirstHashJoinIterator.java:98) > at > org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:207) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) > ... 3 more > Caused by: java.lang.IllegalAccessException: Class > org.apache.flink.api.java.typeutils.runtime.TupleSerializer can not access a > member of class > org.apache.flink.examples.java.relational.TPCHQuery3$ShippingPriorityItem > with modifiers "public" > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) > at java.lang.Class.newInstance(Class.java:436) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:62) > ... 9 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9400) change import statement for flink-scala
thinkerou created FLINK-9400: Summary: change import statement for flink-scala Key: FLINK-9400 URL: https://issues.apache.org/jira/browse/FLINK-9400 Project: Flink Issue Type: Improvement Components: Scala API Affects Versions: 1.4.1 Reporter: thinkerou Fix For: 1.6.0 At `flink-scala` project have the follow import statement: ```scala import org.apache.flink.api.common.operators.Keys import Keys.ExpressionKeys ``` So I want to commit one pull request to fix it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state
[ https://issues.apache.org/jira/browse/FLINK-9398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-9398: Assignee: Rong Rong > Flink CLI list running job returns all jobs except in CREATE state > -- > > Key: FLINK-9398 > URL: https://issues.apache.org/jira/browse/FLINK-9398 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > See: > https://github.com/apache/flink/blob/4922ced71a307a26b9f5070b41f72fd5d93b0ac8/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445 > Seems like CLI command: *flink list -r* returns all jobs except jobs in > *CREATE* state. which conflicts with the CLI description: *Running/Restarting > Jobs*. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9399) TPCHQuery3 example fails due to Serialization issues
[ https://issues.apache.org/jira/browse/FLINK-9399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-9399: --- Assignee: Timo Walther > TPCHQuery3 example fails due to Serialization issues > > > Key: FLINK-9399 > URL: https://issues.apache.org/jira/browse/FLINK-9399 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Timo Walther >Priority: Major > > Running {{org.apache.flink.examples.java.relational.TPCHQuery3}} fails > because the static class {{ShippingPriorityItem}} has private visibility, and > hence cannot be serialized. > *Stacktrace* > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: java.lang.Exception: > The data preparation for task 'CHAIN Join (Join at main(TPCHQuery3.java:155)) > -> Combine (SUM(1), at main(TPCHQuery3.java:165)' , caused an error: Cannot > instantiate tuple. > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1652) > at > org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:174) > Caused by: java.lang.Exception: The data preparation for task 'CHAIN Join > (Join at main(TPCHQuery3.java:155)) -> Combine (SUM(1), at > main(TPCHQuery3.java:165)' , caused an error: Cannot instantiate tuple. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Cannot instantiate tuple. > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:71) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:799) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:487) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.open(NonReusingBuildFirstHashJoinIterator.java:98) > at > org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:207) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) > ... 3 more > Caused by: java.lang.IllegalAccessException: Class > org.apache.flink.api.java.typeutils.runtime.TupleSerializer can not access a > member of class > org.apache.flink.examples.java.relational.TPCHQuery3$ShippingPriorityItem > with modifiers "public" > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) > at java.lang.Class.newInstance(Class.java:436) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:62) > ... 9 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9399) TPCHQuery3 example fails due to Serialization issues
[ https://issues.apache.org/jira/browse/FLINK-9399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480822#comment-16480822 ] Timo Walther commented on FLINK-9399: - This is related to a couple of wrong type extraction checks. I fixed it [here|https://github.com/apache/flink/pull/5097] already. I will fix this issue as well. > TPCHQuery3 example fails due to Serialization issues > > > Key: FLINK-9399 > URL: https://issues.apache.org/jira/browse/FLINK-9399 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Timo Walther >Priority: Major > > Running {{org.apache.flink.examples.java.relational.TPCHQuery3}} fails > because the static class {{ShippingPriorityItem}} has private visibility, and > hence cannot be serialized. > *Stacktrace* > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: java.lang.Exception: > The data preparation for task 'CHAIN Join (Join at main(TPCHQuery3.java:155)) > -> Combine (SUM(1), at main(TPCHQuery3.java:165)' , caused an error: Cannot > instantiate tuple. > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1652) > at > org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:174) > Caused by: java.lang.Exception: The data preparation for task 'CHAIN Join > (Join at main(TPCHQuery3.java:155)) -> Combine (SUM(1), at > main(TPCHQuery3.java:165)' , caused an error: Cannot instantiate tuple. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Cannot instantiate tuple. > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:71) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:30) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:799) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:487) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.open(NonReusingBuildFirstHashJoinIterator.java:98) > at > org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:207) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) > ... 3 more > Caused by: java.lang.IllegalAccessException: Class > org.apache.flink.api.java.typeutils.runtime.TupleSerializer can not access a > member of class > org.apache.flink.examples.java.relational.TPCHQuery3$ShippingPriorityItem > with modifiers "public" > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) > at java.lang.Class.newInstance(Class.java:436) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:62) > ... 9 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module
[ https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480842#comment-16480842 ] Ted Yu commented on FLINK-9091: --- Hit the above error when running tests for 1.5.0 RC. > Failure while enforcing releasability in building flink-json module > --- > > Key: FLINK-9091 > URL: https://issues.apache.org/jira/browse/FLINK-9091 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > Attachments: f-json.out > > > Got the following when building flink-json module: > {code} > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence > failed with message: > Failed while enforcing releasability. See above detailed error message. > ... > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce > (dependency-convergence) on project flink-json: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed. -> > [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6042: [FLINK-9400] change import statement for flink-sca...
GitHub user thinkerou opened a pull request: https://github.com/apache/flink/pull/6042 [FLINK-9400] change import statement for flink-scala **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change When I read `flink-scala` code, I found like the below code: ```scala import org.apache.flink.api.common.operators.Keys import Keys.ExpressionKeys ``` So, I want to fix it, use the follow code: ```scala import org.apache.flink.api.common.operators.Keys.ExpressionKeys ``` ## Brief change log - normalize scala import statement ## Verifying this change This change is already covered by existing tests, test pass. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (don't know) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (don't know) - The S3 file system connector: (don't know) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/thinkerou/flink im Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6042.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6042 commit 5ded44fb0ecc27007bce837c519b8e140504f879 Author: thinkerou Date: 2018-05-18T16:02:01Z [FLINK-9400] change import statement for flink-scala ---
[jira] [Commented] (FLINK-9400) change import statement for flink-scala
[ https://issues.apache.org/jira/browse/FLINK-9400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480866#comment-16480866 ] ASF GitHub Bot commented on FLINK-9400: --- GitHub user thinkerou opened a pull request: https://github.com/apache/flink/pull/6042 [FLINK-9400] change import statement for flink-scala **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change When I read `flink-scala` code, I found like the below code: ```scala import org.apache.flink.api.common.operators.Keys import Keys.ExpressionKeys ``` So, I want to fix it, use the follow code: ```scala import org.apache.flink.api.common.operators.Keys.ExpressionKeys ``` ## Brief change log - normalize scala import statement ## Verifying this change This change is already covered by existing tests, test pass. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (don't know) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (don't know) - The S3 file system connector: (don't know) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/thinkerou/flink im Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6042.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6042 commit 5ded44fb0ecc27007bce837c519b8e140504f879 Author: thinkerou Date: 2018-05-18T16:02:01Z [FLINK-9400] change import statement for flink-scala > change import statement for flink-scala > --- > > Key: FLINK-9400 > URL: https://issues.apache.org/jira/browse/FLINK-9400 > Project: Flink > Issue Type: Improvement > Components: Scala API >Affects Versions: 1.4.1 >Reporter: thinkerou >Priority: Trivial > Labels: easyfix > Fix For: 1.6.0 > > > At `flink-scala` project have the follow import statement: > > ```scala > import org.apache.flink.api.common.operators.Keys > import Keys.ExpressionKeys > ``` > So I want to commit one pull request to fix it. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9387) Several log message errors in queryable-state module
[ https://issues.apache.org/jira/browse/FLINK-9387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481092#comment-16481092 ] Chesnay Schepler commented on FLINK-9387: - No, it didn't miss a placeholder. There's an implicit placeholder for an exception which causes the stacktrace to be printed. With this change only the exception message is printed as now {{toString}} is called. > Several log message errors in queryable-state module > > > Key: FLINK-9387 > URL: https://issues.apache.org/jira/browse/FLINK-9387 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.5.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...
GitHub user cjolif opened a pull request: https://github.com/apache/flink/pull/6043 [FLINK-7386] evolve RequestIndexer API to make it working with Elastic 5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a possible RestHighLevelClient implementation ## What is the purpose of the change *The purpose of this PR is to make sure current Elasticsearch implementation is compatible with Elasticsearch 5.3+ fixing [FLINK-7386] and is also open to a future HighLevelRestClient implementation that could be used to provide elasticsearch 6 compatibility [FLINK-8101]* ## Brief change log * add specific IndexRequest, UpdateRequest and DeleteRequest add method on RequestIndexer so that it is compatible both with 5.2- and 5.3+ APIs (knowing that in 5.3+ Elasticsearch does not accept anymore ActionRequest in BulkProcessor). * make sure existing ActionRequest method on RequestIndexer is calling the new specific method based on actual type. * throw an exception for other types. * Change returned values of createClient method in ElasticsearchApiCallBridge. As TransportClient and HighLevelRestClient have only the AutoCloseable interface in common, this is what the method returns now. * Make ElasticsearchSinkBase agnostic to whether it is using a TransportClient or RestClient by adding a createBulkProcessorBuilder method on ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this method on all bridges. ## Verifying this change This change added tests and can be verified as follows: * Elasticsearch test base has also been reworked a little bit to make it compatible with the changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: a `@PublicEvolving` interface is now an abstract class. However typically the user does not extend/implement it but just call methods on it. - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs & javadocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/cjolif/flink es-5.3-apis Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6043.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6043 commit b1f2abc1d33b39c1fed4f370e5b21cbf477e0aa8 Author: Christophe Jolif Date: 2018-05-17T22:17:04Z [FLINK-7386] evolve RequestIndexer API to make it working with Elastic 5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a possible RestHighLevelClient implementation. ---
[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client
[ https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481116#comment-16481116 ] ASF GitHub Bot commented on FLINK-7386: --- GitHub user cjolif opened a pull request: https://github.com/apache/flink/pull/6043 [FLINK-7386] evolve RequestIndexer API to make it working with Elastic 5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a possible RestHighLevelClient implementation ## What is the purpose of the change *The purpose of this PR is to make sure current Elasticsearch implementation is compatible with Elasticsearch 5.3+ fixing [FLINK-7386] and is also open to a future HighLevelRestClient implementation that could be used to provide elasticsearch 6 compatibility [FLINK-8101]* ## Brief change log * add specific IndexRequest, UpdateRequest and DeleteRequest add method on RequestIndexer so that it is compatible both with 5.2- and 5.3+ APIs (knowing that in 5.3+ Elasticsearch does not accept anymore ActionRequest in BulkProcessor). * make sure existing ActionRequest method on RequestIndexer is calling the new specific method based on actual type. * throw an exception for other types. * Change returned values of createClient method in ElasticsearchApiCallBridge. As TransportClient and HighLevelRestClient have only the AutoCloseable interface in common, this is what the method returns now. * Make ElasticsearchSinkBase agnostic to whether it is using a TransportClient or RestClient by adding a createBulkProcessorBuilder method on ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this method on all bridges. ## Verifying this change This change added tests and can be verified as follows: * Elasticsearch test base has also been reworked a little bit to make it compatible with the changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: a `@PublicEvolving` interface is now an abstract class. However typically the user does not extend/implement it but just call methods on it. - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs & javadocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/cjolif/flink es-5.3-apis Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6043.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6043 commit b1f2abc1d33b39c1fed4f370e5b21cbf477e0aa8 Author: Christophe Jolif Date: 2018-05-17T22:17:04Z [FLINK-7386] evolve RequestIndexer API to make it working with Elastic 5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a possible RestHighLevelClient implementation. > Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ > client > > > Key: FLINK-7386 > URL: https://issues.apache.org/jira/browse/FLINK-7386 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector >Reporter: Dawid Wysakowicz >Assignee: Fang Yong >Priority: Critical > Fix For: 1.5.0 > > > In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and > has no longer the method {{add(ActionRequest)}}. > For more info see: https://github.com/elastic/elasticsearch/pull/20109 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6044: [FLINK-1044] Website: Offer a zip archive with a p...
GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6044 [FLINK-1044] Website: Offer a zip archive with a pre-setup user project ## What is the purpose of the change This PR will run two tests to build Java and Scala quickstart packages. After compiling and packaging the jar files, it submit the wordcount class to a cluster and validate the output. At the very end it should upload the jar files to `https://flink.apache.org/q` server. Last part not working as I need more details to find out how to upload the .jar files to the flink server and I am looking for help from reviewers to point me to the right direction. ## Brief change log - *add two new e2e tests * - *Modify run-pre-commit-tests* ## Verifying this change This change added tests and can be verified as follows: Run test_build_quickstart_java.sh and test_build_quickstart_scala.sh to verify this change ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-1044 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6044.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6044 commit dd59a8df1662bd1fa06dbfa220802e20a12945c3 Author: Yadan.JS Date: 2018-05-18T22:11:40Z [FLINK-1044] Website: Offer a zip archive with a pre-setup user project ---
[jira] [Commented] (FLINK-1044) Website: Offer a zip archive with a pre-setup user project
[ https://issues.apache.org/jira/browse/FLINK-1044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481317#comment-16481317 ] ASF GitHub Bot commented on FLINK-1044: --- GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6044 [FLINK-1044] Website: Offer a zip archive with a pre-setup user project ## What is the purpose of the change This PR will run two tests to build Java and Scala quickstart packages. After compiling and packaging the jar files, it submit the wordcount class to a cluster and validate the output. At the very end it should upload the jar files to `https://flink.apache.org/q` server. Last part not working as I need more details to find out how to upload the .jar files to the flink server and I am looking for help from reviewers to point me to the right direction. ## Brief change log - *add two new e2e tests * - *Modify run-pre-commit-tests* ## Verifying this change This change added tests and can be verified as follows: Run test_build_quickstart_java.sh and test_build_quickstart_scala.sh to verify this change ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-1044 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6044.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6044 commit dd59a8df1662bd1fa06dbfa220802e20a12945c3 Author: Yadan.JS Date: 2018-05-18T22:11:40Z [FLINK-1044] Website: Offer a zip archive with a pre-setup user project > Website: Offer a zip archive with a pre-setup user project > -- > > Key: FLINK-1044 > URL: https://issues.apache.org/jira/browse/FLINK-1044 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Stephan Ewen >Priority: Minor > Labels: starter > Attachments: flink-project.zip > > > This is basically a shortcut for those that are not familiar with maven > archetypes or do not have maven installed (other then as part of the Eclipse > IDE or so). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9387) Several log message errors in queryable-state module
[ https://issues.apache.org/jira/browse/FLINK-9387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481354#comment-16481354 ] vinoyang commented on FLINK-9387: - [~Zentol] sorry, it really did not miss a placeholder, so how it fix this? open a new issue? If you agree, I will do it. > Several log message errors in queryable-state module > > > Key: FLINK-9387 > URL: https://issues.apache.org/jira/browse/FLINK-9387 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.5.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9401) Data lost when rescaling the job from incremental checkpoint
Sihua Zhou created FLINK-9401: - Summary: Data lost when rescaling the job from incremental checkpoint Key: FLINK-9401 URL: https://issues.apache.org/jira/browse/FLINK-9401 Project: Flink Issue Type: Bug Affects Versions: 1.4.2, 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou We may lost data when rescaling job from incremental checkpoint because of the following code. {code:java} try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) { int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); } iterator.seek(startKeyGroupPrefixBytes); while (iterator.isValid()) { int keyGroup = 0; for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; } if (stateBackend.keyGroupRange.contains(keyGroup)) { stateBackend.db.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); } iterator.next(); } } {code} For every state handle to fetch the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID immediately if the state handle's _start key group_ is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9401) Data lost when rescaling the job from incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-9401. - Resolution: Invalid > Data lost when rescaling the job from incremental checkpoint > > > Key: FLINK-9401 > URL: https://issues.apache.org/jira/browse/FLINK-9401 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0, 1.4.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > > We may lost data when rescaling job from incremental checkpoint because of > the following code. > {code:java} > try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, > columnFamilyHandle)) { >int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); >byte[] startKeyGroupPrefixBytes = new > byte[stateBackend.keyGroupPrefixBytes]; >for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { > startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> > ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); >} >iterator.seek(startKeyGroupPrefixBytes); >while (iterator.isValid()) { > int keyGroup = 0; > for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { > keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; > } > if (stateBackend.keyGroupRange.contains(keyGroup)) { > stateBackend.db.put(targetColumnFamilyHandle, > iterator.key(), iterator.value()); > } > iterator.next(); >} > } > {code} > For every state handle to fetch the target data, we > _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be > INVALID immediately if the state handle's _start key group_ is bigger that > _state.keyGroupRange.getStartKeyGroup()_. Then, data lost... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r189417750 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -181,6 +181,25 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) { return configProps; } + public static Properties replaceDeprecatedConsumerKeys(Properties configProps) { + if (configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) { --- End diff -- Done. ---
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481373#comment-16481373 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r189417750 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -181,6 +181,25 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) { return configProps; } + public static Properties replaceDeprecatedConsumerKeys(Properties configProps) { + if (configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) { --- End diff -- Done. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9390) Shutdown of KafkaProducer causes confusing log message
[ https://issues.apache.org/jira/browse/FLINK-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481438#comment-16481438 ] Triones Deng commented on FLINK-9390: - [~StephanEwen] notice that now flink make use of TaskInterrupter to cancel running task,it is hard tell the InterruptedException due to cancel or real exception. so when the user try to cancel the application, will call {code:java} Task.cancelOrFailAndCancelInvokable() {code} here we can give the user a hint like a log that the InterruptedException due to cancel, so the user can ignore the below InterruptedException log. > Shutdown of KafkaProducer causes confusing log message > -- > > Key: FLINK-9390 > URL: https://issues.apache.org/jira/browse/FLINK-9390 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stefan Richter >Priority: Minor > > I found some logged exceptions in a user log that occurred during shutdown in > the context of the Kafka Producer. Those exceptions are most certainly not a > real problem, but can be confusing to users, so maybe we can get rid of them. > {code} > 2018-05-16 08:52:16,526 DEBUG > org.apache.flink.streaming.api.operators.async.Emitter - Emitter thread got > interrupted, shutting down. > 2018-05-16 08:52:16,527 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal > of stream operator. > org.apache.kafka.common.KafkaException: Failed to close kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:384) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1260) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703) > ... 9 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8654) Extend quickstart docs on how to submit jobs
[ https://issues.apache.org/jira/browse/FLINK-8654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481456#comment-16481456 ] Yazdan Shirvany commented on FLINK-8654: [~Zentol] is this still valid? If yes I would like to assign it to myself and work on it Should the CLI part goes here : [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/setup_quickstart.html] or adding a new page under Project Setup after IDE Setup? > Extend quickstart docs on how to submit jobs > > > Key: FLINK-8654 > URL: https://issues.apache.org/jira/browse/FLINK-8654 > Project: Flink > Issue Type: Improvement > Components: Documentation, Quickstarts >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Major > > The quickstart documentation explains how to setup the project, build the jar > and run things in the IDE, but neither explains how to submit the jar to a > cluster nor guides the user to where he could find this information (like the > CLI docs). > Additionally, the quickstart poms should also contain the commands for > submitting the jar to a cluster, in particular how to select a main-class if > it wasn't set in the pom. (-c CLI flag) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry
[ https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481458#comment-16481458 ] Yazdan Shirvany commented on FLINK-8983: [~till.rohrmann] is it ok to assign this to myself? > End-to-end test: Confluent schema registry > -- > > Key: FLINK-8983 > URL: https://issues.apache.org/jira/browse/FLINK-8983 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Reporter: Till Rohrmann >Priority: Critical > > It would be good to add an end-to-end test which verifies that Flink is able > to work together with the Confluent schema registry. In order to do that we > have to setup a Kafka cluster and write a Flink job which reads from the > Confluent schema registry producing an Avro type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6044: [FLINK-1044] Website: Offer a zip archive with a pre-setu...
Github user medcv commented on the issue: https://github.com/apache/flink/pull/6044 @StephanEwen I would appreciate if you review this ---
[jira] [Commented] (FLINK-1044) Website: Offer a zip archive with a pre-setup user project
[ https://issues.apache.org/jira/browse/FLINK-1044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481459#comment-16481459 ] ASF GitHub Bot commented on FLINK-1044: --- Github user medcv commented on the issue: https://github.com/apache/flink/pull/6044 @StephanEwen I would appreciate if you review this > Website: Offer a zip archive with a pre-setup user project > -- > > Key: FLINK-1044 > URL: https://issues.apache.org/jira/browse/FLINK-1044 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Stephan Ewen >Priority: Minor > Labels: starter > Attachments: flink-project.zip > > > This is basically a shortcut for those that are not familiar with maven > archetypes or do not have maven installed (other then as part of the Eclipse > IDE or so). -- This message was sent by Atlassian JIRA (v7.6.3#76005)