[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314415#comment-16314415 ] Tzu-Li (Gordon) Tai commented on FLINK-8283: Merged. 1.5.0: 542419ba07b1c0b0ba68b636d14de8f1a00aaae1 1.4.1: 74135c9db11728f2189b6b4ccae90b1d4ccb84c1 > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > Fix For: 1.5.0, 1.4.1 > > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, >
[jira] [Resolved] (FLINK-8298) Shutdown MockEnvironment
[ https://issues.apache.org/jira/browse/FLINK-8298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-8298. Resolution: Fixed > Shutdown MockEnvironment > > > Key: FLINK-8298 > URL: https://issues.apache.org/jira/browse/FLINK-8298 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.5.0 > > > IOManager inside MockEnvironment is not being shutdown properly in tests > causing a memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-8283. Resolution: Fixed Fix Version/s: 1.4.1 1.5.0 > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > Fix For: 1.5.0, 1.4.1 > > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775, >
[jira] [Updated] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8268: --- Fix Version/s: 1.5.0 > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8298) Shutdown MockEnvironment
[ https://issues.apache.org/jira/browse/FLINK-8298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314414#comment-16314414 ] Tzu-Li (Gordon) Tai commented on FLINK-8298: Merged for 1.5.0: 091a37052b7045b3ed28c68bfea109024a5d1871 > Shutdown MockEnvironment > > > Key: FLINK-8298 > URL: https://issues.apache.org/jira/browse/FLINK-8298 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.5.0 > > > IOManager inside MockEnvironment is not being shutdown properly in tests > causing a memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314413#comment-16314413 ] Tzu-Li (Gordon) Tai commented on FLINK-8268: Merged for 1.5.0: e8d1aa57a8246de0b78e799a02c08f4007fb3a92 > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-8268. Resolution: Fixed > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8287) Flink Kafka Producer docs should clearly state what partitioner is used by default
[ https://issues.apache.org/jira/browse/FLINK-8287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314411#comment-16314411 ] Tzu-Li (Gordon) Tai commented on FLINK-8287: Merged. 1.5.0: 71974895da966478f2e24fd36c08d7cf386a7050 1.4.1: 9f68e790fc28197f89638cc83d1612f8f7a796a8 > Flink Kafka Producer docs should clearly state what partitioner is used by > default > -- > > Key: FLINK-8287 > URL: https://issues.apache.org/jira/browse/FLINK-8287 > Project: Flink > Issue Type: Improvement > Components: Documentation, Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.5.0, 1.4.1 > > > See original discussion in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaProducerXX-td16951.html > It is worth mentioning what partitioning scheme is used by the > {{FlinkKafkaProducer}} by default when writing to Kafka, as it seems user are > often surprised by the default {{FlinkFixedPartitioner}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8287) Flink Kafka Producer docs should clearly state what partitioner is used by default
[ https://issues.apache.org/jira/browse/FLINK-8287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-8287. Resolution: Fixed > Flink Kafka Producer docs should clearly state what partitioner is used by > default > -- > > Key: FLINK-8287 > URL: https://issues.apache.org/jira/browse/FLINK-8287 > Project: Flink > Issue Type: Improvement > Components: Documentation, Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.5.0, 1.4.1 > > > See original discussion in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaProducerXX-td16951.html > It is worth mentioning what partitioning scheme is used by the > {{FlinkKafkaProducer}} by default when writing to Kafka, as it seems user are > often surprised by the default {{FlinkFixedPartitioner}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8260) Document API of Kafka 0.11 Producer
[ https://issues.apache.org/jira/browse/FLINK-8260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314410#comment-16314410 ] Tzu-Li (Gordon) Tai commented on FLINK-8260: Merged. 1.5.0: 8d42197b662efaf58d92a3073d8c319f8a2a793e 1.4.1: 10f1acf92313cde7bf4ac8aa1403b19405d2ed25 > Document API of Kafka 0.11 Producer > --- > > Key: FLINK-8260 > URL: https://issues.apache.org/jira/browse/FLINK-8260 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The API of the Flink Kafka Producer changed for Kafka 0.11, for example there > is no {{writeToKafkaWithTimestamps}} method anymore. > This needs to be added to the [Kafka connector > documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer], > i.e., a new tab with a code snippet needs to be added for Kafka 0.11. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8116) Stale comments referring to Checkpointed interface
[ https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-8116. Resolution: Fixed > Stale comments referring to Checkpointed interface > -- > > Key: FLINK-8116 > URL: https://issues.apache.org/jira/browse/FLINK-8116 > Project: Flink > Issue Type: Bug > Components: DataStream API, Documentation >Reporter: Gabor Gevay >Priority: Trivial > Labels: starter > Fix For: 1.5.0 > > > Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by > the {{CheckpointedFunction}} interface. > However, in {{SourceFunction}} there are two comments still referring to the > old {{Checkpointed}} interface. (The code examples there also need to be > modified.) > Note that the problem also occurs in {{StreamExecutionEnvironment}}, and > possibly other places as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8260) Document API of Kafka 0.11 Producer
[ https://issues.apache.org/jira/browse/FLINK-8260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-8260. Resolution: Fixed > Document API of Kafka 0.11 Producer > --- > > Key: FLINK-8260 > URL: https://issues.apache.org/jira/browse/FLINK-8260 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The API of the Flink Kafka Producer changed for Kafka 0.11, for example there > is no {{writeToKafkaWithTimestamps}} method anymore. > This needs to be added to the [Kafka connector > documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer], > i.e., a new tab with a code snippet needs to be added for Kafka 0.11. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface
[ https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314408#comment-16314408 ] Tzu-Li (Gordon) Tai commented on FLINK-8116: Merged. 1.5.0: 7b5fdbd6501c33f74700d79aedc9411d160191ba 1.4.1: 1e637c54c2ad1b9a8d9ad6d3f9c8aa55605d7e8e > Stale comments referring to Checkpointed interface > -- > > Key: FLINK-8116 > URL: https://issues.apache.org/jira/browse/FLINK-8116 > Project: Flink > Issue Type: Bug > Components: DataStream API, Documentation >Reporter: Gabor Gevay >Priority: Trivial > Labels: starter > Fix For: 1.5.0 > > > Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by > the {{CheckpointedFunction}} interface. > However, in {{SourceFunction}} there are two comments still referring to the > old {{Checkpointed}} interface. (The code examples there also need to be > modified.) > Note that the problem also occurs in {{StreamExecutionEnvironment}}, and > possibly other places as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314404#comment-16314404 ] ASF GitHub Bot commented on FLINK-7797: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5140 Hi @fhueske, thanks for your concrete suggestions! IMO, the refactorings in `TimeBoundedStreamJoin` are quite reasonable, while the refactoring for `createNegativeWindowSizeJoin()` may not be so significant as the negative window size should be taken as an exception. Anyway, I've applied them for better efficiency. Thanks, Xingcan > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5140: [FLINK-7797] [table] Add support for windowed outer joins...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5140 Hi @fhueske, thanks for your concrete suggestions! IMO, the refactorings in `TimeBoundedStreamJoin` are quite reasonable, while the refactoring for `createNegativeWindowSizeJoin()` may not be so significant as the negative window size should be taken as an exception. Anyway, I've applied them for better efficiency. ð Thanks, Xingcan ---
[jira] [Created] (FLINK-8383) flink-mesos build failing: duplicate Jackson relocation in shaded jar
Tzu-Li (Gordon) Tai created FLINK-8383: -- Summary: flink-mesos build failing: duplicate Jackson relocation in shaded jar Key: FLINK-8383 URL: https://issues.apache.org/jira/browse/FLINK-8383 Project: Flink Issue Type: Bug Components: Build System, Mesos Reporter: Tzu-Li (Gordon) Tai Priority: Critical Example: https://travis-ci.org/apache/flink/jobs/325604587 The build for {{flink-mesos}} is failing with: {code} [ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:3.0.0:shade (shade-flink) on project flink-mesos_2.11: Error creating shaded jar: duplicate entry: META-INF/services/org.apache.flink.mesos.shaded.com.fasterxml.jackson.core.JsonFactory -> [Help 1] {code} Seems to be caused by https://github.com/apache/flink/commit/9ae4c5447a2f5aae2b65d5860f822d452a9d5af1. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8260) Document API of Kafka 0.11 Producer
[ https://issues.apache.org/jira/browse/FLINK-8260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314387#comment-16314387 ] ASF GitHub Bot commented on FLINK-8260: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5179 > Document API of Kafka 0.11 Producer > --- > > Key: FLINK-8260 > URL: https://issues.apache.org/jira/browse/FLINK-8260 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The API of the Flink Kafka Producer changed for Kafka 0.11, for example there > is no {{writeToKafkaWithTimestamps}} method anymore. > This needs to be added to the [Kafka connector > documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer], > i.e., a new tab with a code snippet needs to be added for Kafka 0.11. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314389#comment-16314389 ] ASF GitHub Bot commented on FLINK-8283: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5201 > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775, >
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314388#comment-16314388 ] ASF GitHub Bot commented on FLINK-8268: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5193 > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface
[ https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314386#comment-16314386 ] ASF GitHub Bot commented on FLINK-8116: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5121 > Stale comments referring to Checkpointed interface > -- > > Key: FLINK-8116 > URL: https://issues.apache.org/jira/browse/FLINK-8116 > Project: Flink > Issue Type: Bug > Components: DataStream API, Documentation >Reporter: Gabor Gevay >Priority: Trivial > Labels: starter > Fix For: 1.5.0 > > > Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by > the {{CheckpointedFunction}} interface. > However, in {{SourceFunction}} there are two comments still referring to the > old {{Checkpointed}} interface. (The code examples there also need to be > modified.) > Note that the problem also occurs in {{StreamExecutionEnvironment}}, and > possibly other places as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5179: [FLINK-8260/8287] [kafka] Bunch of improvements to...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5179 ---
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5193 ---
[GitHub] flink pull request #5201: [FLINK-8283] [kafka] Stabalize FlinkKafkaConsumerB...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5201 ---
[GitHub] flink pull request #5121: [FLINK-8116] Stale comments referring to Checkpoin...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5121 ---
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314362#comment-16314362 ] ASF GitHub Bot commented on FLINK-8268: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5193 Merging to `master` .. > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5193: [FLINK-8268][tests] Improve tests stability
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5193 Merging to `master` .. ---
[jira] [Commented] (FLINK-8260) Document API of Kafka 0.11 Producer
[ https://issues.apache.org/jira/browse/FLINK-8260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314357#comment-16314357 ] ASF GitHub Bot commented on FLINK-8260: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5179 Thanks a lot for the review. Merging this to `master` and `release-1.4` .. > Document API of Kafka 0.11 Producer > --- > > Key: FLINK-8260 > URL: https://issues.apache.org/jira/browse/FLINK-8260 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The API of the Flink Kafka Producer changed for Kafka 0.11, for example there > is no {{writeToKafkaWithTimestamps}} method anymore. > This needs to be added to the [Kafka connector > documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer], > i.e., a new tab with a code snippet needs to be added for Kafka 0.11. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5179: [FLINK-8260/8287] [kafka] Bunch of improvements to Kafka ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5179 Thanks a lot for the review. Merging this to `master` and `release-1.4` .. ---
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314356#comment-16314356 ] ASF GitHub Bot commented on FLINK-8283: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5201 Thanks for the review @pnowojski. I'm merging the last commit of this PR to `master` and `release-1.4` .. > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, > KafkaTopicPartition{topic='test-topic',
[GitHub] flink issue #5201: [FLINK-8283] [kafka] Stabalize FlinkKafkaConsumerBaseTest...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5201 Thanks for the review @pnowojski. I'm merging the last commit of this PR to `master` and `release-1.4` .. ---
[jira] [Created] (FLINK-8382) sheduleRunAsync with a positive schedule delay does not work in JobMaster
Zhu Zhu created FLINK-8382: -- Summary: sheduleRunAsync with a positive schedule delay does not work in JobMaster Key: FLINK-8382 URL: https://issues.apache.org/jira/browse/FLINK-8382 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Zhu Zhu Here's the process of invoking scheduleRunAsync in JM: 1. The method scheduleRunAsync in JobMaster will forward a RunAsync message to FencedAkkaRpcActor. 2. FencedAkkaRpcActor handles it with method handleRunAsync from its parent class AkkaRpcActor: If the schedule delay is positive, handleRunAsync will schedule a raw RunAsync message without a FencedMessage/UnfencedMessage wrapped. 3. Later when FencedAkkaRpcActor receives the scheduled raw RunAsync message, it will discard it for not recognizing the message type. Thus the scheduleRunAsync does work. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables
[ https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314122#comment-16314122 ] Elias Levy commented on FLINK-7935: --- [~Zentol] thoughts? > Metrics with user supplied scope variables > -- > > Key: FLINK-7935 > URL: https://issues.apache.org/jira/browse/FLINK-7935 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.2 >Reporter: Elias Levy > > We use DataDog for metrics. DD and Flink differ somewhat in how they track > metrics. > Flink names and scopes metrics together, at least by default. E.g. by default > the System scope for operator metrics is > {{.taskmanager}}. > The scope variables become part of the metric's full name. > In DD the metric would be named something generic, e.g. > {{taskmanager.job.operator}}, and they would be distinguished by their tag > values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. > Flink allows you to configure the format string for system scopes, so it is > possible to set the operator scope format to {{taskmanager.job.operator}}. > We do this for all scopes: > {code} > metrics.scope.jm: jobmanager > metrics.scope.jm.job: jobmanager.job > metrics.scope.tm: taskmanager > metrics.scope.tm.job: taskmanager.job > metrics.scope.task: taskmanager.job.task > metrics.scope.operator: taskmanager.job.operator > {code} > This seems to work. The DataDog Flink metric's plugin submits all scope > variables as tags, even if they are not used within the scope format. And it > appears internally this does not lead to metrics conflicting with each other. > We would like to extend this to user defined metrics, but you can define > variables/scopes when adding a metric group or metric with the user API, so > that in DD we have a single metric with a tag with many different values, > rather than hundreds of metrics to just the one value we want to measure > across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8329) Move YarnClient out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-8329: - Assignee: Shuyi Chen (was: Till Rohrmann) > Move YarnClient out of YarnClusterClient > > > Key: FLINK-8329 > URL: https://issues.apache.org/jira/browse/FLINK-8329 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Shuyi Chen > Labels: flip-6 > Fix For: 1.5.0 > > > Move the {{YarnClient}} from the {{YarnClusterClient}} to the > {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle > management of the {{YarnClient}}. This change is a clean up task which will > better structure the client code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5208: [FLINK-8265] [Mesos] Missing jackson dependency for flink...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5208 Merged, and thanks for fixing this! ð Could you please close the PR? ---
[jira] [Closed] (FLINK-8265) Missing jackson dependency for flink-mesos
[ https://issues.apache.org/jira/browse/FLINK-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-8265. --- Resolution: Fixed Fixed on master in 9ae4c5447a2f5aae2b65d5860f822d452a9d5af1 > Missing jackson dependency for flink-mesos > -- > > Key: FLINK-8265 > URL: https://issues.apache.org/jira/browse/FLINK-8265 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The Jackson library that is required by Fenzo is missing from the Flink > distribution jar-file. > This manifests as an exception in certain circumstances when a hard > constraint is configured ("mesos.constraints.hard.hostattribute"). > {code} > NoClassDefFoundError: > org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper > at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35) > at > com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784) > at > com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581) > at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796) > at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8265) Missing jackson dependency for flink-mesos
[ https://issues.apache.org/jira/browse/FLINK-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313852#comment-16313852 ] ASF GitHub Bot commented on FLINK-8265: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5208 Merged, and thanks for fixing this! Could you please close the PR? > Missing jackson dependency for flink-mesos > -- > > Key: FLINK-8265 > URL: https://issues.apache.org/jira/browse/FLINK-8265 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The Jackson library that is required by Fenzo is missing from the Flink > distribution jar-file. > This manifests as an exception in certain circumstances when a hard > constraint is configured ("mesos.constraints.hard.hostattribute"). > {code} > NoClassDefFoundError: > org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper > at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35) > at > com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784) > at > com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581) > at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796) > at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8356) JDBCAppendTableSink does not work for Hbase Phoenix Driver
[ https://issues.apache.org/jira/browse/FLINK-8356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paul Wu closed FLINK-8356. -- Resolution: Not A Bug Release Note: A person from my workplace found that this can be simply fixed by appending ";autocommit=true" in the jdbc url. > JDBCAppendTableSink does not work for Hbase Phoenix Driver > --- > > Key: FLINK-8356 > URL: https://issues.apache.org/jira/browse/FLINK-8356 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Paul Wu > > The following code runs without errors, but the data is not inserted into the > HBase table. However, it does work for MySQL (see the commented out code). > The Phoenix driver is from > https://mvnrepository.com/artifact/org.apache.phoenix/phoenix/4.7.0-HBase-1.1 > String query = "select CURRENT_DATE SEGMENTSTARTTIME, CURRENT_DATE > SEGMENTENDTIME, cast (imsi as varchar) imsi, cast(imei as varchar) imei from > ts "; > > Table table = ste.sqlQuery(query); > JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder(); > jdbc.setDrivername("org.apache.phoenix.jdbc.PhoenixDriver"); > jdbc.setDBUrl("jdbc:phoenix:hosts:2181:/hbase-unsecure"); > jdbc.setQuery("upsert INTO GEO_ANALYTICS_STREAMING_DATA > (SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)"); > // JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder(); > //jdbc.setDrivername("com.mysql.jdbc.Driver"); > //jdbc.setDBUrl("jdbc:mysql://localhost/test"); > //jdbc.setUsername("root").setPassword(""); > //jdbc.setQuery("insert INTO GEO_ANALYTICS_STREAMING_DATA > (SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)"); > //jdbc.setBatchSize(1); > jdbc.setParameterTypes(Types.SQL_DATE, Types.SQL_DATE, Types.STRING, > Types.STRING); > JDBCAppendTableSink sink = jdbc.build(); > table.writeToSink(sink); -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313670#comment-16313670 ] ASF GitHub Bot commented on FLINK-5823: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5248 [FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata ## What is the purpose of the change This pull requests puts the State Backends in charge of persisting the metadata. This has the following advantages: - Checkpoints become conceptually independent of File Systems. We can in the future implement state backends purely backed by non-file systems like databases or message queues. - We can simplify or drop the extra code paths implemented for externalized checkpoints and checkpoint metadata persisting in HA cases. - The checkpoint and savepoint configurations go purely through the state backends, making testing much simpler. - Because the configuration go through the state backends only, it is simple to let the state backends pick up a mix of configuration from the application code (in code config) and configuration from the cluster setup. For example, a programmer can pick the state backend, and the cluster can have default limits or checkpoint directories configured. To support that, state backends may implement an additional interface which lets them pick up configuration values from the cluster configuration. - As a followup, this will allow us to implement more efficient ways of dropping checkpoint state (recursive directory delete) as well as logic to scavenge left-over checkpoint data. ## Altered user-facing Behavior - All checkpoints are always "externalized", meaning that the metadata is always persisted. The notion of externalized checkpoints is dropped. - Checkpoints have no "externalization setting", but a **retention policy**, like - `RETAIN_ON_CANCELLATION`: Keep checkpoints when user manually cancels job, similar as the corresponding setting for externalized checkpoints - `RETAIN_ON_FAILURE`: Retain when the job reaches a terminal failure. For compatibility, this is automatically picked when the user calls the now deprecated method to activate externalized checkpoints. - `NEVER_RETAIN_AFTER_TERMINATION`: Conceptually similar to the behavior when no externalized checkpoints were configured. - The `MemoryStateBackend` is viewed as a FileSystem-based State Backend that does not create separate files for state, but just holds state inline with the checkpoint metadata. In the Metadata and Savepoint handling, there is no distinction between the `MemoryStateBackend` and the `FsStateBackend`. - As a special case, the MemoryStateBackend may choose to not durably persist the metadata (when no storage location is configured, by default), in which case it will not be able to support an HA mode (there is an eager check for that). That is merely there to support no-config getting started experiences and simpler in-IDE development setups. ## Followup work To make sure there is no extra persisting of the checkpoint metadata by the HA store (it simply references the regular persisted checkpoint metadata) we need some changes to the `ZooKeeperCompletedCheckpointStore`. Once we start storing shared checkpoint state (incremental checkpoints) and task-owned state (write-ahead sinks) in different locations, we can start optimizing checkpoint directory cleanup, and can start implementing scavengers for left-over state. ## Brief change log - The state backends introduce the concept of a `CheckpointStorage` (storage of bytes) and `CheckpointStorageLocation` (specific location for the bytes of a checkpoint/savepoint). That makes the separation of concerns in the state backend clear: `KeyedStateBackend` and `OperatorStatebackend` define how to hold and checkpoint the state, while `CheckpointStorage` defines how to persist bytes (data and metadata). - The `CheckpointStorage` is responsible for storing the checkpoint metadata. There is no implicit assumption that the checkpoint metadata is stored in a file systems any more. - All checkpoint directory / savepoint directory specific config settings are now part of the state backends. The Checkpoint Coordinator simply calls the relevant methods on the state backends to store metadata. - All checkpoints are addressable via a "pointer", which is interpreted by the state backend to find the checkpoint. For File-system based statebackends (all statebackends in Flink currently), this pointer is the file path. ## Verifying this change This change adds and adjusts many existing tests to verify the behavior.
[GitHub] flink pull request #5248: [FLINK-5823] [checkpoints] State backends now also...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5248 [FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata ## What is the purpose of the change This pull requests puts the State Backends in charge of persisting the metadata. This has the following advantages: - Checkpoints become conceptually independent of File Systems. We can in the future implement state backends purely backed by non-file systems like databases or message queues. - We can simplify or drop the extra code paths implemented for externalized checkpoints and checkpoint metadata persisting in HA cases. - The checkpoint and savepoint configurations go purely through the state backends, making testing much simpler. - Because the configuration go through the state backends only, it is simple to let the state backends pick up a mix of configuration from the application code (in code config) and configuration from the cluster setup. For example, a programmer can pick the state backend, and the cluster can have default limits or checkpoint directories configured. To support that, state backends may implement an additional interface which lets them pick up configuration values from the cluster configuration. - As a followup, this will allow us to implement more efficient ways of dropping checkpoint state (recursive directory delete) as well as logic to scavenge left-over checkpoint data. ## Altered user-facing Behavior - All checkpoints are always "externalized", meaning that the metadata is always persisted. The notion of externalized checkpoints is dropped. - Checkpoints have no "externalization setting", but a **retention policy**, like - `RETAIN_ON_CANCELLATION`: Keep checkpoints when user manually cancels job, similar as the corresponding setting for externalized checkpoints - `RETAIN_ON_FAILURE`: Retain when the job reaches a terminal failure. For compatibility, this is automatically picked when the user calls the now deprecated method to activate externalized checkpoints. - `NEVER_RETAIN_AFTER_TERMINATION`: Conceptually similar to the behavior when no externalized checkpoints were configured. - The `MemoryStateBackend` is viewed as a FileSystem-based State Backend that does not create separate files for state, but just holds state inline with the checkpoint metadata. In the Metadata and Savepoint handling, there is no distinction between the `MemoryStateBackend` and the `FsStateBackend`. - As a special case, the MemoryStateBackend may choose to not durably persist the metadata (when no storage location is configured, by default), in which case it will not be able to support an HA mode (there is an eager check for that). That is merely there to support no-config getting started experiences and simpler in-IDE development setups. ## Followup work To make sure there is no extra persisting of the checkpoint metadata by the HA store (it simply references the regular persisted checkpoint metadata) we need some changes to the `ZooKeeperCompletedCheckpointStore`. Once we start storing shared checkpoint state (incremental checkpoints) and task-owned state (write-ahead sinks) in different locations, we can start optimizing checkpoint directory cleanup, and can start implementing scavengers for left-over state. ## Brief change log - The state backends introduce the concept of a `CheckpointStorage` (storage of bytes) and `CheckpointStorageLocation` (specific location for the bytes of a checkpoint/savepoint). That makes the separation of concerns in the state backend clear: `KeyedStateBackend` and `OperatorStatebackend` define how to hold and checkpoint the state, while `CheckpointStorage` defines how to persist bytes (data and metadata). - The `CheckpointStorage` is responsible for storing the checkpoint metadata. There is no implicit assumption that the checkpoint metadata is stored in a file systems any more. - All checkpoint directory / savepoint directory specific config settings are now part of the state backends. The Checkpoint Coordinator simply calls the relevant methods on the state backends to store metadata. - All checkpoints are addressable via a "pointer", which is interpreted by the state backend to find the checkpoint. For File-system based statebackends (all statebackends in Flink currently), this pointer is the file path. ## Verifying this change This change adds and adjusts many existing tests to verify the behavior. Manual verification can happen by just starting a regular Flink cluster, enabling checkpoints, and seeing that metadata files get persisted always. ## Does this pull request potentially affect one of the following parts: -
[jira] [Comment Edited] (FLINK-8380) Dynamic BucketingSink paths based on ingested Kafka topics
[ https://issues.apache.org/jira/browse/FLINK-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313504#comment-16313504 ] Kyle Hamlin edited comment on FLINK-8380 at 1/5/18 6:53 PM: I agree that is the simplest solution. I thought it might be possible at the very least to get topic through to the getBucketPath method. Topic is available when extending KeyedDeserializationSchema in the deserialize method so I thought it might be possible to also get it through to the bucketer. Another thought would be to get the message key though to the getBucketPath method and the topic name could be set as a message key. was (Author: hamlinkn): I agree that is the simplest solution. I thought it might be possible at the very least to get topic through to the getBucketPath method. Topic is available when extending KeyedDeserializationSchema in the deserialize method so I thought it might be possible to also get it through to the bucketer. > Dynamic BucketingSink paths based on ingested Kafka topics > -- > > Key: FLINK-8380 > URL: https://issues.apache.org/jira/browse/FLINK-8380 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kyle Hamlin > > Flink 1.4 released a feature that allows Kafka consumers to dynamically > ingest topics based on a regex pattern. If a user wanted to use Flink as a > simple (no transformations) but dynamic (auto topic discovery & auto output > path generation) data persister they would currently only have half the tools > to do so. I believe it would be a beneficial feature to allow users to not > only define automatic topic discovery but also a way to dynamically > incorporate those topics into a BucketingSink output path. For example: > If I had three Kafka topics > {code:java} > select-topic-1 > ignore-topic-1 > select-topic-2 > {code} > And my Kafka consumers regex only selected two topics > {code:java} > val consumer = new > FlinkKafkaConsumer010[GenericRecord](Pattern.compile("select-.*?"), new > MyDeserializer(), props) > {code} > Then the selected topics would appended to the beginning of the BucketingSink > output path and any Bucketers partitions would follow > {code:java} > val sink = new BucketingSink[GenericRecord]("s3://my-bucket/") > sink.setBucketer(new DateTimeBucketer[GenericRecord]("MMdd")) > {code} > The resulting output paths would be > {code:java} > s3://my-bucket/selected-topic1/MMdd/ > s3://my-bucket/selected-topic2/MMdd/ > {code} > As new topics are discovered via the regex pattern (while the app is running) > the set of BucketingSink output paths would grow. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8374) Unstable Yarn tests due to Akka Shutdown Exception Logging
[ https://issues.apache.org/jira/browse/FLINK-8374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8374. --- > Unstable Yarn tests due to Akka Shutdown Exception Logging > -- > > Key: FLINK-8374 > URL: https://issues.apache.org/jira/browse/FLINK-8374 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.5.0 > > > Akka may log the following in some cases during shutdown: > {{java.util.concurrent.RejectedExecutionException: Worker has already been > shutdown}} > The Yarn tests search the logs for unexpected exceptions and fail when > encountering that exception. We should whitelist it, as it is not a problem, > merely an Akka shutdown artifact. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313653#comment-16313653 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r159950881 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -70,35 +71,45 @@ private Date initTimestamp; + private long millisBehindLatest; + /** * Creates a shard consumer. * * @param fetcherRef reference to the owning fetcher * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to * @param subscribedShard the shard this consumer is subscribed to * @param lastSequenceNum the sequence number in the shard to start consuming +* @param kinesisMetricGroup the metric group to report to */ public ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, - SequenceNumber lastSequenceNum) { + SequenceNumber lastSequenceNum, + MetricGroup kinesisMetricGroup) { this(fetcherRef, subscribedShardStateIndex, subscribedShard, lastSequenceNum, - KinesisProxy.create(fetcherRef.getConsumerConfiguration())); + KinesisProxy.create(fetcherRef.getConsumerConfiguration()), + kinesisMetricGroup); } /** This constructor is exposed for testing purposes. */ protected ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, - KinesisProxyInterface kinesis) { + KinesisProxyInterface kinesis, + MetricGroup kinesisMetricGroup) { this.fetcherRef = checkNotNull(fetcherRef); this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); this.lastSequenceNum = checkNotNull(lastSequenceNum); + + checkNotNull(kinesisMetricGroup) + .gauge("millisBehindLatest", () -> millisBehindLatest); --- End diff -- Bump > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8374) Unstable Yarn tests due to Akka Shutdown Exception Logging
[ https://issues.apache.org/jira/browse/FLINK-8374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8374. - Resolution: Fixed Fixed via c7c72704ab8827245d08850edf3d9a448d18097f > Unstable Yarn tests due to Akka Shutdown Exception Logging > -- > > Key: FLINK-8374 > URL: https://issues.apache.org/jira/browse/FLINK-8374 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.5.0 > > > Akka may log the following in some cases during shutdown: > {{java.util.concurrent.RejectedExecutionException: Worker has already been > shutdown}} > The Yarn tests search the logs for unexpected exceptions and fail when > encountering that exception. We should whitelist it, as it is not a problem, > merely an Akka shutdown artifact. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8346) add S3 signature v4 workaround to docs
[ https://issues.apache.org/jira/browse/FLINK-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8346. --- > add S3 signature v4 workaround to docs > -- > > Key: FLINK-8346 > URL: https://issues.apache.org/jira/browse/FLINK-8346 > Project: Flink > Issue Type: Improvement > Components: Documentation, FileSystem >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.5.0, 1.4.1 > > > As per > https://lists.apache.org/thread.html/dd59f94d76ae809f83dc36958006974d0a13dc0798856d1d64bb7293@%3Cuser.flink.apache.org%3E, > we should add a hint to enable signature v4 for older Hadoop versions to > work with S3 (for the non-shaded S3 file systems and regions only accepting > v4, e.g. {{eu-central-1}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8359) Update copyright date in NOTICE
[ https://issues.apache.org/jira/browse/FLINK-8359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8359. - Resolution: Fixed Fixed via 1eaef6abf4839194a12b19a038a1ec480a037783 > Update copyright date in NOTICE > --- > > Key: FLINK-8359 > URL: https://issues.apache.org/jira/browse/FLINK-8359 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.5.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > NOTICE file has copyright year as 2014-2017. This needs to be updated as > 2014-2018. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r159950881 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -70,35 +71,45 @@ private Date initTimestamp; + private long millisBehindLatest; + /** * Creates a shard consumer. * * @param fetcherRef reference to the owning fetcher * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to * @param subscribedShard the shard this consumer is subscribed to * @param lastSequenceNum the sequence number in the shard to start consuming +* @param kinesisMetricGroup the metric group to report to */ public ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, - SequenceNumber lastSequenceNum) { + SequenceNumber lastSequenceNum, + MetricGroup kinesisMetricGroup) { this(fetcherRef, subscribedShardStateIndex, subscribedShard, lastSequenceNum, - KinesisProxy.create(fetcherRef.getConsumerConfiguration())); + KinesisProxy.create(fetcherRef.getConsumerConfiguration()), + kinesisMetricGroup); } /** This constructor is exposed for testing purposes. */ protected ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, - KinesisProxyInterface kinesis) { + KinesisProxyInterface kinesis, + MetricGroup kinesisMetricGroup) { this.fetcherRef = checkNotNull(fetcherRef); this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); this.lastSequenceNum = checkNotNull(lastSequenceNum); + + checkNotNull(kinesisMetricGroup) + .gauge("millisBehindLatest", () -> millisBehindLatest); --- End diff -- Bump ð ---
[jira] [Closed] (FLINK-8359) Update copyright date in NOTICE
[ https://issues.apache.org/jira/browse/FLINK-8359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8359. --- > Update copyright date in NOTICE > --- > > Key: FLINK-8359 > URL: https://issues.apache.org/jira/browse/FLINK-8359 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.5.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > NOTICE file has copyright year as 2014-2017. This needs to be updated as > 2014-2018. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8346) add S3 signature v4 workaround to docs
[ https://issues.apache.org/jira/browse/FLINK-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8346. - Resolution: Fixed Fix Version/s: 1.4.1 Fixed in - 1.4.1 via 4b2178677e9bae00de2ac201ac3642c803c29064 - 1.5.0 via fd13ed09d4dfeea04be3acb7856fe97ac4ae6c32 > add S3 signature v4 workaround to docs > -- > > Key: FLINK-8346 > URL: https://issues.apache.org/jira/browse/FLINK-8346 > Project: Flink > Issue Type: Improvement > Components: Documentation, FileSystem >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.5.0, 1.4.1 > > > As per > https://lists.apache.org/thread.html/dd59f94d76ae809f83dc36958006974d0a13dc0798856d1d64bb7293@%3Cuser.flink.apache.org%3E, > we should add a hint to enable signature v4 for older Hadoop versions to > work with S3 (for the non-shaded S3 file systems and regions only accepting > v4, e.g. {{eu-central-1}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-8346) add S3 signature v4 workaround to docs
[ https://issues.apache.org/jira/browse/FLINK-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen reopened FLINK-8346: - Reopen to fix closing message. > add S3 signature v4 workaround to docs > -- > > Key: FLINK-8346 > URL: https://issues.apache.org/jira/browse/FLINK-8346 > Project: Flink > Issue Type: Improvement > Components: Documentation, FileSystem >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.5.0, 1.4.1 > > > As per > https://lists.apache.org/thread.html/dd59f94d76ae809f83dc36958006974d0a13dc0798856d1d64bb7293@%3Cuser.flink.apache.org%3E, > we should add a hint to enable signature v4 for older Hadoop versions to > work with S3 (for the non-shaded S3 file systems and regions only accepting > v4, e.g. {{eu-central-1}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8346) add S3 signature v4 workaround to docs
[ https://issues.apache.org/jira/browse/FLINK-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313628#comment-16313628 ] ASF GitHub Bot commented on FLINK-8346: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5231 > add S3 signature v4 workaround to docs > -- > > Key: FLINK-8346 > URL: https://issues.apache.org/jira/browse/FLINK-8346 > Project: Flink > Issue Type: Improvement > Components: Documentation, FileSystem >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.5.0 > > > As per > https://lists.apache.org/thread.html/dd59f94d76ae809f83dc36958006974d0a13dc0798856d1d64bb7293@%3Cuser.flink.apache.org%3E, > we should add a hint to enable signature v4 for older Hadoop versions to > work with S3 (for the non-shaded S3 file systems and regions only accepting > v4, e.g. {{eu-central-1}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8346) add S3 signature v4 workaround to docs
[ https://issues.apache.org/jira/browse/FLINK-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8346. --- > add S3 signature v4 workaround to docs > -- > > Key: FLINK-8346 > URL: https://issues.apache.org/jira/browse/FLINK-8346 > Project: Flink > Issue Type: Improvement > Components: Documentation, FileSystem >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.5.0 > > > As per > https://lists.apache.org/thread.html/dd59f94d76ae809f83dc36958006974d0a13dc0798856d1d64bb7293@%3Cuser.flink.apache.org%3E, > we should add a hint to enable signature v4 for older Hadoop versions to > work with S3 (for the non-shaded S3 file systems and regions only accepting > v4, e.g. {{eu-central-1}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8359) Update copyright date in NOTICE
[ https://issues.apache.org/jira/browse/FLINK-8359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313627#comment-16313627 ] ASF GitHub Bot commented on FLINK-8359: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5238 > Update copyright date in NOTICE > --- > > Key: FLINK-8359 > URL: https://issues.apache.org/jira/browse/FLINK-8359 > Project: Flink > Issue Type: Task > Components: Build System >Affects Versions: 1.5.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > NOTICE file has copyright year as 2014-2017. This needs to be updated as > 2014-2018. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8346) add S3 signature v4 workaround to docs
[ https://issues.apache.org/jira/browse/FLINK-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8346. - Resolution: Fixed Fix Version/s: 1.5.0 Fixed via fd13ed09d4dfeea04be3acb7856fe97ac4ae6c32 > add S3 signature v4 workaround to docs > -- > > Key: FLINK-8346 > URL: https://issues.apache.org/jira/browse/FLINK-8346 > Project: Flink > Issue Type: Improvement > Components: Documentation, FileSystem >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.5.0 > > > As per > https://lists.apache.org/thread.html/dd59f94d76ae809f83dc36958006974d0a13dc0798856d1d64bb7293@%3Cuser.flink.apache.org%3E, > we should add a hint to enable signature v4 for older Hadoop versions to > work with S3 (for the non-shaded S3 file systems and regions only accepting > v4, e.g. {{eu-central-1}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8373) Inconsistencies in some FileSystem directory functions
[ https://issues.apache.org/jira/browse/FLINK-8373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8373. --- > Inconsistencies in some FileSystem directory functions > -- > > Key: FLINK-8373 > URL: https://issues.apache.org/jira/browse/FLINK-8373 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Minor > Fix For: 1.5.0 > > > There are some minor differences in the behaviors of some File System > functions, like {{mkdirs()}}. On some filesystems, it tolerates existing > directories or files in place of parent directories. Some return false in an > error case, some throw an exception. > I encountered this during writing tests for the file basted state backends. > We should harmonize the behavior of {{FileSystem.mkdirs()}}. > I suggest to adopt the behavior that is used by HDFS, which seems the most > correct one. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5238: [FLINK-8359][docs] Update copyright date in NOTICE
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5238 ---
[GitHub] flink pull request #5231: [FLINK-8346][docs] add v4 signature workaround for...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5231 ---
[jira] [Resolved] (FLINK-8373) Inconsistencies in some FileSystem directory functions
[ https://issues.apache.org/jira/browse/FLINK-8373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8373. - Resolution: Fixed Fixed via 3d0ed12edab5e1b89db0829230e69fb6ef841b7e > Inconsistencies in some FileSystem directory functions > -- > > Key: FLINK-8373 > URL: https://issues.apache.org/jira/browse/FLINK-8373 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Minor > Fix For: 1.5.0 > > > There are some minor differences in the behaviors of some File System > functions, like {{mkdirs()}}. On some filesystems, it tolerates existing > directories or files in place of parent directories. Some return false in an > error case, some throw an exception. > I encountered this during writing tests for the file basted state backends. > We should harmonize the behavior of {{FileSystem.mkdirs()}}. > I suggest to adopt the behavior that is used by HDFS, which seems the most > correct one. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5178: [hotfix] Fix typo in TestableKinesisDataFetcher
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5178 ---
[GitHub] flink pull request #5204: [hotfix] Fix typo in ReplicatingInputFormat/DualIn...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5204 ---
[GitHub] flink pull request #5237: [hotfix][doc] fix typo in filesystems.md
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5237 ---
[GitHub] flink pull request #5242: [hotfix] Fix typos
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5242 ---
[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala
[ https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313622#comment-16313622 ] ASF GitHub Bot commented on FLINK-8175: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5112#discussion_r159949798 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java --- @@ -28,8 +29,12 @@ import java.net.Socket; /** - * A specialized data sink to be used by DataStreamUtils.collect. + * A specialized data sink to be used by DataStreamUtils.collect(). + * + * This experimental class is relocated from flink-streaming-contrib. Please see package-info.java + * for more information. */ +@PublicEvolving --- End diff -- make sense. Changed > remove flink-streaming-contrib and migrate its classes to > flink-streaming-java/scala > > > Key: FLINK-8175 > URL: https://issues.apache.org/jira/browse/FLINK-8175 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > > I propose removing flink-streaming-contrib from flink-contrib, and migrating > its classes to flink-streaming-java/scala for the following reasons: > - flink-streaming-contrib is so small that it only has 4 classes (3 java and > 1 scala), and they don't need a dedicated jar for Flink to distribute and > maintain it and for users to deal with the overhead of dependency management > - the 4 classes in flink-streaming-contrib are logically more tied to > flink-streaming-java/scala, and thus can be easily migrated > - flink-contrib is already too crowded and noisy. It contains lots of sub > modules with different purposes which confuse developers and users, and they > lack a proper project hierarchy > To take a step even forward, I would argue that even flink-contrib should be > removed and all its submodules should be migrated to other top-level modules > for the following reasons: 1) Apache Flink the whole project itself is a > result of contributions from many developers, there's no reason to highlight > some contributions in a dedicated module named 'contrib' 2) flink-contrib > inherently doesn't have a good hierarchy to hold submodules -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5112: [FLINK-8175] [DataStream API java/scala] remove fl...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5112#discussion_r159949798 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java --- @@ -28,8 +29,12 @@ import java.net.Socket; /** - * A specialized data sink to be used by DataStreamUtils.collect. + * A specialized data sink to be used by DataStreamUtils.collect(). + * + * This experimental class is relocated from flink-streaming-contrib. Please see package-info.java + * for more information. */ +@PublicEvolving --- End diff -- make sense. Changed ---
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313571#comment-16313571 ] sunjincheng commented on FLINK-8331: Fixed in 1.4 1c7a7fa706e9b373bbe395398507378a61759190 > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala
[ https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313567#comment-16313567 ] ASF GitHub Bot commented on FLINK-8175: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5112#discussion_r159941955 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java --- @@ -28,8 +29,12 @@ import java.net.Socket; /** - * A specialized data sink to be used by DataStreamUtils.collect. + * A specialized data sink to be used by DataStreamUtils.collect(). + * + * This experimental class is relocated from flink-streaming-contrib. Please see package-info.java + * for more information. */ +@PublicEvolving --- End diff -- Would this better be `@Internal`? I think it is not meant to be instantiated directly, but only via the collect() call... > remove flink-streaming-contrib and migrate its classes to > flink-streaming-java/scala > > > Key: FLINK-8175 > URL: https://issues.apache.org/jira/browse/FLINK-8175 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > > I propose removing flink-streaming-contrib from flink-contrib, and migrating > its classes to flink-streaming-java/scala for the following reasons: > - flink-streaming-contrib is so small that it only has 4 classes (3 java and > 1 scala), and they don't need a dedicated jar for Flink to distribute and > maintain it and for users to deal with the overhead of dependency management > - the 4 classes in flink-streaming-contrib are logically more tied to > flink-streaming-java/scala, and thus can be easily migrated > - flink-contrib is already too crowded and noisy. It contains lots of sub > modules with different purposes which confuse developers and users, and they > lack a proper project hierarchy > To take a step even forward, I would argue that even flink-contrib should be > removed and all its submodules should be migrated to other top-level modules > for the following reasons: 1) Apache Flink the whole project itself is a > result of contributions from many developers, there's no reason to highlight > some contributions in a dedicated module named 'contrib' 2) flink-contrib > inherently doesn't have a good hierarchy to hold submodules -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala
[ https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313568#comment-16313568 ] ASF GitHub Bot commented on FLINK-8175: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5112#discussion_r159942085 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package holds classes that are experimental. + * + * They are NOT battle-tested code and may be changed or removed in future versions. --- End diff -- For checkstyle, you probably need to prepend new paragraphs with ``. > remove flink-streaming-contrib and migrate its classes to > flink-streaming-java/scala > > > Key: FLINK-8175 > URL: https://issues.apache.org/jira/browse/FLINK-8175 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > > I propose removing flink-streaming-contrib from flink-contrib, and migrating > its classes to flink-streaming-java/scala for the following reasons: > - flink-streaming-contrib is so small that it only has 4 classes (3 java and > 1 scala), and they don't need a dedicated jar for Flink to distribute and > maintain it and for users to deal with the overhead of dependency management > - the 4 classes in flink-streaming-contrib are logically more tied to > flink-streaming-java/scala, and thus can be easily migrated > - flink-contrib is already too crowded and noisy. It contains lots of sub > modules with different purposes which confuse developers and users, and they > lack a proper project hierarchy > To take a step even forward, I would argue that even flink-contrib should be > removed and all its submodules should be migrated to other top-level modules > for the following reasons: 1) Apache Flink the whole project itself is a > result of contributions from many developers, there's no reason to highlight > some contributions in a dedicated module named 'contrib' 2) flink-contrib > inherently doesn't have a good hierarchy to hold submodules -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5112: [FLINK-8175] [DataStream API java/scala] remove fl...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5112#discussion_r159942085 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/package-info.java --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package holds classes that are experimental. + * + * They are NOT battle-tested code and may be changed or removed in future versions. --- End diff -- For checkstyle, you probably need to prepend new paragraphs with ``. ---
[GitHub] flink pull request #5112: [FLINK-8175] [DataStream API java/scala] remove fl...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5112#discussion_r159941955 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java --- @@ -28,8 +29,12 @@ import java.net.Socket; /** - * A specialized data sink to be used by DataStreamUtils.collect. + * A specialized data sink to be used by DataStreamUtils.collect(). + * + * This experimental class is relocated from flink-streaming-contrib. Please see package-info.java + * for more information. */ +@PublicEvolving --- End diff -- Would this better be `@Internal`? I think it is not meant to be instantiated directly, but only via the collect() call... ---
[jira] [Created] (FLINK-8381) Document more flexible schema definition
Timo Walther created FLINK-8381: --- Summary: Document more flexible schema definition Key: FLINK-8381 URL: https://issues.apache.org/jira/browse/FLINK-8381 Project: Flink Issue Type: Improvement Components: Documentation, Table API & SQL Reporter: Timo Walther Assignee: Timo Walther FLINK-8203 implemented a more flexible schema definition for registering DataSet/DataStream as a table. Documentation should be added with examples. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5132 ---
[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible
[ https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313513#comment-16313513 ] ASF GitHub Bot commented on FLINK-8203: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5132 > Make schema definition of DataStream/DataSet to Table conversion more flexible > -- > > Key: FLINK-8203 > URL: https://issues.apache.org/jira/browse/FLINK-8203 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.5.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > Fix For: 1.5.0 > > > When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, > the schema of the table can be defined (by default it is extracted from the > {{TypeInformation}}. > The schema needs to be manually specified to select (project) fields, rename > fields, or define time attributes. Right now, there are several limitations > how the fields can be defined that also depend on the type of the > {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., > tuples, case classes, Row) require schema definition based on the position of > fields. Pojo types which have no fixed order of fields, require to refer to > fields by name. Moreover, there are several restrictions on how time > attributes can be defined, e.g., event time attribute must replace an > existing field or be appended and proctime attributes must be appended. > I think we can make the schema definition more flexible and provide two modes: > 1. Reference input fields by name: All fields in the schema definition are > referenced by name (and possibly renamed using an alias ({{as}}). In this > mode, fields can be reordered and projected out. Moreover, we can define > proctime and eventtime attributes at arbitrary positions using arbitrary > names (except those that existing the result schema). This mode can be used > for any input type, including POJOs. This mode is used if all field > references exist in the input type. > 2. Reference input fields by position: Field references might not refer to > existing fields in the input type. In this mode, fields are simply renamed. > Event-time attributes can replace the field on their position in the input > data (if it is of correct type) or be appended at the end. Proctime > attributes must be appended at the end. This mode can only be used if the > input type has a defined field order (tuple, case class, Row). > We need to add more tests the check for all combinations of input types and > schema definition modes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible
[ https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8203. - Resolution: Fixed Fix Version/s: 1.5.0 Fixed in 1.5: fb29898cd3507b2b94dd8bbf3dbfd2132b643a1d > Make schema definition of DataStream/DataSet to Table conversion more flexible > -- > > Key: FLINK-8203 > URL: https://issues.apache.org/jira/browse/FLINK-8203 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.5.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > Fix For: 1.5.0 > > > When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, > the schema of the table can be defined (by default it is extracted from the > {{TypeInformation}}. > The schema needs to be manually specified to select (project) fields, rename > fields, or define time attributes. Right now, there are several limitations > how the fields can be defined that also depend on the type of the > {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., > tuples, case classes, Row) require schema definition based on the position of > fields. Pojo types which have no fixed order of fields, require to refer to > fields by name. Moreover, there are several restrictions on how time > attributes can be defined, e.g., event time attribute must replace an > existing field or be appended and proctime attributes must be appended. > I think we can make the schema definition more flexible and provide two modes: > 1. Reference input fields by name: All fields in the schema definition are > referenced by name (and possibly renamed using an alias ({{as}}). In this > mode, fields can be reordered and projected out. Moreover, we can define > proctime and eventtime attributes at arbitrary positions using arbitrary > names (except those that existing the result schema). This mode can be used > for any input type, including POJOs. This mode is used if all field > references exist in the input type. > 2. Reference input fields by position: Field references might not refer to > existing fields in the input type. In this mode, fields are simply renamed. > Event-time attributes can replace the field on their position in the input > data (if it is of correct type) or be appended at the end. Proctime > attributes must be appended at the end. This mode can only be used if the > input type has a defined field order (tuple, case class, Row). > We need to add more tests the check for all combinations of input types and > schema definition modes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8380) Dynamic BucketingSink paths based on ingested Kafka topics
[ https://issues.apache.org/jira/browse/FLINK-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313504#comment-16313504 ] Kyle Hamlin commented on FLINK-8380: I agree that is the simplest solution. I thought it might be possible at the very least to get topic through to the getBucketPath method. Topic is available when extending KeyedDeserializationSchema in the deserialize method so I thought it might be possible to also get it through to the bucketer. > Dynamic BucketingSink paths based on ingested Kafka topics > -- > > Key: FLINK-8380 > URL: https://issues.apache.org/jira/browse/FLINK-8380 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kyle Hamlin > > Flink 1.4 released a feature that allows Kafka consumers to dynamically > ingest topics based on a regex pattern. If a user wanted to use Flink as a > simple (no transformations) but dynamic (auto topic discovery & auto output > path generation) data persister they would currently only have half the tools > to do so. I believe it would be a beneficial feature to allow users to not > only define automatic topic discovery but also a way to dynamically > incorporate those topics into a BucketingSink output path. For example: > If I had three Kafka topics > {code:java} > select-topic-1 > ignore-topic-1 > select-topic-2 > {code} > And my Kafka consumers regex only selected two topics > {code:java} > val consumer = new > FlinkKafkaConsumer010[GenericRecord](Pattern.compile("select-.*?"), new > MyDeserializer(), props) > {code} > Then the selected topics would appended to the beginning of the BucketingSink > output path and any Bucketers partitions would follow > {code:java} > val sink = new BucketingSink[GenericRecord]("s3://my-bucket/") > sink.setBucketer(new DateTimeBucketer[GenericRecord]("MMdd")) > {code} > The resulting output paths would be > {code:java} > s3://my-bucket/selected-topic1/MMdd/ > s3://my-bucket/selected-topic2/MMdd/ > {code} > As new topics are discovered via the regex pattern (while the app is running) > the set of BucketingSink output paths would grow. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8371) Buffers are not recycled in a non-spilled SpillableSubpartition upon release
[ https://issues.apache.org/jira/browse/FLINK-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313498#comment-16313498 ] Nico Kruber commented on FLINK-8371: Working on it - also, while doing so, I noticed that similarly, {{SpillableSubpartitionView#nextBuffer}} is also not cleaned up (if populated when the {{release()}} is happening). > Buffers are not recycled in a non-spilled SpillableSubpartition upon release > > > Key: FLINK-8371 > URL: https://issues.apache.org/jira/browse/FLINK-8371 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1, 1.4.0, 1.3.2 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartition}} only recycles buffers in its {{buffer}} queue if > there is no view attached to it yet. If there is a view, it delegates this > task to the view, but {{SpillableSubpartitionView}} only instructs the > {{SpilledSubpartitionView}} to clean up in its {{releaseAllResources()}}. > Similarly to the {{PipelinesSubpartition}} implementation, we should always > clean up and recycle the buffers in {{SpillableSubpartition#release()}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible
[ https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313494#comment-16313494 ] ASF GitHub Bot commented on FLINK-8203: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5132 Thanks for the intensive review @fhueske. I will merge this now :) > Make schema definition of DataStream/DataSet to Table conversion more flexible > -- > > Key: FLINK-8203 > URL: https://issues.apache.org/jira/browse/FLINK-8203 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.5.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > > When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, > the schema of the table can be defined (by default it is extracted from the > {{TypeInformation}}. > The schema needs to be manually specified to select (project) fields, rename > fields, or define time attributes. Right now, there are several limitations > how the fields can be defined that also depend on the type of the > {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., > tuples, case classes, Row) require schema definition based on the position of > fields. Pojo types which have no fixed order of fields, require to refer to > fields by name. Moreover, there are several restrictions on how time > attributes can be defined, e.g., event time attribute must replace an > existing field or be appended and proctime attributes must be appended. > I think we can make the schema definition more flexible and provide two modes: > 1. Reference input fields by name: All fields in the schema definition are > referenced by name (and possibly renamed using an alias ({{as}}). In this > mode, fields can be reordered and projected out. Moreover, we can define > proctime and eventtime attributes at arbitrary positions using arbitrary > names (except those that existing the result schema). This mode can be used > for any input type, including POJOs. This mode is used if all field > references exist in the input type. > 2. Reference input fields by position: Field references might not refer to > existing fields in the input type. In this mode, fields are simply renamed. > Event-time attributes can replace the field on their position in the input > data (if it is of correct type) or be appended at the end. Proctime > attributes must be appended at the end. This mode can only be used if the > input type has a defined field order (tuple, case class, Row). > We need to add more tests the check for all combinations of input types and > schema definition modes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5132: [FLINK-8203] [FLINK-7681] [table] Make schema definition ...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5132 Thanks for the intensive review @fhueske. I will merge this now :) ---
[jira] [Commented] (FLINK-8380) Dynamic BucketingSink paths based on ingested Kafka topics
[ https://issues.apache.org/jira/browse/FLINK-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313487#comment-16313487 ] Aljoscha Krettek commented on FLINK-8380: - You can already do that by encoding the topic in your records and providing a custom {{Bucketer}} that determines a bucket path based on the topic. I don't think Flink can provide a general solution for this since the topic somehow has to find it's way to the sink while traveling through (potentially) multiple operations. What do you think? > Dynamic BucketingSink paths based on ingested Kafka topics > -- > > Key: FLINK-8380 > URL: https://issues.apache.org/jira/browse/FLINK-8380 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kyle Hamlin > > Flink 1.4 released a feature that allows Kafka consumers to dynamically > ingest topics based on a regex pattern. If a user wanted to use Flink as a > simple (no transformations) but dynamic (auto topic discovery & auto output > path generation) data persister they would currently only have half the tools > to do so. I believe it would be a beneficial feature to allow users to not > only define automatic topic discovery but also a way to dynamically > incorporate those topics into a BucketingSink output path. For example: > If I had three Kafka topics > {code:java} > select-topic-1 > ignore-topic-1 > select-topic-2 > {code} > And my Kafka consumers regex only selected two topics > {code:java} > val consumer = new > FlinkKafkaConsumer010[GenericRecord](Pattern.compile("select-.*?"), new > MyDeserializer(), props) > {code} > Then the selected topics would appended to the beginning of the BucketingSink > output path and any Bucketers partitions would follow > {code:java} > val sink = new BucketingSink[GenericRecord]("s3://my-bucket/") > sink.setBucketer(new DateTimeBucketer[GenericRecord]("MMdd")) > {code} > The resulting output paths would be > {code:java} > s3://my-bucket/selected-topic1/MMdd/ > s3://my-bucket/selected-topic2/MMdd/ > {code} > As new topics are discovered via the regex pattern (while the app is running) > the set of BucketingSink output paths would grow. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8356) JDBCAppendTableSink does not work for Hbase Phoenix Driver
[ https://issues.apache.org/jira/browse/FLINK-8356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313470#comment-16313470 ] ASF GitHub Bot commented on FLINK-8356: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5247 Thanks for opening a PR for this issue @paulzwu. It seems the branches are a bit messed up. Can you update this PR by putting your changes on top of the `master` branch? We'll backport the fix onto `release-1.4`. Thanks, Fabian > JDBCAppendTableSink does not work for Hbase Phoenix Driver > --- > > Key: FLINK-8356 > URL: https://issues.apache.org/jira/browse/FLINK-8356 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Paul Wu > > The following code runs without errors, but the data is not inserted into the > HBase table. However, it does work for MySQL (see the commented out code). > The Phoenix driver is from > https://mvnrepository.com/artifact/org.apache.phoenix/phoenix/4.7.0-HBase-1.1 > String query = "select CURRENT_DATE SEGMENTSTARTTIME, CURRENT_DATE > SEGMENTENDTIME, cast (imsi as varchar) imsi, cast(imei as varchar) imei from > ts "; > > Table table = ste.sqlQuery(query); > JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder(); > jdbc.setDrivername("org.apache.phoenix.jdbc.PhoenixDriver"); > jdbc.setDBUrl("jdbc:phoenix:hosts:2181:/hbase-unsecure"); > jdbc.setQuery("upsert INTO GEO_ANALYTICS_STREAMING_DATA > (SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)"); > // JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder(); > //jdbc.setDrivername("com.mysql.jdbc.Driver"); > //jdbc.setDBUrl("jdbc:mysql://localhost/test"); > //jdbc.setUsername("root").setPassword(""); > //jdbc.setQuery("insert INTO GEO_ANALYTICS_STREAMING_DATA > (SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)"); > //jdbc.setBatchSize(1); > jdbc.setParameterTypes(Types.SQL_DATE, Types.SQL_DATE, Types.STRING, > Types.STRING); > JDBCAppendTableSink sink = jdbc.build(); > table.writeToSink(sink); -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5247: [FLINK-8356] Need to add the commit in flush() in JDBCOut...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5247 Thanks for opening a PR for this issue @paulzwu. It seems the branches are a bit messed up. Can you update this PR by putting your changes on top of the `master` branch? We'll backport the fix onto `release-1.4`. Thanks, Fabian ---
[jira] [Commented] (FLINK-8356) JDBCAppendTableSink does not work for Hbase Phoenix Driver
[ https://issues.apache.org/jira/browse/FLINK-8356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313457#comment-16313457 ] ASF GitHub Bot commented on FLINK-8356: --- GitHub user paulzwu opened a pull request: https://github.com/apache/flink/pull/5247 [FLINK-8356] Need to add the commit in flush() in JDBCOutputFormat *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs /
[GitHub] flink pull request #5247: [FLINK-8356] Need to add the commit in flush() in ...
GitHub user paulzwu opened a pull request: https://github.com/apache/flink/pull/5247 [FLINK-8356] Need to add the commit in flush() in JDBCOutputFormat *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/flink master Alternatively you can review and apply these changes as the patch at:
[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible
[ https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313446#comment-16313446 ] ASF GitHub Bot commented on FLINK-8203: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5132#discussion_r159926701 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -492,11 +509,16 @@ abstract class StreamTableEnvironment( throw new TableException( "The proctime attribute can only be defined once in a table schema.") } else { -// check that proctime is only appended -if (idx < fieldTypes.length) { - throw new TableException( -"The proctime attribute can only be appended to the table schema and not replace " + - "an existing field. Please move it to the end of the schema.") +// if the fields are referenced by position, +// it is only possible to append the time attribute at the end +if (isRefByPos) { + + // check that proctime is only appended + if (idx < fieldTypes.length) { +throw new TableException( + "The proctime attribute can only be appended to the table schema and not replace " + +"an existing field. Please move it to the end of the schema.") + } } proctime = Some(idx, name) --- End diff -- I keep the current behavior because this allows to have arbitrary table schemas. > Make schema definition of DataStream/DataSet to Table conversion more flexible > -- > > Key: FLINK-8203 > URL: https://issues.apache.org/jira/browse/FLINK-8203 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.5.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > > When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, > the schema of the table can be defined (by default it is extracted from the > {{TypeInformation}}. > The schema needs to be manually specified to select (project) fields, rename > fields, or define time attributes. Right now, there are several limitations > how the fields can be defined that also depend on the type of the > {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., > tuples, case classes, Row) require schema definition based on the position of > fields. Pojo types which have no fixed order of fields, require to refer to > fields by name. Moreover, there are several restrictions on how time > attributes can be defined, e.g., event time attribute must replace an > existing field or be appended and proctime attributes must be appended. > I think we can make the schema definition more flexible and provide two modes: > 1. Reference input fields by name: All fields in the schema definition are > referenced by name (and possibly renamed using an alias ({{as}}). In this > mode, fields can be reordered and projected out. Moreover, we can define > proctime and eventtime attributes at arbitrary positions using arbitrary > names (except those that existing the result schema). This mode can be used > for any input type, including POJOs. This mode is used if all field > references exist in the input type. > 2. Reference input fields by position: Field references might not refer to > existing fields in the input type. In this mode, fields are simply renamed. > Event-time attributes can replace the field on their position in the input > data (if it is of correct type) or be appended at the end. Proctime > attributes must be appended at the end. This mode can only be used if the > input type has a defined field order (tuple, case class, Row). > We need to add more tests the check for all combinations of input types and > schema definition modes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5132#discussion_r159926701 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -492,11 +509,16 @@ abstract class StreamTableEnvironment( throw new TableException( "The proctime attribute can only be defined once in a table schema.") } else { -// check that proctime is only appended -if (idx < fieldTypes.length) { - throw new TableException( -"The proctime attribute can only be appended to the table schema and not replace " + - "an existing field. Please move it to the end of the schema.") +// if the fields are referenced by position, +// it is only possible to append the time attribute at the end +if (isRefByPos) { + + // check that proctime is only appended + if (idx < fieldTypes.length) { +throw new TableException( + "The proctime attribute can only be appended to the table schema and not replace " + +"an existing field. Please move it to the end of the schema.") + } } proctime = Some(idx, name) --- End diff -- I keep the current behavior because this allows to have arbitrary table schemas. ---
[jira] [Created] (FLINK-8380) Dynamic BucketingSink paths based on ingested Kafka topics
Kyle Hamlin created FLINK-8380: -- Summary: Dynamic BucketingSink paths based on ingested Kafka topics Key: FLINK-8380 URL: https://issues.apache.org/jira/browse/FLINK-8380 Project: Flink Issue Type: Improvement Components: DataStream API Affects Versions: 1.4.0 Reporter: Kyle Hamlin Flink 1.4 released a feature that allows Kafka consumers to dynamically ingest topics based on a regex pattern. If a user wanted to use Flink as a simple (no transformations) but dynamic (auto topic discovery & auto output path generation) data persister they would currently only have half the tools to do so. I believe it would be a beneficial feature to allow users to not only define automatic topic discovery but also a way to dynamically incorporate those topics into a BucketingSink output path. For example: If I had three Kafka topics {code:java} select-topic-1 ignore-topic-1 select-topic-2 {code} And my Kafka consumers regex only selected two topics {code:java} val consumer = new FlinkKafkaConsumer010[GenericRecord](Pattern.compile("select-.*?"), new MyDeserializer(), props) {code} Then the selected topics would appended to the beginning of the BucketingSink output path and any Bucketers partitions would follow {code:java} val sink = new BucketingSink[GenericRecord]("s3://my-bucket/") sink.setBucketer(new DateTimeBucketer[GenericRecord]("MMdd")) {code} The resulting output paths would be {code:java} s3://my-bucket/selected-topic1/MMdd/ s3://my-bucket/selected-topic2/MMdd/ {code} As new topics are discovered via the regex pattern (while the app is running) the set of BucketingSink output paths would grow. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313438#comment-16313438 ] Aljoscha Krettek commented on FLINK-8318: - Thanks, that looks alright. I think that we just have to shade all the dependencies, fortunately, there is already an open PR for another issue that does just that: https://github.com/apache/flink/pull/5243 > Conflict jackson library with ElasticSearch connector > - > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Jihyun Cho >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - OS current > user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > Hadoop/Kerberos user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 31403 MiBytes > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > (not set) > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.6.5 > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /home/www/service/flink-1.4.0/conf > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Classpath: > ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:... > > 2017-12-26 14:14:01,393 INFO
[jira] [Resolved] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8139. - Resolution: Fixed Fix Version/s: 1.5.0 Fixed in 1.5: 3c91de518c4ce95d180400ea36a840be10fe6a86 & f88da4d04c328c46b9c94dc76d3e7a3e4e87b2ea > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > Fix For: 1.5.0 > > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313366#comment-16313366 ] ASF GitHub Bot commented on FLINK-8139: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5065 > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5065 ---
[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313362#comment-16313362 ] Jihyun Cho commented on FLINK-8318: --- Here is my code and pom files. {code:title=Test.scala} import java.net.{InetAddress, InetSocketAddress} import java.util.{Properties, TimeZone} import org.apache.commons.lang3.time.FastDateFormat import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.json4s.DefaultFormats import org.json4s.JsonAST._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write object Test { val consumeProperties = { val props = new Properties() props.setProperty("bootstrap.servers", "kafka-001:9092") props.setProperty("group.id", "test") props.setProperty("auto.offset.reset", "latest") props } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.addSource(new FlinkKafkaConsumer010[String]("clova-log-dev", new SimpleStringSchema(), consumeProperties)) val config = new java.util.HashMap[String, String] config.put("cluster.name", "test") val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName("es-001"), 9300)) val esSink = new ElasticsearchSink[String](config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(t: String): IndexRequest = { return Requests.indexRequest() .index("test") .`type`("message") .source(t) } override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer) = { requestIndexer.add(createIndexRequest(t)) } } ) stream.map { value => try { val esDateFormat = FastDateFormat.getInstance("-MM-dd HH:mm:ss.SSS", TimeZone.getTimeZone("UTC")) implicit val formats = DefaultFormats val json = parse(value) val transJson = json transformField { case JField("short_message", JString(s)) => ("message", JString(s)) case JField("host", JString(s)) => ("source", JString(s)) case JField("timestamp", JInt(i)) => ("timestamp", JString(esDateFormat.format((i * 1000L).toLong))) case JField("timestamp", JDouble(d)) => ("timestamp", JString(esDateFormat.format((d * 1000L).toLong))) case JField(k, v) => (k.stripPrefix("_"), v) } write(transJson) } catch { case _: Exception => "" } }.filter(_.nonEmpty).addSink(esSink) env.execute("Test") } } {code} {code:title=pom.xml} http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 test.streaming test jar 1.0-SNAPSHOT 1.4.0 org.apache.flink flink-streaming-scala_2.11 ${flink.version} org.apache.flink flink-streaming-contrib_2.11 ${flink.version} org.apache.flink flink-connector-kafka-0.10_2.11 ${flink.version} org.apache.flink flink-connector-elasticsearch5_2.11 ${flink.version} org.json4s json4s-native_2.11 3.5.3 org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile org.apache.maven.plugins maven-shade-plugin 2.4.3 test.streaming.Test ${project.artifactId}
[jira] [Comment Edited] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313362#comment-16313362 ] Jihyun Cho edited comment on FLINK-8318 at 1/5/18 4:14 PM: --- Here is my code and pom files. {code:title=Test.scala} import java.net.{InetAddress, InetSocketAddress} import java.util.{Properties, TimeZone} import org.apache.commons.lang3.time.FastDateFormat import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.json4s.DefaultFormats import org.json4s.JsonAST._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write object Test { val consumeProperties = { val props = new Properties() props.setProperty("bootstrap.servers", "kafka-001:9092") props.setProperty("group.id", "test") props.setProperty("auto.offset.reset", "latest") props } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), consumeProperties)) val config = new java.util.HashMap[String, String] config.put("cluster.name", "test") val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName("es-001"), 9300)) val esSink = new ElasticsearchSink[String](config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(t: String): IndexRequest = { return Requests.indexRequest() .index("test") .`type`("message") .source(t) } override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer) = { requestIndexer.add(createIndexRequest(t)) } } ) stream.map { value => try { val esDateFormat = FastDateFormat.getInstance("-MM-dd HH:mm:ss.SSS", TimeZone.getTimeZone("UTC")) implicit val formats = DefaultFormats val json = parse(value) val transJson = json transformField { case JField("short_message", JString(s)) => ("message", JString(s)) case JField("host", JString(s)) => ("source", JString(s)) case JField("timestamp", JInt(i)) => ("timestamp", JString(esDateFormat.format((i * 1000L).toLong))) case JField("timestamp", JDouble(d)) => ("timestamp", JString(esDateFormat.format((d * 1000L).toLong))) case JField(k, v) => (k.stripPrefix("_"), v) } write(transJson) } catch { case _: Exception => "" } }.filter(_.nonEmpty).addSink(esSink) env.execute("Test") } } {code} {code:title=pom.xml} http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 test.streaming test jar 1.0-SNAPSHOT 1.4.0 org.apache.flink flink-streaming-scala_2.11 ${flink.version} org.apache.flink flink-streaming-contrib_2.11 ${flink.version} org.apache.flink flink-connector-kafka-0.10_2.11 ${flink.version} org.apache.flink flink-connector-elasticsearch5_2.11 ${flink.version} org.json4s json4s-native_2.11 3.5.3 org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile org.apache.maven.plugins maven-shade-plugin 2.4.3 test.streaming.Test
[jira] [Closed] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-8331. -- Resolution: Fixed Fixed in 1.5.0 9dd3a859b1609d27ccc80a3da86456e533895b7a > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313356#comment-16313356 ] ASF GitHub Bot commented on FLINK-8331: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5218 > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5218 ---
[jira] [Created] (FLINK-8379) Improve type checking for DataView
Timo Walther created FLINK-8379: --- Summary: Improve type checking for DataView Key: FLINK-8379 URL: https://issues.apache.org/jira/browse/FLINK-8379 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther At the moment an accumulator with no proper type information is a valid accumulator. {code} public static class CountDistinctAccum { public MapViewmap; public long count; } {code} I quickly looked into the code and it seems that MapView with type information for key and value can be null. We should add a null check at the correct position to inform the user about the non-existing type information. We should also add the type check added with FLINK-8139 for the key type of MapView. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8378) Serialization and Deserialization of Kafka Avro messages from and to Row with Confluent Schema Registry
Christophe Philemotte created FLINK-8378: Summary: Serialization and Deserialization of Kafka Avro messages from and to Row with Confluent Schema Registry Key: FLINK-8378 URL: https://issues.apache.org/jira/browse/FLINK-8378 Project: Flink Issue Type: New Feature Components: Type Serialization System Affects Versions: 1.3.2, 1.4.0 Reporter: Christophe Philemotte Priority: Minor I need to serialize and deserialize Avro messages from Kafka whose the schema are stored in the Confluent Schema Registry to Row instances. As far as I know, * KafkaAvroTableSource does not fetch the schema from the registry * there is no TableSink for KafkaAvro * the AvroRowSerializationSchema and AvroRowDeserializationSchema needs a known Avro schema * there is none wrapper of the Kafka serdes So, for now it's not possible out of the box. Still, I think it's an interesting use case especially considering the Table & SQL API. I've found some issues related to my case: * https://issues.apache.org/jira/browse/FLINK-4050 * https://issues.apache.org/jira/browse/FLINK-2597 It seems to me that they have wider scope, that's why I'm creating this issue. I hope I'm doing well. I've managed to implement a serializer and deserializer that are between the Kafka ones and the Flink AvroRow ones. If you think such serdes would be a nice addition, I'm ready to contribute and rework my implementation (notably to work with TypeSerializers if possible). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8377) Document DataViews for user-defined aggregate functions
Timo Walther created FLINK-8377: --- Summary: Document DataViews for user-defined aggregate functions Key: FLINK-8377 URL: https://issues.apache.org/jira/browse/FLINK-8377 Project: Flink Issue Type: Improvement Components: Documentation, Table API & SQL Reporter: Timo Walther The {{DataView}} feature that has been implemented with FLINK-7206 has not been documented on our website. We should add some examples and explain the differences and limitations (where Flink state is used and where regular Java data structures are used). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort
[ https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313320#comment-16313320 ] ASF GitHub Bot commented on FLINK-8037: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5205#discussion_r159904126 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -742,7 +742,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this // scaling up. if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) { - nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize; + nextFreeTransactionalId += (long) getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize; --- End diff -- Good change, although that is rather theoretical bug. To trigger it there would need to be more then 1_000_000 subtasks and more then 2000 parallel ongoing checkpoints. > Missing cast in integer arithmetic in > TransactionalIdsGenerator#generateIdsToAbort > -- > > Key: FLINK-8037 > URL: https://issues.apache.org/jira/browse/FLINK-8037 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Greg Hogan >Priority: Minor > > {code} > public Set generateIdsToAbort() { > Set idsToAbort = new HashSet<>(); > for (int i = 0; i < safeScaleDownFactor; i++) { > idsToAbort.addAll(generateIdsToUse(i * poolSize * > totalNumberOfSubtasks)); > {code} > The operands are integers where generateIdsToUse() expects long parameter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5205: [FLINK-8037] Fix integer multiplication or shift i...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5205#discussion_r159904126 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -742,7 +742,7 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this // scaling up. if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) { - nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize; + nextFreeTransactionalId += (long) getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize; --- End diff -- Good change, although that is rather theoretical bug. To trigger it there would need to be more then 1_000_000 subtasks and more then 2000 parallel ongoing checkpoints. ---
[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313314#comment-16313314 ] Aljoscha Krettek commented on FLINK-8318: - Hmm, could you maybe post the code that produces this second exception? > Conflict jackson library with ElasticSearch connector > - > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Jihyun Cho >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - OS current > user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > Hadoop/Kerberos user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 31403 MiBytes > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > (not set) > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.6.5 > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /home/www/service/flink-1.4.0/conf > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Classpath: > ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:... > > 2017-12-26 14:14:01,393 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Filter -> Map -> Filter -> Sink: > Unnamed (3/10)
[jira] [Updated] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8318: Priority: Blocker (was: Major) > Conflict jackson library with ElasticSearch connector > - > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Jihyun Cho >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - OS current > user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > Hadoop/Kerberos user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 31403 MiBytes > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > (not set) > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.6.5 > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /home/www/service/flink-1.4.0/conf > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Classpath: > ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:... > > 2017-12-26 14:14:01,393 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Filter -> Map -> Filter -> Sink: > Unnamed (3/10) (fb33a6e0c1a7e859eaef9cf8bcf4565e) switched from RUNNING to > FAILED. >