[jira] [Created] (FLINK-8618) Add S3 to the list of sinks on the delivery guarantees page

2018-02-09 Thread chris snow (JIRA)
chris snow created FLINK-8618:
-

 Summary: Add S3 to the list of sinks on the delivery guarantees 
page
 Key: FLINK-8618
 URL: https://issues.apache.org/jira/browse/FLINK-8618
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: chris snow


It would be good to add S3 to the list of sinks.

Maybe S3 inherits the delivery guarantee properties from hdfs in which case it 
could be added next to hdfs? E.g.

HDFS/S3 rolling sink | exactly once | Implementation depends on Hadoop version

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-09 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167162743
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class containing common methods for testing
+ * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}.
+ */
+public class BarrierBufferTestBase {
--- End diff --

This is not exactly what I had in mind by deduplication of 
`BarrierBufferTest` and `CreditBasedBarrierBufferTest`.  Both of those tests 
are still pretty much copy of one another and those static methods are only a 
fraction of duplication.

Look for example at the `testSingleChannelNoBarriers()` they are 99% 
identical. All of it's code could be moved to `BarrierBufferTestBase`. 
`BarrierBufferTestBase` would only need to define abstract method 
`CheckpointBarrierHandler createBarrierHandler()` which would be define 
differently in `BarrierBufferTest` and `CreditBasedBarrierBufferTest`. One 
minor thing is that `BarrierBufferTest` would need `checkNoTempFilesRemain()` 
added as an `@After` test hook. Same applies to all of the other tests.


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-09 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167163798
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class containing common methods for testing
+ * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}.
+ */
+public class BarrierBufferTestBase {
+
+   private static final Random RND = new Random();
+
+   private static int sizeCounter = 1;
+
+   public static BufferOrEvent createBarrier(long checkpointId, int 
channel) {
+   return new BufferOrEvent(new CheckpointBarrier(
+   checkpointId, System.currentTimeMillis(), 
CheckpointOptions.forCheckpointWithDefaultLocation()), channel);
+   }
+
+   public static BufferOrEvent createCancellationBarrier(long 
checkpointId, int channel) {
--- End diff --

Instead of using static methods please use inheritance - make 
`BarrierBufferTest` and `CreditBasedBarrierBufferTest` extend 
`BarrierBufferTestBase`. Especially that name `*Base` already suggests that.


---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358128#comment-16358128
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167163798
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class containing common methods for testing
+ * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}.
+ */
+public class BarrierBufferTestBase {
+
+   private static final Random RND = new Random();
+
+   private static int sizeCounter = 1;
+
+   public static BufferOrEvent createBarrier(long checkpointId, int 
channel) {
+   return new BufferOrEvent(new CheckpointBarrier(
+   checkpointId, System.currentTimeMillis(), 
CheckpointOptions.forCheckpointWithDefaultLocation()), channel);
+   }
+
+   public static BufferOrEvent createCancellationBarrier(long 
checkpointId, int channel) {
--- End diff --

Instead of using static methods please use inheritance - make 
`BarrierBufferTest` and `CreditBasedBarrierBufferTest` extend 
`BarrierBufferTestBase`. Especially that name `*Base` already suggests that.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks

[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358129#comment-16358129
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r167162743
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class containing common methods for testing
+ * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}.
+ */
+public class BarrierBufferTestBase {
--- End diff --

This is not exactly what I had in mind by deduplication of 
`BarrierBufferTest` and `CreditBasedBarrierBufferTest`.  Both of those tests 
are still pretty much copy of one another and those static methods are only a 
fraction of duplication.

Look for example at the `testSingleChannelNoBarriers()` they are 99% 
identical. All of it's code could be moved to `BarrierBufferTestBase`. 
`BarrierBufferTestBase` would only need to define abstract method 
`CheckpointBarrierHandler createBarrierHandler()` which would be define 
differently in `BarrierBufferTest` and `CreditBasedBarrierBufferTest`. One 
minor thing is that `BarrierBufferTest` would need `checkNoTempFilesRemain()` 
added as an `@After` test hook. Same applies to all of the other tests.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to ro

[jira] [Created] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Lynch Lee (JIRA)
Lynch Lee created FLINK-8619:


 Summary: Some thing about Flink SQL distinct, I need help
 Key: FLINK-8619
 URL: https://issues.apache.org/jira/browse/FLINK-8619
 Project: Flink
  Issue Type: Wish
  Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: Lynch Lee
 Fix For: 1.4.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Lynch Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lynch Lee updated FLINK-8619:
-
Description: 
I do some test about distinct on mysql below:

 

 

mysql> CREATE TABLE `rpt_tt` (

    ->   `target_id` varchar(50) NOT NULL DEFAULT '',

    ->   `target_type` varchar(50) NOT NULL DEFAULT '',

    ->   `amt_pay` bigint(20) DEFAULT NULL,

    ->   `down_payment` bigint(20) DEFAULT NULL,

    ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)

    ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Query OK, 0 rows affected (0.01 sec)

 

mysql> insert into rpt_tt values("1","5","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> insert into rpt_tt values("3","5","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> insert into rpt_tt values("2","6","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> insert into rpt_tt values("3","7","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt;

+-+---+-+--+

| target_type | target_id | amt_pay | down_payment |

+-+---+-+--+

| 5           | 1         |       1 |            1 |

| 6           | 2         |       1 |            1 |

| 5           | 3         |       1 |            1 |

| 7           | 3         |       1 |            1 |

+-+---+-+--+

4 rows in set (0.00 sec)

 

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt 
group by target_type;

+-+---+-+--+

| target_type | target_id | amt_pay | down_payment |

+-+---+-+--+

| 5           | 1         |       1 |            1 |

| 6           | 2         |       1 |            1 |

| 7           | 3         |       1 |            1 |

+-+---+-+--+

3 rows in set (0.00 sec)

 

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt 
group by target_type,target_id,amt_pay,down_payment;

+-+---+-+--+

| target_type | target_id | amt_pay | down_payment |

+-+---+-+--+

| 5           | 1         |       1 |            1 |

| 5           | 3         |       1 |            1 |

| 6           | 2         |       1 |            1 |

| 7           | 3         |       1 |            1 |

+-+---+-+--+

4 rows in set (0.01 sec)

 

But now,

I want do some query on flink SQL, code is here:


import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.god.hala.flink.convertors.RowIntoJson;
import com.god.hala.flink.sources.DataSources;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.Properties;
import java.util.UUID;

public class KafkaConn2Topics1 {

 public static void main(String[] args) throws Exception {
 String inputTopic = "input-case01-test02";
 String outputTopic = "output-case01-test02";

 Properties props = new Properties();
 props.setProperty("bootstrap.servers", "data-node5:9092");
 props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", 
""));

 LocalStreamEnvironment streamEnv = 
StreamExecutionEnvironment.createLocalEnvi

[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...

2018-02-09 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , thanks for suggestions and I totally agree with that. 
That abstraction indeed makes the code simple.  I will update the codes 
ASAP.


---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358151#comment-16358151
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , thanks for suggestions and I totally agree with that. 
That abstraction indeed makes the code simple.  I will update the codes 
ASAP.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Lynch Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lynch Lee updated FLINK-8619:
-
Description: 
I do some test about distinct on mysql below:

 

 

mysql> CREATE TABLE `rpt_tt` (

    ->   `target_id` varchar(50) NOT NULL DEFAULT '',

    ->   `target_type` varchar(50) NOT NULL DEFAULT '',

    ->   `amt_pay` bigint(20) DEFAULT NULL,

    ->   `down_payment` bigint(20) DEFAULT NULL,

    ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)

    ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Query OK, 0 rows affected (0.01 sec)

 

mysql> insert into rpt_tt values("1","5","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> insert into rpt_tt values("3","5","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> insert into rpt_tt values("2","6","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> insert into rpt_tt values("3","7","1","1");

Query OK, 1 row affected (0.00 sec)

 

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt;

+--+--++---+
|target_type|target_id|amt_pay|down_payment|

+--+--++---+
|5          |1        |      1|            1|
|6          |2        |      1|            1|
|5          |3        |      1|            1|
|7          |3        |      1|            1|

+--+--++---+

4 rows in set (0.00 sec)

 

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt 
group by target_type;

+--+--++---+
|target_type|target_id|amt_pay|down_payment|

+--+--++---+
|5          |1        |      1|            1|
|6          |2        |      1|            1|
|7          |3        |      1|            1|

+--+--++---+

3 rows in set (0.00 sec)

 

mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt 
group by target_type,target_id,amt_pay,down_payment;

+--+--++---+
|target_type|target_id|amt_pay|down_payment|

+--+--++---+
|5          |1        |      1|            1|
|5          |3        |      1|            1|
|6          |2        |      1|            1|
|7          |3        |      1|            1|

+--+--++---+

4 rows in set (0.01 sec)

 

But now,

I want do some query on flink SQL, code is here:

import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.god.hala.flink.convertors.RowIntoJson;
 import com.god.hala.flink.sources.DataSources;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
 import java.util.Properties;
 import java.util.UUID;

public class KafkaConn2Topics1 {

public static void main(String[] args) throws Exception

{ String inputTopic = "input-case01-test02"; String outputTopic = 
"output-case01-test02"; Properties props = new Properties(); 
props.setProperty("bootstrap.servers", "data-node5:9092"); 
props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", 
"")); LocalStreamEnvironment streamEnv = 
StreamExecutionEnvironment.createLocalEnvironment(); 
streamEnv.setParallelism(1); 
streamEnv.setStreamTimeCharacteristic(TimeChar

[jira] [Commented] (FLINK-6428) Add support DISTINCT in dataStream SQL

2018-02-09 Thread Lynch Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358160#comment-16358160
 ] 

Lynch Lee commented on FLINK-6428:
--

[~fhueske]  I want use flink sql into my product, but i need some suggestion 
from you, thanks .

For this sql:   SELECT distinct a, b, c FROM t GROUP BY a, b, c

why must we put the fields b,c into the group by keys while the distinct is on 
field a ??  

> Add support DISTINCT in dataStream SQL
> --
>
> Key: FLINK-6428
> URL: https://issues.apache.org/jira/browse/FLINK-6428
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> Add support DISTINCT in dataStream SQL as follow:
> DATA:
> {code}
> (name, age)
> (kevin, 28),
> (sunny, 6),
> (jack, 6)
> {code}
> SQL:
> {code}
> SELECT DISTINCT age FROM MyTable"
> {code}
> RESULTS:
> {code}
> 28, 6
> {code}
> To DataStream:
> {code}
> inputDS
>   .keyBy() // KeyBy on all fields
>   .flatMap() //  Eliminate duplicate data
> {code}
> [~fhueske] do we need this feature?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-8619.
---
Resolution: Won't Fix

Hi [~lynchlee], Jira is used for tracking bugs or proposing new features. If 
you have questions like this, please use our mailing lists 
(http://flink.apache.org/community.html#mailing-lists) and we are happy to help 
you.

> Some thing about Flink SQL distinct, I need help
> 
>
> Key: FLINK-8619
> URL: https://issues.apache.org/jira/browse/FLINK-8619
> Project: Flink
>  Issue Type: Wish
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
> Fix For: 1.4.0
>
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |5          |3        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |5          |3        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
>  import com.fasterxml.jackson.databind.JsonNode;
>  import com.fasterxml.jackson.databind.ObjectMapper;
>  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
>  import com.fasterxml.jackson.databind.node.ObjectNode;
>  import com.god.hala.flink.convertors.RowIntoJson;
>  import com.god.hala.flink.sources.DataSources;
>  import org.apache.flink.api.common.functions.MapFunction;
>  import org.apache.flink.api.common.restartstrategy.RestartStrategies;
>  import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>  import org.apache.flink.api.common.time.Time;
>  import org.apache.flink.api.common.typeinfo.TypeInformation;
>  import org.apache.flink.api.java.tuple.Tuple2;
>  import org.apache.flink.api.java.typeutils.RowTypeInfo;
>  import org.apache.flink.streaming.api.CheckpointingMode;
>  import org.apache.flink.streaming.api.TimeCharacteristic;
>  import org.apache.flink.streaming.api.datastream.DataStream;
>  import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>  import org.apache.flink.streaming.api.functions.ProcessFunction;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
>  import 
> org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
>  import org.apache.flink.table.api.StreamQueryConfig;
>  import org.apache.flink.table.api.Table;
>  import org.apache.flink.table.api.Types;
>  import org

[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-09 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5395
  
Didn't merge yet, because there is an issue with the buildbot environment.


---


[jira] [Commented] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358173#comment-16358173
 ] 

Timo Walther commented on FLINK-8619:
-

[~lynchlee] to answer your question. I don't know if {{SELECT DISTINCT(value)}} 
is actually standard SQL but what I know is that is translated into {{SELECT 
DISTINCT value}}. What you actually want is {{SELECT DISTINCT value, 
FIRST_VALUE(col1), FIRST_VALUE(col2)}}. {{FIRST_VALUE}} would be an aggregation 
function that is also asked for in the exception. However, it is not supported 
yet (see FLINK-6465). I think you have to write you own aggregate function for 
now (or copy the open pull request code).

> Some thing about Flink SQL distinct, I need help
> 
>
> Key: FLINK-8619
> URL: https://issues.apache.org/jira/browse/FLINK-8619
> Project: Flink
>  Issue Type: Wish
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
> Fix For: 1.4.0
>
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |5          |3        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |5          |3        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
>  import com.fasterxml.jackson.databind.JsonNode;
>  import com.fasterxml.jackson.databind.ObjectMapper;
>  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
>  import com.fasterxml.jackson.databind.node.ObjectNode;
>  import com.god.hala.flink.convertors.RowIntoJson;
>  import com.god.hala.flink.sources.DataSources;
>  import org.apache.flink.api.common.functions.MapFunction;
>  import org.apache.flink.api.common.restartstrategy.RestartStrategies;
>  import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>  import org.apache.flink.api.common.time.Time;
>  import org.apache.flink.api.common.typeinfo.TypeInformation;
>  import org.apache.flink.api.java.tuple.Tuple2;
>  import org.apache.flink.api.java.typeutils.RowTypeInfo;
>  import org.apache.flink.streaming.api.CheckpointingMode;
>  import org.apache.flink.streaming.api.TimeCharacteristic;
>  import org.apache.flink.streaming.api.datastream.DataStream;
>  import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>  import org.apache.flink.streaming.api.functions.ProcessFunction;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358174#comment-16358174
 ] 

ASF GitHub Bot commented on FLINK-8308:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/5395
  
Didn't merge yet, because there is an issue with the buildbot environment.


> Update yajl-ruby dependency to 1.3.1 or higher
> --
>
> Key: FLINK-8308
> URL: https://issues.apache.org/jira/browse/FLINK-8308
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Steven Langbroek
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> We got notified that yajl-ruby < 1.3.1, a dependency which is used to build 
> the Flink website, has a  security vulnerability of high severity.
> We should update yajl-ruby to 1.3.1 or higher.
> Since the website is built offline and served as static HTML, I don't think 
> this is a super critical issue (please correct me if I'm wrong), but we 
> should resolve this soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358175#comment-16358175
 ] 

Fabian Hueske commented on FLINK-8619:
--

Hi [~lynchlee], 

we use JIRA as a bug tracker and not as a question / answer forum like stack 
overflow.
Please post such questions to the user mailing list or stack overflow.

> Some thing about Flink SQL distinct, I need help
> 
>
> Key: FLINK-8619
> URL: https://issues.apache.org/jira/browse/FLINK-8619
> Project: Flink
>  Issue Type: Wish
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
> Fix For: 1.4.0
>
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |5          |3        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |5          |3        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
>  import com.fasterxml.jackson.databind.JsonNode;
>  import com.fasterxml.jackson.databind.ObjectMapper;
>  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
>  import com.fasterxml.jackson.databind.node.ObjectNode;
>  import com.god.hala.flink.convertors.RowIntoJson;
>  import com.god.hala.flink.sources.DataSources;
>  import org.apache.flink.api.common.functions.MapFunction;
>  import org.apache.flink.api.common.restartstrategy.RestartStrategies;
>  import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>  import org.apache.flink.api.common.time.Time;
>  import org.apache.flink.api.common.typeinfo.TypeInformation;
>  import org.apache.flink.api.java.tuple.Tuple2;
>  import org.apache.flink.api.java.typeutils.RowTypeInfo;
>  import org.apache.flink.streaming.api.CheckpointingMode;
>  import org.apache.flink.streaming.api.TimeCharacteristic;
>  import org.apache.flink.streaming.api.datastream.DataStream;
>  import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>  import org.apache.flink.streaming.api.functions.ProcessFunction;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
>  import 
> org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
>  import org.apache.flink.table.api.StreamQueryConfig;
>  import org.apache.flink.table.api.Table;
>  import org.apache.flink.table.api.Types;
>  import org.apache.flin

[jira] [Commented] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358184#comment-16358184
 ] 

Timo Walther commented on FLINK-8619:
-

Sorry I meant {{SELECT value, FIRST_VALUE(col1), FIRST_VALUE(col2) GROUP BY 
value}}.

> Some thing about Flink SQL distinct, I need help
> 
>
> Key: FLINK-8619
> URL: https://issues.apache.org/jira/browse/FLINK-8619
> Project: Flink
>  Issue Type: Wish
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
> Fix For: 1.4.0
>
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |5          |3        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |5          |3        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
>  import com.fasterxml.jackson.databind.JsonNode;
>  import com.fasterxml.jackson.databind.ObjectMapper;
>  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
>  import com.fasterxml.jackson.databind.node.ObjectNode;
>  import com.god.hala.flink.convertors.RowIntoJson;
>  import com.god.hala.flink.sources.DataSources;
>  import org.apache.flink.api.common.functions.MapFunction;
>  import org.apache.flink.api.common.restartstrategy.RestartStrategies;
>  import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>  import org.apache.flink.api.common.time.Time;
>  import org.apache.flink.api.common.typeinfo.TypeInformation;
>  import org.apache.flink.api.java.tuple.Tuple2;
>  import org.apache.flink.api.java.typeutils.RowTypeInfo;
>  import org.apache.flink.streaming.api.CheckpointingMode;
>  import org.apache.flink.streaming.api.TimeCharacteristic;
>  import org.apache.flink.streaming.api.datastream.DataStream;
>  import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>  import org.apache.flink.streaming.api.functions.ProcessFunction;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
>  import 
> org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
>  import org.apache.flink.table.api.StreamQueryConfig;
>  import org.apache.flink.table.api.Table;
>  import org.apache.flink.table.api.Types;
>  import org.apache.flink.table.api.java.StreamTableEnvironment;
>  import org.apache.flink.types.Row;
>  import org.

[jira] [Created] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-09 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-8620:
---

 Summary: Enable shipping custom artifacts to BlobStore and 
accessing them through DistributedCache
 Key: FLINK-8620
 URL: https://issues.apache.org/jira/browse/FLINK-8620
 Project: Flink
  Issue Type: New Feature
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz


We should be able to distribute custom files to taskmanagers. To do that we can 
store those files in BlobStore and later on access them in TaskManagers through 
DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8621) PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis

2018-02-09 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8621:


 Summary: 
PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on 
Travis
 Key: FLINK-8621
 URL: https://issues.apache.org/jira/browse/FLINK-8621
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.5.0


{{PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed}} fails on 
Travis: https://travis-ci.org/apache/flink/jobs/339344244



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8623) ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on Travis

2018-02-09 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8623:


 Summary: 
ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on Travis
 Key: FLINK-8623
 URL: https://issues.apache.org/jira/browse/FLINK-8623
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.5.0


{{ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics}} fails on 
Travis: https://travis-ci.org/apache/flink/jobs/33932



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.

2018-02-09 Thread Bhumika Bayani (JIRA)
Bhumika Bayani created FLINK-8622:
-

 Summary: flink-mesos: High memory usage of scheduler + job 
manager. GC never kicks in.
 Key: FLINK-8622
 URL: https://issues.apache.org/jira/browse/FLINK-8622
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.3.2, 1.4.0
Reporter: Bhumika Bayani


We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos.

We have observed that the memory usage for 'jobmanager' is high. In spite of 
allocating more and more memory resources to it, it hits the limit within 
minutes.

We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 GB 
RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM and 
lesser heap (i.e. same, 3GB) too. In that case also, memory graph was identical.

As per the graph below, the scheduler almost always runs with maximum memory 
resources.

!flink-mem-usage-graph-for-jira.png!

 

Throughout the run of the scheduler, we do not see memory usage going down 
unless it is killed due to OOM. So inferring, garbage collection is never 
happening.

We have tried using both flink versions 1.4 and 1.3 but could see same issue on 
both versions.

 

Is there any way we can find out where and how memory is being used? 

Are there any flink config options for jobmanager or jvm parameters which can 
help us restrict the memory usage, force garbage collection, and prevent it 
from crash? 

Please let us know if there any resource recommendations from Flink for running 
Flink on mesos at scale.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8621) PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis

2018-02-09 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8621:
-
Component/s: Tests

> PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on 
> Travis
> 
>
> Key: FLINK-8621
> URL: https://issues.apache.org/jira/browse/FLINK-8621
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> {{PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed}} fails 
> on Travis: https://travis-ci.org/apache/flink/jobs/339344244



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.

2018-02-09 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8622:
-
Component/s: Mesos

> flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
> -
>
> Key: FLINK-8622
> URL: https://issues.apache.org/jira/browse/FLINK-8622
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Bhumika Bayani
>Priority: Major
>
> We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos.
> We have observed that the memory usage for 'jobmanager' is high. In spite of 
> allocating more and more memory resources to it, it hits the limit within 
> minutes.
> We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 
> GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM 
> and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was 
> identical.
> As per the graph below, the scheduler almost always runs with maximum memory 
> resources.
> !flink-mem-usage-graph-for-jira.png!
>  
> Throughout the run of the scheduler, we do not see memory usage going down 
> unless it is killed due to OOM. So inferring, garbage collection is never 
> happening.
> We have tried using both flink versions 1.4 and 1.3 but could see same issue 
> on both versions.
>  
> Is there any way we can find out where and how memory is being used? 
> Are there any flink config options for jobmanager or jvm parameters which can 
> help us restrict the memory usage, force garbage collection, and prevent it 
> from crash? 
> Please let us know if there any resource recommendations from Flink for 
> running Flink on mesos at scale.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6428) Add support DISTINCT in dataStream SQL

2018-02-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358201#comment-16358201
 ] 

Fabian Hueske commented on FLINK-6428:
--

Hi [~lynchlee]

The query {{SELECT DISTINCT a, b, c FROM t GROUP BY a}} is not working because 
it is ill-defined.
What should be the result of the query for the following input?

{code}
a | b | c
--
1 | 1 | 1
1 | 2 | 2
{code} 

Clearly, we can only return a single row, because we group on {{a}} and there 
is only one distinct value for {{a}}. But which values should be returned for 
{{b}} and {{c}} in that row?
We cannot return all values, so we have to pick one. That's an arbitrary choice 
and hence a random result. 
Apache Calcite (which Flink uses as a SQL parser and optimizer) does not 
support it and IMO that's correct.


> Add support DISTINCT in dataStream SQL
> --
>
> Key: FLINK-6428
> URL: https://issues.apache.org/jira/browse/FLINK-6428
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> Add support DISTINCT in dataStream SQL as follow:
> DATA:
> {code}
> (name, age)
> (kevin, 28),
> (sunny, 6),
> (jack, 6)
> {code}
> SQL:
> {code}
> SELECT DISTINCT age FROM MyTable"
> {code}
> RESULTS:
> {code}
> 28, 6
> {code}
> To DataStream:
> {code}
> inputDS
>   .keyBy() // KeyBy on all fields
>   .flatMap() //  Eliminate duplicate data
> {code}
> [~fhueske] do we need this feature?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8500:

Fix Version/s: (was: 1.4.2)

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5439: [FLINK-8571] [DataStream] [Backport] Introduce utility fu...

2018-02-09 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5439
  
CC @aljoscha @tzulitai 


---


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358212#comment-16358212
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/5439

[FLINK-8571] [DataStream] [Backport] Introduce utility function that 
reinterprets a data stream as keyed stream

This PR is a backport of #5424 to Flink 1.4.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
key-partitioned-source-1.4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5439.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5439


commit bd1c83f90234a03c4cab5ce98d705fa45daa34f5
Author: Stefan Richter 
Date:   2018-02-09T10:30:37Z

[FLINK-8571] [DataStream] Introduce utility function that reinterprets a 
data stream as keyed stream (backport from 1.5 branch)




> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5439: [FLINK-8571] [DataStream] [Backport] Introduce uti...

2018-02-09 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/5439

[FLINK-8571] [DataStream] [Backport] Introduce utility function that 
reinterprets a data stream as keyed stream

This PR is a backport of #5424 to Flink 1.4.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
key-partitioned-source-1.4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5439.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5439


commit bd1c83f90234a03c4cab5ce98d705fa45daa34f5
Author: Stefan Richter 
Date:   2018-02-09T10:30:37Z

[FLINK-8571] [DataStream] Introduce utility function that reinterprets a 
data stream as keyed stream (backport from 1.5 branch)




---


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358214#comment-16358214
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5439
  
CC @aljoscha @tzulitai 


> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-02-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358222#comment-16358222
 ] 

Fabian Hueske commented on FLINK-8601:
--

Thanks for the detailed design document [~sihuazhou]! 
Now I have a pretty good understanding of the proposal. 

I think this would be a nice feature, but I'm not sure if it is generic enough 
to go into the base classes that are used by all functions or whether it would 
make more sense to expose it only to specific functions like the 
{{ProcessFunction}}.

[~aljoscha], what do you think about the proposal?

> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on the RocksDB state, the state data will need 
> to be serialized each time it is queried and updated, and the performance 
> will be very poor.
>  * 2. Data skewed: Data in different key group can be skewed, and the 
> information of data skewed can not be accurately predicted before the program 
> is running. Therefore, it is impossible to determine how much resources bloom 
> filter should allocate. One way to do this is to allocate space needed for 
> the most skewed case, but this can lead to very serious waste of resources.
> h3. Requirement
> Therefore, I introduce the PartitionedBloomFilter for flink, which at least 
> need to meet the following features:
>  * 1. Support for changing Parallelism
>  * 2. Only serialize when necessary: when performing checkpoint
>  * 3. Can deal with data skew problem: users only need to specify a 
> PartitionedBloomFilter with the desired input, fpp, system will allocate 
> resource dynamic.
>  * 4. Do not conflict with other state: user can use KeyedState and 
> OperateState when using this bloom filter.
>  * 5. Support relax ttl (ie: the data survival time at least greater than the 
> specified time)
> Design doc:  [design 
> doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5439: [FLINK-8571] [DataStream] [Backport] Introduce uti...

2018-02-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5439#discussion_r167196102
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * ITCase for {@link DataStreamUtils#reinterpretAsKeyedStream(DataStream, 
KeySelector)}.
+ */
+public class ReinterpretAsKeyedStreamITCase {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* FLINK-8571
--- End diff --

Can maybe remove this. I don't see a need to explicitly tag the test.


---


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358238#comment-16358238
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5439#discussion_r167196102
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * ITCase for {@link DataStreamUtils#reinterpretAsKeyedStream(DataStream, 
KeySelector)}.
+ */
+public class ReinterpretAsKeyedStreamITCase {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* FLINK-8571
--- End diff --

Can maybe remove this. I don't see a need to explicitly tag the test.


> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5439: [FLINK-8571] [DataStream] [Backport] Introduce uti...

2018-02-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5439#discussion_r167197709
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * ITCase for {@link DataStreamUtils#reinterpretAsKeyedStream(DataStream, 
KeySelector)}.
+ */
+public class ReinterpretAsKeyedStreamITCase {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* FLINK-8571
--- End diff --

@StefanRRichter Yes, this can then probably also be removed from the 
original PR before merging.


---


[GitHub] flink issue #5439: [FLINK-8571] [DataStream] [Backport] Introduce utility fu...

2018-02-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5439
  
The changes look good! But as @tzulitai mentioned the missing Scala tests 
seem strange.


---


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358249#comment-16358249
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5439#discussion_r167197709
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * ITCase for {@link DataStreamUtils#reinterpretAsKeyedStream(DataStream, 
KeySelector)}.
+ */
+public class ReinterpretAsKeyedStreamITCase {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* FLINK-8571
--- End diff --

@StefanRRichter Yes, this can then probably also be removed from the 
original PR before merging.


> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358254#comment-16358254
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5439
  
The changes look good! But as @tzulitai mentioned the missing Scala tests 
seem strange.


> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5424: FLINK-8571] [DataStream] Introduce utility functio...

2018-02-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5424#discussion_r167198202
  
--- Diff: docs/dev/api_concepts.md ---
@@ -896,11 +896,16 @@ result type ```R``` for the final result. E.g. for a 
histogram, ```V``` is a num
 
 {% top %}
 
-## EXPERIMENTAL: Reinterpreting a pre-partitioned data stream as keyed 
stream
+Experimental features
--- End diff --

Ah, what I meant is in a completely separate section of the doc, because 
this part of the doc is about general API concepts and not specific to the 
streaming API.


---


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358255#comment-16358255
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5424#discussion_r167198202
  
--- Diff: docs/dev/api_concepts.md ---
@@ -896,11 +896,16 @@ result type ```R``` for the final result. E.g. for a 
histogram, ```V``` is a num
 
 {% top %}
 
-## EXPERIMENTAL: Reinterpreting a pre-partitioned data stream as keyed 
stream
+Experimental features
--- End diff --

Ah, what I meant is in a completely separate section of the doc, because 
this part of the doc is about general API concepts and not specific to the 
streaming API.


> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5424: FLINK-8571] [DataStream] Introduce utility function that ...

2018-02-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5424
  
Still looks good, except that I'd like to see the new doc in a completely 
separate section of the doc.


---


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358257#comment-16358257
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5424
  
Still looks good, except that I'd like to see the new doc in a completely 
separate section of the doc.


> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8624) flink-mesos: The flink rest-api sometimes becomes unresponsive

