[jira] [Commented] (FLINK-9698) "case class must be static and globally accessible" is too constrained

2018-07-01 Thread Jeff Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529406#comment-16529406
 ] 

Jeff Zhang commented on FLINK-9698:
---

Could anyone let me know why flink require case class must be static and 
globally accessible ? Because this similar code can work in spark, so I beleive 
it should be the same for flink. As both of them require to serialize these 
class to remote host and execute in remote side. 

> "case class must be static and globally accessible" is too constrained
> --
>
> Key: FLINK-9698
> URL: https://issues.apache.org/jira/browse/FLINK-9698
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Priority: Major
>
> The following code can reproduce this issue. 
> {code}
> object BatchJob {
>   def main(args: Array[String]) {
> // set up the batch execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tenv = TableEnvironment.getTableEnvironment(env)
> case class Person(id:Int, name:String)
> val ds = env.fromElements(Person(1,"jeff"), Person(2, "andy"))
> tenv.registerDataSet("table_1", ds);
>   }
> }
> {code}
> Although I have workaround to declare case class outside of the main method, 
> this workaround won't work in scala-shell. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9698) "case class must be static and globally accessible" is too constrained

2018-07-01 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-9698:
-

 Summary: "case class must be static and globally accessible" is 
too constrained
 Key: FLINK-9698
 URL: https://issues.apache.org/jira/browse/FLINK-9698
 Project: Flink
  Issue Type: Improvement
Reporter: Jeff Zhang


The following code can reproduce this issue. 
{code}
object BatchJob {
  def main(args: Array[String]) {
// set up the batch execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val tenv = TableEnvironment.getTableEnvironment(env)
case class Person(id:Int, name:String)
val ds = env.fromElements(Person(1,"jeff"), Person(2, "andy"))
tenv.registerDataSet("table_1", ds);
  }
}
{code}

Although I have workaround to declare case class outside of the main method, 
this workaround won't work in scala-shell. 




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529329#comment-16529329
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user cricket007 commented on the issue:

https://github.com/apache/flink/pull/5995
  
What about implementing a `KeyedDeserializationSchema` for Avro?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-07-01 Thread cricket007
Github user cricket007 commented on the issue:

https://github.com/apache/flink/pull/5995
  
What about implementing a `KeyedDeserializationSchema` for Avro?


---


[jira] [Closed] (FLINK-9670) Introduce slot manager factory

2018-07-01 Thread Renjie Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renjie Liu closed FLINK-9670.
-
Resolution: Invalid

> Introduce slot manager factory
> --
>
> Key: FLINK-9670
> URL: https://issues.apache.org/jira/browse/FLINK-9670
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
> Fix For: 1.5.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529321#comment-16529321
 ] 

ASF GitHub Bot commented on FLINK-9567:
---

Github user Clark closed the pull request at:

https://github.com/apache/flink/pull/6192


> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...

2018-07-01 Thread Clarkkkkk
Github user Clark closed the pull request at:

https://github.com/apache/flink/pull/6192


---


[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529282#comment-16529282
 ] 

ASF GitHub Bot commented on FLINK-9688:
---

Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199366673
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -,6 +,53 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
   math.atan(-0.123123132132132).toString)
   }
 
+  @Test
+  def testAtan2(): Unit = {
+testAllApis(
+  'f25.atan2('f26),
+  "f25.atan2(f26)",
+  "ATAN2(f25, f26)",
+  math.atan2(0.42.toByte, 0.toByte).toString)
+
+
--- End diff --

Thank you for that catch. 
blank line removed


> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-01 Thread snuyanzin
Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199366673
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -,6 +,53 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
   math.atan(-0.123123132132132).toString)
   }
 
+  @Test
+  def testAtan2(): Unit = {
+testAllApis(
+  'f25.atan2('f26),
+  "f25.atan2(f26)",
+  "ATAN2(f25, f26)",
+  math.atan2(0.42.toByte, 0.toByte).toString)
+
+
--- End diff --

Thank you for that catch. 
blank line removed


---


[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529278#comment-16529278
 ] 

ASF GitHub Bot commented on FLINK-9688:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199365966
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -,6 +,53 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
   math.atan(-0.123123132132132).toString)
   }
 
