[jira] [Created] (FLINK-8618) Add S3 to the list of sinks on the delivery guarantees page
chris snow created FLINK-8618: - Summary: Add S3 to the list of sinks on the delivery guarantees page Key: FLINK-8618 URL: https://issues.apache.org/jira/browse/FLINK-8618 Project: Flink Issue Type: Improvement Components: Documentation Reporter: chris snow It would be good to add S3 to the list of sinks. Maybe S3 inherits the delivery guarantee properties from hdfs in which case it could be added next to hdfs? E.g. HDFS/S3 rolling sink | exactly once | Implementation depends on Hadoop version -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167162743 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class containing common methods for testing + * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}. + */ +public class BarrierBufferTestBase { --- End diff -- This is not exactly what I had in mind by deduplication of `BarrierBufferTest` and `CreditBasedBarrierBufferTest`. Both of those tests are still pretty much copy of one another and those static methods are only a fraction of duplication. Look for example at the `testSingleChannelNoBarriers()` they are 99% identical. All of it's code could be moved to `BarrierBufferTestBase`. `BarrierBufferTestBase` would only need to define abstract method `CheckpointBarrierHandler createBarrierHandler()` which would be define differently in `BarrierBufferTest` and `CreditBasedBarrierBufferTest`. One minor thing is that `BarrierBufferTest` would need `checkNoTempFilesRemain()` added as an `@After` test hook. Same applies to all of the other tests. ---
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167163798 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class containing common methods for testing + * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}. + */ +public class BarrierBufferTestBase { + + private static final Random RND = new Random(); + + private static int sizeCounter = 1; + + public static BufferOrEvent createBarrier(long checkpointId, int channel) { + return new BufferOrEvent(new CheckpointBarrier( + checkpointId, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), channel); + } + + public static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) { --- End diff -- Instead of using static methods please use inheritance - make `BarrierBufferTest` and `CreditBasedBarrierBufferTest` extend `BarrierBufferTestBase`. Especially that name `*Base` already suggests that. ---
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358128#comment-16358128 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167163798 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class containing common methods for testing + * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}. + */ +public class BarrierBufferTestBase { + + private static final Random RND = new Random(); + + private static int sizeCounter = 1; + + public static BufferOrEvent createBarrier(long checkpointId, int channel) { + return new BufferOrEvent(new CheckpointBarrier( + checkpointId, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), channel); + } + + public static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) { --- End diff -- Instead of using static methods please use inheritance - make `BarrierBufferTest` and `CreditBasedBarrierBufferTest` extend `BarrierBufferTestBase`. Especially that name `*Base` already suggests that. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358129#comment-16358129 ] ASF GitHub Bot commented on FLINK-8547: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5400#discussion_r167162743 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class containing common methods for testing + * {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}. + */ +public class BarrierBufferTestBase { --- End diff -- This is not exactly what I had in mind by deduplication of `BarrierBufferTest` and `CreditBasedBarrierBufferTest`. Both of those tests are still pretty much copy of one another and those static methods are only a fraction of duplication. Look for example at the `testSingleChannelNoBarriers()` they are 99% identical. All of it's code could be moved to `BarrierBufferTestBase`. `BarrierBufferTestBase` would only need to define abstract method `CheckpointBarrierHandler createBarrierHandler()` which would be define differently in `BarrierBufferTest` and `CreditBasedBarrierBufferTest`. One minor thing is that `BarrierBufferTest` would need `checkNoTempFilesRemain()` added as an `@After` test hook. Same applies to all of the other tests. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to ro
[jira] [Created] (FLINK-8619) Some thing about Flink SQL distinct, I need help
Lynch Lee created FLINK-8619: Summary: Some thing about Flink SQL distinct, I need help Key: FLINK-8619 URL: https://issues.apache.org/jira/browse/FLINK-8619 Project: Flink Issue Type: Wish Components: Table API & SQL Affects Versions: 1.4.0 Reporter: Lynch Lee Fix For: 1.4.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lynch Lee updated FLINK-8619: - Description: I do some test about distinct on mysql below: mysql> CREATE TABLE `rpt_tt` ( -> `target_id` varchar(50) NOT NULL DEFAULT '', -> `target_type` varchar(50) NOT NULL DEFAULT '', -> `amt_pay` bigint(20) DEFAULT NULL, -> `down_payment` bigint(20) DEFAULT NULL, -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; Query OK, 0 rows affected (0.01 sec) mysql> insert into rpt_tt values("1","5","1","1"); Query OK, 1 row affected (0.00 sec) mysql> insert into rpt_tt values("3","5","1","1"); Query OK, 1 row affected (0.00 sec) mysql> insert into rpt_tt values("2","6","1","1"); Query OK, 1 row affected (0.00 sec) mysql> insert into rpt_tt values("3","7","1","1"); Query OK, 1 row affected (0.00 sec) mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt; +-+---+-+--+ | target_type | target_id | amt_pay | down_payment | +-+---+-+--+ | 5 | 1 | 1 | 1 | | 6 | 2 | 1 | 1 | | 5 | 3 | 1 | 1 | | 7 | 3 | 1 | 1 | +-+---+-+--+ 4 rows in set (0.00 sec) mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type; +-+---+-+--+ | target_type | target_id | amt_pay | down_payment | +-+---+-+--+ | 5 | 1 | 1 | 1 | | 6 | 2 | 1 | 1 | | 7 | 3 | 1 | 1 | +-+---+-+--+ 3 rows in set (0.00 sec) mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type,target_id,amt_pay,down_payment; +-+---+-+--+ | target_type | target_id | amt_pay | down_payment | +-+---+-+--+ | 5 | 1 | 1 | 1 | | 5 | 3 | 1 | 1 | | 6 | 2 | 1 | 1 | | 7 | 3 | 1 | 1 | +-+---+-+--+ 4 rows in set (0.01 sec) But now, I want do some query on flink SQL, code is here: import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.god.hala.flink.convertors.RowIntoJson; import com.god.hala.flink.sources.DataSources; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.table.api.StreamQueryConfig; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.util.Properties; import java.util.UUID; public class KafkaConn2Topics1 { public static void main(String[] args) throws Exception { String inputTopic = "input-case01-test02"; String outputTopic = "output-case01-test02"; Properties props = new Properties(); props.setProperty("bootstrap.servers", "data-node5:9092"); props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", "")); LocalStreamEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvi
[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 @pnowojski , thanks for suggestions and I totally agree with that. That abstraction indeed makes the code simple. I will update the codes ASAP. ---
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358151#comment-16358151 ] ASF GitHub Bot commented on FLINK-8547: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/5400 @pnowojski , thanks for suggestions and I totally agree with that. That abstraction indeed makes the code simple. I will update the codes ASAP. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lynch Lee updated FLINK-8619: - Description: I do some test about distinct on mysql below: mysql> CREATE TABLE `rpt_tt` ( -> `target_id` varchar(50) NOT NULL DEFAULT '', -> `target_type` varchar(50) NOT NULL DEFAULT '', -> `amt_pay` bigint(20) DEFAULT NULL, -> `down_payment` bigint(20) DEFAULT NULL, -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; Query OK, 0 rows affected (0.01 sec) mysql> insert into rpt_tt values("1","5","1","1"); Query OK, 1 row affected (0.00 sec) mysql> insert into rpt_tt values("3","5","1","1"); Query OK, 1 row affected (0.00 sec) mysql> insert into rpt_tt values("2","6","1","1"); Query OK, 1 row affected (0.00 sec) mysql> insert into rpt_tt values("3","7","1","1"); Query OK, 1 row affected (0.00 sec) mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt; +--+--++---+ |target_type|target_id|amt_pay|down_payment| +--+--++---+ |5 |1 | 1| 1| |6 |2 | 1| 1| |5 |3 | 1| 1| |7 |3 | 1| 1| +--+--++---+ 4 rows in set (0.00 sec) mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type; +--+--++---+ |target_type|target_id|amt_pay|down_payment| +--+--++---+ |5 |1 | 1| 1| |6 |2 | 1| 1| |7 |3 | 1| 1| +--+--++---+ 3 rows in set (0.00 sec) mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type,target_id,amt_pay,down_payment; +--+--++---+ |target_type|target_id|amt_pay|down_payment| +--+--++---+ |5 |1 | 1| 1| |5 |3 | 1| 1| |6 |2 | 1| 1| |7 |3 | 1| 1| +--+--++---+ 4 rows in set (0.01 sec) But now, I want do some query on flink SQL, code is here: import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.god.hala.flink.convertors.RowIntoJson; import com.god.hala.flink.sources.DataSources; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.table.api.StreamQueryConfig; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.util.Properties; import java.util.UUID; public class KafkaConn2Topics1 { public static void main(String[] args) throws Exception { String inputTopic = "input-case01-test02"; String outputTopic = "output-case01-test02"; Properties props = new Properties(); props.setProperty("bootstrap.servers", "data-node5:9092"); props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", "")); LocalStreamEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironment(); streamEnv.setParallelism(1); streamEnv.setStreamTimeCharacteristic(TimeChar
[jira] [Commented] (FLINK-6428) Add support DISTINCT in dataStream SQL
[ https://issues.apache.org/jira/browse/FLINK-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358160#comment-16358160 ] Lynch Lee commented on FLINK-6428: -- [~fhueske] I want use flink sql into my product, but i need some suggestion from you, thanks . For this sql: SELECT distinct a, b, c FROM t GROUP BY a, b, c why must we put the fields b,c into the group by keys while the distinct is on field a ?? > Add support DISTINCT in dataStream SQL > -- > > Key: FLINK-6428 > URL: https://issues.apache.org/jira/browse/FLINK-6428 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > Add support DISTINCT in dataStream SQL as follow: > DATA: > {code} > (name, age) > (kevin, 28), > (sunny, 6), > (jack, 6) > {code} > SQL: > {code} > SELECT DISTINCT age FROM MyTable" > {code} > RESULTS: > {code} > 28, 6 > {code} > To DataStream: > {code} > inputDS > .keyBy() // KeyBy on all fields > .flatMap() // Eliminate duplicate data > {code} > [~fhueske] do we need this feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther closed FLINK-8619. --- Resolution: Won't Fix Hi [~lynchlee], Jira is used for tracking bugs or proposing new features. If you have questions like this, please use our mailing lists (http://flink.apache.org/community.html#mailing-lists) and we are happy to help you. > Some thing about Flink SQL distinct, I need help > > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Lynch Lee >Priority: Major > Fix For: 1.4.0 > > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |5 |3 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |5 |3 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; > import > org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; > import org.apache.flink.table.api.StreamQueryConfig; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Types; > import org
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user uce commented on the issue: https://github.com/apache/flink/pull/5395 Didn't merge yet, because there is an issue with the buildbot environment. ---
[jira] [Commented] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358173#comment-16358173 ] Timo Walther commented on FLINK-8619: - [~lynchlee] to answer your question. I don't know if {{SELECT DISTINCT(value)}} is actually standard SQL but what I know is that is translated into {{SELECT DISTINCT value}}. What you actually want is {{SELECT DISTINCT value, FIRST_VALUE(col1), FIRST_VALUE(col2)}}. {{FIRST_VALUE}} would be an aggregation function that is also asked for in the exception. However, it is not supported yet (see FLINK-6465). I think you have to write you own aggregate function for now (or copy the open pull request code). > Some thing about Flink SQL distinct, I need help > > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Lynch Lee >Priority: Major > Fix For: 1.4.0 > > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |5 |3 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |5 |3 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher
[ https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358174#comment-16358174 ] ASF GitHub Bot commented on FLINK-8308: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/5395 Didn't merge yet, because there is an issue with the buildbot environment. > Update yajl-ruby dependency to 1.3.1 or higher > -- > > Key: FLINK-8308 > URL: https://issues.apache.org/jira/browse/FLINK-8308 > Project: Flink > Issue Type: Task > Components: Project Website >Reporter: Fabian Hueske >Assignee: Steven Langbroek >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > We got notified that yajl-ruby < 1.3.1, a dependency which is used to build > the Flink website, has a security vulnerability of high severity. > We should update yajl-ruby to 1.3.1 or higher. > Since the website is built offline and served as static HTML, I don't think > this is a super critical issue (please correct me if I'm wrong), but we > should resolve this soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358175#comment-16358175 ] Fabian Hueske commented on FLINK-8619: -- Hi [~lynchlee], we use JIRA as a bug tracker and not as a question / answer forum like stack overflow. Please post such questions to the user mailing list or stack overflow. > Some thing about Flink SQL distinct, I need help > > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Lynch Lee >Priority: Major > Fix For: 1.4.0 > > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |5 |3 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |5 |3 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; > import > org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; > import org.apache.flink.table.api.StreamQueryConfig; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Types; > import org.apache.flin
[jira] [Commented] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358184#comment-16358184 ] Timo Walther commented on FLINK-8619: - Sorry I meant {{SELECT value, FIRST_VALUE(col1), FIRST_VALUE(col2) GROUP BY value}}. > Some thing about Flink SQL distinct, I need help > > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Lynch Lee >Priority: Major > Fix For: 1.4.0 > > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |5 |3 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |5 |3 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; > import > org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; > import org.apache.flink.table.api.StreamQueryConfig; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Types; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > import org.
[jira] [Created] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache
Dawid Wysakowicz created FLINK-8620: --- Summary: Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache Key: FLINK-8620 URL: https://issues.apache.org/jira/browse/FLINK-8620 Project: Flink Issue Type: New Feature Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz We should be able to distribute custom files to taskmanagers. To do that we can store those files in BlobStore and later on access them in TaskManagers through DistributedCache. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8621) PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis
Till Rohrmann created FLINK-8621: Summary: PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis Key: FLINK-8621 URL: https://issues.apache.org/jira/browse/FLINK-8621 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.5.0 Reporter: Till Rohrmann Fix For: 1.5.0 {{PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed}} fails on Travis: https://travis-ci.org/apache/flink/jobs/339344244 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8623) ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on Travis
Till Rohrmann created FLINK-8623: Summary: ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics unstable on Travis Key: FLINK-8623 URL: https://issues.apache.org/jira/browse/FLINK-8623 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.5.0 Reporter: Till Rohrmann Fix For: 1.5.0 {{ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics}} fails on Travis: https://travis-ci.org/apache/flink/jobs/33932 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
Bhumika Bayani created FLINK-8622: - Summary: flink-mesos: High memory usage of scheduler + job manager. GC never kicks in. Key: FLINK-8622 URL: https://issues.apache.org/jira/browse/FLINK-8622 Project: Flink Issue Type: Bug Affects Versions: 1.3.2, 1.4.0 Reporter: Bhumika Bayani We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos. We have observed that the memory usage for 'jobmanager' is high. In spite of allocating more and more memory resources to it, it hits the limit within minutes. We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was identical. As per the graph below, the scheduler almost always runs with maximum memory resources. !flink-mem-usage-graph-for-jira.png! Throughout the run of the scheduler, we do not see memory usage going down unless it is killed due to OOM. So inferring, garbage collection is never happening. We have tried using both flink versions 1.4 and 1.3 but could see same issue on both versions. Is there any way we can find out where and how memory is being used? Are there any flink config options for jobmanager or jvm parameters which can help us restrict the memory usage, force garbage collection, and prevent it from crash? Please let us know if there any resource recommendations from Flink for running Flink on mesos at scale. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8621) PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-8621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8621: - Component/s: Tests > PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on > Travis > > > Key: FLINK-8621 > URL: https://issues.apache.org/jira/browse/FLINK-8621 > Project: Flink > Issue Type: Bug > Components: Metrics, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > {{PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed}} fails > on Travis: https://travis-ci.org/apache/flink/jobs/339344244 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
[ https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8622: - Component/s: Mesos > flink-mesos: High memory usage of scheduler + job manager. GC never kicks in. > - > > Key: FLINK-8622 > URL: https://issues.apache.org/jira/browse/FLINK-8622 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0, 1.3.2 >Reporter: Bhumika Bayani >Priority: Major > > We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos. > We have observed that the memory usage for 'jobmanager' is high. In spite of > allocating more and more memory resources to it, it hits the limit within > minutes. > We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 > GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM > and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was > identical. > As per the graph below, the scheduler almost always runs with maximum memory > resources. > !flink-mem-usage-graph-for-jira.png! > > Throughout the run of the scheduler, we do not see memory usage going down > unless it is killed due to OOM. So inferring, garbage collection is never > happening. > We have tried using both flink versions 1.4 and 1.3 but could see same issue > on both versions. > > Is there any way we can find out where and how memory is being used? > Are there any flink config options for jobmanager or jvm parameters which can > help us restrict the memory usage, force garbage collection, and prevent it > from crash? > Please let us know if there any resource recommendations from Flink for > running Flink on mesos at scale. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6428) Add support DISTINCT in dataStream SQL
[ https://issues.apache.org/jira/browse/FLINK-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358201#comment-16358201 ] Fabian Hueske commented on FLINK-6428: -- Hi [~lynchlee] The query {{SELECT DISTINCT a, b, c FROM t GROUP BY a}} is not working because it is ill-defined. What should be the result of the query for the following input? {code} a | b | c -- 1 | 1 | 1 1 | 2 | 2 {code} Clearly, we can only return a single row, because we group on {{a}} and there is only one distinct value for {{a}}. But which values should be returned for {{b}} and {{c}} in that row? We cannot return all values, so we have to pick one. That's an arbitrary choice and hence a random result. Apache Calcite (which Flink uses as a SQL parser and optimizer) does not support it and IMO that's correct. > Add support DISTINCT in dataStream SQL > -- > > Key: FLINK-6428 > URL: https://issues.apache.org/jira/browse/FLINK-6428 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > Add support DISTINCT in dataStream SQL as follow: > DATA: > {code} > (name, age) > (kevin, 28), > (sunny, 6), > (jack, 6) > {code} > SQL: > {code} > SELECT DISTINCT age FROM MyTable" > {code} > RESULTS: > {code} > 28, 6 > {code} > To DataStream: > {code} > inputDS > .keyBy() // KeyBy on all fields > .flatMap() // Eliminate duplicate data > {code} > [~fhueske] do we need this feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8500: Fix Version/s: (was: 1.4.2) > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.5.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5439: [FLINK-8571] [DataStream] [Backport] Introduce utility fu...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5439 CC @aljoscha @tzulitai ---
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358212#comment-16358212 ] ASF GitHub Bot commented on FLINK-8571: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/5439 [FLINK-8571] [DataStream] [Backport] Introduce utility function that reinterprets a data stream as keyed stream This PR is a backport of #5424 to Flink 1.4. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink key-partitioned-source-1.4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5439.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5439 commit bd1c83f90234a03c4cab5ce98d705fa45daa34f5 Author: Stefan Richter Date: 2018-02-09T10:30:37Z [FLINK-8571] [DataStream] Introduce utility function that reinterprets a data stream as keyed stream (backport from 1.5 branch) > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5439: [FLINK-8571] [DataStream] [Backport] Introduce uti...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/5439 [FLINK-8571] [DataStream] [Backport] Introduce utility function that reinterprets a data stream as keyed stream This PR is a backport of #5424 to Flink 1.4. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink key-partitioned-source-1.4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5439.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5439 commit bd1c83f90234a03c4cab5ce98d705fa45daa34f5 Author: Stefan Richter Date: 2018-02-09T10:30:37Z [FLINK-8571] [DataStream] Introduce utility function that reinterprets a data stream as keyed stream (backport from 1.5 branch) ---
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358214#comment-16358214 ] ASF GitHub Bot commented on FLINK-8571: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5439 CC @aljoscha @tzulitai > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization
[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358222#comment-16358222 ] Fabian Hueske commented on FLINK-8601: -- Thanks for the detailed design document [~sihuazhou]! Now I have a pretty good understanding of the proposal. I think this would be a nice feature, but I'm not sure if it is generic enough to go into the base classes that are used by all functions or whether it would make more sense to expose it only to specific functions like the {{ProcessFunction}}. [~aljoscha], what do you think about the proposal? > Introduce PartitionedBloomFilter for Approximate calculation and other > situations of performance optimization > - > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > h3. Backgroud > Bloom filter is useful in many situation, for example: > * 1. Approximate calculation: deduplication (eg: UV calculation) > * 2. Performance optimization: eg, [runtime filter > join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] >By using BF, we can greatly reduce the number of queries for state > data in a stream join, and these filtered queries will eventually fail to > find any results, which is a poor performance for rocksdb-based state due to > traversing ```sst``` on the disk. > However, based on the current status provided by flink, it is hard to use the > bloom filter for the following reasons: > * 1. Serialization problem: Bloom filter status can be large (for example: > 100M), if implement it based on the RocksDB state, the state data will need > to be serialized each time it is queried and updated, and the performance > will be very poor. > * 2. Data skewed: Data in different key group can be skewed, and the > information of data skewed can not be accurately predicted before the program > is running. Therefore, it is impossible to determine how much resources bloom > filter should allocate. One way to do this is to allocate space needed for > the most skewed case, but this can lead to very serious waste of resources. > h3. Requirement > Therefore, I introduce the PartitionedBloomFilter for flink, which at least > need to meet the following features: > * 1. Support for changing Parallelism > * 2. Only serialize when necessary: when performing checkpoint > * 3. Can deal with data skew problem: users only need to specify a > PartitionedBloomFilter with the desired input, fpp, system will allocate > resource dynamic. > * 4. Do not conflict with other state: user can use KeyedState and > OperateState when using this bloom filter. > * 5. Support relax ttl (ie: the data survival time at least greater than the > specified time) > Design doc: [design > doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5439: [FLINK-8571] [DataStream] [Backport] Introduce uti...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5439#discussion_r167196102 --- Diff: flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * ITCase for {@link DataStreamUtils#reinterpretAsKeyedStream(DataStream, KeySelector)}. + */ +public class ReinterpretAsKeyedStreamITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* FLINK-8571 --- End diff -- Can maybe remove this. I don't see a need to explicitly tag the test. ---
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358238#comment-16358238 ] ASF GitHub Bot commented on FLINK-8571: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5439#discussion_r167196102 --- Diff: flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * ITCase for {@link DataStreamUtils#reinterpretAsKeyedStream(DataStream, KeySelector)}. + */ +public class ReinterpretAsKeyedStreamITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* FLINK-8571 --- End diff -- Can maybe remove this. I don't see a need to explicitly tag the test. > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5439: [FLINK-8571] [DataStream] [Backport] Introduce uti...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5439#discussion_r167197709 --- Diff: flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * ITCase for {@link DataStreamUtils#reinterpretAsKeyedStream(DataStream, KeySelector)}. + */ +public class ReinterpretAsKeyedStreamITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* FLINK-8571 --- End diff -- @StefanRRichter Yes, this can then probably also be removed from the original PR before merging. ---
[GitHub] flink issue #5439: [FLINK-8571] [DataStream] [Backport] Introduce utility fu...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5439 The changes look good! But as @tzulitai mentioned the missing Scala tests seem strange. ---
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358249#comment-16358249 ] ASF GitHub Bot commented on FLINK-8571: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5439#discussion_r167197709 --- Diff: flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/ReinterpretAsKeyedStreamITCase.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * ITCase for {@link DataStreamUtils#reinterpretAsKeyedStream(DataStream, KeySelector)}. + */ +public class ReinterpretAsKeyedStreamITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* FLINK-8571 --- End diff -- @StefanRRichter Yes, this can then probably also be removed from the original PR before merging. > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358254#comment-16358254 ] ASF GitHub Bot commented on FLINK-8571: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5439 The changes look good! But as @tzulitai mentioned the missing Scala tests seem strange. > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5424: FLINK-8571] [DataStream] Introduce utility functio...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5424#discussion_r167198202 --- Diff: docs/dev/api_concepts.md --- @@ -896,11 +896,16 @@ result type ```R``` for the final result. E.g. for a histogram, ```V``` is a num {% top %} -## EXPERIMENTAL: Reinterpreting a pre-partitioned data stream as keyed stream +Experimental features --- End diff -- Ah, what I meant is in a completely separate section of the doc, because this part of the doc is about general API concepts and not specific to the streaming API. ---
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358255#comment-16358255 ] ASF GitHub Bot commented on FLINK-8571: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5424#discussion_r167198202 --- Diff: docs/dev/api_concepts.md --- @@ -896,11 +896,16 @@ result type ```R``` for the final result. E.g. for a histogram, ```V``` is a num {% top %} -## EXPERIMENTAL: Reinterpreting a pre-partitioned data stream as keyed stream +Experimental features --- End diff -- Ah, what I meant is in a completely separate section of the doc, because this part of the doc is about general API concepts and not specific to the streaming API. > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5424: FLINK-8571] [DataStream] Introduce utility function that ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5424 Still looks good, except that I'd like to see the new doc in a completely separate section of the doc. ---
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358257#comment-16358257 ] ASF GitHub Bot commented on FLINK-8571: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5424 Still looks good, except that I'd like to see the new doc in a completely separate section of the doc. > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8624) flink-mesos: The flink rest-api sometimes becomes unresponsive
Bhumika Bayani created FLINK-8624: - Summary: flink-mesos: The flink rest-api sometimes becomes unresponsive Key: FLINK-8624 URL: https://issues.apache.org/jira/browse/FLINK-8624 Project: Flink Issue Type: Bug Affects Versions: 1.3.2 Reporter: Bhumika Bayani Sometimes flink-mesos-scheduler fails/get killed, and marathon brings it up again on some other node. Sometimes we have observed, the rest-api of the newly created flink instance becomes unresponsive. Even if we execute api calls manually with curl, such as http://:/overview or http://:/config we do not receive any response. We submit and execute all our flink-jobs using rest-api only. So if rest api becomes un-responsive, that stops us from running any of the flink jobs and no stream processing happens. We tried enabling flink debug logs, but we did not observer anything specific that indicates why rest api is failing/unresponsive. We see below exceptions in logs but that is not specific to case when flink-api is hung. We see them in healthy flink-scheduler too: {code:java} Timestamp=2018-02-08 05:43:49,175 LogLevel=INFO ThreadId=[Checkpoint Timer] Class=o.a.f.r.c.CheckpointCoordinator Msg=Triggering checkpoint 10181 @ 1518068629174 Timestamp=2018-02-08 05:43:49,183 LogLevel=DEBUG ThreadId=[nioEventLoopGroup-5-3] Class=o.a.f.r.w.WebRuntimeMonitor Msg=Unhandled exception: {} akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager#753807801]] after [1 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91] {code} During the time rest api is unresponsive, we have observed flink web UI too does not load/show any information. Restarting the flink-scheduler solves this issue sometimes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5411: [FLINK-8556] [Kinesis Connector] Add proxy feature to the...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5411 Thanks for the contribution @pduveau. I'll try to take a look at this soon. ---
[jira] [Commented] (FLINK-8556) Add proxy feature to Kinesis Connector to acces its endpoint
[ https://issues.apache.org/jira/browse/FLINK-8556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358261#comment-16358261 ] ASF GitHub Bot commented on FLINK-8556: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5411 Thanks for the contribution @pduveau. I'll try to take a look at this soon. > Add proxy feature to Kinesis Connector to acces its endpoint > > > Key: FLINK-8556 > URL: https://issues.apache.org/jira/browse/FLINK-8556 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.0 >Reporter: Ph.Duveau >Priority: Major > Labels: features > > The connector can not be configured to use a proxy to access Kinesis > endpoint. This feature is required on EC2 instances which can access internet > only through a proxy. VPC Kinesis endpoints are currently available in few > AWS' regions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8624) flink-mesos: The flink rest-api sometimes becomes unresponsive
[ https://issues.apache.org/jira/browse/FLINK-8624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8624: Fix Version/s: 1.5.0 > flink-mesos: The flink rest-api sometimes becomes unresponsive > -- > > Key: FLINK-8624 > URL: https://issues.apache.org/jira/browse/FLINK-8624 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.3.2 >Reporter: Bhumika Bayani >Priority: Major > Fix For: 1.5.0 > > > Sometimes flink-mesos-scheduler fails/get killed, and marathon brings it up > again on some other node. Sometimes we have observed, the rest-api of the > newly created flink instance becomes unresponsive. > Even if we execute api calls manually with curl, such as > http://:/overview or http://:/config > we do not receive any response. > We submit and execute all our flink-jobs using rest-api only. So if rest api > becomes un-responsive, that stops us from running any of the flink jobs and > no stream processing happens. > We tried enabling flink debug logs, but we did not observer anything specific > that indicates why rest api is failing/unresponsive. > We see below exceptions in logs but that is not specific to case when > flink-api is hung. We see them in healthy flink-scheduler too: > > {code:java} > Timestamp=2018-02-08 05:43:49,175 LogLevel=INFO > ThreadId=[Checkpoint Timer] Class=o.a.f.r.c.CheckpointCoordinator > Msg=Triggering checkpoint 10181 @ 1518068629174 > Timestamp=2018-02-08 05:43:49,183 LogLevel=DEBUG > ThreadId=[nioEventLoopGroup-5-3] Class=o.a.f.r.w.WebRuntimeMonitor > Msg=Unhandled exception: {} > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager#753807801]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91] > {code} > > During the time rest api is unresponsive, we have observed flink web UI too > does not load/show any information. > Restarting the flink-scheduler solves this issue sometimes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8624) flink-mesos: The flink rest-api sometimes becomes unresponsive
[ https://issues.apache.org/jira/browse/FLINK-8624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8624: Component/s: REST Distributed Coordination > flink-mesos: The flink rest-api sometimes becomes unresponsive > -- > > Key: FLINK-8624 > URL: https://issues.apache.org/jira/browse/FLINK-8624 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.3.2 >Reporter: Bhumika Bayani >Priority: Blocker > Fix For: 1.5.0 > > > Sometimes flink-mesos-scheduler fails/get killed, and marathon brings it up > again on some other node. Sometimes we have observed, the rest-api of the > newly created flink instance becomes unresponsive. > Even if we execute api calls manually with curl, such as > http://:/overview or http://:/config > we do not receive any response. > We submit and execute all our flink-jobs using rest-api only. So if rest api > becomes un-responsive, that stops us from running any of the flink jobs and > no stream processing happens. > We tried enabling flink debug logs, but we did not observer anything specific > that indicates why rest api is failing/unresponsive. > We see below exceptions in logs but that is not specific to case when > flink-api is hung. We see them in healthy flink-scheduler too: > > {code:java} > Timestamp=2018-02-08 05:43:49,175 LogLevel=INFO > ThreadId=[Checkpoint Timer] Class=o.a.f.r.c.CheckpointCoordinator > Msg=Triggering checkpoint 10181 @ 1518068629174 > Timestamp=2018-02-08 05:43:49,183 LogLevel=DEBUG > ThreadId=[nioEventLoopGroup-5-3] Class=o.a.f.r.w.WebRuntimeMonitor > Msg=Unhandled exception: {} > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager#753807801]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91] > {code} > > During the time rest api is unresponsive, we have observed flink web UI too > does not load/show any information. > Restarting the flink-scheduler solves this issue sometimes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
[ https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8622: Fix Version/s: 1.5.0 > flink-mesos: High memory usage of scheduler + job manager. GC never kicks in. > - > > Key: FLINK-8622 > URL: https://issues.apache.org/jira/browse/FLINK-8622 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0, 1.3.2 >Reporter: Bhumika Bayani >Priority: Major > Fix For: 1.5.0 > > > We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos. > We have observed that the memory usage for 'jobmanager' is high. In spite of > allocating more and more memory resources to it, it hits the limit within > minutes. > We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 > GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM > and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was > identical. > As per the graph below, the scheduler almost always runs with maximum memory > resources. > !flink-mem-usage-graph-for-jira.png! > > Throughout the run of the scheduler, we do not see memory usage going down > unless it is killed due to OOM. So inferring, garbage collection is never > happening. > We have tried using both flink versions 1.4 and 1.3 but could see same issue > on both versions. > > Is there any way we can find out where and how memory is being used? > Are there any flink config options for jobmanager or jvm parameters which can > help us restrict the memory usage, force garbage collection, and prevent it > from crash? > Please let us know if there any resource recommendations from Flink for > running Flink on mesos at scale. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8624) flink-mesos: The flink rest-api sometimes becomes unresponsive
[ https://issues.apache.org/jira/browse/FLINK-8624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8624: Priority: Blocker (was: Major) > flink-mesos: The flink rest-api sometimes becomes unresponsive > -- > > Key: FLINK-8624 > URL: https://issues.apache.org/jira/browse/FLINK-8624 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.3.2 >Reporter: Bhumika Bayani >Priority: Blocker > Fix For: 1.5.0 > > > Sometimes flink-mesos-scheduler fails/get killed, and marathon brings it up > again on some other node. Sometimes we have observed, the rest-api of the > newly created flink instance becomes unresponsive. > Even if we execute api calls manually with curl, such as > http://:/overview or http://:/config > we do not receive any response. > We submit and execute all our flink-jobs using rest-api only. So if rest api > becomes un-responsive, that stops us from running any of the flink jobs and > no stream processing happens. > We tried enabling flink debug logs, but we did not observer anything specific > that indicates why rest api is failing/unresponsive. > We see below exceptions in logs but that is not specific to case when > flink-api is hung. We see them in healthy flink-scheduler too: > > {code:java} > Timestamp=2018-02-08 05:43:49,175 LogLevel=INFO > ThreadId=[Checkpoint Timer] Class=o.a.f.r.c.CheckpointCoordinator > Msg=Triggering checkpoint 10181 @ 1518068629174 > Timestamp=2018-02-08 05:43:49,183 LogLevel=DEBUG > ThreadId=[nioEventLoopGroup-5-3] Class=o.a.f.r.w.WebRuntimeMonitor > Msg=Unhandled exception: {} > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager#753807801]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91] > {code} > > During the time rest api is unresponsive, we have observed flink web UI too > does not load/show any information. > Restarting the flink-scheduler solves this issue sometimes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
[ https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8622: Component/s: ResourceManager Distributed Coordination > flink-mesos: High memory usage of scheduler + job manager. GC never kicks in. > - > > Key: FLINK-8622 > URL: https://issues.apache.org/jira/browse/FLINK-8622 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Mesos, ResourceManager >Affects Versions: 1.4.0, 1.3.2 >Reporter: Bhumika Bayani >Priority: Blocker > Fix For: 1.5.0 > > > We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos. > We have observed that the memory usage for 'jobmanager' is high. In spite of > allocating more and more memory resources to it, it hits the limit within > minutes. > We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 > GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM > and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was > identical. > As per the graph below, the scheduler almost always runs with maximum memory > resources. > !flink-mem-usage-graph-for-jira.png! > > Throughout the run of the scheduler, we do not see memory usage going down > unless it is killed due to OOM. So inferring, garbage collection is never > happening. > We have tried using both flink versions 1.4 and 1.3 but could see same issue > on both versions. > > Is there any way we can find out where and how memory is being used? > Are there any flink config options for jobmanager or jvm parameters which can > help us restrict the memory usage, force garbage collection, and prevent it > from crash? > Please let us know if there any resource recommendations from Flink for > running Flink on mesos at scale. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reopened FLINK-8619: - Reopen to remove fix version > Some thing about Flink SQL distinct, I need help > > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Lynch Lee >Priority: Major > Fix For: 1.4.0 > > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |5 |3 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |5 |3 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; > import > org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; > import org.apache.flink.table.api.StreamQueryConfig; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Types; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import jav
[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
[ https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8622: Priority: Blocker (was: Major) > flink-mesos: High memory usage of scheduler + job manager. GC never kicks in. > - > > Key: FLINK-8622 > URL: https://issues.apache.org/jira/browse/FLINK-8622 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Mesos, ResourceManager >Affects Versions: 1.4.0, 1.3.2 >Reporter: Bhumika Bayani >Priority: Blocker > Fix For: 1.5.0 > > > We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos. > We have observed that the memory usage for 'jobmanager' is high. In spite of > allocating more and more memory resources to it, it hits the limit within > minutes. > We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 > GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM > and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was > identical. > As per the graph below, the scheduler almost always runs with maximum memory > resources. > !flink-mem-usage-graph-for-jira.png! > > Throughout the run of the scheduler, we do not see memory usage going down > unless it is killed due to OOM. So inferring, garbage collection is never > happening. > We have tried using both flink versions 1.4 and 1.3 but could see same issue > on both versions. > > Is there any way we can find out where and how memory is being used? > Are there any flink config options for jobmanager or jvm parameters which can > help us restrict the memory usage, force garbage collection, and prevent it > from crash? > Please let us know if there any resource recommendations from Flink for > running Flink on mesos at scale. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-8619. --- Resolution: Not A Bug > Some thing about Flink SQL distinct, I need help > > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Lynch Lee >Priority: Major > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |5 |3 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |5 |3 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; > import > org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; > import org.apache.flink.table.api.StreamQueryConfig; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Types; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import java.nio.charset.Charset; > import java.u
[jira] [Updated] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8619: Fix Version/s: (was: 1.4.0) > Some thing about Flink SQL distinct, I need help > > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Lynch Lee >Priority: Major > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |5 |3 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |5 |3 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; > import > org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; > import org.apache.flink.table.api.StreamQueryConfig; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Types; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import java.nio.charset.Charset; > i
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358286#comment-16358286 ] Xingcan Cui commented on FLINK-8538: Hi [~twalthr], I think we need a converter to transform a standard JSON schema to a {{TypeInformation}}, right? > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358295#comment-16358295 ] Timo Walther commented on FLINK-8538: - [~xccui] yes, I think this is necessary. We also have to think about how to handle JSON specific types. E.g. the JSON standard declares a "Number" type but we have to map it to some Java primitive. It may also declares union types. We have the following options: Option 1: We infer the type using information from the {{TableSchema}} (but this would be Table API specific, formats are intended for all APIs). Option 2: We make this configurable: number as double, number as BigDecimal etc. Option 3: We introduce a new TypeInformation. If we really want to support JSON once and for all, we have to think about how to handle those cases. I just read a discussion on the Beam ML about this: https://lists.apache.org/thread.html/ee6843859f1ddb1d4544c32d255fe88a3bf3aec97d3afc3e3d47c701@%3Cdev.beam.apache.org%3E > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/5405 ping @aljoscha ---
[jira] [Commented] (FLINK-8477) Add api to support for user to skip the first incomplete window data
[ https://issues.apache.org/jira/browse/FLINK-8477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358297#comment-16358297 ] ASF GitHub Bot commented on FLINK-8477: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/5405 ping @aljoscha > Add api to support for user to skip the first incomplete window data > > > Key: FLINK-8477 > URL: https://issues.apache.org/jira/browse/FLINK-8477 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Fix For: 1.4.1 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6160) Retry JobManager/ResourceManager connection in case of timeout
[ https://issues.apache.org/jira/browse/FLINK-6160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358310#comment-16358310 ] ASF GitHub Bot commented on FLINK-6160: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/5440 [FLINK-6160] [flip-6] Retry JobManager/ResourceManager connection in … ## What is the purpose of the change When timeout comes, retry JobManager/ResourceManager connection in case of timeout ## Brief change log When timeout, invoke ```requestHeartbeat``` in HeartbeatMonitor thread. Not directly invoke ```notifyHeartbeatTimeout``` and close the connection. ## Verifying this change This change is already covered by existing tests, but did minor changes. in the TaskExecutorTest.java, change ```testHeartbeatTimeoutWithResourceManager``` behavior to while timeout, does not invoke ```disconnectTaskManager```. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6160 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5440.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5440 commit 5319abdf503c757baf7afde9913ab2fb6fb61b60 Author: zhangminglei Date: 2018-02-09T11:52:50Z [FLINK-6160] [flip-6] Retry JobManager/ResourceManager connection in case of timeout > Retry JobManager/ResourceManager connection in case of timeout > --- > > Key: FLINK-6160 > URL: https://issues.apache.org/jira/browse/FLINK-6160 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Priority: Major > Labels: flip-6 > > In case of a heartbeat timeout, the {{TaskExecutor}} closes the connection to > the remote component. Furthermore, it assumes that the component has actually > failed and, thus, it will only start trying to connect to the component if it > is notified about a new leader address and leader session id. This is > brittle, because the heartbeat could also time out without the component > having crashed. Thus, we should add an automatic retry to the latest known > leader address information in case of a timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5440: [FLINK-6160] [flip-6] Retry JobManager/ResourceMan...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/5440 [FLINK-6160] [flip-6] Retry JobManager/ResourceManager connection in ⦠## What is the purpose of the change When timeout comes, retry JobManager/ResourceManager connection in case of timeout ## Brief change log When timeout, invoke ```requestHeartbeat``` in HeartbeatMonitor thread. Not directly invoke ```notifyHeartbeatTimeout``` and close the connection. ## Verifying this change This change is already covered by existing tests, but did minor changes. in the TaskExecutorTest.java, change ```testHeartbeatTimeoutWithResourceManager``` behavior to while timeout, does not invoke ```disconnectTaskManager```. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6160 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5440.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5440 commit 5319abdf503c757baf7afde9913ab2fb6fb61b60 Author: zhangminglei Date: 2018-02-09T11:52:50Z [FLINK-6160] [flip-6] Retry JobManager/ResourceManager connection in case of timeout ---
[jira] [Commented] (FLINK-8619) Some thing about Flink SQL distinct, I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358312#comment-16358312 ] Lynch Lee commented on FLINK-8619: -- Thanks to all. I got it. > Some thing about Flink SQL distinct, I need help > > > Key: FLINK-8619 > URL: https://issues.apache.org/jira/browse/FLINK-8619 > Project: Flink > Issue Type: Wish > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Lynch Lee >Priority: Major > > I do some test about distinct on mysql below: > > > mysql> CREATE TABLE `rpt_tt` ( > -> `target_id` varchar(50) NOT NULL DEFAULT '', > -> `target_type` varchar(50) NOT NULL DEFAULT '', > -> `amt_pay` bigint(20) DEFAULT NULL, > -> `down_payment` bigint(20) DEFAULT NULL, > -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`) > -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > Query OK, 0 rows affected (0.01 sec) > > mysql> insert into rpt_tt values("1","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","5","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("2","6","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> insert into rpt_tt values("3","7","1","1"); > Query OK, 1 row affected (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |5 |3 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 3 rows in set (0.00 sec) > > mysql> select distinct(target_type),target_id,amt_pay,down_payment from > rpt_tt group by target_type,target_id,amt_pay,down_payment; > +--+--++---+ > |target_type|target_id|amt_pay|down_payment| > +--+--++---+ > |5 |1 | 1| 1| > |5 |3 | 1| 1| > |6 |2 | 1| 1| > |7 |3 | 1| 1| > +--+--++---+ > 4 rows in set (0.01 sec) > > But now, > I want do some query on flink SQL, code is here: > import com.fasterxml.jackson.databind.DeserializationFeature; > import com.fasterxml.jackson.databind.JsonNode; > import com.fasterxml.jackson.databind.ObjectMapper; > import com.fasterxml.jackson.databind.node.JsonNodeFactory; > import com.fasterxml.jackson.databind.node.ObjectNode; > import com.god.hala.flink.convertors.RowIntoJson; > import com.god.hala.flink.sources.DataSources; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import > org.apache.flink.api.common.serialization.AbstractDeserializationSchema; > import org.apache.flink.api.common.time.Time; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; > import > org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; > import org.apache.flink.table.api.StreamQueryConfig; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.Types; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > imp
[jira] [Assigned] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE
[ https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-8423: --- Assignee: mingleizhang > OperatorChain#pushToOperator catch block may fail with NPE > -- > > Key: FLINK-8423 > URL: https://issues.apache.org/jira/browse/FLINK-8423 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: mingleizhang >Priority: Critical > > {code} > @Override > protected void pushToOperator(StreamRecord record) { > try { > // we know that the given outputTag matches our OutputTag so > the record > // must be of the type that our operator (and Serializer) > expects. > @SuppressWarnings("unchecked") > StreamRecord castRecord = (StreamRecord) record; > numRecordsIn.inc(); > StreamRecord copy = > castRecord.copy(serializer.copy(castRecord.getValue())); > operator.setKeyContextElement1(copy); > operator.processElement(copy); > } catch (ClassCastException e) { > // Enrich error message > ClassCastException replace = new ClassCastException( > String.format( > "%s. Failed to push OutputTag with id '%s' to > operator. " + > "This can occur when multiple OutputTags with > different types " + > "but identical names are being used.", > e.getMessage(), > outputTag.getId())); > throw new ExceptionInChainedOperatorException(replace); > } catch (Exception e) { > throw new ExceptionInChainedOperatorException(e); > } > } > {code} > If outputTag is null (as is the case when no sideOutput was defined) the > catch block will crash with a NullPointerException. This may happen if > {{operator.processElement}} throws a ClassCastException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE
[ https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358322#comment-16358322 ] mingleizhang commented on FLINK-8423: - I will fix this issue. > OperatorChain#pushToOperator catch block may fail with NPE > -- > > Key: FLINK-8423 > URL: https://issues.apache.org/jira/browse/FLINK-8423 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: mingleizhang >Priority: Critical > > {code} > @Override > protected void pushToOperator(StreamRecord record) { > try { > // we know that the given outputTag matches our OutputTag so > the record > // must be of the type that our operator (and Serializer) > expects. > @SuppressWarnings("unchecked") > StreamRecord castRecord = (StreamRecord) record; > numRecordsIn.inc(); > StreamRecord copy = > castRecord.copy(serializer.copy(castRecord.getValue())); > operator.setKeyContextElement1(copy); > operator.processElement(copy); > } catch (ClassCastException e) { > // Enrich error message > ClassCastException replace = new ClassCastException( > String.format( > "%s. Failed to push OutputTag with id '%s' to > operator. " + > "This can occur when multiple OutputTags with > different types " + > "but identical names are being used.", > e.getMessage(), > outputTag.getId())); > throw new ExceptionInChainedOperatorException(replace); > } catch (Exception e) { > throw new ExceptionInChainedOperatorException(e); > } > } > {code} > If outputTag is null (as is the case when no sideOutput was defined) the > catch block will crash with a NullPointerException. This may happen if > {{operator.processElement}} throws a ClassCastException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8622) flink-mesos: High memory usage of scheduler + job manager. GC never kicks in.
[ https://issues.apache.org/jira/browse/FLINK-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhumika Bayani updated FLINK-8622: -- Attachment: flink-mem-usage-graph-for-jira.png > flink-mesos: High memory usage of scheduler + job manager. GC never kicks in. > - > > Key: FLINK-8622 > URL: https://issues.apache.org/jira/browse/FLINK-8622 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Mesos, ResourceManager >Affects Versions: 1.4.0, 1.3.2 >Reporter: Bhumika Bayani >Priority: Blocker > Fix For: 1.5.0 > > Attachments: flink-mem-usage-graph-for-jira.png > > > We are deploying a 1 job manager + 6 taskmanager flink cluster on mesos. > We have observed that the memory usage for 'jobmanager' is high. In spite of > allocating more and more memory resources to it, it hits the limit within > minutes. > We had started with 1.5 GB RAM and 1 GB heap. Currently we have allocated 4 > GB RAM, 3 GB heap to jobmanager cum scheduler. We tried allocating 8GB RAM > and lesser heap (i.e. same, 3GB) too. In that case also, memory graph was > identical. > As per the graph below, the scheduler almost always runs with maximum memory > resources. > !flink-mem-usage-graph-for-jira.png! > > Throughout the run of the scheduler, we do not see memory usage going down > unless it is killed due to OOM. So inferring, garbage collection is never > happening. > We have tried using both flink versions 1.4 and 1.3 but could see same issue > on both versions. > > Is there any way we can find out where and how memory is being used? > Are there any flink config options for jobmanager or jvm parameters which can > help us restrict the memory usage, force garbage collection, and prevent it > from crash? > Please let us know if there any resource recommendations from Flink for > running Flink on mesos at scale. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8625) Move OutputFlasherThread to Netty scheduled executor
Piotr Nowojski created FLINK-8625: - Summary: Move OutputFlasherThread to Netty scheduled executor Key: FLINK-8625 URL: https://issues.apache.org/jira/browse/FLINK-8625 Project: Flink Issue Type: Sub-task Components: Network Reporter: Piotr Nowojski This will allow us to trigger/schedule next flush only if we are not currently busy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5383: [hotfix][kafka-tests] Do not hide original excepti...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5383#discussion_r167224057 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java --- @@ -592,24 +595,21 @@ private Properties createProperties() { return properties; } - private void assertIsCausedBy(Class clazz, Throwable ex) { + private static Optional isCausedBy(Class clazz, Throwable ex) { --- End diff -- Changed to `findThrowable` :) ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358372#comment-16358372 ] Fabian Hueske commented on FLINK-8538: -- Regarding the options: * Option 1: This was my initial idea. But Timo is right, we would tie the format to the Table API which is not a good choice, IMO. * Option 2: I don't think this will work. What if a schema contains two NUMBER types? * Option 3: How would this help? We would have a custom number type that nobody can properly handle. I think the best approach to support JSON Schema would be to always return BigDecimal types. > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5425: [FLINK-8456] Add Scala API for Connected Streams with Bro...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5425 @aljoscha Updated the PR. Please have a look! ---
[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.
[ https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358384#comment-16358384 ] ASF GitHub Bot commented on FLINK-8456: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5425 @aljoscha Updated the PR. Please have a look! > Add Scala API for Connected Streams with Broadcast State. > - > > Key: FLINK-8456 > URL: https://issues.apache.org/jira/browse/FLINK-8456 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6428) Add support DISTINCT in dataStream SQL
[ https://issues.apache.org/jira/browse/FLINK-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358399#comment-16358399 ] Lynch Lee commented on FLINK-6428: -- OK。 [~fhueske], Thanks your explaination . I got it clearly. > Add support DISTINCT in dataStream SQL > -- > > Key: FLINK-6428 > URL: https://issues.apache.org/jira/browse/FLINK-6428 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > Add support DISTINCT in dataStream SQL as follow: > DATA: > {code} > (name, age) > (kevin, 28), > (sunny, 6), > (jack, 6) > {code} > SQL: > {code} > SELECT DISTINCT age FROM MyTable" > {code} > RESULTS: > {code} > 28, 6 > {code} > To DataStream: > {code} > inputDS > .keyBy() // KeyBy on all fields > .flatMap() // Eliminate duplicate data > {code} > [~fhueske] do we need this feature? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5439: [FLINK-8571] [DataStream] [Backport] Introduce utility fu...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5439 Added the test. ---
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358451#comment-16358451 ] ASF GitHub Bot commented on FLINK-8571: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5439 Added the test. > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5438: [FLINK-8617][TableAPI & SQL] Fix code generation bug whil...
Github user Xpray commented on the issue: https://github.com/apache/flink/pull/5438 I've removed the ITCase and add some tests to ```org.apache.flink.table.expressions.MapTypeTest ``` ---
[jira] [Commented] (FLINK-8617) Fix code generation bug while accessing Map type
[ https://issues.apache.org/jira/browse/FLINK-8617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358470#comment-16358470 ] ASF GitHub Bot commented on FLINK-8617: --- Github user Xpray commented on the issue: https://github.com/apache/flink/pull/5438 I've removed the ITCase and add some tests to ```org.apache.flink.table.expressions.MapTypeTest ``` > Fix code generation bug while accessing Map type > > > Key: FLINK-8617 > URL: https://issues.apache.org/jira/browse/FLINK-8617 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > > There's a code generation bug in {code}ScalarOperatos.generateMapGet{code}. > And there's two more bugs found in {code}ScalarOperators.generateIsNull{code} > and {code}ScalarOperators.generateIsNotNull{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/5441 [FLINK-8607] [table] Add a basic embedded SQL CLI client ## What is the purpose of the change This PR implements the first part of the implementation plan described in FLIP-24. ``` Goal: Add the basic features to play around with Flink's streaming SQL. - Add CLI component that reads the configuration files - "Pre-registered table sources" - "Job parameters" - Add executor for retrieving pre-flight information and corresponding CLI SQL parser - SHOW TABLES - DESCRIBE TABLE - EXPLAIN - Add streaming append query submission to executor - Submit jars and run SELECT query using the ClusterClient - Collect results on heap and serve them on the CLI side (Internal Mode with SELECT) - SOURCE (for executing a SQL statement stored in a local file) ``` Additionally, this PR also supports retraction queries and the SET operation for setting properties. The client can be started using `./bin/sql-client.sh embedded`. A table source must be defined in `./conf/sql-client-defaults.yaml` (for example a CSV table source, an example can be found in the test resources directory). The client supports two modes for viewing results. A `changelog` mode or `table` mode. They can be selected by setting the `execution.result-mode` property. The code is still work in progress. There are a couple of things that can be improved: - Add more logging instead of swallowing exceptions - Use Flink's ConfigOptions where applicable (e.g. for better default value handling and validation) - Maybe make the record result retrieval blocking? - Tests for the LocalExecutor - Some basic tests for other classes (maybe for CLI classes as well?) - More deployment options (support for YARN, FLIP-6?) - Documentation ## Brief change log - New module `flink-sql-client` - New executable script in `flink/bin` - Minor visibility changes in other modules ## Verifying this change Manually verified. Further tests will follow. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-8607 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5441.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5441 commit 17b6b6d8b8804382b7b855033c370ffb4fd673ac Author: twalthr Date: 2017-12-07T12:46:31Z [FLINK-8607] [table] Add a basic embedded SQL CLI client ---
[jira] [Commented] (FLINK-8607) Add a basic embedded SQL CLI client
[ https://issues.apache.org/jira/browse/FLINK-8607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358472#comment-16358472 ] ASF GitHub Bot commented on FLINK-8607: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/5441 [FLINK-8607] [table] Add a basic embedded SQL CLI client ## What is the purpose of the change This PR implements the first part of the implementation plan described in FLIP-24. ``` Goal: Add the basic features to play around with Flink's streaming SQL. - Add CLI component that reads the configuration files - "Pre-registered table sources" - "Job parameters" - Add executor for retrieving pre-flight information and corresponding CLI SQL parser - SHOW TABLES - DESCRIBE TABLE - EXPLAIN - Add streaming append query submission to executor - Submit jars and run SELECT query using the ClusterClient - Collect results on heap and serve them on the CLI side (Internal Mode with SELECT) - SOURCE (for executing a SQL statement stored in a local file) ``` Additionally, this PR also supports retraction queries and the SET operation for setting properties. The client can be started using `./bin/sql-client.sh embedded`. A table source must be defined in `./conf/sql-client-defaults.yaml` (for example a CSV table source, an example can be found in the test resources directory). The client supports two modes for viewing results. A `changelog` mode or `table` mode. They can be selected by setting the `execution.result-mode` property. The code is still work in progress. There are a couple of things that can be improved: - Add more logging instead of swallowing exceptions - Use Flink's ConfigOptions where applicable (e.g. for better default value handling and validation) - Maybe make the record result retrieval blocking? - Tests for the LocalExecutor - Some basic tests for other classes (maybe for CLI classes as well?) - More deployment options (support for YARN, FLIP-6?) - Documentation ## Brief change log - New module `flink-sql-client` - New executable script in `flink/bin` - Minor visibility changes in other modules ## Verifying this change Manually verified. Further tests will follow. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-8607 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5441.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5441 commit 17b6b6d8b8804382b7b855033c370ffb4fd673ac Author: twalthr Date: 2017-12-07T12:46:31Z [FLINK-8607] [table] Add a basic embedded SQL CLI client > Add a basic embedded SQL CLI client > --- > > Key: FLINK-8607 > URL: https://issues.apache.org/jira/browse/FLINK-8607 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > This issue describes the Implementation Plan 1 of FLIP-24. > Goal: Add the basic features to play around with Flink's streaming SQL. > {code} > - Add CLI component that reads the configuration files > - "Pre-registered table sources" > - "Job parameters" > - Add executor for retrieving pre-flight information and corresponding CLI > SQL parser > - SHOW TABLES > - DESCRIBE TABLE > - EXPLAIN > - Add streaming append query submission to executor > - Submit jars and run SELECT query using the ClusterClient > - Collect results on heap and serve them on the CLI side (Internal Mode with > SELECT) > - SOURCE (for executing a SQL statement stored in a local file) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8625) Move OutputFlusher thread to Netty scheduled executor
[ https://issues.apache.org/jira/browse/FLINK-8625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ken Krugler updated FLINK-8625: --- Summary: Move OutputFlusher thread to Netty scheduled executor (was: Move OutputFlasherThread to Netty scheduled executor) > Move OutputFlusher thread to Netty scheduled executor > - > > Key: FLINK-8625 > URL: https://issues.apache.org/jira/browse/FLINK-8625 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Piotr Nowojski >Priority: Major > > This will allow us to trigger/schedule next flush only if we are not > currently busy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358494#comment-16358494 ] Xingcan Cui commented on FLINK-8538: Thanks for the comments [~twalthr] and [~fhueske]. Actually, I've encountered the problem of parsing the NUMBER type before and I just added an extra field to indicate the numeric types as the JSON schema in my application was only used internally. I think the Option 2 Timo raised referred to a global config, i.e., we always return a specific type, which could be configured to be double or BigDecimal, for the JSON NUMBER type. I'll create a ticket for the JSON schema converter. Thanks! > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5442 [FLINK-7713][flip6] Implement JarUploadHandler ## What is the purpose of the change *Allow uploading jars through HTTP to enable job submissions from the web ui.* cc: @tillrohrmann ## Brief change log - *Allow uploading jars through HTTP.* - *Implement and register JarUploadHandler.* ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests for uploading files.* - *Started cluster locally and uploaded jars using* ``` curl -v -X POST -H "Expect:" -F "jarfile=@examples/streaming/Kafka010Example.jar" http://127.0.0.1:9065/jars/upload ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7713 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5442.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5442 commit caf74f63db6047a79393e53e9434eb1cf078ee48 Author: gyao Date: 2018-02-09T14:46:22Z [FLINK-7713][flip6] Implement JarUploadHandler ---
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358498#comment-16358498 ] ASF GitHub Bot commented on FLINK-7713: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5442 [FLINK-7713][flip6] Implement JarUploadHandler ## What is the purpose of the change *Allow uploading jars through HTTP to enable job submissions from the web ui.* cc: @tillrohrmann ## Brief change log - *Allow uploading jars through HTTP.* - *Implement and register JarUploadHandler.* ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests for uploading files.* - *Started cluster locally and uploaded jars using* ``` curl -v -X POST -H "Expect:" -F "jarfile=@examples/streaming/Kafka010Example.jar" http://127.0.0.1:9065/jars/upload ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7713 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5442.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5442 commit caf74f63db6047a79393e53e9434eb1cf078ee48 Author: gyao Date: 2018-02-09T14:46:22Z [FLINK-7713][flip6] Implement JarUploadHandler > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249285 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.FileUpload; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; + +/** + * Handles .jar file uploads. + */ +public class JarUploadHandler extends --- End diff -- Tests missing. ---
[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249371 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadResponseBody.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ResponseBody} for {@link JarUploadHandler}. + */ +public class JarUploadResponseBody implements ResponseBody { --- End diff -- Marshalling test missing. ---
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358500#comment-16358500 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249371 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadResponseBody.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ResponseBody} for {@link JarUploadHandler}. + */ +public class JarUploadResponseBody implements ResponseBody { --- End diff -- Marshalling test missing. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358499#comment-16358499 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249285 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.FileUpload; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; + +/** + * Handles .jar file uploads. + */ +public class JarUploadHandler extends --- End diff -- Tests missing. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( --- End diff -- Method name is not accurate. There will be more handlers. ---
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358503#comment-16358503 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( --- End diff -- Method name is not accurate. There will be more handlers. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249950 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( + GatewayRetriever leaderRetriever, + CompletableFuture restAddressFuture, + Time timeout, + java.nio.file.Path uploadDir, + Executor executor) { + + // 1. Check if flink-runtime-web is in the classpath + try { + final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; + Class.forName(classname).asSubclass(WebMonitor.class); + } catch (ClassNotFoundException e) { + // class not found means that there is no flink-runtime-web in the classpath + return Collections.emptyList(); + } + + try { + final String classname = "org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadHandler"; + final Class clazz = Class.forName(classname); + final Constructor constructor = clazz.getConstructor( + CompletableFuture.class, + GatewayRetriever.class, + Time.class, + Map.class, + MessageHeaders.class, + java.nio.file.Path.class, + Executor.class); + + final MessageHeaders jarUploadMessageHeaders = + (MessageHeaders) Class + .forName("org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadMessageHeaders") + .newInstance(); + + return Arrays.asList(Tuple2.of(jarUploadMessageHeaders, (ChannelInboundHandler) constructor.newInstance( + restAddressFuture, + leaderRetriever, + timeout, + Collections.emptyMap(), --- End diff -- should use headers defined by `restConfiguration.getResponseHeaders()` ---
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358504#comment-16358504 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249950 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( + GatewayRetriever leaderRetriever, + CompletableFuture restAddressFuture, + Time timeout, + java.nio.file.Path uploadDir, + Executor executor) { + + // 1. Check if flink-runtime-web is in the classpath + try { + final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; + Class.forName(classname).asSubclass(WebMonitor.class); + } catch (ClassNotFoundException e) { + // class not found means that there is no flink-runtime-web in the classpath + return Collections.emptyList(); + } + + try { + final String classname = "org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadHandler"; + final Class clazz = Class.forName(classname); + final Constructor constructor = clazz.getConstructor( + CompletableFuture.class, + GatewayRetriever.class, + Time.class, + Map.class, + MessageHeaders.class, + java.nio.file.Path.class, + Executor.class); + + final MessageHeaders jarUploadMessageHeaders = + (MessageHeaders) Class + .forName("org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadMessageHeaders") + .newInstance(); + + return Arrays.asList(Tuple2.of(jarUploadMessageHeaders, (ChannelInboundHandler) constructor.newInstance( + restAddressFuture, + leaderRetriever, + timeout, + Collections.emptyMap(), --- End diff -- should use headers defined by `restConfiguration.getResponseHeaders()` > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5442: [FLINK-7713][flip6] Implement JarUploadHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167250379 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; --- End diff -- The legacy handlers could be moved to a `legacy` package. ---
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358506#comment-16358506 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167250379 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; --- End diff -- The legacy handlers could be moved to a `legacy` package. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8626) Introduce BackPressureStatsTracker interface
Till Rohrmann created FLINK-8626: Summary: Introduce BackPressureStatsTracker interface Key: FLINK-8626 URL: https://issues.apache.org/jira/browse/FLINK-8626 Project: Flink Issue Type: Improvement Components: REST, Tests Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 In order to better test components like the {{JobMaster}} we should introduce a {{BackPressureStatsTracker}} interface and rename the current {{BackPressureStatsTracker}} class into {{BackPressureStatsTrackerImpl}}. This will simplify testing where we have to set up all these things. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5443: [FLINK-8626] Introduce BackPressureStatsTracker in...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5443 [FLINK-8626] Introduce BackPressureStatsTracker interface ## What is the purpose of the change Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and introduce a BackPressureStatsTracker interface. This will make testing easier when we don't have to set up all the different components. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink introduceBackPressureStatsTrackerInterface Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5443.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5443 commit 8e172aee75e9e0c81608ed7e6796bca1ea7d7544 Author: Till Rohrmann Date: 2018-02-09T13:07:31Z [FLINK-8626] Introduce BackPressureStatsTracker interface Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and introduce a BackPressureStatsTracker interface. This will make testing easier when we don't have to set up all the different components. ---
[jira] [Commented] (FLINK-8626) Introduce BackPressureStatsTracker interface
[ https://issues.apache.org/jira/browse/FLINK-8626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358509#comment-16358509 ] ASF GitHub Bot commented on FLINK-8626: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5443 [FLINK-8626] Introduce BackPressureStatsTracker interface ## What is the purpose of the change Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and introduce a BackPressureStatsTracker interface. This will make testing easier when we don't have to set up all the different components. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink introduceBackPressureStatsTrackerInterface Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5443.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5443 commit 8e172aee75e9e0c81608ed7e6796bca1ea7d7544 Author: Till Rohrmann Date: 2018-02-09T13:07:31Z [FLINK-8626] Introduce BackPressureStatsTracker interface Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and introduce a BackPressureStatsTracker interface. This will make testing easier when we don't have to set up all the different components. > Introduce BackPressureStatsTracker interface > > > Key: FLINK-8626 > URL: https://issues.apache.org/jira/browse/FLINK-8626 > Project: Flink > Issue Type: Improvement > Components: REST, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.5.0 > > > In order to better test components like the {{JobMaster}} we should introduce > a {{BackPressureStatsTracker}} interface and rename the current > {{BackPressureStatsTracker}} class into {{BackPressureStatsTrackerImpl}}. > This will simplify testing where we have to set up all these things. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5421: [FLINK-8573] [client] Add more information for printing J...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5421 The CI error does not relevant to this PR. ```BackPressureStatsTrackerITCase``` test failed. ---
[jira] [Commented] (FLINK-8573) Print JobID for failed jobs
[ https://issues.apache.org/jira/browse/FLINK-8573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358511#comment-16358511 ] ASF GitHub Bot commented on FLINK-8573: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5421 The CI error does not relevant to this PR. ```BackPressureStatsTrackerITCase``` test failed. > Print JobID for failed jobs > --- > > Key: FLINK-8573 > URL: https://issues.apache.org/jira/browse/FLINK-8573 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Major > > When a job is successfully run the client prints a something along the lines > of "Job with successfully run". If the job fails however we only > print the exception but not the JobID. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5444: [FLINK-8546] [flip6] Respect savepoints and restor...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5444 [FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints ## What is the purpose of the change Let the JobMaster respect checkpoints and savepoints. The JobMaster will always try to restore the latest checkpoint if there is one available. Next it will check whether savepoint restore settings have been set. If so, then it will try to restore the savepoint. Only if these settings are not set, the job will be started from scratch. This PR is based on #5443. ## Brief change log - Check in JobMaster if the `CheckpointCoordinator` has been set - If so, then check if there is a checkpoint to recover - If not, then check whether we can restore from a savepoint - If not, then start the job without any recovered state ## Verifying this change - Added `JobMasterTest#testRestoringFromSavepoint` and `JobMasterTest#testCheckpointPrecedesSavepointRecovery`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink respectSavepoints Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5444.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5444 commit 8e172aee75e9e0c81608ed7e6796bca1ea7d7544 Author: Till Rohrmann Date: 2018-02-09T13:07:31Z [FLINK-8626] Introduce BackPressureStatsTracker interface Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and introduce a BackPressureStatsTracker interface. This will make testing easier when we don't have to set up all the different components. commit 8380146fb2f85d4e8d9d41b84ba0ad435c242984 Author: Till Rohrmann Date: 2018-02-09T13:18:11Z [hotfix] [tests] Simplify JobMasterTest commit 09d36a47ed78d7fae0cae0229823114bbb6d45be Author: Till Rohrmann Date: 2018-02-01T15:14:53Z [FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints Let the JobMaster respect checkpoints and savepoints. The JobMaster will always try to restore the latest checkpoint if there is one available. Next it will check whether savepoint restore settings have been set. If so, then it will try to restore the savepoint. Only if these settings are not set, the job will be started from scratch. ---
[jira] [Commented] (FLINK-8546) Respect savepoint settings and recover from latest checkpoint in Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358518#comment-16358518 ] ASF GitHub Bot commented on FLINK-8546: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5444 [FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints ## What is the purpose of the change Let the JobMaster respect checkpoints and savepoints. The JobMaster will always try to restore the latest checkpoint if there is one available. Next it will check whether savepoint restore settings have been set. If so, then it will try to restore the savepoint. Only if these settings are not set, the job will be started from scratch. This PR is based on #5443. ## Brief change log - Check in JobMaster if the `CheckpointCoordinator` has been set - If so, then check if there is a checkpoint to recover - If not, then check whether we can restore from a savepoint - If not, then start the job without any recovered state ## Verifying this change - Added `JobMasterTest#testRestoringFromSavepoint` and `JobMasterTest#testCheckpointPrecedesSavepointRecovery`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink respectSavepoints Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5444.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5444 commit 8e172aee75e9e0c81608ed7e6796bca1ea7d7544 Author: Till Rohrmann Date: 2018-02-09T13:07:31Z [FLINK-8626] Introduce BackPressureStatsTracker interface Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and introduce a BackPressureStatsTracker interface. This will make testing easier when we don't have to set up all the different components. commit 8380146fb2f85d4e8d9d41b84ba0ad435c242984 Author: Till Rohrmann Date: 2018-02-09T13:18:11Z [hotfix] [tests] Simplify JobMasterTest commit 09d36a47ed78d7fae0cae0229823114bbb6d45be Author: Till Rohrmann Date: 2018-02-01T15:14:53Z [FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints Let the JobMaster respect checkpoints and savepoints. The JobMaster will always try to restore the latest checkpoint if there is one available. Next it will check whether savepoint restore settings have been set. If so, then it will try to restore the savepoint. Only if these settings are not set, the job will be started from scratch. > Respect savepoint settings and recover from latest checkpoint in Flip-6 > --- > > Key: FLINK-8546 > URL: https://issues.apache.org/jira/browse/FLINK-8546 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{JobMaster}} should respect savepoints and recover from the latest > checkpoint if possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5439: [FLINK-8571] [DataStream] [Backport] Introduce utility fu...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5439 LGTM once Travis is green. We should also remove the redundant mention of FLINK-8571 in `ReinterpretAsKeyedStreamITCase` while merging. ---
[jira] [Commented] (FLINK-8571) Provide an enhanced KeyedStream implementation to use ForwardPartitioner
[ https://issues.apache.org/jira/browse/FLINK-8571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358559#comment-16358559 ] ASF GitHub Bot commented on FLINK-8571: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5439 LGTM once Travis is green. We should also remove the redundant mention of FLINK-8571 in `ReinterpretAsKeyedStreamITCase` while merging. > Provide an enhanced KeyedStream implementation to use ForwardPartitioner > > > Key: FLINK-8571 > URL: https://issues.apache.org/jira/browse/FLINK-8571 > Project: Flink > Issue Type: Improvement >Reporter: Nagarjun Guraja >Assignee: Stefan Richter >Priority: Major > > This enhancement would help in modeling problems with pre partitioned input > sources(for e.g. Kafka with Keyed topics). This would help in making the job > graph embarrassingly parallel while leveraging rocksdb state backend and also > the fine grained recovery semantics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5425: [FLINK-8456] Add Scala API for Connected Streams with Bro...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5425 I think this looks good now. ---
[jira] [Commented] (FLINK-8456) Add Scala API for Connected Streams with Broadcast State.
[ https://issues.apache.org/jira/browse/FLINK-8456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358582#comment-16358582 ] ASF GitHub Bot commented on FLINK-8456: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5425 I think this looks good now. > Add Scala API for Connected Streams with Broadcast State. > - > > Key: FLINK-8456 > URL: https://issues.apache.org/jira/browse/FLINK-8456 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5443: [FLINK-8626] Introduce BackPressureStatsTracker in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5443#discussion_r167268048 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.backpressure; + +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; + +import java.util.Optional; + +/** + * {@link BackPressureStatsTracker} implementation which returns always no back pressure statistics. --- End diff -- switch "returns" and "always"? ---