[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6040
  
@snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be 
bit flaky, so you can safely ignore that for now. I'll take another look at 
your changes soon. Thanks!


---


[jira] [Commented] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480262#comment-16480262
 ] 

ASF GitHub Bot commented on FLINK-9349:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6040
  
@snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be 
bit flaky, so you can safely ignore that for now. I'll take another look at 
your changes soon. Thanks!


> KafkaConnector Exception  while fetching from multiple kafka topics
> ---
>
> Key: FLINK-9349
> URL: https://issues.apache.org/jira/browse/FLINK-9349
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Vishal Santoshi
>Assignee: Sergey Nuyanzin
>Priority: Critical
> Attachments: Flink9349Test.java
>
>
> ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
>  
> It seems the List subscribedPartitionStates was being modified when 
> runFetchLoop iterated the List.
> This can happen if, e.g., FlinkKafkaConsumer runs the following code 
> concurrently:
>                 kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
>  
> {code:java}
>  java.util.ConcurrentModificationException
>   at 
> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
>   at java.util.LinkedList$ListItr.next(LinkedList.java:888)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5978: [FLINK-8554] Upgrade AWS SDK

2018-05-18 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5978
  
Are there anything in `1.11.325` we desperately need?

If not, I would oppose upgrading AWS SDK too frequently. Highly likely that 
we don't need any of the new changes in `1.11.325`. As you can see, the current 
sdk version is `1.11.319` which is upgraded just a few days ago. 

There're a few reasons we should discourage it:

- It doesn't add much value, and we don't really need it
- It costs lots of unnecessary work from both contributors and Flink 
community (committers, reviewers, etc)
- AWS releases their SDK very frequently, in a much faster pace than we can 
possibly catch up




---


[jira] [Commented] (FLINK-8554) Upgrade AWS SDK

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480264#comment-16480264
 ] 

ASF GitHub Bot commented on FLINK-8554:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5978
  
Are there anything in `1.11.325` we desperately need?

If not, I would oppose upgrading AWS SDK too frequently. Highly likely that 
we don't need any of the new changes in `1.11.325`. As you can see, the current 
sdk version is `1.11.319` which is upgraded just a few days ago. 

There're a few reasons we should discourage it:

- It doesn't add much value, and we don't really need it
- It costs lots of unnecessary work from both contributors and Flink 
community (committers, reviewers, etc)
- AWS releases their SDK very frequently, in a much faster pace than we can 
possibly catch up




> Upgrade AWS SDK
> ---
>
> Key: FLINK-8554
> URL: https://issues.apache.org/jira/browse/FLINK-8554
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> AWS SDK 1.11.271 fixes a lot of bugs.
> One of which would exhibit the following:
> {code}
> Caused by: java.lang.NullPointerException
>   at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729)
>   at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67)
>   at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5374
  
@cjolif since we have now reached a conclusion on where the Elasticsearch 
connector should be improved in the future, could you maybe close this PR? I 
assume a new PR will be opened that subsumes this one.


---


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480277#comment-16480277
 ] 

ASF GitHub Bot commented on FLINK-8101:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5374
  
@cjolif since we have now reached a conclusion on where the Elasticsearch 
connector should be improved in the future, could you maybe close this PR? I 
assume a new PR will be opened that subsumes this one.


> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480280#comment-16480280
 ] 

ASF GitHub Bot commented on FLINK-8101:
---

Github user cjolif closed the pull request at:

https://github.com/apache/flink/pull/5374


> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

2018-05-18 Thread cjolif
Github user cjolif closed the pull request at:

https://github.com/apache/flink/pull/5374


---


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480279#comment-16480279
 ] 

ASF GitHub Bot commented on FLINK-8101:
---

Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/5374
  
@tzulitai sure. done.


> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

2018-05-18 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/5374
  
@tzulitai sure. done.


---


[GitHub] flink issue #5823: [FLINK-9008] [e2e] Implements quickstarts end to end test

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5823
  
Thanks @zhangminglei. This looks good to merge, will proceed to merge this.


---


[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480282#comment-16480282
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5823
  
Thanks @zhangminglei. This looks good to merge, will proceed to merge this.


> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Blocker
> Fix For: 1.6.0
>
>
> We could add an end-to-end test which verifies Flink's quickstarts. It should 
> do the following:
> # create a new Flink project using the quickstarts archetype 
> # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library) 
> # run {{mvn clean package -Pbuild-jar}}
> # verify that no core dependencies are contained in the jar file
> # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9387) Several log message errors in queryable-state module

2018-05-18 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480284#comment-16480284
 ] 

Kostas Kloudas commented on FLINK-9387:
---

Well there is no change apart from the message wording right? Both before and 
after it is future.cause(). Or am I missing something?

> Several log message errors in queryable-state module
> 
>
> Key: FLINK-9387
> URL: https://issues.apache.org/jira/browse/FLINK-9387
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189185186
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+   

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480292#comment-16480292
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189185186
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = record

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480293#comment-16480293
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189185420
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
--- End diff --

I would skip the initialization in the constructor, if you have he 
initialization in `checkAvroInitialized()`. Simpler, and avoids having two 
places that to the initialization which have to be kept in sync.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189185420
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
--- End diff --

I would skip the initialization in the constructor, if you have he 
initialization in `checkAvroInitialized()`. Simpler, and avoids having two 
places that to the initialization which have to be kept in sync.


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185047
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

There probably should be some verification that the job actually runs with 
DOP=4


---


[jira] [Commented] (FLINK-9387) Several log message errors in queryable-state module

2018-05-18 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480296#comment-16480296
 ] 

vinoyang commented on FLINK-9387:
-

this change added a log variable placeholder for future.cause(), the original 
code missed.

> Several log message errors in queryable-state module
> 
>
> Key: FLINK-9387
> URL: https://issues.apache.org/jira/browse/FLINK-9387
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185689
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+  -c org.apache.flink.examples.java.wordcount.WordCount \
+  $FLINK_DIR/examples/batch/WordCount.jar \
+  --input file:///$FLINK_DIR/README.txt \
+  --output file:///${TEST_DATA_DIR}/out/result"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test information APIs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink info 
$FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list -s"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list -r"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test operation on running streaming jobs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+RETURN=`$FLINK_DIR/bin/flink run -d \
+$PERIODIC_JOB_JAR --outputPath file:///${TEST_DATA_DIR}/out/result`
+echo "job submission returns: $RETURN"
+JOB_ID=`extract_job_id_from_job_submission_return "$RETURN"`
+eval "$FLINK_DIR/bin/flink cancel ${JOB_ID}"
+EXIT_CODE=$?
+fi
+
+printf 
"\n

[jira] [Commented] (FLINK-8985) End-to-end test: CLI

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480298#comment-16480298
 ] 

ASF GitHub Bot commented on FLINK-8985:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185232
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+  -c org.apache.flink.examples.java.wordcount.WordCount \
+  $FLINK_DIR/examples/batch/WordCount.jar \
+  --input file:///$FLINK_DIR/README.txt \
+  --output file:///${TEST_DATA_DIR}/out/result"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test information APIs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink info 
$FLINK_DIR/examples/batch/WordCount.jar"
--- End diff --

Should we verify the output of `info`?


> End-to-end test: CLI
> 
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should an end-to-end test which verifies that all client commands are 
> working correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185232
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+  -c org.apache.flink.examples.java.wordcount.WordCount \
+  $FLINK_DIR/examples/batch/WordCount.jar \
+  --input file:///$FLINK_DIR/README.txt \
+  --output file:///${TEST_DATA_DIR}/out/result"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test information APIs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink info 
$FLINK_DIR/examples/batch/WordCount.jar"
--- End diff --

Should we verify the output of `info`?


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185140
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

Since this is a detached execution, we probably want to wait until this job 
completes before continuing?


---


[jira] [Commented] (FLINK-8985) End-to-end test: CLI

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480301#comment-16480301
 ] 

ASF GitHub Bot commented on FLINK-8985:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185047
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

There probably should be some verification that the job actually runs with 
DOP=4


