[jira] [Commented] (FLINK-9698) "case class must be static and globally accessible" is too constrained
[ https://issues.apache.org/jira/browse/FLINK-9698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529406#comment-16529406 ] Jeff Zhang commented on FLINK-9698: --- Could anyone let me know why flink require case class must be static and globally accessible ? Because this similar code can work in spark, so I beleive it should be the same for flink. As both of them require to serialize these class to remote host and execute in remote side. > "case class must be static and globally accessible" is too constrained > -- > > Key: FLINK-9698 > URL: https://issues.apache.org/jira/browse/FLINK-9698 > Project: Flink > Issue Type: Improvement >Reporter: Jeff Zhang >Priority: Major > > The following code can reproduce this issue. > {code} > object BatchJob { > def main(args: Array[String]) { > // set up the batch execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tenv = TableEnvironment.getTableEnvironment(env) > case class Person(id:Int, name:String) > val ds = env.fromElements(Person(1,"jeff"), Person(2, "andy")) > tenv.registerDataSet("table_1", ds); > } > } > {code} > Although I have workaround to declare case class outside of the main method, > this workaround won't work in scala-shell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9698) "case class must be static and globally accessible" is too constrained
Jeff Zhang created FLINK-9698: - Summary: "case class must be static and globally accessible" is too constrained Key: FLINK-9698 URL: https://issues.apache.org/jira/browse/FLINK-9698 Project: Flink Issue Type: Improvement Reporter: Jeff Zhang The following code can reproduce this issue. {code} object BatchJob { def main(args: Array[String]) { // set up the batch execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tenv = TableEnvironment.getTableEnvironment(env) case class Person(id:Int, name:String) val ds = env.fromElements(Person(1,"jeff"), Person(2, "andy")) tenv.registerDataSet("table_1", ds); } } {code} Although I have workaround to declare case class outside of the main method, this workaround won't work in scala-shell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529329#comment-16529329 ] ASF GitHub Bot commented on FLINK-9337: --- Github user cricket007 commented on the issue: https://github.com/apache/flink/pull/5995 What about implementing a `KeyedDeserializationSchema` for Avro? > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user cricket007 commented on the issue: https://github.com/apache/flink/pull/5995 What about implementing a `KeyedDeserializationSchema` for Avro? ---
[jira] [Closed] (FLINK-9670) Introduce slot manager factory
[ https://issues.apache.org/jira/browse/FLINK-9670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renjie Liu closed FLINK-9670. - Resolution: Invalid > Introduce slot manager factory > -- > > Key: FLINK-9670 > URL: https://issues.apache.org/jira/browse/FLINK-9670 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Fix For: 1.5.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529321#comment-16529321 ] ASF GitHub Bot commented on FLINK-9567: --- Github user Clark closed the pull request at: https://github.com/apache/flink/pull/6192 > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6192: [FLINK-9567][runtime][yarn] Fix the bug that Flink...
Github user Clark closed the pull request at: https://github.com/apache/flink/pull/6192 ---
[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529282#comment-16529282 ] ASF GitHub Bot commented on FLINK-9688: --- Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199366673 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -,6 +,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.atan(-0.123123132132132).toString) } + @Test + def testAtan2(): Unit = { +testAllApis( + 'f25.atan2('f26), + "f25.atan2(f26)", + "ATAN2(f25, f26)", + math.atan2(0.42.toByte, 0.toByte).toString) + + --- End diff -- Thank you for that catch. blank line removed > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199366673 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -,6 +,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.atan(-0.123123132132132).toString) } + @Test + def testAtan2(): Unit = { +testAllApis( + 'f25.atan2('f26), + "f25.atan2(f26)", + "ATAN2(f25, f26)", + math.atan2(0.42.toByte, 0.toByte).toString) + + --- End diff -- Thank you for that catch. blank line removed ---
[jira] [Commented] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529278#comment-16529278 ] ASF GitHub Bot commented on FLINK-9688: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199365966 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -,6 +,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.atan(-0.123123132132132).toString) } + @Test + def testAtan2(): Unit = { +testAllApis( + 'f25.atan2('f26), + "f25.atan2(f26)", + "ATAN2(f25, f26)", + math.atan2(0.42.toByte, 0.toByte).toString) + + --- End diff -- Remove this blank line would be better~ > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9688) ATAN2 Sql Function support
[ https://issues.apache.org/jira/browse/FLINK-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9688: -- Labels: pull-request-available (was: ) > ATAN2 Sql Function support > -- > > Key: FLINK-9688 > URL: https://issues.apache.org/jira/browse/FLINK-9688 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Minor > Labels: pull-request-available > > simple query fails {code} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, > config()); > DataSet> ds = > CollectionDataSets.get3TupleDataSet(env); > tableEnv.registerDataSet("t1", ds, "x, y, z"); > String sqlQuery = "SELECT atan2(1,2)"; > Table result = tableEnv.sqlQuery(sqlQuery); > {code} > while at the same time Calcite supports it and in Calcite's sqlline it works > like {noformat} > 0: jdbc:calcite:model=target/test-classes/mod> select atan2(1,2); > +-+ > | EXPR$0 | > +-+ > | 0.4636476090008061 | > +-+ > 1 row selected (0.173 seconds) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199365966 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -,6 +,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { math.atan(-0.123123132132132).toString) } + @Test + def testAtan2(): Unit = { +testAllApis( + 'f25.atan2('f26), + "f25.atan2(f26)", + "ATAN2(f25, f26)", + math.atan2(0.42.toByte, 0.toByte).toString) + + --- End diff -- Remove this blank line would be better~ ---
[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6234#discussion_r199365531 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -224,23 +226,26 @@ private boolean isFinalState(ComputationState state) { /** * Prunes states assuming there will be no events with timestamp lower than the given one. -* It cleares the sharedBuffer and also emits all timed out partial matches. +* It clears the sharedBuffer and also emits all timed out partial matches. * * @param sharedBuffer the SharedBuffer object that we need to work upon while processing * @param nfaState The NFAState object that we need to affect while processing * @param timestamptimestamp that indicates that there will be no more events with lower timestamp * @return all timed outed partial matches * @throws Exception Thrown if the system cannot access the state. */ - public Collection>, Long>> advanceTime( + public Tuple2>, Long>>, Collection>>> advanceTime( final SharedBuffer sharedBuffer, final NFAState nfaState, - final long timestamp) throws Exception { + final long timestamp, + final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception { --- End diff -- please add parameter explication for new parameter `afterMatchSkipStrategy ` of the method. ---
[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep
[ https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529276#comment-16529276 ] ASF GitHub Bot commented on FLINK-9431: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6234#discussion_r199365531 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -224,23 +226,26 @@ private boolean isFinalState(ComputationState state) { /** * Prunes states assuming there will be no events with timestamp lower than the given one. -* It cleares the sharedBuffer and also emits all timed out partial matches. +* It clears the sharedBuffer and also emits all timed out partial matches. * * @param sharedBuffer the SharedBuffer object that we need to work upon while processing * @param nfaState The NFAState object that we need to affect while processing * @param timestamptimestamp that indicates that there will be no more events with lower timestamp * @return all timed outed partial matches * @throws Exception Thrown if the system cannot access the state. */ - public Collection>, Long>> advanceTime( + public Tuple2>, Long>>, Collection>>> advanceTime( final SharedBuffer sharedBuffer, final NFAState nfaState, - final long timestamp) throws Exception { + final long timestamp, + final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception { --- End diff -- please add parameter explication for new parameter `afterMatchSkipStrategy ` of the method. > Introduce TimeEnd State to flink cep > > > Key: FLINK-9431 > URL: https://issues.apache.org/jira/browse/FLINK-9431 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > Now flink cep have no support to reach a Final State upon past some time. if > i use a pattern like > {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element > be emitted after 5minutes, i have no way. > I want to introduce a timeEnd State to work with notFollowedBy to figure out > with this scenior. > It can be used like this > {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code}, > [~dawidwys] [~kkl0u] Is this meaningful? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529266#comment-16529266 ] ASF GitHub Bot commented on FLINK-9696: --- Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6233 Hello @hequn8128! Thank you for your review and comments. About PR template - I did changes based on proposed #5811. Please let me know if it is acceptable or not. About `rowToString` agree. I think it makes sense and I added such tests. However I faced with some strange behavior (I do not know if it is bug or whatever else). Commented on the code about that. > Deep toString for arrays/map in SQL client > --- > > Key: FLINK-9696 > URL: https://issues.apache.org/jira/browse/FLINK-9696 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently SQL client does not show arrays/map in human readable way (please > have a look at examples below). e.g. {code}select map[1,2];{code} is shown as > {noformat} +/-EXPR$0 >+ java.util.HashMap > {noformat} > {code}select array[1,2];{code} is shown as {noformat} > +/-EXPR$0 >+ java.lang.Integer[] > {noformat} > {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.Map[]{noformat} > {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.HashMap{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6233: [FLINK-9696] Deep toString for array/map sql types
Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6233 Hello @hequn8128! Thank you for your review and comments. About PR template - I did changes based on proposed #5811. Please let me know if it is acceptable or not. About `rowToString` agree. I think it makes sense and I added such tests. However I faced with some strange behavior (I do not know if it is bug or whatever else). Commented on the code about that. ---
[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529265#comment-16529265 ] ASF GitHub Bot commented on FLINK-9696: --- Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6233#discussion_r199364581 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CliUtils}. + */ +public class CliUtilsTest { + + @Test + public void testRowToString() throws IOException { + Row result = new Row(10); + result.setField(0, null); + result.setField(1, "String"); + result.setField(2, 'c'); + result.setField(3, false); + result.setField(4, 12345.67f); + result.setField(5, 12345.67d); + result.setField(6, 12345L); + result.setField(7, java.sql.Date.valueOf("2018-11-12")); + result.setField(8, new int[]{1, 2}); + result.setField(9, new Tuple3<>(1, "123", null)); + assertEquals(Arrays.toString(CliUtils.rowToString(result)), + "[(NULL), String, c, false, 12345.67, 12345.67, 12345, 2018-11-12, " + + "[1, 2], (1,123,null)]"); --- End diff -- If having tuple here is ok then the next strange thing is null handling inside tuples (it is printed in lowercase and without brackets). So there are at least 2 different types of null handling: inside tuples and all others. > Deep toString for arrays/map in SQL client > --- > > Key: FLINK-9696 > URL: https://issues.apache.org/jira/browse/FLINK-9696 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently SQL client does not show arrays/map in human readable way (please > have a look at examples below). e.g. {code}select map[1,2];{code} is shown as > {noformat} +/-EXPR$0 >+ java.util.HashMap > {noformat} > {code}select array[1,2];{code} is shown as {noformat} > +/-EXPR$0 >+ java.lang.Integer[] > {noformat} > {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.Map[]{noformat} > {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.HashMap{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types
Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6233#discussion_r199364538 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CliUtils}. + */ +public class CliUtilsTest { + + @Test + public void testRowToString() throws IOException { + Row result = new Row(10); + result.setField(0, null); + result.setField(1, "String"); + result.setField(2, 'c'); + result.setField(3, false); + result.setField(4, 12345.67f); + result.setField(5, 12345.67d); + result.setField(6, 12345L); + result.setField(7, java.sql.Date.valueOf("2018-11-12")); + result.setField(8, new int[]{1, 2}); + result.setField(9, new Tuple3<>(1, "123", null)); --- End diff -- Is it a real case to have tuple here for SqlClient? API allows to do that but not sure about real cases. ---
[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529264#comment-16529264 ] ASF GitHub Bot commented on FLINK-9696: --- Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6233#discussion_r199364538 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CliUtils}. + */ +public class CliUtilsTest { + + @Test + public void testRowToString() throws IOException { + Row result = new Row(10); + result.setField(0, null); + result.setField(1, "String"); + result.setField(2, 'c'); + result.setField(3, false); + result.setField(4, 12345.67f); + result.setField(5, 12345.67d); + result.setField(6, 12345L); + result.setField(7, java.sql.Date.valueOf("2018-11-12")); + result.setField(8, new int[]{1, 2}); + result.setField(9, new Tuple3<>(1, "123", null)); --- End diff -- Is it a real case to have tuple here for SqlClient? API allows to do that but not sure about real cases. > Deep toString for arrays/map in SQL client > --- > > Key: FLINK-9696 > URL: https://issues.apache.org/jira/browse/FLINK-9696 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently SQL client does not show arrays/map in human readable way (please > have a look at examples below). e.g. {code}select map[1,2];{code} is shown as > {noformat} +/-EXPR$0 >+ java.util.HashMap > {noformat} > {code}select array[1,2];{code} is shown as {noformat} > +/-EXPR$0 >+ java.lang.Integer[] > {noformat} > {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.Map[]{noformat} > {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.HashMap{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types
Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6233#discussion_r199364581 --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CliUtils}. + */ +public class CliUtilsTest { + + @Test + public void testRowToString() throws IOException { + Row result = new Row(10); + result.setField(0, null); + result.setField(1, "String"); + result.setField(2, 'c'); + result.setField(3, false); + result.setField(4, 12345.67f); + result.setField(5, 12345.67d); + result.setField(6, 12345L); + result.setField(7, java.sql.Date.valueOf("2018-11-12")); + result.setField(8, new int[]{1, 2}); + result.setField(9, new Tuple3<>(1, "123", null)); + assertEquals(Arrays.toString(CliUtils.rowToString(result)), + "[(NULL), String, c, false, 12345.67, 12345.67, 12345, 2018-11-12, " + + "[1, 2], (1,123,null)]"); --- End diff -- If having tuple here is ok then the next strange thing is null handling inside tuples (it is printed in lowercase and without brackets). So there are at least 2 different types of null handling: inside tuples and all others. ---
[jira] [Closed] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9456. Resolution: Fixed Fixed via 1.6.0: 89cfeaa882f9e68df2bd215563622b48c29a9ec9 50c0ea8c9fe17278d45aba476a95791152a1420b 1.5.1: a2f43b4cc081d360cd59ce3e7fb875e4b5fd243f 627412c4d2ea655271fe5da67a55ac936a1a060e > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529253#comment-16529253 ] ASF GitHub Bot commented on FLINK-9456: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6132 > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6132 ---
[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.
[ https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529134#comment-16529134 ] ASF GitHub Bot commented on FLINK-9633: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6194 @yanghua Thanks for the review, I rebased the PR. > Flink doesn't use the Savepoint path's filesystem to create the OuptutStream > on Task. > - > > Key: FLINK-9633 > URL: https://issues.apache.org/jira/browse/FLINK-9633 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Currently, flink use the Savepoint's filesystem to create the meta output > stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses > the Checkpoint's filesystem to create the checkpoint data output stream. When > the Savepoint & Checkpoint in different filesystem this will lead to > problematic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6194 @yanghua Thanks for the review, I rebased the PR. ---
[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep
[ https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529124#comment-16529124 ] ASF GitHub Bot commented on FLINK-9431: --- GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6234 [FLINK-9431]Introduce time bounded condition to cep ## What is the purpose of the change In cep the event is now driving the transformation of the NFA, I think the time factor should also be taken into account in some senior. When a key's data is not endless, and if we want to match the following pattern after we match the `AB` after `B` has appeared for ten seconds. ``` Pattern.begin("A").followedBy("B").notFollowedBy("C") ``` We can not emit the result because there is no branch can lead to the `Final State`, And i think we can add a `TimeEnd` state to describe a pattern that accepts a time condition evaluated by processing time / event time to compare the timestamp in the element we have meant before. As described in the issue link, there are two main reason why i introduce this feature 1. the `notFollowedBy` cant be at the end of the pattern 2. the `within` just compare with the element at start, and some key's data may not endless, so we have to evaluate condition not also on event but also on time ## Brief change log 1. Add the method to distinguish the event driven condition or time drivern condition in `IterativeCondition` 2. when `advanceTime`, we not only prune the expire element, but also look the time bounded condition ## Verifying this change This change is already covered by existing cep tests, may be it need a little more about the new api. This change added tests and can be verified as follows: ## Documentation - Does this pull request introduce a new feature? (yes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink timeEnd-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6234 commit b1aa992a97c8eac818e57c3d2f82be76957052d0 Author: minwenjun Date: 2018-07-01T14:41:44Z [FLINK-9431]Introduce time bounded condition to cep > Introduce TimeEnd State to flink cep > > > Key: FLINK-9431 > URL: https://issues.apache.org/jira/browse/FLINK-9431 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > Now flink cep have no support to reach a Final State upon past some time. if > i use a pattern like > {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element > be emitted after 5minutes, i have no way. > I want to introduce a timeEnd State to work with notFollowedBy to figure out > with this scenior. > It can be used like this > {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code}, > [~dawidwys] [~kkl0u] Is this meaningful? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9431) Introduce TimeEnd State to flink cep
[ https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9431: -- Labels: pull-request-available (was: ) > Introduce TimeEnd State to flink cep > > > Key: FLINK-9431 > URL: https://issues.apache.org/jira/browse/FLINK-9431 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > Now flink cep have no support to reach a Final State upon past some time. if > i use a pattern like > {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element > be emitted after 5minutes, i have no way. > I want to introduce a timeEnd State to work with notFollowedBy to figure out > with this scenior. > It can be used like this > {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code}, > [~dawidwys] [~kkl0u] Is this meaningful? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6234: [FLINK-9431]Introduce time bounded condition to ce...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6234 [FLINK-9431]Introduce time bounded condition to cep ## What is the purpose of the change In cep the event is now driving the transformation of the NFA, I think the time factor should also be taken into account in some senior. When a key's data is not endless, and if we want to match the following pattern after we match the `AB` after `B` has appeared for ten seconds. ``` Pattern.begin("A").followedBy("B").notFollowedBy("C") ``` We can not emit the result because there is no branch can lead to the `Final State`, And i think we can add a `TimeEnd` state to describe a pattern that accepts a time condition evaluated by processing time / event time to compare the timestamp in the element we have meant before. As described in the issue link, there are two main reason why i introduce this feature 1. the `notFollowedBy` cant be at the end of the pattern 2. the `within` just compare with the element at start, and some key's data may not endless, so we have to evaluate condition not also on event but also on time ## Brief change log 1. Add the method to distinguish the event driven condition or time drivern condition in `IterativeCondition` 2. when `advanceTime`, we not only prune the expire element, but also look the time bounded condition ## Verifying this change This change is already covered by existing cep tests, may be it need a little more about the new api. This change added tests and can be verified as follows: ## Documentation - Does this pull request introduce a new feature? (yes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink timeEnd-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6234 commit b1aa992a97c8eac818e57c3d2f82be76957052d0 Author: minwenjun Date: 2018-07-01T14:41:44Z [FLINK-9431]Introduce time bounded condition to cep ---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6194 +1, there is a conflicting file~ cc @sihuazhou ---
[jira] [Commented] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.
[ https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529123#comment-16529123 ] ASF GitHub Bot commented on FLINK-9633: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6194 +1, there is a conflicting file~ cc @sihuazhou > Flink doesn't use the Savepoint path's filesystem to create the OuptutStream > on Task. > - > > Key: FLINK-9633 > URL: https://issues.apache.org/jira/browse/FLINK-9633 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Currently, flink use the Savepoint's filesystem to create the meta output > stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses > the Checkpoint's filesystem to create the checkpoint data output stream. When > the Savepoint & Checkpoint in different filesystem this will lead to > problematic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199352237 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- +1 for this approach that directly specifies the interval literals. Regarding Quarter. It seems like a very old implementation and we should probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it consistent with all other time unit extractions. What do you guys think? I just tried it out by modifying the `Extract` method and it seems working perfectly. ---
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529122#comment-16529122 ] ASF GitHub Bot commented on FLINK-6846: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199352237 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- +1 for this approach that directly specifies the interval literals. Regarding Quarter. It seems like a very old implementation and we should probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it consistent with all other time unit extractions. What do you guys think? I just tried it out by modifying the `Extract` method and it seems working perfectly. > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9654) Internal error while deserializing custom Scala TypeSerializer instances
[ https://issues.apache.org/jira/browse/FLINK-9654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529121#comment-16529121 ] ASF GitHub Bot commented on FLINK-9654: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6206 hi @zsolt-donca I have seen the Travis build error log, the failed reason is not because of your code. This PR looks good, but if you can add a test for `isAnonymousClass` method, that would be better. cc @tillrohrmann > Internal error while deserializing custom Scala TypeSerializer instances > > > Key: FLINK-9654 > URL: https://issues.apache.org/jira/browse/FLINK-9654 > Project: Flink > Issue Type: Bug >Reporter: Zsolt Donca >Priority: Major > Labels: pull-request-available > > When you are using custom `TypeSerializer` instances implemented in Scala, > the Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can > manifest itself when a Flink job is restored from checkpoint or started with > a savepoint. > The reason is that in such a restore from checkpoint or savepoint, Flink uses > `InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type > serializers and their configurations. The deserialization walks through the > entire object graph corresponding, and for each class it calls > `isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place > for FLINK-6869). If there is an internal class defined in a Scala object for > which `getSimpleName` fails (see the Scala issue), then a > `java.lang.InternalError` is thrown which causes the task manager to restart. > In this case, Flink tries to restart the job on another task manager, causing > all the task managers to restart, wreaking havoc on the entire Flink cluster. > There are some alternative type information derivation mechanisms that rely > on anonymous classes and, most importantly, classes generated by macros, that > can easily trigger the above problem. I am personally working on > [https://github.com/zsolt-donca/flink-alt], and there is also > [https://github.com/joroKr21/flink-shapeless] > I prepared a pull request that fixes the issue. > > Edit: added a stack trace to help demonstrate the issue. > 2018-06-21 13:08:07.829 [today-stats (2/2)] ERROR > org.apache.flink.runtime.taskmanager.Task - Encountered fatal error > java.lang.InternalError - terminating the JVM > java.lang.InternalError: Malformed class name > at java.lang.Class.getSimpleName(Class.java:1330) ~[na:1.8.0_171] > at java.lang.Class.isAnonymousClass(Class.java:1411) ~[na:1.8.0_171] > at > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:206) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1855) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) > ~[na:1.8.0_171] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > ~[na:1.8.0_171] > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > ~[na:1.8.0_171] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > ~[flink-dist_2.11-1.4.2.jar:1.4.2] > at >
[GitHub] flink issue #6206: [FLINK-9654] [core] Changed the check for anonymous class...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6206 hi @zsolt-donca I have seen the Travis build error log, the failed reason is not because of your code. This PR looks good, but if you can add a test for `isAnonymousClass` method, that would be better. cc @tillrohrmann ---
[jira] [Updated] (FLINK-9695) Add option for Mesos executor to forcefully pull Docker images
[ https://issues.apache.org/jira/browse/FLINK-9695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonid Ishimnikov updated FLINK-9695: - Description: It would be useful to have an option to forcefully pull Docker images for tasks, rather than reuse a previously cached image. Such option exists in many Mesos frameworks, and it significantly simplifies debugging. I propose adding a new {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option. (was: It would be useful to have an option to forcefully pull Docker images for tasks, rather than reuse a previously cached image. Such option exists in many Mesos frameworks, and it significantly simplifies debugging. I propose adding a new {{mesos.resourcemanager.tasks.container.docker.forcePullImage}} option.) > Add option for Mesos executor to forcefully pull Docker images > -- > > Key: FLINK-9695 > URL: https://issues.apache.org/jira/browse/FLINK-9695 > Project: Flink > Issue Type: Improvement > Components: Mesos >Reporter: Leonid Ishimnikov >Assignee: Leonid Ishimnikov >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > It would be useful to have an option to forcefully pull Docker images for > tasks, rather than reuse a previously cached image. Such option exists in > many Mesos frameworks, and it significantly simplifies debugging. I propose > adding a new > {{mesos.resourcemanager.tasks.container.docker.}}{{force-pull-image}} option. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9669) Introduce task manager assignment store
[ https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529103#comment-16529103 ] ASF GitHub Bot commented on FLINK-9669: --- Github user liurenjie1024 closed the pull request at: https://github.com/apache/flink/pull/6214 > Introduce task manager assignment store > --- > > Key: FLINK-9669 > URL: https://issues.apache.org/jira/browse/FLINK-9669 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.5.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9669) Introduce task manager assignment store
[ https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529102#comment-16529102 ] ASF GitHub Bot commented on FLINK-9669: --- Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/6214 @tillrohrmann This is from my initial design, and since the design has changed, we can close this now. > Introduce task manager assignment store > --- > > Key: FLINK-9669 > URL: https://issues.apache.org/jira/browse/FLINK-9669 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.5.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9669) Introduce task manager assignment store
[ https://issues.apache.org/jira/browse/FLINK-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renjie Liu closed FLINK-9669. - Resolution: Invalid > Introduce task manager assignment store > --- > > Key: FLINK-9669 > URL: https://issues.apache.org/jira/browse/FLINK-9669 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.5.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6214: [FLINK-9669] Add assignment store interface.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/6214 @tillrohrmann This is from my initial design, and since the design has changed, we can close this now. ---
[GitHub] flink pull request #6214: [FLINK-9669] Add assignment store interface.
Github user liurenjie1024 closed the pull request at: https://github.com/apache/flink/pull/6214 ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199349619 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- +1 for better consistency. It is good to follow the Table-api style. ---
[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529099#comment-16529099 ] ASF GitHub Bot commented on FLINK-6846: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199349619 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- +1 for better consistency. It is good to follow the Table-api style. > Add TIMESTAMPADD supported in TableAPI > -- > > Key: FLINK-6846 > URL: https://issues.apache.org/jira/browse/FLINK-6846 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: pull-request-available, starter > > See FLINK-6811 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9696) Deep toString for arrays/map in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529096#comment-16529096 ] ASF GitHub Bot commented on FLINK-9696: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/6233 Hi @snuyanzin , thanks for your PR. The code looks good and the `deepToString()` function returns result correctly. I could not spot any issues with the implementation. To make the PR better, I think we can add a test in `CliUtilsTest` to test the `rowToString` function, since code in the function also has been changed. BTW, the PR template can be done better. See for PR https://github.com/apache/flink/pull/5811 as an example. Best, Hequn > Deep toString for arrays/map in SQL client > --- > > Key: FLINK-9696 > URL: https://issues.apache.org/jira/browse/FLINK-9696 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently SQL client does not show arrays/map in human readable way (please > have a look at examples below). e.g. {code}select map[1,2];{code} is shown as > {noformat} +/-EXPR$0 >+ java.util.HashMap > {noformat} > {code}select array[1,2];{code} is shown as {noformat} > +/-EXPR$0 >+ java.lang.Integer[] > {noformat} > {code}select array[map[1,2],map[2,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.Map[]{noformat} > {code}select map[array[1,2], array[1,2]];{code} is shown as {noformat} +/- > EXPR$0 >+ java.util.HashMap{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6233: [FLINK-9696] Deep toString for array/map sql types
Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/6233 Hi @snuyanzin , thanks for your PR. The code looks good and the `deepToString()` function returns result correctly. I could not spot any issues with the implementation. To make the PR better, I think we can add a test in `CliUtilsTest` to test the `rowToString` function, since code in the function also has been changed. BTW, the PR template can be done better. See for PR https://github.com/apache/flink/pull/5811 as an example. Best, Hequn ---
[GitHub] flink issue #6219: [hotfix] Fixed typo in docs
Github user elbaulp commented on the issue: https://github.com/apache/flink/pull/6219 @tillrohrmann You're welcome :-) ---
[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and display it in the UI
[ https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529025#comment-16529025 ] vinoyang commented on FLINK-9682: - seems a feasible suggestion, what do you think about this? [~till.rohrmann] and [~StephanEwen] > Add setDescription to execution environment and display it in the UI > > > Key: FLINK-9682 > URL: https://issues.apache.org/jira/browse/FLINK-9682 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Webfrontend >Affects Versions: 1.5.0 >Reporter: Elias Levy >Assignee: vinoyang >Priority: Major > > Currently you can provide a job name to {{execute}} in the execution > environment. In an environment where many version of a job may be executing, > such as a development or test environment, identifying which running job is > of a specific version via the UI can be difficult unless the version is > embedded into the job name given the {{execute}}. But the job name is uses > for other purposes, such as for namespacing metrics. Thus, it is not ideal > to modify the job name, as that could require modifying metric dashboards and > monitors each time versions change. > I propose a new method be added to the execution environment, > {{setDescription}}, that would allow a user to pass in an arbitrary > description that would be displayed in the dashboard, allowing users to > distinguish jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9511) Make StateDescriptor configurable with optional TTL
[ https://issues.apache.org/jira/browse/FLINK-9511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529023#comment-16529023 ] vinoyang commented on FLINK-9511: - Hi [~azagrebin] the defined TtlConfig stored in flink-runtime module but StateDescriptor class exists in flink-core, it seems I can not import it. Is there something wrong? > Make StateDescriptor configurable with optional TTL > --- > > Key: FLINK-9511 > URL: https://issues.apache.org/jira/browse/FLINK-9511 > Project: Flink > Issue Type: Sub-task > Components: Java API, State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: vinoyang >Priority: Major > Fix For: 1.6.0 > > > TTL can be enabled and configured in the constructor of abstract > StateDescriptor and become available in all subclasses: > | {code:java} > enum StateTtlUpdateType { Disabled, OnCreateAndWrite, OnReadAndWrite } > enum StateTtlCleanupGuarantee { Relaxed, Exact } > enum TtlStateVisibility { Relaxed, Exact } > class TtlConfig { > StateTtlUpdateType updateType; > StateTtlCleanupGuarantee cleanupStrategy; > TtlStateVisibility stateVisibility; > TimeCharacteristic timeCharacteristic; > long ttl; > } > // previous constructor > StateDescriptor(...) { > this.ttlConfig = ttlConfig.DISABLED; > } > // overloaded constructor with TtlConfig > StateDescriptor(..., ttlConfig) { > ... > } > {code} > | > Another option is to consider adding StateDescriptor builder > Queryable state can be unsupported with TTL for now. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529020#comment-16529020 ] vinoyang commented on FLINK-9221: - [~joshlemer] I think we should not add this method to `SinkFunction` interface, it seems could be implemented in a util class. What's your opinion? [~till.rohrmann] > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > --- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task > Components: DataSet API, DataStream API >Affects Versions: 1.5.0 >Reporter: Josh Lemer >Assignee: vinoyang >Priority: Minor > Labels: flink > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketer(new DateTimeBucketer("-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)