+  @Test
+  def testAtan2(): Unit = {
+testAllApis(
+  'f25.atan2('f26),
+  "f25.atan2(f26)",
+  "ATAN2(f25, f26)",
+  math.atan2(0.42.toByte, 0.toByte).toString)
+
+
--- End diff --

Remove this blank line would be better~ 


> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support

2018-07-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9688:
--
Labels: pull-request-available  (was: )

> ATAN2 Sql Function support
> --
>
> Key: FLINK-9688
> URL: https://issues.apache.org/jira/browse/FLINK-9688
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> simple query fails {code}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
> config());
> DataSet> ds = 
> CollectionDataSets.get3TupleDataSet(env);
> tableEnv.registerDataSet("t1", ds, "x, y, z");
> String sqlQuery = "SELECT atan2(1,2)";
> Table result = tableEnv.sqlQuery(sqlQuery);
> {code}
> while at the same time Calcite supports it and in Calcite's sqlline it works 
> like {noformat}
> 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2);
> +-+
> | EXPR$0  |
> +-+
> | 0.4636476090008061 |
> +-+
> 1 row selected (0.173 seconds)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-01 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199365966
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -,6 +,53 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
   math.atan(-0.123123132132132).toString)
   }
 
+  @Test
+  def testAtan2(): Unit = {
+testAllApis(
+  'f25.atan2('f26),
+  "f25.atan2(f26)",
+  "ATAN2(f25, f26)",
+  math.atan2(0.42.toByte, 0.toByte).toString)
+
+
--- End diff --

Remove this blank line would be better~ 


---