> End-to-end test: CLI
> 
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should an end-to-end test which verifies that all client commands are 
> working correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185253
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+  -c org.apache.flink.examples.java.wordcount.WordCount \
+  $FLINK_DIR/examples/batch/WordCount.jar \
+  --input file:///$FLINK_DIR/README.txt \
+  --output file:///${TEST_DATA_DIR}/out/result"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test information APIs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink info 
$FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list"
--- End diff --

Should we verify the output of `list`?


---


[jira] [Commented] (FLINK-8985) End-to-end test: CLI

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480297#comment-16480297
 ] 

ASF GitHub Bot commented on FLINK-8985:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185253
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+  -c org.apache.flink.examples.java.wordcount.WordCount \
+  $FLINK_DIR/examples/batch/WordCount.jar \
+  --input file:///$FLINK_DIR/README.txt \
+  --output file:///${TEST_DATA_DIR}/out/result"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test information APIs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink info 
$FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list"
--- End diff --

Should we verify the output of `list`?


> End-to-end test: CLI
> 
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should an end-to-end test which verifies that all client commands are 
> working correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8985) End-to-end test: CLI

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480300#comment-16480300
 ] 

ASF GitHub Bot commented on FLINK-8985:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185140
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

Since this is a detached execution, we probably want to wait until this job 
completes before continuing?


> End-to-end test: CLI
> 
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should an end-to-end test which verifies that all client commands are 
> working correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8985) End-to-end test: CLI

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480299#comment-16480299
 ] 

ASF GitHub Bot commented on FLINK-8985:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185689
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+  -c org.apache.flink.examples.java.wordcount.WordCount \
+  $FLINK_DIR/examples/batch/WordCount.jar \
+  --input file:///$FLINK_DIR/README.txt \
+  --output file:///${TEST_DATA_DIR}/out/result"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test information APIs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink info 
$FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list -s"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list -r"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test operation on running streaming jobs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+RETURN=`$FLINK_DIR/bin/flink run -d \
+$PERIODIC_JOB_JAR --outputPath file:///${TEST_DATA_DIR}/out/result`
+echo "job submission returns: $

[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5849
  
Hi @walterddr, what is the status of this PR? Would be nice if we can move 
forward with this PR (and also the CLI e2e test PR that also you opened.)


---


[jira] [Commented] (FLINK-8986) End-to-end test: REST

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480304#comment-16480304
 ] 

ASF GitHub Bot commented on FLINK-8986:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5849
  
Hi @walterddr, what is the status of this PR? Would be nice if we can move 
forward with this PR (and also the CLI e2e test PR that also you opened.)


> End-to-end test: REST
> -
>
> Key: FLINK-8986
> URL: https://issues.apache.org/jira/browse/FLINK-8986
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Tests
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should add an end-to-end test which verifies that we can use the REST 
> interface to obtain information about a running job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6041: [FLINK-9326] TaskManagerOptions.NUM_TASK_SLOTS does not w...

2018-05-18 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6041
  
cc @tillrohrmann 


---


[jira] [Commented] (FLINK-9326) TaskManagerOptions.NUM_TASK_SLOTS does not work for local/embedded mode

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480307#comment-16480307
 ] 

ASF GitHub Bot commented on FLINK-9326:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6041
  
cc @tillrohrmann 


> TaskManagerOptions.NUM_TASK_SLOTS does not work for local/embedded mode
> ---
>
> Key: FLINK-9326
> URL: https://issues.apache.org/jira/browse/FLINK-9326
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
> Environment: Linux 64bit
> Flink branch release-1.5
>Reporter: Samuel Doyle
>Assignee: vinoyang
>Priority: Major
>
> When attempting to set the number of task slots via the api such ash
> {code:java}
> configuration = new Configuration();
> configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 16);
> configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, 1);
> {code}
> I will always end up with the default slot setting based on the number of 
> cores I have where my standalone instance is running, it doesn't matter what 
> I set the the NUM_TASK_SLOTS to, it has no effect



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.

2018-05-18 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6039
  
Thanks, I'll also commit an empty release notes page to the 1.6 / master 
docs.


---


[GitHub] flink issue #5798: [FLINK-7917] The return of taskInformationOrBlobKey shoul...

2018-05-18 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5798
  
cc @kl0u 


---


[jira] [Commented] (FLINK-7917) The return of taskInformationOrBlobKey should be placed inside synchronized in ExecutionJobVertex

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480308#comment-16480308
 ] 

ASF GitHub Bot commented on FLINK-7917:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5798
  
cc @kl0u 


> The return of taskInformationOrBlobKey should be placed inside synchronized 
> in ExecutionJobVertex
> -
>
> Key: FLINK-7917
> URL: https://issues.apache.org/jira/browse/FLINK-7917
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> Currently in ExecutionJobVertex#getTaskInformationOrBlobKey:
> {code}
> }
> return taskInformationOrBlobKey;
> {code}
> The return should be placed inside synchronized block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9393) LocatableInputSplit#hashCode should take hostnames into account

2018-05-18 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480310#comment-16480310
 ] 

Stephan Ewen commented on FLINK-9393:
-

The leaking of the hostnames array make {{equals()}} vulnerable, it still does 
not violate the equals/hashcode contract. In some sense it is almost an 
argument to not change {{hashCode()}}, unless the argument is "equals is partly 
broken, so hash code should be broken as well".

The way that "splitNumber" gets initialized, it is a perfect hashcode, no 
collisions at all. That is admittedly not guaranteed from the class itself, but 
it is the effect of the way it is currently used. Adding the hostnames to the 
hash code can introduce some collisions that were not there before. Its not 
going to be a problem, but looking at the bigger picture, things may always be 
a bit different than an individual hashCode function tells.

I am not opposed to fixing this issue. What I am trying to do is make 
contributors aware of the mechanics and implications of contributing to such a 
large project. We can probably spend two years of everyone's time to just 
rewrite parts of the code that are already fine, but could be done differently 
to satisfy some checkstyle, default pattern, common guideline, etc.
Every time we do that, we have to be really careful, because changing something 
that is not wrong cannot really make anything better, but could make things 
worse (again, implications can be subtle), so great care is needed, and often 
more time (from reviewing committers) than it may initially seem.

Please understand that I am only writing this in order to raise some awareness. 
I have seen projects that came into a state that they mainly worked on "moving 
things around" internally, and slowed down a lot on adding user/developer 
benefit. So in my opinion we should.

In that sense, yes, please go ahead, this is fine to be changed and merged. 
Just take the above into account when looking for parts of Flink you want to 
change.


> LocatableInputSplit#hashCode should take hostnames into account
> ---
>
> Key: FLINK-9393
> URL: https://issues.apache.org/jira/browse/FLINK-9393
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>
> Currently:
> {code}
>   public int hashCode() {
> return this.splitNumber;
> {code}
> This is not symmetrical with {{equals}} method where hostnames are compared.
> LocatableInputSplit#hashCode should take hostnames into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.

2018-05-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6039


---


[jira] [Closed] (FLINK-9397) Individual Buffer Timeout of 0 incorrectly leads to default timeout

2018-05-18 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-9397.
---

> Individual Buffer Timeout of 0 incorrectly leads to default timeout
> ---
>
> Key: FLINK-9397
> URL: https://issues.apache.org/jira/browse/FLINK-9397
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> When configuring the buffer timeout of an individual operation to {{0}}, the 
> StreamGraphGenerator incorrectly uses the default value of the application 
> (typically 100).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9397) Individual Buffer Timeout of 0 incorrectly leads to default timeout

2018-05-18 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-9397.
-
Resolution: Fixed

Fixed in
  - 1.4.3 via 611ead023ff0be995a4cebb42d579e7e39442c3c
  - 1.5.0 via 884c2e39b401fc4f1e0623e856008af53ed5f98e
  - 1.6.0 via f41debef3f98cbbd49107fd02e101baa7c94fc6e

> Individual Buffer Timeout of 0 incorrectly leads to default timeout
> ---
>
> Key: FLINK-9397
> URL: https://issues.apache.org/jira/browse/FLINK-9397
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> When configuring the buffer timeout of an individual operation to {{0}}, the 
> StreamGraphGenerator incorrectly uses the default value of the application 
> (typically 100).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9392) Add @FunctionalInterface annotations to all core functional interfaces