2018-02-09 Thread Bhumika Bayani (JIRA)
Bhumika Bayani created FLINK-8624:
-

 Summary: flink-mesos: The flink rest-api sometimes becomes 
unresponsive
 Key: FLINK-8624
 URL: https://issues.apache.org/jira/browse/FLINK-8624
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.3.2
Reporter: Bhumika Bayani


Sometimes flink-mesos-scheduler fails/get killed, and marathon brings it up 
again on some other node. Sometimes we have observed, the rest-api of the newly 
created flink instance becomes unresponsive.

Even if we execute api calls manually with curl, such as 

http://:/overview or http://:/config

we do not receive any response. 

We submit and execute all our flink-jobs using rest-api only. So if rest api 
becomes un-responsive, that stops us from running any of the flink jobs and no 
stream processing happens. 

We tried enabling flink debug logs, but we did not observer anything specific 
that indicates why rest api is failing/unresponsive.

We see below exceptions in logs but that is not specific to case when flink-api 
is hung. We see them in healthy flink-scheduler too: 

 
{code:java}
Timestamp=2018-02-08 05:43:49,175 LogLevel=INFO
        ThreadId=[Checkpoint Timer] Class=o.a.f.r.c.CheckpointCoordinator 
Msg=Triggering checkpoint 10181 @ 1518068629174
Timestamp=2018-02-08 05:43:49,183 LogLevel=DEBUG
        ThreadId=[nioEventLoopGroup-5-3] Class=o.a.f.r.w.WebRuntimeMonitor 