[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...

2018-07-01 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6234#discussion_r199365531
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -224,23 +226,26 @@ private boolean isFinalState(ComputationState state) {
 
/**
 * Prunes states assuming there will be no events with timestamp 
lower than the given one.
-* It cleares the sharedBuffer and also emits all timed out partial 
matches.
+* It clears the sharedBuffer and also emits all timed out partial 
matches.
 *
 * @param sharedBuffer the SharedBuffer object that we need to work 
upon while processing
 * @param nfaState The NFAState object that we need to affect while 
processing
 * @param timestamptimestamp that indicates that there will be no 
more events with lower timestamp
 * @return all timed outed partial matches
 * @throws Exception Thrown if the system cannot access the state.
 */
-   public Collection>, Long>> advanceTime(
+   public Tuple2>, Long>>, 
Collection>>> advanceTime(
final SharedBuffer sharedBuffer,
final NFAState nfaState,
-   final long timestamp) throws Exception {
+   final long timestamp,
+   final AfterMatchSkipStrategy afterMatchSkipStrategy) 
throws Exception {
--- End diff --

please add parameter explication for new parameter `afterMatchSkipStrategy 
` of the method. 


---


[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529276#comment-16529276
 ] 

ASF GitHub Bot commented on FLINK-9431:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6234#discussion_r199365531
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -224,23 +226,26 @@ private boolean isFinalState(ComputationState state) {
 
/**
 * Prunes states assuming there will be no events with timestamp 
lower than the given one.
-* It cleares the sharedBuffer and also emits all timed out partial 
matches.
+* It clears the sharedBuffer and also emits all timed out partial 
matches.
 *
 * @param sharedBuffer the SharedBuffer object that we need to work 
upon while processing
 * @param nfaState The NFAState object that we need to affect while 
processing
 * @param timestamptimestamp that indicates that there will be no 
more events with lower timestamp
 * @return all timed outed partial matches
 * @throws Exception Thrown if the system cannot access the state.
 */
-   public Collection>, Long>> advanceTime(
+   public Tuple2>, Long>>, 
Collection>>> advanceTime(
final SharedBuffer sharedBuffer,
final NFAState nfaState,
-   final long timestamp) throws Exception {
+   final long timestamp,
+   final AfterMatchSkipStrategy afterMatchSkipStrategy) 
throws Exception {
--- End diff --

please add parameter explication for new parameter `afterMatchSkipStrategy 
` of the method. 


> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529266#comment-16529266
 ] 

ASF GitHub Bot commented on FLINK-9696:
---

Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6233
  
Hello @hequn8128! Thank you for your review and comments.
About PR template - I did changes based on proposed #5811. Please let me 
know if it is acceptable or not.
About `rowToString` agree. I think it makes sense and I added such tests. 
However I faced with some strange behavior (I do not know if it is bug or 
whatever else). Commented on the code about that. 


> Deep toString for arrays/map in SQL client 
> ---
>
> Key: FLINK-9696
> URL: https://issues.apache.org/jira/browse/FLINK-9696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently SQL client does not show arrays/map in human readable way (please 
> have a look at examples below). e.g. {code}select map[1,2];{code} is shown as 
> {noformat} +/-EXPR$0
>+ java.util.HashMap
> {noformat}
> {code}select array[1,2];{code} is shown as {noformat}
> +/-EXPR$0
>+   java.lang.Integer[]
> {noformat} 
> {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/-   
>  EXPR$0
>+   java.util.Map[]{noformat}
> {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/-
> EXPR$0
>+ java.util.HashMap{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-01 Thread snuyanzin
Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6233
  
Hello @hequn8128! Thank you for your review and comments.
About PR template - I did changes based on proposed #5811. Please let me 
know if it is acceptable or not.
About `rowToString` agree. I think it makes sense and I added such tests. 
However I faced with some strange behavior (I do not know if it is bug or 
whatever else). Commented on the code about that. 


---


[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529265#comment-16529265
 ] 

ASF GitHub Bot commented on FLINK-9696:
---

Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199364581
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
+   assertEquals(Arrays.toString(CliUtils.rowToString(result)),
+   "[(NULL), String, c, false, 12345.67, 12345.67, 12345, 
2018-11-12, " +
+   "[1, 2], (1,123,null)]");
--- End diff --

If having tuple here is ok then the next strange thing is null handling 
inside tuples (it is printed in lowercase and without brackets). So there are 
at least 2 different types of null handling: inside tuples and all others.



> Deep toString for arrays/map in SQL client 
> ---
>
> Key: FLINK-9696
> URL: https://issues.apache.org/jira/browse/FLINK-9696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently SQL client does not show arrays/map in human readable way (please 
> have a look at examples below). e.g. {code}select map[1,2];{code} is shown as 
> {noformat} +/-EXPR$0
>+ java.util.HashMap
> {noformat}
> {code}select array[1,2];{code} is shown as {noformat}
> +/-EXPR$0
>+   java.lang.Integer[]
> {noformat} 
> {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/-   
>  EXPR$0
>+   java.util.Map[]{noformat}
> {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/-
> EXPR$0
>+ java.util.HashMap{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-01 Thread snuyanzin
Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199364538
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
--- End diff --

Is it a real case to have tuple here for SqlClient? API allows to do that 
but not sure about real cases.


---


[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529264#comment-16529264
 ] 

ASF GitHub Bot commented on FLINK-9696:
---

Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199364538
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
--- End diff --

Is it a real case to have tuple here for SqlClient? API allows to do that 
but not sure about real cases.


> Deep toString for arrays/map in SQL client 
> ---
>
> Key: FLINK-9696
> URL: https://issues.apache.org/jira/browse/FLINK-9696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently SQL client does not show arrays/map in human readable way (please 
> have a look at examples below). e.g. {code}select map[1,2];{code} is shown as 
> {noformat} +/-EXPR$0
>+ java.util.HashMap
> {noformat}
> {code}select array[1,2];{code} is shown as {noformat}
> +/-EXPR$0
>+   java.lang.Integer[]
> {noformat} 
> {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/-   
>  EXPR$0
>+   java.util.Map[]{noformat}
> {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/-
> EXPR$0
>+ java.util.HashMap{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-01 Thread snuyanzin
Github user snuyanzin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199364581
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
+   assertEquals(Arrays.toString(CliUtils.rowToString(result)),
+   "[(NULL), String, c, false, 12345.67, 12345.67, 12345, 
2018-11-12, " +
+   "[1, 2], (1,123,null)]");
--- End diff --

If having tuple here is ok then the next strange thing is null handling 
inside tuples (it is printed in lowercase and without brackets). So there are 
at least 2 different types of null handling: inside tuples and all others.



---


[jira] [Closed] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-07-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9456.

Resolution: Fixed

Fixed via
1.6.0:
89cfeaa882f9e68df2bd215563622b48c29a9ec9
50c0ea8c9fe17278d45aba476a95791152a1420b

1.5.1:
a2f43b4cc081d360cd59ce3e7fb875e4b5fd243f
627412c4d2ea655271fe5da67a55ac936a1a060e

> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529253#comment-16529253
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6132


> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-07-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6132


---


[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529134#comment-16529134
 ] 

ASF GitHub Bot commented on FLINK-9633:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6194
  
@yanghua Thanks for the review, I rebased the PR.


> Flink doesn't use the Savepoint path's filesystem to create the OuptutStream 
> on Task.
> -
>
> Key: FLINK-9633
> URL: https://issues.apache.org/jira/browse/FLINK-9633
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Currently, flink use the Savepoint's filesystem to create the meta output 
> stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
> the Checkpoint's filesystem to create the checkpoint data output stream. When 
> the Savepoint & Checkpoint in different filesystem this will lead to 
> problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

2018-07-01 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6194
  
@yanghua Thanks for the review, I rebased the PR.


---


[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529124#comment-16529124
 ] 

ASF GitHub Bot commented on FLINK-9431:
---

GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6234

[FLINK-9431]Introduce time bounded condition to cep

## What is the purpose of the change

In cep the event is now driving the transformation of the NFA, I think the 
time factor should also be taken into account in some senior.

When a key's data is not endless, and if we want to match the following 
pattern after we match the `AB` after `B` has appeared for  ten seconds.

```
Pattern.begin("A").followedBy("B").notFollowedBy("C")
``` 
We can not emit the result because there is no branch can lead to the 
`Final State`, And i think we can add a `TimeEnd` state to describe a pattern 
that accepts a time condition evaluated by processing time / event time to 
compare the timestamp in the element we have meant before.

As described in the issue link,  there are two main reason why i introduce 
this feature

1.  the `notFollowedBy` cant be at the end of the pattern 
2.  the `within` just compare with the element at start, and some key's 
data may not endless, so we have to evaluate condition not also on event but 
also on time

## Brief change log

1.  Add the method to distinguish the event driven condition or time 
drivern condition in `IterativeCondition`
2.  when `advanceTime`, we not only prune the expire element, but also look 
the time bounded condition


## Verifying this change

This change is already covered by existing cep tests, may be it need a 
little more about the new api.

This change added tests and can be verified as follows:


## Documentation

  - Does this pull request introduce a new feature? (yes)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink timeEnd-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6234.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6234


commit b1aa992a97c8eac818e57c3d2f82be76957052d0
Author: minwenjun 
Date:   2018-07-01T14:41:44Z

[FLINK-9431]Introduce time bounded condition to cep




> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-07-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9431:
--
Labels: pull-request-available  (was: )

> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...

2018-07-01 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6234

[FLINK-9431]Introduce time bounded condition to cep

## What is the purpose of the change

In cep the event is now driving the transformation of the NFA, I think the 
time factor should also be taken into account in some senior.

When a key's data is not endless, and if we want to match the following 
pattern after we match the `AB` after `B` has appeared for  ten seconds.

```
Pattern.begin("A").followedBy("B").notFollowedBy("C")
``` 
We can not emit the result because there is no branch can lead to the 
`Final State`, And i think we can add a `TimeEnd` state to describe a pattern 
that accepts a time condition evaluated by processing time / event time to 
compare the timestamp in the element we have meant before.

As described in the issue link,  there are two main reason why i introduce 
this feature

1.  the `notFollowedBy` cant be at the end of the pattern 
2.  the `within` just compare with the element at start, and some key's 
data may not endless, so we have to evaluate condition not also on event but 
also on time

## Brief change log

1.  Add the method to distinguish the event driven condition or time 
drivern condition in `IterativeCondition`
2.  when `advanceTime`, we not only prune the expire element, but also look 
the time bounded condition


## Verifying this change

This change is already covered by existing cep tests, may be it need a 
little more about the new api.

This change added tests and can be verified as follows:


## Documentation

  - Does this pull request introduce a new feature? (yes)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink timeEnd-state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6234.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6234


commit b1aa992a97c8eac818e57c3d2f82be76957052d0
Author: minwenjun 
Date:   2018-07-01T14:41:44Z

[FLINK-9431]Introduce time bounded condition to cep




---


[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

2018-07-01 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6194
  
+1, there is a conflicting file~ cc @sihuazhou


---


[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529123#comment-16529123
 ] 

ASF GitHub Bot commented on FLINK-9633:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6194
  
+1, there is a conflicting file~ cc @sihuazhou


> Flink doesn't use the Savepoint path's filesystem to create the OuptutStream 
> on Task.
> -
>
> Key: FLINK-9633
> URL: https://issues.apache.org/jira/browse/FLINK-9633
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Currently, flink use the Savepoint's filesystem to create the meta output 
> stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
> the Checkpoint's filesystem to create the checkpoint data output stream. When 
> the Savepoint & Checkpoint in different filesystem this will lead to 
> problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-01 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199352237
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for this approach that directly specifies the interval literals. 

Regarding Quarter. It seems like a very old implementation and we should 
probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it 
consistent with all other time unit extractions. What do you guys think?

I just tried it out by modifying the `Extract` method and it seems working 
perfectly.


---


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529122#comment-16529122
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199352237
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for this approach that directly specifies the interval literals. 

Regarding Quarter. It seems like a very old implementation and we should 
probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it 
consistent with all other time unit extractions. What do you guys think?

I just tried it out by modifying the `Extract` method and it seems working 
perfectly.


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9654) Internal error while deserializing custom Scala TypeSerializer instances

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529121#comment-16529121
 ] 

ASF GitHub Bot commented on FLINK-9654:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6206
  
hi @zsolt-donca I have seen the Travis build error log, the failed reason 
is not because of your code. This PR looks good, but if you can add a test for 
`isAnonymousClass` method, that would be better.
cc @tillrohrmann 


> Internal error while deserializing custom Scala TypeSerializer instances
> 
>
> Key: FLINK-9654
> URL: https://issues.apache.org/jira/browse/FLINK-9654
> Project: Flink
>  Issue Type: Bug
>Reporter: Zsolt Donca
>Priority: Major
>  Labels: pull-request-available
>
> When you are using custom `TypeSerializer` instances implemented in Scala, 
> the Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can 
> manifest itself when a Flink job is restored from checkpoint or started with 
> a savepoint.
> The reason is that in such a restore from checkpoint or savepoint, Flink uses 
> `InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type 
> serializers and their configurations. The deserialization walks through the 
> entire object graph corresponding, and for each class it calls 
> `isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place 
> for FLINK-6869). If there is an internal class defined in a Scala object for 
> which `getSimpleName` fails (see the Scala issue), then a 
> `java.lang.InternalError` is thrown which causes the task manager to restart. 
> In this case, Flink tries to restart the job on another task manager, causing 
> all the task managers to restart, wreaking havoc on the entire Flink cluster.
> There are some alternative type information derivation mechanisms that rely 
> on anonymous classes and, most importantly, classes generated by macros, that 
> can easily trigger the above problem. I am personally working on 
> [https://github.com/zsolt-donca/flink-alt], and there is also 
> [https://github.com/joroKr21/flink-shapeless]
> I prepared a pull request that fixes the issue. 
>  
> Edit: added a stack trace to help demonstrate the issue.
> 2018-06-21 13:08:07.829 [today-stats (2/2)] ERROR 
> org.apache.flink.runtime.taskmanager.Task  - Encountered fatal error 
> java.lang.InternalError - terminating the JVM
>  java.lang.InternalError: Malformed class name
>          at java.lang.Class.getSimpleName(Class.java:1330) ~[na:1.8.0_171]
>          at java.lang.Class.isAnonymousClass(Class.java:1411) ~[na:1.8.0_171]
>          at 
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:206)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1855) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) 
> ~[na:1.8.0_171]
>          at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) 
> ~[na:1.8.0_171]
>          at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) 
> ~[na:1.8.0_171]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>  ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>          at 
> 

[GitHub] flink issue #6206: [FLINK-9654] [core] Changed the check for anonymous class...

2018-07-01 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6206
  
hi @zsolt-donca I have seen the Travis build error log, the failed reason 
is not because of your code. This PR looks good, but if you can add a test for 
`isAnonymousClass` method, that would be better.
cc @tillrohrmann 


---


[jira] [Updated] (FLINK-9695) Add option for Mesos executor to forcefully pull Docker images

2018-07-01 Thread Leonid Ishimnikov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonid Ishimnikov updated FLINK-9695:
-
Description: It would be useful to have an option to forcefully pull Docker 
images for tasks, rather than reuse a previously cached image. Such option 
exists in many Mesos frameworks, and it significantly simplifies debugging. I 
propose adding a new 
{{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option.  
(was: It would be useful to have an option to forcefully pull Docker images for 
tasks, rather than reuse a previously cached image.  Such option exists in many 
Mesos frameworks, and it significantly simplifies debugging.  I propose adding 
a new {{mesos.resourcemanager.tasks.container.docker.forcePullImage}} option.)

> Add option for Mesos executor to forcefully pull Docker images
> --
>
> Key: FLINK-9695
> URL: https://issues.apache.org/jira/browse/FLINK-9695
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Leonid Ishimnikov
>Assignee: Leonid Ishimnikov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> It would be useful to have an option to forcefully pull Docker images for 
> tasks, rather than reuse a previously cached image. Such option exists in 
> many Mesos frameworks, and it significantly simplifies debugging. I propose 
> adding a new 
> {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9669) Introduce task manager assignment store

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529103#comment-16529103
 ] 

ASF GitHub Bot commented on FLINK-9669:
---

Github user liurenjie1024 closed the pull request at:

https://github.com/apache/flink/pull/6214


> Introduce task manager assignment store
> ---
>
> Key: FLINK-9669
> URL: https://issues.apache.org/jira/browse/FLINK-9669
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9669) Introduce task manager assignment store

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529102#comment-16529102
 ] 

ASF GitHub Bot commented on FLINK-9669:
---

Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/6214
  
@tillrohrmann This is from my initial design, and since the design has 
changed, we can close this now.


> Introduce task manager assignment store
> ---
>
> Key: FLINK-9669
> URL: https://issues.apache.org/jira/browse/FLINK-9669
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9669) Introduce task manager assignment store

2018-07-01 Thread Renjie Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renjie Liu closed FLINK-9669.
-
Resolution: Invalid

> Introduce task manager assignment store
> ---
>
> Key: FLINK-9669
> URL: https://issues.apache.org/jira/browse/FLINK-9669
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6214: [FLINK-9669] Add assignment store interface.

2018-07-01 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/6214
  
@tillrohrmann This is from my initial design, and since the design has 
changed, we can close this now.


---


[GitHub] flink pull request #6214: [FLINK-9669] Add assignment store interface.

2018-07-01 Thread liurenjie1024
Github user liurenjie1024 closed the pull request at:

https://github.com/apache/flink/pull/6214


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-01 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199349619
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for better consistency. It is good to follow the Table-api style.


---


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529099#comment-16529099
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199349619
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for better consistency. It is good to follow the Table-api style.


> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529096#comment-16529096
 ] 

ASF GitHub Bot commented on FLINK-9696:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6233
  
Hi @snuyanzin , thanks for your PR. The code looks good and the 
`deepToString()` function returns result correctly. I could not spot any issues 
with the implementation. To make the PR better, I think we can add a test in 
`CliUtilsTest` to test the `rowToString` function, since code in the function 
also has been changed. BTW, the PR template can be done better. See for PR 
https://github.com/apache/flink/pull/5811 as an example.
Best, Hequn


> Deep toString for arrays/map in SQL client 
> ---
>
> Key: FLINK-9696
> URL: https://issues.apache.org/jira/browse/FLINK-9696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Currently SQL client does not show arrays/map in human readable way (please 
> have a look at examples below). e.g. {code}select map[1,2];{code} is shown as 
> {noformat} +/-EXPR$0
>+ java.util.HashMap
> {noformat}
> {code}select array[1,2];{code} is shown as {noformat}
> +/-EXPR$0
>+   java.lang.Integer[]
> {noformat} 
> {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/-   
>  EXPR$0
>+   java.util.Map[]{noformat}
> {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/-
> EXPR$0
>+ java.util.HashMap{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-01 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6233
  
Hi @snuyanzin , thanks for your PR. The code looks good and the 
`deepToString()` function returns result correctly. I could not spot any issues 
with the implementation. To make the PR better, I think we can add a test in 
`CliUtilsTest` to test the `rowToString` function, since code in the function 
also has been changed. BTW, the PR template can be done better. See for PR 
https://github.com/apache/flink/pull/5811 as an example.
Best, Hequn


---


[GitHub] flink issue #6219: [hotfix] Fixed typo in docs

2018-07-01 Thread elbaulp
Github user elbaulp commented on the issue:

https://github.com/apache/flink/pull/6219
  
@tillrohrmann You're welcome :-)


---


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and display it in the UI

2018-07-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529025#comment-16529025
 ] 

vinoyang commented on FLINK-9682:
-

seems a feasible suggestion, what do you think about this? [~till.rohrmann] and 
[~StephanEwen]

> Add setDescription to execution environment and display it in the UI
> 
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9511) Make StateDescriptor configurable with optional TTL

2018-07-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529023#comment-16529023
 ] 

vinoyang commented on FLINK-9511:
-

Hi [~azagrebin] the defined TtlConfig stored in flink-runtime module but 
StateDescriptor class exists in flink-core, it seems I can not import it. Is 
there something wrong?

> Make StateDescriptor configurable with optional TTL
> ---
>
> Key: FLINK-9511
> URL: https://issues.apache.org/jira/browse/FLINK-9511
> Project: Flink
>  Issue Type: Sub-task
>  Components: Java API, State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL can be enabled and configured in the constructor of abstract 
> StateDescriptor and become available in all subclasses:
> | {code:java}
> enum StateTtlUpdateType { Disabled, OnCreateAndWrite, OnReadAndWrite }
> enum StateTtlCleanupGuarantee { Relaxed, Exact }
> enum TtlStateVisibility { Relaxed, Exact }
> class TtlConfig {
>   StateTtlUpdateType updateType;
>   StateTtlCleanupGuarantee cleanupStrategy;
>   TtlStateVisibility stateVisibility;
>   TimeCharacteristic timeCharacteristic;
>   long ttl;
> }
> // previous constructor
> StateDescriptor(...) {
>   this.ttlConfig = ttlConfig.DISABLED;
> }
> // overloaded constructor with TtlConfig
> StateDescriptor(..., ttlConfig) {
>   ...
> }
> {code}
>  |
> Another option is to consider adding StateDescriptor builder
> Queryable state can be unsupported with TTL for now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]

2018-07-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529020#comment-16529020
 ] 

vinoyang commented on FLINK-9221:
-

[~joshlemer] I think we should not add this method to `SinkFunction` interface, 
it seems could be implemented in a util class. What's your opinion? 
[~till.rohrmann] 

> Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
> ---
>
> Key: FLINK-9221
> URL: https://issues.apache.org/jira/browse/FLINK-9221
> Project: Flink
>  Issue Type: Task
>  Components: DataSet API, DataStream API
>Affects Versions: 1.5.0
>Reporter: Josh Lemer
>Assignee: vinoyang
>Priority: Minor
>  Labels: flink
>
> Just like it is very useful to use `DataStream[T]` as a sort of Functor or 
> Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to 
> have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on 
> `SinkFunctions`, so that you can reuse existing complex sink functions, but 
> with a different input type. For example:
> {code}
> val bucketingStringSink: SinkFunction[String] = 
>   new BucketingSink[String]("...")
> .setBucketer(new DateTimeBucketer("-MM-dd-HHmm")
> val bucketingIntListSink: SinkFunction[List[Int]] =
>   bucketingStringSink.contramap[List[Int]](_.mkString(","))
> {code}
> For some more formal motivation behind this, 
> https://typelevel.org/cats/typeclasses/contravariant.html is definitely a 
> great place to start!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)