2018-05-18 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-9392.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in fafef15eaca1f4277ac1dddb018a8e01eca22817

> Add @FunctionalInterface annotations to all core functional interfaces
> --
>
> Key: FLINK-9392
> URL: https://issues.apache.org/jira/browse/FLINK-9392
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.6.0
>
>
> The {{@FunctionalInterface}} annotation should be added to all SAM interfaces 
> in order to prevent accidentally breaking them (as non SAMs).
> We had a case of that before for the {{SinkFunction}} which was compatible 
> through default methods, but incompatible for users that previously 
> instantiated that interface through a lambda.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9392) Add @FunctionalInterface annotations to all core functional interfaces

2018-05-18 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-9392.
---

> Add @FunctionalInterface annotations to all core functional interfaces
> --
>
> Key: FLINK-9392
> URL: https://issues.apache.org/jira/browse/FLINK-9392
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.6.0
>
>
> The {{@FunctionalInterface}} annotation should be added to all SAM interfaces 
> in order to prevent accidentally breaking them (as non SAMs).
> We had a case of that before for the {{SinkFunction}} which was compatible 
> through default methods, but incompatible for users that previously 
> instantiated that interface through a lambda.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189195014
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+
+   private static final long serialVersionUID = -884738268437806062L;
+
+   /** Provider for schema coder. Used for initializing in each task. */
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+
+   /** Coder used for reading schema from incoming stream. */
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(Class recordClazz, 
@Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
+   this.schemaCoderProvider = schemaCoderProvider;
+   this.schemaCoder = schemaCoderProvider.get();
+   }
+
+   @Override
+   public T deserialize(byte[] message) {
+   // read record
+   try {
+   checkAvroInitialized();
+   getInputStream().setBuffer(message);
+   Schema writerSchema = 
schemaCoder.readSchema(getInputStream());
+   Schema readerSchema = getReaderSchema();
+
+   GenericDatumReader datumReader = getDatumReader();
+
+   datumReader.setSchema(writerSchema);
+   datumReader.setExpected(readerSchema);
+
+   return datumReader.read(null, getDecoder());
+   } catch (Exception e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
--- End diff --

The method `deserialize()` can throw an IOException. That got dropped from 
the signature, and exceptions are not wrapped into a RuntimeException. That 
makes exception stack traces more complicated, and hides the fact that "there 
is a possible exceptional case to handle" from the consumers of that code.

I think that this makes a general rule: Whenever using `RutimeException`, 
take a step back and look at the exception structure and signatures, and see if 
something is not declared well.


---


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480333#comment-16480333
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189195014
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+
+   private static final long serialVersionUID = -884738268437806062L;
+
+   /** Provider for schema coder. Used for initializing in each task. */
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+
+   /** Coder used for reading schema from incoming stream. */
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(Class recordClazz, 
@Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
+   this.schemaCoderProvider = schemaCoderProvider;
+   this.schemaCoder = schemaCoderProvider.get();
+   }
+
+   @Override
+   public T deserialize(byte[] message) {
+   // read record
+   try {
+   checkAvroInitialized();
+   getInputStream().setBuffer(message);
+   Schema writerSchema = 
schemaCoder.readSchema(getInputStream());
+   Schema readerSchema = getReaderSchema();
+
+   GenericDatumReader datumReader = getDatumReader();
+
+   datumReader.setSchema(writerSchema);
+   datumReader.setExpected(readerSchema);
+
+   return datumReader.read(null, getDecoder());
+   } catch (Exception e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
--- End diff --

The method `deserialize()` can throw an IOException. That got dropped from 
the signature, and exceptions are not wrapped into a RuntimeException. That 
makes exception stack traces more complicated, and hides the fact that "there 
is a possible exceptional case to handle" from the consumers of that code.

I think that this makes a general rule: Whenever using `RutimeException`, 
take a step back and look at the exception structure and signatures, and see if 
something is not declared well.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wys

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189197633
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+   this.schemaString = reader.toString();
+   } else {
+   this.schemaString = null;
+   }
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces {@link 
GenericRecord} using provided schema.
+*
+* @param schema schema of produced records
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static AvroDeserializationSchema 
forGeneric(Schema schema) {
--- End diff --

Minor comment: I found it helps code structure/readability to move 
static/factory methods either to the top or the bottom of the class.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189197766
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+   this.schemaString = reader.toString();
+   } else {
+   this.schemaString = null;
+   }
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces {@link 
GenericRecord} using provided schema.
+*
+* @param schema schema of produced records
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static AvroDeserializationSchema 
forGeneric(Schema schema) {
+   return new AvroDeserializationSchema<>(GenericRecord.class, 
schema);
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from avro schema.
+*
+* @param tClass class of record to be produced
+* @return deserialized record
+*/
+   public static  AvroDeserializationSchema 
forSpecific(Class tClass) {
+   return new Avro

[GitHub] flink issue #5823: [FLINK-9008] [e2e] Implements quickstarts end to end test

2018-05-18 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5823
  
Thank you very much to @tzulitai and @zentol review this PR.


---


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480344#comment-16480344
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189197633
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+   this.schemaString = reader.toString();
+   } else {
+   this.schemaString = null;
+   }
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces {@link 
GenericRecord} using provided schema.
+*
+* @param schema schema of produced records
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static AvroDeserializationSchema 
forGeneric(Schema schema) {
--- End diff --

Minor comment: I found it helps code structure/readability to move 
static/factory methods either to the top or the bottom of the class.


> Implement AvroDeserialization

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480345#comment-16480345
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189197766
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+   this.schemaString = reader.toString();
+   } else {
+   this.schemaString = null;
+   }
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces {@link 
GenericRecord} using provided schema.
+*
+* @param schema schema of produced records
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static AvroDeserializationSchema 
forGeneric(Schema schema) {
+   return new AvroDeserializationSchema<>(GenericRecord.class, 
schema);
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were

[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480347#comment-16480347
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5823
  
Thank you very much to @tzulitai and @zentol review this PR.


> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Blocker
> Fix For: 1.6.0
>
>
> We could add an end-to-end test which verifies Flink's quickstarts. It should 
> do the following:
> # create a new Flink project using the quickstarts archetype 
> # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library) 
> # run {{mvn clean package -Pbuild-jar}}
> # verify that no core dependencies are contained in the jar file
> # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
Added a few more comment, most importantly around exception wrapping.
Otherwise, looking good...


---


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480350#comment-16480350
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
Added a few more comment, most importantly around exception wrapping.
Otherwise, looking good...


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...

2018-05-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5979


---


[jira] [Commented] (FLINK-9070) Improve performance of RocksDBMapState.clear()

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480382#comment-16480382
 ] 

ASF GitHub Bot commented on FLINK-9070:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5979


> Improve performance of RocksDBMapState.clear()
> --
>
> Key: FLINK-9070
> URL: https://issues.apache.org/jira/browse/FLINK-9070
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Truong Duc Kien
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, RocksDBMapState.clear() is implemented by iterating over all the 
> keys and drop them one by one. This iteration can be quite slow with: 
>  * Large maps
>  * High-churn maps with a lot of tombstones
> There are a few methods to speed-up deletion for a range of keys, each with 
> their own caveats:
>  * DeleteRange: still experimental, likely buggy
>  * DeleteFilesInRange + CompactRange: only good for large ranges
>  
> Flink can also keep a list of inserted keys in-memory, then directly delete 
> them without having to iterate over the Rocksdb database again. 
>  
> Reference:
>  * [RocksDB article about range 
> deletion|https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys]
>  * [Bug in DeleteRange|https://pingcap.com/blog/2017-09-08-rocksdbbug]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9395) multiple left outer joins to subqueries with array values fail

2018-05-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480447#comment-16480447
 ] 

Fabian Hueske commented on FLINK-9395:
--

Hi [~kgeis], your analysis is correct. The problem is caused by the array type 
on the second outer join. The same problem would arise if table {{a}} had an 
array type, so it is not about the number of joins but about the field types of 
the outer side of a join (I'll update the JIRA to reflect that).

We need to sort (well actually the order is not important, only that values are 
correctly grouped) on all fields of the outer join input, because the outer 
join of the DataSet API only supports equality predicates. Hence, all other 
predicates need to be applied later. If the later applied predicates filter out 
all join results we still need to ensure to emit a null-padded result. To do 
that, we group on all fields of the outer side to check if we have a result for 
an input row or not. There's not really much else we can do with the current 
state of the DataSet API.

To solve this problem, we need to extend the TypeInformation of the array type 
by adding a TypeComparator that can be used to sort the array type. We will 
face similar issues for other types that are not sortable.

> multiple left outer joins to subqueries with array values fail
> --
>
> Key: FLINK-9395
> URL: https://issues.apache.org/jira/browse/FLINK-9395
> Project: Flink
>  Issue Type: Bug
>Reporter: Ken Geis
>Priority: Major
> Attachments: JoinTest.java
>
>
> Where {{a}} is a table with column {{id}}, the following query succeeds:
> {code:sql}
> SELECT * FROM a
>  LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
> {code}
> I add another join:
> {code:sql}
> SELECT * FROM a
>   LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
>   LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS c ON a.id = c.id
> {code}
> This fails with the error:
> {noformat}
> org.apache.flink.api.common.InvalidProgramException: Selected sort key is not 
> a sortable type
> at 
> org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
> at 
> org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465)
> at 
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> at 
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> at 
> scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176)
> at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399)
> at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378)
> at 
> org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9395) Outer Joins with array types on the outer join input fail

2018-05-18 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-9395:
-
Summary: Outer Joins with array types on the outer join input fail  (was: 
multiple left outer joins to subqueries with array values fail)

> Outer Joins with array types on the outer join input fail
> -
>
> Key: FLINK-9395
> URL: https://issues.apache.org/jira/browse/FLINK-9395
> Project: Flink
>  Issue Type: Bug
>Reporter: Ken Geis
>Priority: Major
> Attachments: JoinTest.java
>
>
> Where {{a}} is a table with column {{id}}, the following query succeeds:
> {code:sql}
> SELECT * FROM a
>  LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
> {code}
> I add another join:
> {code:sql}
> SELECT * FROM a
>   LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
>   LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS c ON a.id = c.id
> {code}
> This fails with the error:
> {noformat}
> org.apache.flink.api.common.InvalidProgramException: Selected sort key is not 
> a sortable type
> at 
> org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
> at 
> org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465)
> at 
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> at 
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> at 
> scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176)
> at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399)
> at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378)
> at 
> org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9395) Outer Joins with array types on the outer join input fail