Msg=Unhandled exception: {}
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager#753807801]] after [1 ms]
        at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) 
~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
        at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) 
~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
        at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
        at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
        at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
        at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
 ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
        at 
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
 ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
        at 
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) 
~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
        at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) 
~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
{code}
 

During the time rest api is unresponsive, we have observed flink web UI too 
does not load/show any information. 

Restarting the flink-scheduler solves this issue sometimes. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5411: [FLINK-8556] [Kinesis Connector] Add proxy feature to the...

2018-02-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5411
  
Thanks for the contribution @pduveau.
I'll try to take a look at this soon.


---


[jira] [Commented] (FLINK-8556) Add proxy feature to Kinesis Connector to acces its endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358261#comment-16358261
 ] 

ASF GitHub Bot commented on FLINK-8556:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5411
  
Thanks for the contribution @pduveau.
I'll try to take a look at this soon.


> Add proxy feature to Kinesis Connector to acces its endpoint
> 
>
> Key: FLINK-8556
> URL: https://issues.apache.org/jira/browse/FLINK-8556
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>  Labels: features
>
> The connector can not be configured to use a proxy to access Kinesis 
> endpoint. This feature is required on EC2 instances which can access internet 
> only through a proxy. VPC Kinesis endpoints are currently available in few 
> AWS' regions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8624) flink-mesos: The flink rest-api sometimes becomes unresponsive

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8624:

Fix Version/s: 1.5.0

> flink-mesos: The flink rest-api sometimes becomes unresponsive
> --
>
> Key: FLINK-8624
> URL: https://issues.apache.org/jira/browse/FLINK-8624
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.3.2
>Reporter: Bhumika Bayani
>Priority: Major
> Fix For: 1.5.0
>
>
> Sometimes flink-mesos-scheduler fails/get killed, and marathon brings it up 
> again on some other node. Sometimes we have observed, the rest-api of the 
> newly created flink instance becomes unresponsive.
> Even if we execute api calls manually with curl, such as 
> http://:/overview or http://:/config
> we do not receive any response. 
> We submit and execute all our flink-jobs using rest-api only. So if rest api 
> becomes un-responsive, that stops us from running any of the flink jobs and 
> no stream processing happens. 
> We tried enabling flink debug logs, but we did not observer anything specific 
> that indicates why rest api is failing/unresponsive.
> We see below exceptions in logs but that is not specific to case when 
> flink-api is hung. We see them in healthy flink-scheduler too: 
>  
> {code:java}
> Timestamp=2018-02-08 05:43:49,175 LogLevel=INFO
>         ThreadId=[Checkpoint Timer] Class=o.a.f.r.c.CheckpointCoordinator 
> Msg=Triggering checkpoint 10181 @ 1518068629174
> Timestamp=2018-02-08 05:43:49,183 LogLevel=DEBUG
>         ThreadId=[nioEventLoopGroup-5-3] Class=o.a.f.r.w.WebRuntimeMonitor 
> Msg=Unhandled exception: {}
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager#753807801]] after [1 ms]
>         at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>  ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>  ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>  ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
> {code}
>  
> During the time rest api is unresponsive, we have observed flink web UI too 
> does not load/show any information. 
> Restarting the flink-scheduler solves this issue sometimes. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8624) flink-mesos: The flink rest-api sometimes becomes unresponsive

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8624:

Component/s: REST
 Distributed Coordination

> flink-mesos: The flink rest-api sometimes becomes unresponsive
> --
>
> Key: FLINK-8624
> URL: https://issues.apache.org/jira/browse/FLINK-8624
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.3.2
>Reporter: Bhumika Bayani
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Sometimes flink-mesos-scheduler fails/get killed, and marathon brings it up 
> again on some other node. Sometimes we have observed, the rest-api of the 
> newly created flink instance becomes unresponsive.
> Even if we execute api calls manually with curl, such as 
> http://:/overview or http://:/config
> we do not receive any response. 
> We submit and execute all our flink-jobs using rest-api only. So if rest api 
> becomes un-responsive, that stops us from running any of the flink jobs and 
> no stream processing happens. 
> We tried enabling flink debug logs, but we did not observer anything specific 
> that indicates why rest api is failing/unresponsive.
> We see below exceptions in logs but that is not specific to case when 
> flink-api is hung. We see them in healthy flink-scheduler too: 
>  
> {code:java}
> Timestamp=2018-02-08 05:43:49,175 LogLevel=INFO
>         ThreadId=[Checkpoint Timer] Class=o.a.f.r.c.CheckpointCoordinator 
> Msg=Triggering checkpoint 10181 @ 1518068629174
> Timestamp=2018-02-08 05:43:49,183 LogLevel=DEBUG
>         ThreadId=[nioEventLoopGroup-5-3] Class=o.a.f.r.w.WebRuntimeMonitor 
> Msg=Unhandled exception: {}
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager#753807801]] after [1 ms]
>         at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>  ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>  ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>  ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
> {code}
>  
> During the time rest api is unresponsive, we have observed flink web UI too 
> does not load/show any information. 
> Restarting the flink-scheduler solves this issue sometimes. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8622:

Fix Version/s: 1.5.0

> flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
> -
>
> Key: FLINK-8622
> URL: https://issues.apache.org/jira/browse/FLINK-8622
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Bhumika Bayani
>Priority: Major
> Fix For: 1.5.0
>
>
> We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos.
> We have observed that the memory usage for 'jobmanager' is high. In spite of 
> allocating more and more memory resources to it, it hits the limit within 
> minutes.
> We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 
> GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM 
> and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was 
> identical.
> As per the graph below, the scheduler almost always runs with maximum memory 
> resources.
> !flink-mem-usage-graph-for-jira.png!
>  
> Throughout the run of the scheduler, we do not see memory usage going down 
> unless it is killed due to OOM. So inferring, garbage collection is never 
> happening.
> We have tried using both flink versions 1.4 and 1.3 but could see same issue 
> on both versions.
>  
> Is there any way we can find out where and how memory is being used? 
> Are there any flink config options for jobmanager or jvm parameters which can 
> help us restrict the memory usage, force garbage collection, and prevent it 
> from crash? 
> Please let us know if there any resource recommendations from Flink for 
> running Flink on mesos at scale.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8624) flink-mesos: The flink rest-api sometimes becomes unresponsive

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8624:

Priority: Blocker  (was: Major)

> flink-mesos: The flink rest-api sometimes becomes unresponsive
> --
>
> Key: FLINK-8624
> URL: https://issues.apache.org/jira/browse/FLINK-8624
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.3.2
>Reporter: Bhumika Bayani
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Sometimes flink-mesos-scheduler fails/get killed, and marathon brings it up 
> again on some other node. Sometimes we have observed, the rest-api of the 
> newly created flink instance becomes unresponsive.
> Even if we execute api calls manually with curl, such as 
> http://:/overview or http://:/config
> we do not receive any response. 
> We submit and execute all our flink-jobs using rest-api only. So if rest api 
> becomes un-responsive, that stops us from running any of the flink jobs and 
> no stream processing happens. 
> We tried enabling flink debug logs, but we did not observer anything specific 
> that indicates why rest api is failing/unresponsive.
> We see below exceptions in logs but that is not specific to case when 
> flink-api is hung. We see them in healthy flink-scheduler too: 
>  
> {code:java}
> Timestamp=2018-02-08 05:43:49,175 LogLevel=INFO
>         ThreadId=[Checkpoint Timer] Class=o.a.f.r.c.CheckpointCoordinator 
> Msg=Triggering checkpoint 10181 @ 1518068629174
> Timestamp=2018-02-08 05:43:49,183 LogLevel=DEBUG
>         ThreadId=[nioEventLoopGroup-5-3] Class=o.a.f.r.w.WebRuntimeMonitor 
> Msg=Unhandled exception: {}
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager#753807801]] after [1 ms]
>         at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>  ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>  ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>  ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
> {code}
>  
> During the time rest api is unresponsive, we have observed flink web UI too 
> does not load/show any information. 
> Restarting the flink-scheduler solves this issue sometimes. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8622:

Component/s: ResourceManager
 Distributed Coordination

> flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
> -
>
> Key: FLINK-8622
> URL: https://issues.apache.org/jira/browse/FLINK-8622
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Mesos, ResourceManager
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Bhumika Bayani
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos.
> We have observed that the memory usage for 'jobmanager' is high. In spite of 
> allocating more and more memory resources to it, it hits the limit within 
> minutes.
> We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 
> GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM 
> and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was 
> identical.
> As per the graph below, the scheduler almost always runs with maximum memory 
> resources.
> !flink-mem-usage-graph-for-jira.png!
>  
> Throughout the run of the scheduler, we do not see memory usage going down 
> unless it is killed due to OOM. So inferring, garbage collection is never 
> happening.
> We have tried using both flink versions 1.4 and 1.3 but could see same issue 
> on both versions.
>  
> Is there any way we can find out where and how memory is being used? 
> Are there any flink config options for jobmanager or jvm parameters which can 
> help us restrict the memory usage, force garbage collection, and prevent it 
> from crash? 
> Please let us know if there any resource recommendations from Flink for 
> running Flink on mesos at scale.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-8619:
-

Reopen to remove fix version

> Some thing about Flink SQL distinct, I need help
> 
>
> Key: FLINK-8619
> URL: https://issues.apache.org/jira/browse/FLINK-8619
> Project: Flink
>  Issue Type: Wish
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
> Fix For: 1.4.0
>
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |5          |3        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |5          |3        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
>  import com.fasterxml.jackson.databind.JsonNode;
>  import com.fasterxml.jackson.databind.ObjectMapper;
>  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
>  import com.fasterxml.jackson.databind.node.ObjectNode;
>  import com.god.hala.flink.convertors.RowIntoJson;
>  import com.god.hala.flink.sources.DataSources;
>  import org.apache.flink.api.common.functions.MapFunction;
>  import org.apache.flink.api.common.restartstrategy.RestartStrategies;
>  import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>  import org.apache.flink.api.common.time.Time;
>  import org.apache.flink.api.common.typeinfo.TypeInformation;
>  import org.apache.flink.api.java.tuple.Tuple2;
>  import org.apache.flink.api.java.typeutils.RowTypeInfo;
>  import org.apache.flink.streaming.api.CheckpointingMode;
>  import org.apache.flink.streaming.api.TimeCharacteristic;
>  import org.apache.flink.streaming.api.datastream.DataStream;
>  import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>  import org.apache.flink.streaming.api.functions.ProcessFunction;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
>  import 
> org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
>  import org.apache.flink.table.api.StreamQueryConfig;
>  import org.apache.flink.table.api.Table;
>  import org.apache.flink.table.api.Types;
>  import org.apache.flink.table.api.java.StreamTableEnvironment;
>  import org.apache.flink.types.Row;
>  import org.apache.flink.util.Collector;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
> import jav

[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8622:

Priority: Blocker  (was: Major)

> flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
> -
>
> Key: FLINK-8622
> URL: https://issues.apache.org/jira/browse/FLINK-8622
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Mesos, ResourceManager
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Bhumika Bayani
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos.
> We have observed that the memory usage for 'jobmanager' is high. In spite of 
> allocating more and more memory resources to it, it hits the limit within 
> minutes.
> We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 
> GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM 
> and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was 
> identical.
> As per the graph below, the scheduler almost always runs with maximum memory 
> resources.
> !flink-mem-usage-graph-for-jira.png!
>  
> Throughout the run of the scheduler, we do not see memory usage going down 
> unless it is killed due to OOM. So inferring, garbage collection is never 
> happening.
> We have tried using both flink versions 1.4 and 1.3 but could see same issue 
> on both versions.
>  
> Is there any way we can find out where and how memory is being used? 
> Are there any flink config options for jobmanager or jvm parameters which can 
> help us restrict the memory usage, force garbage collection, and prevent it 
> from crash? 
> Please let us know if there any resource recommendations from Flink for 
> running Flink on mesos at scale.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8619.
---
Resolution: Not A Bug

> Some thing about Flink SQL distinct, I need help
> 
>
> Key: FLINK-8619
> URL: https://issues.apache.org/jira/browse/FLINK-8619
> Project: Flink
>  Issue Type: Wish
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |5          |3        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |5          |3        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
>  import com.fasterxml.jackson.databind.JsonNode;
>  import com.fasterxml.jackson.databind.ObjectMapper;
>  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
>  import com.fasterxml.jackson.databind.node.ObjectNode;
>  import com.god.hala.flink.convertors.RowIntoJson;
>  import com.god.hala.flink.sources.DataSources;
>  import org.apache.flink.api.common.functions.MapFunction;
>  import org.apache.flink.api.common.restartstrategy.RestartStrategies;
>  import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>  import org.apache.flink.api.common.time.Time;
>  import org.apache.flink.api.common.typeinfo.TypeInformation;
>  import org.apache.flink.api.java.tuple.Tuple2;
>  import org.apache.flink.api.java.typeutils.RowTypeInfo;
>  import org.apache.flink.streaming.api.CheckpointingMode;
>  import org.apache.flink.streaming.api.TimeCharacteristic;
>  import org.apache.flink.streaming.api.datastream.DataStream;
>  import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>  import org.apache.flink.streaming.api.functions.ProcessFunction;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
>  import 
> org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
>  import org.apache.flink.table.api.StreamQueryConfig;
>  import org.apache.flink.table.api.Table;
>  import org.apache.flink.table.api.Types;
>  import org.apache.flink.table.api.java.StreamTableEnvironment;
>  import org.apache.flink.types.Row;
>  import org.apache.flink.util.Collector;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
> import java.nio.charset.Charset;
>  import java.u

[jira] [Updated] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8619:

Fix Version/s: (was: 1.4.0)

> Some thing about Flink SQL distinct, I need help
> 
>
> Key: FLINK-8619
> URL: https://issues.apache.org/jira/browse/FLINK-8619
> Project: Flink
>  Issue Type: Wish
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |5          |3        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |5          |3        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
>  import com.fasterxml.jackson.databind.JsonNode;
>  import com.fasterxml.jackson.databind.ObjectMapper;
>  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
>  import com.fasterxml.jackson.databind.node.ObjectNode;
>  import com.god.hala.flink.convertors.RowIntoJson;
>  import com.god.hala.flink.sources.DataSources;
>  import org.apache.flink.api.common.functions.MapFunction;
>  import org.apache.flink.api.common.restartstrategy.RestartStrategies;
>  import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>  import org.apache.flink.api.common.time.Time;
>  import org.apache.flink.api.common.typeinfo.TypeInformation;
>  import org.apache.flink.api.java.tuple.Tuple2;
>  import org.apache.flink.api.java.typeutils.RowTypeInfo;
>  import org.apache.flink.streaming.api.CheckpointingMode;
>  import org.apache.flink.streaming.api.TimeCharacteristic;
>  import org.apache.flink.streaming.api.datastream.DataStream;
>  import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>  import org.apache.flink.streaming.api.functions.ProcessFunction;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
>  import 
> org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
>  import org.apache.flink.table.api.StreamQueryConfig;
>  import org.apache.flink.table.api.Table;
>  import org.apache.flink.table.api.Types;
>  import org.apache.flink.table.api.java.StreamTableEnvironment;
>  import org.apache.flink.types.Row;
>  import org.apache.flink.util.Collector;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
> import java.nio.charset.Charset;
>  i

[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-09 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358286#comment-16358286
 ] 

Xingcan Cui commented on FLINK-8538:


Hi [~twalthr], I think we need a converter to transform a standard JSON schema 
to a {{TypeInformation}}, right?

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-09 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358295#comment-16358295
 ] 

Timo Walther commented on FLINK-8538:
-

[~xccui] yes, I think this is necessary. We also have to think about how to 
handle JSON specific types. E.g. the JSON standard declares a "Number" type but 
we have to map it to some Java primitive. It may also declares union types. We 
have the following options:

Option 1: We infer the type using information from the {{TableSchema}} (but 
this would be Table API specific, formats are intended for all APIs).

Option 2: We make this configurable: number as double, number as BigDecimal etc.

Option 3: We introduce a new TypeInformation.

If we really want to support JSON once and for all, we have to think about how 
to handle those cases. I just read a discussion on the Beam ML about this:
https://lists.apache.org/thread.html/ee6843859f1ddb1d4544c32d255fe88a3bf3aec97d3afc3e3d47c701@%3Cdev.beam.apache.org%3E

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...

2018-02-09 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/5405
  
ping @aljoscha 


---


[jira] [Commented] (FLINK-8477) Add api to support for user to skip the first incomplete window data

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358297#comment-16358297
 ] 

ASF GitHub Bot commented on FLINK-8477:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/5405
  
ping @aljoscha 


> Add api to support for user to skip the first incomplete window data
> 
>
> Key: FLINK-8477
> URL: https://issues.apache.org/jira/browse/FLINK-8477
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
> Fix For: 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6160) Retry JobManager/ResourceManager connection in case of timeout

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358310#comment-16358310
 ] 

ASF GitHub Bot commented on FLINK-6160:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5440

[FLINK-6160] [flip-6] Retry JobManager/ResourceManager connection in …

## What is the purpose of the change

When timeout comes, retry JobManager/ResourceManager connection in case of 
timeout

## Brief change log
When timeout, invoke ```requestHeartbeat``` in HeartbeatMonitor thread. Not 
directly invoke ```notifyHeartbeatTimeout``` and close the connection.

## Verifying this change

This change is already covered by existing tests, but did minor changes. in 
the TaskExecutorTest.java, change ```testHeartbeatTimeoutWithResourceManager``` 
behavior to while timeout, does not invoke ```disconnectTaskManager```.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6160

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5440.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5440


commit 5319abdf503c757baf7afde9913ab2fb6fb61b60
Author: zhangminglei 
Date:   2018-02-09T11:52:50Z