2018-05-18 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-9395:
-
Description: 
Given a table {{a}} with a single column {{id}}, the following query

{code:sql}
SELECT * FROM (SELECT id, ARRAY[id] AS b FROM a) b
 LEFT OUTER JOIN a AS b ON a.id = b.id
{code}

fails with the error:

{noformat}
org.apache.flink.api.common.InvalidProgramException: Selected sort key is not a 
sortable type
at 
org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
at 
org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465)
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378)
at 
org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)
{noformat}


  was:
Where {{a}} is a table with column {{id}}, the following query succeeds:

{code:sql}
SELECT * FROM a
 LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
{code}

I add another join:

{code:sql}
SELECT * FROM a
  LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
  LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS c ON a.id = c.id
{code}

This fails with the error:

{noformat}
org.apache.flink.api.common.InvalidProgramException: Selected sort key is not a 
sortable type
at 
org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
at 
org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465)
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268)
at 
org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378)
at 
org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)
{noformat}



> Outer Joins with array types on the outer join input fail
> -
>
> Key: FLINK-9395
> URL: https://issues.apache.org/jira/browse/FLINK-9395
> Project: Flink
>  Issue Type: Bug
>Reporter: Ken Geis
>Priority: Major
> Attachments: JoinTest.java
>
>
> Given a table {{a}} with a single column {{id}}, the following query
> {code:sql}
> SELECT * FROM (SELECT id, ARRAY[id] AS b FROM a) b
>  LEFT OUTER JOIN a AS b ON a.id = b.id
> {code}
> fails with the error:
> {noformat}
> org.apache.flink.api.common.InvalidProgramException: Selected sort key is not 
> a sortable type
> at 
> org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
> at 
> org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466)
> at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$parti

[jira] [Comment Edited] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-05-18 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480463#comment-16480463
 ] 

Timo Walther edited comment on FLINK-9091 at 5/18/18 10:20 AM:
---

It seems that the versions declared in the {{dependencyManagement}} sections 
have no impact in other modules from time to time. Maybe it is a bug in the 
enforcer plugin, I don't know. I could not reproduce this error so far. But I 
think we can fix it. I have a branch where I use exclusions instead of the 
{{dependencyManagement}}. [~Zentol] what do you think about these changes? 
Together with FLINK-8511 we should be good.

https://github.com/apache/flink/compare/master...twalthr:FLINK-9091


was (Author: twalthr):
It seems that the versions declared in the {{dependencyManagement}} sections 
have no impact in other modules from time to time. Maybe it is a bug in the 
enforcer plugin, I don't know. I could not reproduce this error so far. But I 
think we can fix it. I have a branch where I use exclusions instead of the 
{{dependencyManagement}}. [~Zentol] what do you think about these changes. 
Together with FLINK-8511 we should be good.

https://github.com/apache/flink/compare/master...twalthr:FLINK-9091

> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-05-18 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480463#comment-16480463
 ] 

Timo Walther commented on FLINK-9091:
-

It seems that the versions declared in the {{dependencyManagement}} sections 
have no impact in other modules from time to time. Maybe it is a bug in the 
enforcer plugin, I don't know. I could not reproduce this error so far. But I 
think we can fix it. I have a branch where I use exclusions instead of the 
{{dependencyManagement}}. [~Zentol] what do you think about these changes. 
Together with FLINK-8511 we should be good.

https://github.com/apache/flink/compare/master...twalthr:FLINK-9091

> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5958
  
@FredTing we had some offline discussion on how to proceed with this.
@aljoscha, @twalthr, or @StephanEwen can probably comment more here if I 
missed anything.

The conflict that Stephan mentioned between a "common deserialization 
schema" interface and exposing surfacing connector specific information is 
rooted in the fact that both concerns (deserialization and providing connector 
specific record meta information) is currently coupled in a single interface.

Take for example the Kafka connector's `KeyedDeserializationSchema` - there 
we try to deserialize the Kafka bytes, as well as provide information such as 
topic / partition / timestamp etc. to allow the user to enrich their user 
records for downstream business logic. The first part (deserialization of 
bytes) should be something common for all connector sources, while the second 
part is Kafka-specific.

Therefore, we should perhaps break this up into two separate interfaces, as 
follows:
```
// common interface for all sources (we already have this)
interface DeserializationSchema {
T deserialize(byte[] bytes);
}

// ... and a Kafka-specific interface that is only used to provide record 
meta information
interface ConsumerRecordMetaInfoProvider {
T enrich(T record, ConsumerRecordMetaInfo metaInfo);
}
```

The second interface is something that each connector should have 
independently, and does not handle deserialization of the record bytes. The 
name, of course, is still open to discussion.

What do you think?


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480505#comment-16480505
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5958
  
@FredTing we had some offline discussion on how to proceed with this.
@aljoscha, @twalthr, or @StephanEwen can probably comment more here if I 
missed anything.

The conflict that Stephan mentioned between a "common deserialization 
schema" interface and exposing surfacing connector specific information is 
rooted in the fact that both concerns (deserialization and providing connector 
specific record meta information) is currently coupled in a single interface.

Take for example the Kafka connector's `KeyedDeserializationSchema` - there 
we try to deserialize the Kafka bytes, as well as provide information such as 
topic / partition / timestamp etc. to allow the user to enrich their user 
records for downstream business logic. The first part (deserialization of 
bytes) should be something common for all connector sources, while the second 
part is Kafka-specific.

Therefore, we should perhaps break this up into two separate interfaces, as 
follows:
```
// common interface for all sources (we already have this)
interface DeserializationSchema {
T deserialize(byte[] bytes);
}

// ... and a Kafka-specific interface that is only used to provide record 
meta information
interface ConsumerRecordMetaInfoProvider {
T enrich(T record, ConsumerRecordMetaInfo metaInfo);
}
```

The second interface is something that each connector should have 
independently, and does not handle deserialization of the record bytes. The 
name, of course, is still open to discussion.

What do you think?


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8511) Remove legacy code for the TableType annotation

2018-05-18 Thread blues zheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

blues zheng reassigned FLINK-8511:
--

Assignee: blues zheng  (was: vinoyang)

> Remove legacy code for the TableType annotation
> ---
>
> Key: FLINK-8511
> URL: https://issues.apache.org/jira/browse/FLINK-8511
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Timo Walther
>Assignee: blues zheng
>Priority: Critical
>
> We introduced the very generic TableSource factories that unify the 
> definition of table sources and are specified using Java service loaders. For 
> backwards compatibility, the old code paths are still supported but should be 
> dropped in future Flink versions.
> This will touch:
> {code}
> org.apache.flink.table.annotation.TableType
> org.apache.flink.table.catalog.ExternalCatalogTable
> org.apache.flink.table.api.NoMatchedTableSourceConverterException
> org.apache.flink.table.api.AmbiguousTableSourceConverterException
> org.apache.flink.table.catalog.TableSourceConverter
> org.apache.flink.table.catalog.ExternalTableSourceUtil
> {code}
> We can also drop the {{org.reflections}} and {{commons-configuration}} (and 
> maybe more?) dependencies.
> See also FLINK-8240



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5983: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-18 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r189244151
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

Wouldn't it be enough to add such `timeout()` method to  `AsyncFunction` 
with default implementation that fails the `ResultFuture`? I mean instead of 
adding new interface and deprecating `AsyncFunction`?


---


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480536#comment-16480536
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r189244151
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

Wouldn't it be enough to add such `timeout()` method to  `AsyncFunction` 
with default implementation that fails the `ResultFuture`? I mean instead of 
adding new interface and deprecating `AsyncFunction`?


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5983: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-18 Thread kisimple
Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r189250250
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

Thanks for your review. The deprecated `AsyncFunction` is a Java Interface 
which can not have a method body due to Java grammars. However, your comment 
make me realize that I just forgot about the Scala API for `AsyncFunction`, so 
there is more work need to be done.


---


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480580#comment-16480580
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r189250250
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

Thanks for your review. The deprecated `AsyncFunction` is a Java Interface 
which can not have a method body due to Java grammars. However, your comment 
make me realize that I just forgot about the Scala API for `AsyncFunction`, so 
there is more work need to be done.


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5863: [FLINK-8985][e2etest] initial support for End-to-end CLI ...

2018-05-18 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5863
  
Thanks @tzulitai for the review. I will update asap. 
I am not 100% sure whether I should verify the CLI return but I would 
definitely add them.


---


[jira] [Commented] (FLINK-8985) End-to-end test: CLI

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480664#comment-16480664
 ] 

ASF GitHub Bot commented on FLINK-8985:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5863
  
Thanks @tzulitai for the review. I will update asap. 
I am not 100% sure whether I should verify the CLI return but I would 
definitely add them.


> End-to-end test: CLI
> 
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should an end-to-end test which verifies that all client commands are 
> working correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

2018-05-18 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5849
  
Hi @tzulitai . I've actually created a new version of the test based on 
@zentol 's comment on this PR: 
https://github.com/walterddr/flink/compare/FLINK-8985...walterddr:FLINK-8986-test

But it actually depends on https://github.com/apache/flink/pull/5863 as I 
reused the periodic stream job for testing. Is it possible to create a PR on 
top of another currently pending PR?


---


[jira] [Commented] (FLINK-8986) End-to-end test: REST

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480666#comment-16480666
 ] 

ASF GitHub Bot commented on FLINK-8986:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5849
  
Hi @tzulitai . I've actually created a new version of the test based on 
@zentol 's comment on this PR: 
https://github.com/walterddr/flink/compare/FLINK-8985...walterddr:FLINK-8986-test

But it actually depends on https://github.com/apache/flink/pull/5863 as I 
reused the periodic stream job for testing. Is it possible to create a PR on 
top of another currently pending PR?


> End-to-end test: REST
> -
>
> Key: FLINK-8986
> URL: https://issues.apache.org/jira/browse/FLINK-8986
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Tests
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should add an end-to-end test which verifies that we can use the REST 
> interface to obtain information about a running job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state

2018-05-18 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480670#comment-16480670
 ] 

Rong Rong commented on FLINK-9398:
--

Hi [~yanghua], have you started working on this task? 

I was hoping to get this in and work together with FLINK-8985. If you haven't 
started working on it, can I assign it to myself?

Thanks,
Rong

> Flink CLI list running job returns all jobs except in CREATE state
> --
>
> Key: FLINK-9398
> URL: https://issues.apache.org/jira/browse/FLINK-9398
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Rong Rong
>Assignee: vinoyang
>Priority: Major
>
> See: 
> https://github.com/apache/flink/blob/4922ced71a307a26b9f5070b41f72fd5d93b0ac8/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445
> Seems like CLI command: *flink list -r* returns all jobs except jobs in 
> *CREATE* state. which conflicts with the CLI description: *Running/Restarting 
> Jobs*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5955


---


[jira] [Closed] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-18 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8659.
-
Resolution: Fixed

Merged on master with 4973a2ae49d805f56af746659c3eae48b512f983

and on 1.5 with 0c06852b3cecd414aac623ffd155ebc2a2a31336

> Add migration tests for Broadcast state.
> 
>
> Key: FLINK-8659
> URL: https://issues.apache.org/jira/browse/FLINK-8659
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480731#comment-16480731
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5955


> Add migration tests for Broadcast state.
> 
>
> Key: FLINK-8659
> URL: https://issues.apache.org/jira/browse/FLINK-8659
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state

2018-05-18 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480791#comment-16480791
 ] 

vinoyang commented on FLINK-9398:
-

hi [~walterddr] I have not started this issue, I have released it, you can 
start this task.

> Flink CLI list running job returns all jobs except in CREATE state
> --
>
> Key: FLINK-9398
> URL: https://issues.apache.org/jira/browse/FLINK-9398
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Rong Rong
>Assignee: vinoyang
>Priority: Major
>
> See: 
> https://github.com/apache/flink/blob/4922ced71a307a26b9f5070b41f72fd5d93b0ac8/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445
> Seems like CLI command: *flink list -r* returns all jobs except jobs in 
> *CREATE* state. which conflicts with the CLI description: *Running/Restarting 
> Jobs*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state

2018-05-18 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9398:
---

Assignee: (was: vinoyang)

> Flink CLI list running job returns all jobs except in CREATE state
> --
>
> Key: FLINK-9398
> URL: https://issues.apache.org/jira/browse/FLINK-9398
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Rong Rong
>Priority: Major
>
> See: 
> https://github.com/apache/flink/blob/4922ced71a307a26b9f5070b41f72fd5d93b0ac8/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445
> Seems like CLI command: *flink list -r* returns all jobs except jobs in 
> *CREATE* state. which conflicts with the CLI description: *Running/Restarting 
> Jobs*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9399) TPCHQuery3 fails due to Serialization issues

2018-05-18 Thread Gary Yao (JIRA)
Gary Yao created FLINK-9399:
---

 Summary: TPCHQuery3 fails due to Serialization issues
 Key: FLINK-9399
 URL: https://issues.apache.org/jira/browse/FLINK-9399
 Project: Flink
  Issue Type: Bug
  Components: Examples
Affects Versions: 1.5.0
Reporter: Gary Yao


Running {{org.apache.flink.examples.java.relational.TPCHQuery3}} fails because 
the static class {{ShippingPriorityItem}} has private visibility, and hence 
cannot be serialized.