[FLINK-6160] [flip-6] Retry JobManager/ResourceManager connection in case 
of timeout




>  Retry JobManager/ResourceManager connection in case of timeout
> ---
>
> Key: FLINK-6160
> URL: https://issues.apache.org/jira/browse/FLINK-6160
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Priority: Major
>  Labels: flip-6
>
> In case of a heartbeat timeout, the {{TaskExecutor}} closes the connection to 
> the remote component. Furthermore, it assumes that the component has actually 
> failed and, thus, it will only start trying to connect to the component if it 
> is notified about a new leader address and leader session id. This is 
> brittle, because the heartbeat could also time out without the component 
> having crashed. Thus, we should add an automatic retry to the latest known 
> leader address information in case of a timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5440: [FLINK-6160] [flip-6] Retry JobManager/ResourceMan...

2018-02-09 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5440

[FLINK-6160] [flip-6] Retry JobManager/ResourceManager connection in …

## What is the purpose of the change

When timeout comes, retry JobManager/ResourceManager connection in case of 
timeout

## Brief change log
When timeout, invoke ```requestHeartbeat``` in HeartbeatMonitor thread. Not 
directly invoke ```notifyHeartbeatTimeout``` and close the connection.

## Verifying this change

This change is already covered by existing tests, but did minor changes. in 
the TaskExecutorTest.java, change ```testHeartbeatTimeoutWithResourceManager``` 
behavior to while timeout, does not invoke ```disconnectTaskManager```.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6160

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5440.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5440


commit 5319abdf503c757baf7afde9913ab2fb6fb61b60
Author: zhangminglei 
Date:   2018-02-09T11:52:50Z

[FLINK-6160] [flip-6] Retry JobManager/ResourceManager connection in case 
of timeout




---


[jira] [Commented] (FLINK-8619) Some thing about Flink SQL distinct, I need help

2018-02-09 Thread Lynch Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358312#comment-16358312
 ] 

Lynch Lee commented on FLINK-8619:
--

Thanks to all.  I got it.

> Some thing about Flink SQL distinct, I need help
> 
>
> Key: FLINK-8619
> URL: https://issues.apache.org/jira/browse/FLINK-8619
> Project: Flink
>  Issue Type: Wish
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
>
> I do some test about distinct on mysql below:
>  
>  
> mysql> CREATE TABLE `rpt_tt` (
>     ->   `target_id` varchar(50) NOT NULL DEFAULT '',
>     ->   `target_type` varchar(50) NOT NULL DEFAULT '',
>     ->   `amt_pay` bigint(20) DEFAULT NULL,
>     ->   `down_payment` bigint(20) DEFAULT NULL,
>     ->   PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
>     ->   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>  
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |5          |3        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 3 rows in set (0.00 sec)
>  
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from 
> rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--+--++---+
> |target_type|target_id|amt_pay|down_payment|
> +--+--++---+
> |5          |1        |      1|            1|
> |5          |3        |      1|            1|
> |6          |2        |      1|            1|
> |7          |3        |      1|            1|
> +--+--++---+
> 4 rows in set (0.01 sec)
>  
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
>  import com.fasterxml.jackson.databind.JsonNode;
>  import com.fasterxml.jackson.databind.ObjectMapper;
>  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
>  import com.fasterxml.jackson.databind.node.ObjectNode;
>  import com.god.hala.flink.convertors.RowIntoJson;
>  import com.god.hala.flink.sources.DataSources;
>  import org.apache.flink.api.common.functions.MapFunction;
>  import org.apache.flink.api.common.restartstrategy.RestartStrategies;
>  import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>  import org.apache.flink.api.common.time.Time;
>  import org.apache.flink.api.common.typeinfo.TypeInformation;
>  import org.apache.flink.api.java.tuple.Tuple2;
>  import org.apache.flink.api.java.typeutils.RowTypeInfo;
>  import org.apache.flink.streaming.api.CheckpointingMode;
>  import org.apache.flink.streaming.api.TimeCharacteristic;
>  import org.apache.flink.streaming.api.datastream.DataStream;
>  import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>  import org.apache.flink.streaming.api.functions.ProcessFunction;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
>  import 
> org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
>  import org.apache.flink.table.api.StreamQueryConfig;
>  import org.apache.flink.table.api.Table;
>  import org.apache.flink.table.api.Types;
>  import org.apache.flink.table.api.java.StreamTableEnvironment;
>  import org.apache.flink.types.Row;
>  import org.apache.flink.util.Collector;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
> imp

[jira] [Assigned] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-09 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-8423:
---

Assignee: mingleizhang

> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-09 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358322#comment-16358322
 ] 

mingleizhang commented on FLINK-8423:
-

I will fix this issue.

> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.

2018-02-09 Thread Bhumika Bayani (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhumika Bayani updated FLINK-8622:
--
Attachment: flink-mem-usage-graph-for-jira.png

> flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
> -
>
> Key: FLINK-8622
> URL: https://issues.apache.org/jira/browse/FLINK-8622
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Mesos, ResourceManager
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Bhumika Bayani
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: flink-mem-usage-graph-for-jira.png
>
>
> We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos.
> We have observed that the memory usage for 'jobmanager' is high. In spite of 
> allocating more and more memory resources to it, it hits the limit within 
> minutes.
> We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 
> GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM 
> and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was 
> identical.
> As per the graph below, the scheduler almost always runs with maximum memory 
> resources.
> !flink-mem-usage-graph-for-jira.png!
>  
> Throughout the run of the scheduler, we do not see memory usage going down 
> unless it is killed due to OOM. So inferring, garbage collection is never 
> happening.
> We have tried using both flink versions 1.4 and 1.3 but could see same issue 
> on both versions.
>  
> Is there any way we can find out where and how memory is being used? 
> Are there any flink config options for jobmanager or jvm parameters which can 
> help us restrict the memory usage, force garbage collection, and prevent it 
> from crash? 
> Please let us know if there any resource recommendations from Flink for 
> running Flink on mesos at scale.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8625) Move OutputFlasherThread to Netty scheduled executor

2018-02-09 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-8625:
-

 Summary: Move OutputFlasherThread to Netty scheduled executor
 Key: FLINK-8625
 URL: https://issues.apache.org/jira/browse/FLINK-8625
 Project: Flink
  Issue Type: Sub-task
  Components: Network
Reporter: Piotr Nowojski


This will allow us to trigger/schedule next flush only if we are not currently 
busy. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5383: [hotfix][kafka-tests] Do not hide original excepti...

2018-02-09 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5383#discussion_r167224057
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 ---
@@ -592,24 +595,21 @@ private Properties createProperties() {
return properties;
}
 
-   private void assertIsCausedBy(Class clazz, Throwable ex) {
+   private static  Optional isCausedBy(Class clazz, Throwable ex) 
{
--- End diff --

Changed to `findThrowable` :)


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358372#comment-16358372
 ] 

Fabian Hueske commented on FLINK-8538:
--

Regarding the options:

* Option 1: This was my initial idea. But Timo is right, we would tie the 
format to the Table API which is not a good choice, IMO.
* Option 2: I don't think this will work. What if a schema contains two NUMBER 
types?
* Option 3: How would this help? We would have a custom number type that nobody 
can properly handle.

I think the best approach to support JSON Schema would be to always return 
BigDecimal types.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5425: [FLINK-8456] Add Scala API for Connected Streams with Bro...

2018-02-09 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5425
  
@aljoscha Updated the PR. Please have a look!


---


[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358384#comment-16358384
 ] 

ASF GitHub Bot commented on FLINK-8456:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5425
  
@aljoscha Updated the PR. Please have a look!


> Add Scala API for Connected Streams with Broadcast State.
> -
>
> Key: FLINK-8456
> URL: https://issues.apache.org/jira/browse/FLINK-8456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6428) Add support DISTINCT in dataStream SQL

2018-02-09 Thread Lynch Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358399#comment-16358399
 ] 

Lynch Lee commented on FLINK-6428:
--

OK。 [~fhueske], Thanks your explaination . I got it clearly.

> Add support DISTINCT in dataStream SQL
> --
>
> Key: FLINK-6428
> URL: https://issues.apache.org/jira/browse/FLINK-6428
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> Add support DISTINCT in dataStream SQL as follow:
> DATA:
> {code}
> (name, age)
> (kevin, 28),
> (sunny, 6),
> (jack, 6)
> {code}
> SQL:
> {code}
> SELECT DISTINCT age FROM MyTable"
> {code}
> RESULTS:
> {code}
> 28, 6
> {code}
> To DataStream:
> {code}
> inputDS
>   .keyBy() // KeyBy on all fields
>   .flatMap() //  Eliminate duplicate data
> {code}
> [~fhueske] do we need this feature?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5439: [FLINK-8571] [DataStream] [Backport] Introduce utility fu...

2018-02-09 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5439
  
Added the test.


---


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358451#comment-16358451
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5439
  
Added the test.


> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5438: [FLINK-8617][TableAPI & SQL] Fix code generation bug whil...

2018-02-09 Thread Xpray
Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/5438
  
I've removed the ITCase and add some tests to 
```org.apache.flink.table.expressions.MapTypeTest ```


---


[jira] [Commented] (FLINK-8617) Fix code generation bug while accessing Map type

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358470#comment-16358470
 ] 

ASF GitHub Bot commented on FLINK-8617:
---

Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/5438
  
I've removed the ITCase and add some tests to 
```org.apache.flink.table.expressions.MapTypeTest ```