*Stacktrace*
{noformat}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: java.lang.Exception: The 
data preparation for task 'CHAIN Join (Join at main(TPCHQuery3.java:155)) -> 
Combine (SUM(1), at main(TPCHQuery3.java:165)' , caused an error: Cannot 
instantiate tuple.
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at 
org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:174)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN Join (Join 
at main(TPCHQuery3.java:155)) -> Combine (SUM(1), at main(TPCHQuery3.java:165)' 
, caused an error: Cannot instantiate tuple.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Cannot instantiate tuple.
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:71)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:799)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:487)
at 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.open(NonReusingBuildFirstHashJoinIterator.java:98)
at 
org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:207)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
... 3 more
Caused by: java.lang.IllegalAccessException: Class 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer can not access a 
member of class 
org.apache.flink.examples.java.relational.TPCHQuery3$ShippingPriorityItem with 
modifiers "public"
at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
at java.lang.Class.newInstance(Class.java:436)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:62)
... 9 more
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9399) TPCHQuery3 example fails due to Serialization issues

2018-05-18 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-9399:

Summary: TPCHQuery3 example fails due to Serialization issues  (was: 
TPCHQuery3 fails due to Serialization issues)

> TPCHQuery3 example fails due to Serialization issues
> 
>
> Key: FLINK-9399
> URL: https://issues.apache.org/jira/browse/FLINK-9399
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Priority: Major
>
> Running {{org.apache.flink.examples.java.relational.TPCHQuery3}} fails 
> because the static class {{ShippingPriorityItem}} has private visibility, and 
> hence cannot be serialized.
> *Stacktrace*
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: java.lang.Exception: 
> The data preparation for task 'CHAIN Join (Join at main(TPCHQuery3.java:155)) 
> -> Combine (SUM(1), at main(TPCHQuery3.java:165)' , caused an error: Cannot 
> instantiate tuple.
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
>   at 
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
>   at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
>   at 
> org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:174)
> Caused by: java.lang.Exception: The data preparation for task 'CHAIN Join 
> (Join at main(TPCHQuery3.java:155)) -> Combine (SUM(1), at 
> main(TPCHQuery3.java:165)' , caused an error: Cannot instantiate tuple.
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Cannot instantiate tuple.
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:71)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:799)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:487)
>   at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.open(NonReusingBuildFirstHashJoinIterator.java:98)
>   at 
> org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:207)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
>   ... 3 more
> Caused by: java.lang.IllegalAccessException: Class 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer can not access a 
> member of class 
> org.apache.flink.examples.java.relational.TPCHQuery3$ShippingPriorityItem 
> with modifiers "public"
>   at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
>   at java.lang.Class.newInstance(Class.java:436)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:62)
>   ... 9 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9400) change import statement for flink-scala

2018-05-18 Thread thinkerou (JIRA)
thinkerou created FLINK-9400:


 Summary: change import statement for flink-scala
 Key: FLINK-9400
 URL: https://issues.apache.org/jira/browse/FLINK-9400
 Project: Flink
  Issue Type: Improvement
  Components: Scala API
Affects Versions: 1.4.1
Reporter: thinkerou
 Fix For: 1.6.0


At `flink-scala` project have the follow import statement:

 

```scala

import org.apache.flink.api.common.operators.Keys
import Keys.ExpressionKeys

```

So I want to commit one pull request to fix it.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9398) Flink CLI list running job returns all jobs except in CREATE state

2018-05-18 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong reassigned FLINK-9398:


Assignee: Rong Rong

> Flink CLI list running job returns all jobs except in CREATE state
> --
>
> Key: FLINK-9398
> URL: https://issues.apache.org/jira/browse/FLINK-9398
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> See: 
> https://github.com/apache/flink/blob/4922ced71a307a26b9f5070b41f72fd5d93b0ac8/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L437-L445
> Seems like CLI command: *flink list -r* returns all jobs except jobs in 
> *CREATE* state. which conflicts with the CLI description: *Running/Restarting 
> Jobs*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9399) TPCHQuery3 example fails due to Serialization issues

2018-05-18 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-9399:
---

Assignee: Timo Walther

> TPCHQuery3 example fails due to Serialization issues
> 
>
> Key: FLINK-9399
> URL: https://issues.apache.org/jira/browse/FLINK-9399
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Timo Walther
>Priority: Major
>
> Running {{org.apache.flink.examples.java.relational.TPCHQuery3}} fails 
> because the static class {{ShippingPriorityItem}} has private visibility, and 
> hence cannot be serialized.
> *Stacktrace*
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: java.lang.Exception: 
> The data preparation for task 'CHAIN Join (Join at main(TPCHQuery3.java:155)) 
> -> Combine (SUM(1), at main(TPCHQuery3.java:165)' , caused an error: Cannot 
> instantiate tuple.
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
>   at 
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
>   at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
>   at 
> org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:174)
> Caused by: java.lang.Exception: The data preparation for task 'CHAIN Join 
> (Join at main(TPCHQuery3.java:155)) -> Combine (SUM(1), at 
> main(TPCHQuery3.java:165)' , caused an error: Cannot instantiate tuple.
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Cannot instantiate tuple.
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:71)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:799)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:487)
>   at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.open(NonReusingBuildFirstHashJoinIterator.java:98)
>   at 
> org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:207)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
>   ... 3 more
> Caused by: java.lang.IllegalAccessException: Class 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer can not access a 
> member of class 
> org.apache.flink.examples.java.relational.TPCHQuery3$ShippingPriorityItem 
> with modifiers "public"
>   at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
>   at java.lang.Class.newInstance(Class.java:436)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:62)
>   ... 9 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9399) TPCHQuery3 example fails due to Serialization issues

2018-05-18 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480822#comment-16480822
 ] 

Timo Walther commented on FLINK-9399:
-

This is related to a couple of wrong type extraction checks. I fixed it 
[here|https://github.com/apache/flink/pull/5097] already. I will fix this issue 
as well.

> TPCHQuery3 example fails due to Serialization issues
> 
>
> Key: FLINK-9399
> URL: https://issues.apache.org/jira/browse/FLINK-9399
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Timo Walther
>Priority: Major
>
> Running {{org.apache.flink.examples.java.relational.TPCHQuery3}} fails 
> because the static class {{ShippingPriorityItem}} has private visibility, and 
> hence cannot be serialized.
> *Stacktrace*
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: java.lang.Exception: 
> The data preparation for task 'CHAIN Join (Join at main(TPCHQuery3.java:155)) 
> -> Combine (SUM(1), at main(TPCHQuery3.java:165)' , caused an error: Cannot 
> instantiate tuple.
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
>   at 
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
>   at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
>   at 
> org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:174)
> Caused by: java.lang.Exception: The data preparation for task 'CHAIN Join 
> (Join at main(TPCHQuery3.java:155)) -> Combine (SUM(1), at 
> main(TPCHQuery3.java:165)' , caused an error: Cannot instantiate tuple.
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Cannot instantiate tuple.
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:71)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:799)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:487)
>   at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.open(NonReusingBuildFirstHashJoinIterator.java:98)
>   at 
> org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:207)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
>   ... 3 more
> Caused by: java.lang.IllegalAccessException: Class 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer can not access a 
> member of class 
> org.apache.flink.examples.java.relational.TPCHQuery3$ShippingPriorityItem 
> with modifiers "public"
>   at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
>   at java.lang.Class.newInstance(Class.java:436)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.createInstance(TupleSerializer.java:62)
>   ... 9 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-05-18 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480842#comment-16480842
 ] 

Ted Yu commented on FLINK-9091:
---

Hit the above error when running tests for 1.5.0 RC.

> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6042: [FLINK-9400] change import statement for flink-sca...

2018-05-18 Thread thinkerou
GitHub user thinkerou opened a pull request:

https://github.com/apache/flink/pull/6042

[FLINK-9400] change import statement for flink-scala

**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

When I read `flink-scala` code, I found like the below code:

```scala
import org.apache.flink.api.common.operators.Keys
import Keys.ExpressionKeys
```

So, I want to fix it, use the follow code:

```scala
import org.apache.flink.api.common.operators.Keys.ExpressionKeys
```

## Brief change log

- normalize scala import statement

## Verifying this change