> Fix code generation bug while accessing Map type
> 
>
> Key: FLINK-8617
> URL: https://issues.apache.org/jira/browse/FLINK-8617
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}.
> And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} 
> and {code}ScalarOperators.generateIsNotNull{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-09 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5441

[FLINK-8607] [table] Add a basic embedded SQL CLI client

## What is the purpose of the change

This PR implements the first part of the implementation plan described in 
FLIP-24.

```
Goal: Add the basic features to play around with Flink's streaming SQL.

- Add CLI component that reads the configuration files
- "Pre-registered table sources"
- "Job parameters"
- Add executor for retrieving pre-flight information and corresponding CLI 
SQL parser
- SHOW TABLES
- DESCRIBE TABLE
- EXPLAIN
- Add streaming append query submission to executor
- Submit jars and run SELECT query using the ClusterClient
- Collect results on heap and serve them on the CLI side (Internal Mode 
with SELECT)
- SOURCE (for executing a SQL statement stored in a local file)
```

Additionally, this PR also supports retraction queries and the SET 
operation for setting properties.

The client can be started using `./bin/sql-client.sh embedded`.

A table source must be defined in `./conf/sql-client-defaults.yaml` (for 
example a CSV table source, an example can be found in the test resources 
directory).

The client supports two modes for viewing results. A `changelog` mode or 
`table` mode. They can be selected by setting the `execution.result-mode` 
property.

The code is still work in progress. There are a couple of things that can 
be improved:

- Add more logging instead of swallowing exceptions
- Use Flink's ConfigOptions where applicable (e.g. for better default value 
handling and validation)
- Maybe make the record result retrieval blocking?
- Tests for the LocalExecutor
- Some basic tests for other classes (maybe for CLI classes as well?)
- More deployment options (support for YARN, FLIP-6?)
- Documentation


## Brief change log

- New module `flink-sql-client`
- New executable script in `flink/bin`
- Minor visibility changes in other modules

## Verifying this change

Manually verified. Further tests will follow.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8607

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5441.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5441


commit 17b6b6d8b8804382b7b855033c370ffb4fd673ac
Author: twalthr 
Date:   2017-12-07T12:46:31Z

[FLINK-8607] [table] Add a basic embedded SQL CLI client




---


[jira] [Commented] (FLINK-8607) Add a basic embedded SQL CLI client

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358472#comment-16358472
 ] 

ASF GitHub Bot commented on FLINK-8607:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5441

[FLINK-8607] [table] Add a basic embedded SQL CLI client

## What is the purpose of the change

This PR implements the first part of the implementation plan described in 
FLIP-24.

```
Goal: Add the basic features to play around with Flink's streaming SQL.

- Add CLI component that reads the configuration files
- "Pre-registered table sources"
- "Job parameters"
- Add executor for retrieving pre-flight information and corresponding CLI 
SQL parser
- SHOW TABLES
- DESCRIBE TABLE
- EXPLAIN
- Add streaming append query submission to executor
- Submit jars and run SELECT query using the ClusterClient
- Collect results on heap and serve them on the CLI side (Internal Mode 
with SELECT)
- SOURCE (for executing a SQL statement stored in a local file)
```

Additionally, this PR also supports retraction queries and the SET 
operation for setting properties.

The client can be started using `./bin/sql-client.sh embedded`.

A table source must be defined in `./conf/sql-client-defaults.yaml` (for 
example a CSV table source, an example can be found in the test resources 
directory).

The client supports two modes for viewing results. A `changelog` mode or 
`table` mode. They can be selected by setting the `execution.result-mode` 
property.

The code is still work in progress. There are a couple of things that can 
be improved:

- Add more logging instead of swallowing exceptions
- Use Flink's ConfigOptions where applicable (e.g. for better default value 
handling and validation)
- Maybe make the record result retrieval blocking?
- Tests for the LocalExecutor
- Some basic tests for other classes (maybe for CLI classes as well?)
- More deployment options (support for YARN, FLIP-6?)
- Documentation


## Brief change log

- New module `flink-sql-client`
- New executable script in `flink/bin`
- Minor visibility changes in other modules

## Verifying this change

Manually verified. Further tests will follow.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8607

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5441.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5441


commit 17b6b6d8b8804382b7b855033c370ffb4fd673ac
Author: twalthr 
Date:   2017-12-07T12:46:31Z

[FLINK-8607] [table] Add a basic embedded SQL CLI client




> Add a basic embedded SQL CLI client
> ---
>
> Key: FLINK-8607
> URL: https://issues.apache.org/jira/browse/FLINK-8607
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> This issue describes the Implementation Plan 1 of FLIP-24.
> Goal: Add the basic features to play around with Flink's streaming SQL.
> {code}
> - Add CLI component that reads the configuration files
> - "Pre-registered table sources"
> - "Job parameters"
> - Add executor for retrieving pre-flight information and corresponding CLI 
> SQL parser
> - SHOW TABLES
> - DESCRIBE TABLE
> - EXPLAIN
> - Add streaming append query submission to executor
> - Submit jars and run SELECT query using the ClusterClient
> - Collect results on heap and serve them on the CLI side (Internal Mode with 
> SELECT)
> - SOURCE (for executing a SQL statement stored in a local file)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8625) Move OutputFlusher thread to Netty scheduled executor

2018-02-09 Thread Ken Krugler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ken Krugler updated FLINK-8625:
---
Summary: Move OutputFlusher thread to Netty scheduled executor  (was: Move 
OutputFlasherThread to Netty scheduled executor)

> Move OutputFlusher thread to Netty scheduled executor
> -
>
> Key: FLINK-8625
> URL: https://issues.apache.org/jira/browse/FLINK-8625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Piotr Nowojski
>Priority: Major
>
> This will allow us to trigger/schedule next flush only if we are not 
> currently busy. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-09 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358494#comment-16358494
 ] 

Xingcan Cui commented on FLINK-8538:


Thanks for the comments [~twalthr] and [~fhueske]. Actually, I've encountered 
the problem of parsing the NUMBER type before and I just added an extra field 
to indicate the numeric types as the JSON schema in my application was only 
used internally. I think the Option 2 Timo raised referred to a global config, 
i.e., we always return a specific type,  which could be configured to be double 
or BigDecimal, for the JSON NUMBER type. I'll create a ticket for the JSON 
schema converter. Thanks!

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-09 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5442

[FLINK-7713][flip6] Implement JarUploadHandler

## What is the purpose of the change

*Allow uploading jars through HTTP to enable job submissions from the web 
ui.*

cc: @tillrohrmann  

## Brief change log

  - *Allow uploading jars through HTTP.*
  - *Implement and register JarUploadHandler.*

## Verifying this change

This change added tests and can be verified as follows:
  - *Added integration tests for uploading files.*
  - *Started cluster locally and uploaded jars using* 
```
curl -v -X POST -H "Expect:" -F 
"jarfile=@examples/streaming/Kafka010Example.jar" 
http://127.0.0.1:9065/jars/upload
```

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7713

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5442


commit caf74f63db6047a79393e53e9434eb1cf078ee48
Author: gyao 
Date:   2018-02-09T14:46:22Z

[FLINK-7713][flip6] Implement JarUploadHandler




---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358498#comment-16358498
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5442

[FLINK-7713][flip6] Implement JarUploadHandler

## What is the purpose of the change

*Allow uploading jars through HTTP to enable job submissions from the web 
ui.*

cc: @tillrohrmann  

## Brief change log

  - *Allow uploading jars through HTTP.*
  - *Implement and register JarUploadHandler.*

## Verifying this change

This change added tests and can be verified as follows:
  - *Added integration tests for uploading files.*
  - *Started cluster locally and uploaded jars using* 
```
curl -v -X POST -H "Expect:" -F 
"jarfile=@examples/streaming/Kafka010Example.jar" 
http://127.0.0.1:9065/jars/upload
```

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7713

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5442


commit caf74f63db6047a79393e53e9434eb1cf078ee48
Author: gyao 
Date:   2018-02-09T14:46:22Z

[FLINK-7713][flip6] Implement JarUploadHandler




> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249285
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
--- End diff --

Tests missing.