This change is already covered by existing tests, test pass.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (don't know)
  - The runtime per-record code paths (performance sensitive): (don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (don't know)
  - The S3 file system connector: (don't know)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/thinkerou/flink im

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6042.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6042


commit 5ded44fb0ecc27007bce837c519b8e140504f879
Author: thinkerou 
Date:   2018-05-18T16:02:01Z

[FLINK-9400] change import statement for flink-scala




---


[jira] [Commented] (FLINK-9400) change import statement for flink-scala

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480866#comment-16480866
 ] 

ASF GitHub Bot commented on FLINK-9400:
---

GitHub user thinkerou opened a pull request:

https://github.com/apache/flink/pull/6042

[FLINK-9400] change import statement for flink-scala

**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

When I read `flink-scala` code, I found like the below code:

```scala
import org.apache.flink.api.common.operators.Keys
import Keys.ExpressionKeys
```

So, I want to fix it, use the follow code:

```scala
import org.apache.flink.api.common.operators.Keys.ExpressionKeys
```

## Brief change log

- normalize scala import statement

## Verifying this change

This change is already covered by existing tests, test pass.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (don't know)
  - The runtime per-record code paths (performance sensitive): (don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (don't know)
  - The S3 file system connector: (don't know)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/thinkerou/flink im

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6042.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6042


commit 5ded44fb0ecc27007bce837c519b8e140504f879
Author: thinkerou 
Date:   2018-05-18T16:02:01Z

[FLINK-9400] change import statement for flink-scala




> change import statement for flink-scala
> ---
>
> Key: FLINK-9400
> URL: https://issues.apache.org/jira/browse/FLINK-9400
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.4.1
>Reporter: thinkerou
>Priority: Trivial
>  Labels: easyfix
> Fix For: 1.6.0
>
>
> At `flink-scala` project have the follow import statement:
>  
> ```scala
> import org.apache.flink.api.common.operators.Keys
> import Keys.ExpressionKeys
> ```
> So I want to commit one pull request to fix it.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9387) Several log message errors in queryable-state module

2018-05-18 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481092#comment-16481092
 ] 

Chesnay Schepler commented on FLINK-9387:
-

No, it didn't miss a placeholder. There's an implicit placeholder for an 
exception which causes the stacktrace to be printed. With this change only the 
exception message is printed as now {{toString}} is called.

> Several log message errors in queryable-state module
> 
>
> Key: FLINK-9387
> URL: https://issues.apache.org/jira/browse/FLINK-9387
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-18 Thread cjolif
GitHub user cjolif opened a pull request:

https://github.com/apache/flink/pull/6043

[FLINK-7386] evolve RequestIndexer API to make it working with Elastic 
5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a 
possible RestHighLevelClient implementation 

## What is the purpose of the change

*The purpose of this PR is to make sure current Elasticsearch 
implementation is  compatible with Elasticsearch 5.3+ fixing [FLINK-7386]  and 
is also open to a future HighLevelRestClient implementation that could be used 
to provide elasticsearch 6 compatibility [FLINK-8101]*

## Brief change log


* add specific IndexRequest, UpdateRequest and DeleteRequest add method on 
RequestIndexer so that  it is compatible both with 5.2- and 5.3+ APIs (knowing 
that in 5.3+ Elasticsearch does not accept anymore ActionRequest in 
BulkProcessor).
* make sure existing ActionRequest method on RequestIndexer is calling the 
new specific method based on actual type.
* throw an exception for other types.
* Change returned values of createClient method in 
ElasticsearchApiCallBridge. As TransportClient and HighLevelRestClient have 
only the AutoCloseable interface in common, this is what the method returns now.
 * Make ElasticsearchSinkBase agnostic to whether it is using a 
TransportClient or RestClient by adding a createBulkProcessorBuilder method on 
ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this 
method on all bridges. 
  
## Verifying this change

This change added tests and can be verified as follows:
* Elasticsearch test base has also been reworked a little bit to make it 
compatible with the changes.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  a `@PublicEvolving` interface is now an abstract class. 
However typically the user does not extend/implement it but just call methods 
on it.
  - The serializers:  no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs & javadocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cjolif/flink es-5.3-apis

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6043.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6043


commit b1f2abc1d33b39c1fed4f370e5b21cbf477e0aa8
Author: Christophe Jolif 
Date:   2018-05-17T22:17:04Z

[FLINK-7386] evolve RequestIndexer API to make it working with Elastic
5.3+, evolve ElasticsearchApiCallBridge API to make it compatible
with a possible RestHighLevelClient implementation.




---


[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481116#comment-16481116
 ] 

ASF GitHub Bot commented on FLINK-7386:
---

GitHub user cjolif opened a pull request:

https://github.com/apache/flink/pull/6043

[FLINK-7386] evolve RequestIndexer API to make it working with Elastic 
5.3+, evolve ElasticsearchApiCallBridge API to make it compatible with a 
possible RestHighLevelClient implementation 

## What is the purpose of the change

*The purpose of this PR is to make sure current Elasticsearch 
implementation is  compatible with Elasticsearch 5.3+ fixing [FLINK-7386]  and 
is also open to a future HighLevelRestClient implementation that could be used 
to provide elasticsearch 6 compatibility [FLINK-8101]*

## Brief change log


* add specific IndexRequest, UpdateRequest and DeleteRequest add method on 
RequestIndexer so that  it is compatible both with 5.2- and 5.3+ APIs (knowing 
that in 5.3+ Elasticsearch does not accept anymore ActionRequest in 
BulkProcessor).
* make sure existing ActionRequest method on RequestIndexer is calling the 
new specific method based on actual type.
* throw an exception for other types.
* Change returned values of createClient method in 
ElasticsearchApiCallBridge. As TransportClient and HighLevelRestClient have 
only the AutoCloseable interface in common, this is what the method returns now.
 * Make ElasticsearchSinkBase agnostic to whether it is using a 
TransportClient or RestClient by adding a createBulkProcessorBuilder method on 
ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this 
method on all bridges. 
  
## Verifying this change

This change added tests and can be verified as follows:
* Elasticsearch test base has also been reworked a little bit to make it 
compatible with the changes.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  a `@PublicEvolving` interface is now an abstract class. 
However typically the user does not extend/implement it but just call methods 
on it.
  - The serializers:  no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs & javadocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cjolif/flink es-5.3-apis

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6043.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6043


commit b1f2abc1d33b39c1fed4f370e5b21cbf477e0aa8
Author: Christophe Jolif 
Date:   2018-05-17T22:17:04Z

[FLINK-7386] evolve RequestIndexer API to make it working with Elastic
5.3+, evolve ElasticsearchApiCallBridge API to make it compatible
with a possible RestHighLevelClient implementation.




> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> 
>
> Key: FLINK-7386
> URL: https://issues.apache.org/jira/browse/FLINK-7386
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector
>Reporter: Dawid Wysakowicz
>Assignee: Fang Yong
>Priority: Critical
> Fix For: 1.5.0
>
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6044: [FLINK-1044] Website: Offer a zip archive with a p...

2018-05-18 Thread medcv
GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6044

[FLINK-1044] Website: Offer a zip archive with a pre-setup user project

## What is the purpose of the change

This PR will run two tests to build Java and Scala quickstart packages. 
After compiling and packaging  the jar files, it submit the wordcount class to 
a cluster and validate the output. At the very end it should upload the jar 
files to `https://flink.apache.org/q` server. Last part not working as I need 
more details to find out how to upload the .jar files to the flink server and I 
am looking for help from reviewers to point me to the right direction.


## Brief change log

  - *add two new e2e tests *
  - *Modify run-pre-commit-tests*


## Verifying this change

This change added tests and can be verified as follows:

Run test_build_quickstart_java.sh and test_build_quickstart_scala.sh to 
verify this change

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-1044

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6044.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6044


commit dd59a8df1662bd1fa06dbfa220802e20a12945c3
Author: Yadan.JS 
Date:   2018-05-18T22:11:40Z

[FLINK-1044] Website: Offer a zip archive with a pre-setup user project




---


[jira] [Commented] (FLINK-1044) Website: Offer a zip archive with a pre-setup user project

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481317#comment-16481317
 ] 

ASF GitHub Bot commented on FLINK-1044:
---

GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6044

[FLINK-1044] Website: Offer a zip archive with a pre-setup user project

## What is the purpose of the change

This PR will run two tests to build Java and Scala quickstart packages. 
After compiling and packaging  the jar files, it submit the wordcount class to 
a cluster and validate the output. At the very end it should upload the jar 
files to `https://flink.apache.org/q` server. Last part not working as I need 
more details to find out how to upload the .jar files to the flink server and I 
am looking for help from reviewers to point me to the right direction.


## Brief change log

  - *add two new e2e tests *
  - *Modify run-pre-commit-tests*


## Verifying this change

This change added tests and can be verified as follows:

Run test_build_quickstart_java.sh and test_build_quickstart_scala.sh to 
verify this change

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-1044

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6044.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6044


commit dd59a8df1662bd1fa06dbfa220802e20a12945c3
Author: Yadan.JS 
Date:   2018-05-18T22:11:40Z

[FLINK-1044] Website: Offer a zip archive with a pre-setup user project




> Website: Offer a zip archive with a pre-setup user project
> --
>
> Key: FLINK-1044
> URL: https://issues.apache.org/jira/browse/FLINK-1044
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Stephan Ewen
>Priority: Minor
>  Labels: starter
> Attachments: flink-project.zip
>
>
> This is basically a shortcut for those that are not familiar with maven 
> archetypes or do not have maven installed (other then as part of the Eclipse 
> IDE or so).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9387) Several log message errors in queryable-state module

2018-05-18 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481354#comment-16481354
 ] 

vinoyang commented on FLINK-9387:
-

[~Zentol] sorry, it really did not miss a placeholder, so how it fix this? open 
a new issue? If you agree, I will do it.

> Several log message errors in queryable-state module
> 
>
> Key: FLINK-9387
> URL: https://issues.apache.org/jira/browse/FLINK-9387
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9401) Data lost when rescaling the job from incremental checkpoint

2018-05-18 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9401:
-

 Summary: Data lost when rescaling the job from incremental 
checkpoint
 Key: FLINK-9401
 URL: https://issues.apache.org/jira/browse/FLINK-9401
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.2, 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


We may lost data when rescaling job from incremental checkpoint because of the 
following code.
{code:java}
try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, 
columnFamilyHandle)) {

   int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
   byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
  startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
   }

   iterator.seek(startKeyGroupPrefixBytes);

   while (iterator.isValid()) {

  int keyGroup = 0;
  for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
 keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
  }

  if (stateBackend.keyGroupRange.contains(keyGroup)) {
 stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
  }

  iterator.next();
   }
}
{code}