---


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249371
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadResponseBody.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ResponseBody} for {@link JarUploadHandler}.
+ */
+public class JarUploadResponseBody implements ResponseBody {
--- End diff --

Marshalling test missing.


---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358500#comment-16358500
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249371
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadResponseBody.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ResponseBody} for {@link JarUploadHandler}.
+ */
+public class JarUploadResponseBody implements ResponseBody {
--- End diff --

Marshalling test missing.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358499#comment-16358499
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249285
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
--- End diff --

Tests missing.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

Method name is not accurate. There will be more handlers.


---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358503#comment-16358503
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

Method name is not accurate. There will be more handlers.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249950
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+   Class.forName(classname).asSubclass(WebMonitor.class);
+   } catch (ClassNotFoundException e) {
+   // class not found means that there is no 
flink-runtime-web in the classpath
+   return Collections.emptyList();
+   }
+
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadHandler";
+   final Class clazz = Class.forName(classname);
+   final Constructor constructor = clazz.getConstructor(
+   CompletableFuture.class,
+   GatewayRetriever.class,
+   Time.class,
+   Map.class,
+   MessageHeaders.class,
+   java.nio.file.Path.class,
+   Executor.class);
+
+   final MessageHeaders jarUploadMessageHeaders =
+   (MessageHeaders) Class
+   
.forName("org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadMessageHeaders")
+   .newInstance();
+
+   return Arrays.asList(Tuple2.of(jarUploadMessageHeaders, 
(ChannelInboundHandler) constructor.newInstance(
+   restAddressFuture,
+   leaderRetriever,
+   timeout,
+   Collections.emptyMap(),
--- End diff --

should use headers defined by `restConfiguration.getResponseHeaders()`


---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358504#comment-16358504
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249950
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+   Class.forName(classname).asSubclass(WebMonitor.class);
+   } catch (ClassNotFoundException e) {
+   // class not found means that there is no 
flink-runtime-web in the classpath
+   return Collections.emptyList();
+   }
+
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadHandler";
+   final Class clazz = Class.forName(classname);
+   final Constructor constructor = clazz.getConstructor(
+   CompletableFuture.class,
+   GatewayRetriever.class,
+   Time.class,
+   Map.class,
+   MessageHeaders.class,
+   java.nio.file.Path.class,
+   Executor.class);
+
+   final MessageHeaders jarUploadMessageHeaders =
+   (MessageHeaders) Class
+   
.forName("org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadMessageHeaders")
+   .newInstance();
+
+   return Arrays.asList(Tuple2.of(jarUploadMessageHeaders, 
(ChannelInboundHandler) constructor.newInstance(
+   restAddressFuture,
+   leaderRetriever,
+   timeout,
+   Collections.emptyMap(),
--- End diff --

should use headers defined by `restConfiguration.getResponseHeaders()`


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler

2018-02-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167250379
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
--- End diff --

The legacy handlers could be moved to  a `legacy` package.


---


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358506#comment-16358506
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167250379
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
--- End diff --

The legacy handlers could be moved to  a `legacy` package.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8626) Introduce BackPressureStatsTracker interface

2018-02-09 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8626:


 Summary: Introduce BackPressureStatsTracker interface
 Key: FLINK-8626
 URL: https://issues.apache.org/jira/browse/FLINK-8626
 Project: Flink
  Issue Type: Improvement
  Components: REST, Tests
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to better test components like the {{JobMaster}} we should introduce a 
{{BackPressureStatsTracker}} interface and rename the current 
{{BackPressureStatsTracker}} class into {{BackPressureStatsTrackerImpl}}. This 
will simplify testing where we have to set up all these things.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5443: [FLINK-8626] Introduce BackPressureStatsTracker in...

2018-02-09 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5443

[FLINK-8626] Introduce BackPressureStatsTracker interface

## What is the purpose of the change

Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and 
introduce
a BackPressureStatsTracker interface. This will make testing easier when we 
don't
have to set up all the different components.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
introduceBackPressureStatsTrackerInterface

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5443.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5443


commit 8e172aee75e9e0c81608ed7e6796bca1ea7d7544
Author: Till Rohrmann 
Date:   2018-02-09T13:07:31Z

[FLINK-8626] Introduce BackPressureStatsTracker interface

Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and 
introduce
a BackPressureStatsTracker interface. This will make testing easier when we 
don't
have to set up all the different components.




---


[jira] [Commented] (FLINK-8626) Introduce BackPressureStatsTracker interface

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358509#comment-16358509
 ] 

ASF GitHub Bot commented on FLINK-8626:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5443

[FLINK-8626] Introduce BackPressureStatsTracker interface

## What is the purpose of the change

Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and 
introduce
a BackPressureStatsTracker interface. This will make testing easier when we 
don't
have to set up all the different components.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
introduceBackPressureStatsTrackerInterface

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5443.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5443


commit 8e172aee75e9e0c81608ed7e6796bca1ea7d7544
Author: Till Rohrmann 
Date:   2018-02-09T13:07:31Z

[FLINK-8626] Introduce BackPressureStatsTracker interface

Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and 
introduce
a BackPressureStatsTracker interface. This will make testing easier when we 
don't
have to set up all the different components.




> Introduce BackPressureStatsTracker interface
> 
>
> Key: FLINK-8626
> URL: https://issues.apache.org/jira/browse/FLINK-8626
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.5.0
>
>
> In order to better test components like the {{JobMaster}} we should introduce 
> a {{BackPressureStatsTracker}} interface and rename the current 
> {{BackPressureStatsTracker}} class into {{BackPressureStatsTrackerImpl}}. 
> This will simplify testing where we have to set up all these things.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5421: [FLINK-8573] [client] Add more information for printing J...

2018-02-09 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5421
  
The CI error does not relevant to this PR. 
```BackPressureStatsTrackerITCase``` test failed.


---


[jira] [Commented] (FLINK-8573) Print JobID for failed jobs

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358511#comment-16358511
 ] 

ASF GitHub Bot commented on FLINK-8573:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5421
  
The CI error does not relevant to this PR. 
```BackPressureStatsTrackerITCase``` test failed.


> Print JobID for failed jobs
> ---
>
> Key: FLINK-8573
> URL: https://issues.apache.org/jira/browse/FLINK-8573
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> When a job is successfully run the client prints a something along the lines 
> of "Job with  successfully run". If the job fails however we only 
> print the exception but not the JobID.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5444: [FLINK-8546] [flip6] Respect savepoints and restor...

2018-02-09 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5444

[FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints

## What is the purpose of the change

Let the JobMaster respect checkpoints and savepoints. The JobMaster will 
always
try to restore the latest checkpoint if there is one available. Next it 
will check
whether savepoint restore settings have been set. If so, then it will try 
to restore
the savepoint. Only if these settings are not set, the job will be started 
from
scratch.

This PR is based on #5443.

## Brief change log

- Check in JobMaster if the `CheckpointCoordinator` has been set
- If so, then check if there is a checkpoint to recover
- If not, then check whether we can restore from a savepoint
- If not, then start the job without any recovered state

## Verifying this change

- Added `JobMasterTest#testRestoringFromSavepoint` and 
`JobMasterTest#testCheckpointPrecedesSavepointRecovery`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink respectSavepoints

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5444.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5444


commit 8e172aee75e9e0c81608ed7e6796bca1ea7d7544
Author: Till Rohrmann 
Date:   2018-02-09T13:07:31Z

[FLINK-8626] Introduce BackPressureStatsTracker interface

Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and 
introduce
a BackPressureStatsTracker interface. This will make testing easier when we 
don't
have to set up all the different components.

commit 8380146fb2f85d4e8d9d41b84ba0ad435c242984
Author: Till Rohrmann 
Date:   2018-02-09T13:18:11Z

[hotfix] [tests] Simplify JobMasterTest

commit 09d36a47ed78d7fae0cae0229823114bbb6d45be
Author: Till Rohrmann 
Date:   2018-02-01T15:14:53Z

[FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints

Let the JobMaster respect checkpoints and savepoints. The JobMaster will 
always
try to restore the latest checkpoint if there is one available. Next it 
will check
whether savepoint restore settings have been set. If so, then it will try 
to restore
the savepoint. Only if these settings are not set, the job will be started 
from
scratch.




---


[jira] [Commented] (FLINK-8546) Respect savepoint settings and recover from latest checkpoint in Flip-6

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358518#comment-16358518
 ] 

ASF GitHub Bot commented on FLINK-8546:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5444

[FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints

## What is the purpose of the change

Let the JobMaster respect checkpoints and savepoints. The JobMaster will 
always
try to restore the latest checkpoint if there is one available. Next it 
will check
whether savepoint restore settings have been set. If so, then it will try 
to restore
the savepoint. Only if these settings are not set, the job will be started 
from
scratch.

This PR is based on #5443.

## Brief change log

- Check in JobMaster if the `CheckpointCoordinator` has been set
- If so, then check if there is a checkpoint to recover
- If not, then check whether we can restore from a savepoint
- If not, then start the job without any recovered state

## Verifying this change

- Added `JobMasterTest#testRestoringFromSavepoint` and 
`JobMasterTest#testCheckpointPrecedesSavepointRecovery`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink respectSavepoints

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5444.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5444


commit 8e172aee75e9e0c81608ed7e6796bca1ea7d7544
Author: Till Rohrmann 
Date:   2018-02-09T13:07:31Z

[FLINK-8626] Introduce BackPressureStatsTracker interface

Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and 
introduce
a BackPressureStatsTracker interface. This will make testing easier when we 
don't
have to set up all the different components.

commit 8380146fb2f85d4e8d9d41b84ba0ad435c242984
Author: Till Rohrmann 
Date:   2018-02-09T13:18:11Z

[hotfix] [tests] Simplify JobMasterTest

commit 09d36a47ed78d7fae0cae0229823114bbb6d45be
Author: Till Rohrmann 
Date:   2018-02-01T15:14:53Z

[FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints

Let the JobMaster respect checkpoints and savepoints. The JobMaster will 
always
try to restore the latest checkpoint if there is one available. Next it 
will check
whether savepoint restore settings have been set. If so, then it will try 
to restore
the savepoint. Only if these settings are not set, the job will be started 
from
scratch.




> Respect savepoint settings and recover from latest checkpoint in Flip-6
> ---
>
> Key: FLINK-8546
> URL: https://issues.apache.org/jira/browse/FLINK-8546
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{JobMaster}} should respect savepoints and recover from the latest 
> checkpoint if possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5439: [FLINK-8571] [DataStream] [Backport] Introduce utility fu...

2018-02-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5439
  
LGTM once Travis is green.
We should also remove the redundant mention of FLINK-8571 in 
`ReinterpretAsKeyedStreamITCase` while merging.


---


[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358559#comment-16358559
 ] 

ASF GitHub Bot commented on FLINK-8571:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5439
  
LGTM once Travis is green.
We should also remove the redundant mention of FLINK-8571 in 
`ReinterpretAsKeyedStreamITCase` while merging.


> Provide an enhanced KeyedStream implementation to use ForwardPartitioner
> 
>
> Key: FLINK-8571
> URL: https://issues.apache.org/jira/browse/FLINK-8571
> Project: Flink
>  Issue Type: Improvement
>Reporter: Nagarjun Guraja
>Assignee: Stefan Richter
>Priority: Major
>
> This enhancement would help in modeling problems with pre partitioned input 
> sources(for e.g. Kafka with Keyed topics). This would help in making the job 
> graph embarrassingly parallel while leveraging rocksdb state backend and also 
> the fine grained recovery semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5425: [FLINK-8456] Add Scala API for Connected Streams with Bro...

2018-02-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5425
  
I think this looks good now.


---


[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358582#comment-16358582
 ] 

ASF GitHub Bot commented on FLINK-8456:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5425
  
I think this looks good now.


> Add Scala API for Connected Streams with Broadcast State.
> -
>
> Key: FLINK-8456
> URL: https://issues.apache.org/jira/browse/FLINK-8456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5443: [FLINK-8626] Introduce BackPressureStatsTracker in...

2018-02-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5443#discussion_r167268048
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+
+import java.util.Optional;
+
+/**
+ * {@link BackPressureStatsTracker} implementation which returns always no 
back pressure statistics.
--- End diff --

switch "returns" and "always"?


---


  1   2   >