For every state handle to fetch the target data, we 
_seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be 
INVALID immediately if the state handle's _start key group_ is bigger that 
_state.keyGroupRange.getStartKeyGroup()_. Then, data lost...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9401) Data lost when rescaling the job from incremental checkpoint

2018-05-18 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-9401.
-
Resolution: Invalid

> Data lost when rescaling the job from incremental checkpoint
> 
>
> Key: FLINK-9401
> URL: https://issues.apache.org/jira/browse/FLINK-9401
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> We may lost data when rescaling job from incremental checkpoint because of 
> the following code.
> {code:java}
> try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, 
> columnFamilyHandle)) {
>int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
>byte[] startKeyGroupPrefixBytes = new 
> byte[stateBackend.keyGroupPrefixBytes];
>for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
>   startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
>}
>iterator.seek(startKeyGroupPrefixBytes);
>while (iterator.isValid()) {
>   int keyGroup = 0;
>   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
>  keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
>   }
>   if (stateBackend.keyGroupRange.contains(keyGroup)) {
>  stateBackend.db.put(targetColumnFamilyHandle,
> iterator.key(), iterator.value());
>   }
>   iterator.next();
>}
> }
> {code}
> For every state handle to fetch the target data, we 
> _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be 
> INVALID immediately if the state handle's _start key group_ is bigger that 
> _state.keyGroupRange.getStartKeyGroup()_. Then, data lost...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-18 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r189417750
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -181,6 +181,25 @@ public static Properties 
replaceDeprecatedProducerKeys(Properties configProps) {
return configProps;
}
 
+   public static Properties replaceDeprecatedConsumerKeys(Properties 
configProps) {
+   if 
(configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) 
{
--- End diff --

Done.


---


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481373#comment-16481373
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r189417750
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -181,6 +181,25 @@ public static Properties 
replaceDeprecatedProducerKeys(Properties configProps) {
return configProps;
}
 
+   public static Properties replaceDeprecatedConsumerKeys(Properties 
configProps) {
+   if 
(configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) 
{
--- End diff --

Done.


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9390) Shutdown of KafkaProducer causes confusing log message

2018-05-18 Thread Triones Deng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481438#comment-16481438
 ] 

Triones Deng commented on FLINK-9390:
-

[~StephanEwen] notice that now flink make use of TaskInterrupter to cancel 
running task,it is hard tell the InterruptedException due to cancel or real 
exception.   so when the user try to cancel the application, will call 
{code:java}
Task.cancelOrFailAndCancelInvokable()
{code}
here we can give the user a hint like a log that the InterruptedException due 
to  cancel, so the user can ignore the below InterruptedException log. 


> Shutdown of KafkaProducer causes confusing log message
> --
>
> Key: FLINK-9390
> URL: https://issues.apache.org/jira/browse/FLINK-9390
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Stefan Richter
>Priority: Minor
>
> I found some logged exceptions in a user log that occurred during shutdown in 
> the context of the Kafka Producer. Those exceptions are most certainly not a 
> real problem, but can be confusing to users, so maybe we can get rid of them.
> {code}
> 2018-05-16 08:52:16,526 DEBUG 
> org.apache.flink.streaming.api.operators.async.Emitter - Emitter thread got 
> interrupted, shutting down.
> 2018-05-16 08:52:16,527 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal 
> of stream operator.
> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:384)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.InterruptedException
>  at java.lang.Object.wait(Native Method)
>  at java.lang.Thread.join(Thread.java:1260)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>  ... 9 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8654) Extend quickstart docs on how to submit jobs

2018-05-18 Thread Yazdan Shirvany (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481456#comment-16481456
 ] 

Yazdan Shirvany commented on FLINK-8654:


[~Zentol] is this still valid? If yes I would like to assign it to myself and 
work on it

Should the CLI part goes here :

[https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/setup_quickstart.html]

or adding a new page under Project Setup after IDE Setup?

> Extend quickstart docs on how to submit jobs
> 
>
> Key: FLINK-8654
> URL: https://issues.apache.org/jira/browse/FLINK-8654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Quickstarts
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The quickstart documentation explains how to setup the project, build the jar 
> and run things in the IDE, but neither explains how to submit the jar to a 
> cluster nor guides the user to where he could find this information (like the 
> CLI docs).
> Additionally, the quickstart poms should also contain the commands for 
> submitting the jar to a cluster, in particular how to select a main-class if 
> it wasn't set in the pom. (-c CLI flag)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-05-18 Thread Yazdan Shirvany (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481458#comment-16481458
 ] 

Yazdan Shirvany commented on FLINK-8983:


[~till.rohrmann] is it ok to assign this to myself?

> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6044: [FLINK-1044] Website: Offer a zip archive with a pre-setu...

2018-05-18 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6044
  
@StephanEwen I would appreciate if you review this


---


[jira] [Commented] (FLINK-1044) Website: Offer a zip archive with a pre-setup user project

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481459#comment-16481459
 ] 

ASF GitHub Bot commented on FLINK-1044:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6044
  
@StephanEwen I would appreciate if you review this


> Website: Offer a zip archive with a pre-setup user project
> --
>
> Key: FLINK-1044
> URL: https://issues.apache.org/jira/browse/FLINK-1044
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Stephan Ewen
>Priority: Minor
>  Labels: starter
> Attachments: flink-project.zip
>
>
> This is basically a shortcut for those that are not familiar with maven 
> archetypes or do not have maven installed (other then as part of the Eclipse 
> IDE or so).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)