[jira] [Assigned] (FLINK-7789) Add handler for Async IO operator timeouts
[ https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-7789: -- Assignee: blues zheng > Add handler for Async IO operator timeouts > --- > > Key: FLINK-7789 > URL: https://issues.apache.org/jira/browse/FLINK-7789 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Karthik Deivasigamani >Assignee: blues zheng >Priority: Major > > Currently Async IO operator does not provide a mechanism to handle timeouts. > When a request times out it an exception is thrown and job is restarted. It > would be good to pass a AsyncIOTimeoutHandler which can be implemented by the > user and passed in the constructor. > Here is the discussion from apache flink users mailing list > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9073) Resume from savepoint end-to-end tests should be extended for different state backends
[ https://issues.apache.org/jira/browse/FLINK-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466809#comment-16466809 ] ASF GitHub Bot commented on FLINK-9073: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5926#discussion_r186608526 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -58,25 +58,97 @@ fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" - printf "Running Resuming Savepoint (no parallelism change) end-to-end test\n" + printf "Running Resuming Savepoint (file, async, no parallelism change) end-to-end test\n" printf "==\n" - $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" - printf "Running Resuming Savepoint (scale up) end-to-end test\n" + printf "Running Resuming Savepoint (file, sync, no parallelism change) end-to-end test\n" printf "==\n" - $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" - printf "Running Resuming Savepoint (scale down) end-to-end test\n" + printf "Running Resuming Savepoint (file, async, scale up) end-to-end test\n" printf "==\n" - $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (file, sync, scale up) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (file, async, scale down) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (file, sync, scale down) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (rocks, non-incremental, no parallelism change) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (rocks, incremental, no parallelism change) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 --- End diff -- This makes sense, will address this. > Resume from savepoint end-to-end tests should be extended for different state > backends > -- > >
[jira] [Commented] (FLINK-9073) Resume from savepoint end-to-end tests should be extended for different state backends
[ https://issues.apache.org/jira/browse/FLINK-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466810#comment-16466810 ] ASF GitHub Bot commented on FLINK-9073: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5926 Thanks for the review @StefanRRichter! Will address your comment and merge this. > Resume from savepoint end-to-end tests should be extended for different state > backends > -- > > Key: FLINK-9073 > URL: https://issues.apache.org/jira/browse/FLINK-9073 > Project: Flink > Issue Type: Sub-task >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > The resuming from savepoint end-to-end test script, {{test_resume_savepoint}} > should be extended for the different state backends. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5926: [FLINK-9073] [e2e-tests] Extend savepoint e2e tests for d...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5926 Thanks for the review @StefanRRichter! Will address your comment and merge this. ---
[GitHub] flink pull request #5926: [FLINK-9073] [e2e-tests] Extend savepoint e2e test...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5926#discussion_r186608526 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -58,25 +58,97 @@ fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" - printf "Running Resuming Savepoint (no parallelism change) end-to-end test\n" + printf "Running Resuming Savepoint (file, async, no parallelism change) end-to-end test\n" printf "==\n" - $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" - printf "Running Resuming Savepoint (scale up) end-to-end test\n" + printf "Running Resuming Savepoint (file, sync, no parallelism change) end-to-end test\n" printf "==\n" - $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" - printf "Running Resuming Savepoint (scale down) end-to-end test\n" + printf "Running Resuming Savepoint (file, async, scale up) end-to-end test\n" printf "==\n" - $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (file, sync, scale up) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (file, async, scale down) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (file, sync, scale down) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (rocks, non-incremental, no parallelism change) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==\n" + printf "Running Resuming Savepoint (rocks, incremental, no parallelism change) end-to-end test\n" + printf "==\n" + STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 --- End diff -- This makes sense, will address this. ---
[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() may be stuck in waiting rpc service terminated
[ https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu updated FLINK-9315: Description: Currently the TaskManagerRunner would be waiting for rpc service terminated synchronously. If this happened in rpc service related thread (akka dispatcher thread), it would be stuck forever. We should change the behavior like in ClusterEntrypoint, make it not waiting but asynchronous with a future. was:Currently shutdown TaskManagerRunner may be stuck. Because > TaskManagerRunner.shutDown() may be stuck in waiting rpc service terminated > --- > > Key: FLINK-9315 > URL: https://issues.apache.org/jira/browse/FLINK-9315 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.5.0 >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Major > > Currently the TaskManagerRunner would be waiting for rpc service terminated > synchronously. If this happened in rpc service related thread (akka > dispatcher thread), it would be stuck forever. > We should change the behavior like in ClusterEntrypoint, make it not waiting > but asynchronous with a future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466795#comment-16466795 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186606104 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- Makes sense. Alright, lets leave this as is then. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466797#comment-16466797 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5958 Thanks for the update @FredTing. I'll try to take another look at the PR within the next days. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5958 Thanks for the update @FredTing. I'll try to take another look at the PR within the next days. ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186606104 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- Makes sense. Alright, lets leave this as is then. ---
[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() may be stuck in waiting rpc service terminated
[ https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu updated FLINK-9315: Summary: TaskManagerRunner.shutDown() may be stuck in waiting rpc service terminated (was: TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated) > TaskManagerRunner.shutDown() may be stuck in waiting rpc service terminated > --- > > Key: FLINK-9315 > URL: https://issues.apache.org/jira/browse/FLINK-9315 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.5.0 >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Major > > Currently shutdown TaskManagerRunner may be stuck. Because -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated
[ https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu updated FLINK-9315: Description: Currently shutdown TaskManagerRunner may be stuck. Because (was: Currently shutdown TaskManagerRunner would stuck) > TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated > - > > Key: FLINK-9315 > URL: https://issues.apache.org/jira/browse/FLINK-9315 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.5.0 >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Major > > Currently shutdown TaskManagerRunner may be stuck. Because -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated
[ https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu updated FLINK-9315: Summary: TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated (was: TaskManagerRunner.shutDown() would stuck in waiting rpc service terminated) > TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated > - > > Key: FLINK-9315 > URL: https://issues.apache.org/jira/browse/FLINK-9315 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.5.0 >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Major > > Currently shutdown TaskManagerRunner would stuck -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() would stuck in waiting rpc service terminated
[ https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu updated FLINK-9315: Description: Currently shutdown TaskManagerRunner would stuck > TaskManagerRunner.shutDown() would stuck in waiting rpc service terminated > -- > > Key: FLINK-9315 > URL: https://issues.apache.org/jira/browse/FLINK-9315 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.5.0 >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Major > > Currently shutdown TaskManagerRunner would stuck -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9315) TaskManagerRunner.shutDown() would stuck in waiting rpc service terminated
Biao Liu created FLINK-9315: --- Summary: TaskManagerRunner.shutDown() would stuck in waiting rpc service terminated Key: FLINK-9315 URL: https://issues.apache.org/jira/browse/FLINK-9315 Project: Flink Issue Type: Bug Components: TaskManager Affects Versions: 1.5.0 Reporter: Biao Liu Assignee: Biao Liu -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables
[ https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466777#comment-16466777 ] ASF GitHub Bot commented on FLINK-9258: --- Github user yuqi1129 commented on a diff in the pull request: https://github.com/apache/flink/pull/5959#discussion_r186603684 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java --- @@ -57,11 +57,12 @@ public ComponentMetricGroup(MetricRegistry registry, String[] scope, P parent) { if (variables == null) { // avoid synchronization for common case synchronized (this) { if (variables == null) { - variables = new HashMap<>(); - putVariables(variables); + MaptmpVariables = new HashMap<>(); + putVariables(tmpVariables); if (parent != null) { // not true for Job-/TaskManagerMetricGroup - variables.putAll(parent.getAllVariables()); + tmpVariables.putAll(parent.getAllVariables()); } + variables = tmpVariables; --- End diff -- Do not need any test that verify this change ? > ConcurrentModificationException in ComponentMetricGroup.getAllVariables > --- > > Key: FLINK-9258 > URL: https://issues.apache.org/jira/browse/FLINK-9258 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0, 1.4.3 > > > Seeing this exception at the job startup time. Looks like there is a race > condition when the metrics variables are constructed. > The error is intermittent. > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) > at java.util.HashMap$EntryIterator.next(HashMap.java:1471) > at java.util.HashMap$EntryIterator.next(HashMap.java:1469) > at java.util.HashMap.putMapEntries(HashMap.java:511) > at java.util.HashMap.putAll(HashMap.java:784) > at > org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63) > at > org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63) > at > com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147) > at > com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170) > at > com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75) > at > com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at >
[GitHub] flink pull request #5959: [FLINK-9258][metrics] Thread-safe initialization o...
Github user yuqi1129 commented on a diff in the pull request: https://github.com/apache/flink/pull/5959#discussion_r186603684 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java --- @@ -57,11 +57,12 @@ public ComponentMetricGroup(MetricRegistry registry, String[] scope, P parent) { if (variables == null) { // avoid synchronization for common case synchronized (this) { if (variables == null) { - variables = new HashMap<>(); - putVariables(variables); + MaptmpVariables = new HashMap<>(); + putVariables(tmpVariables); if (parent != null) { // not true for Job-/TaskManagerMetricGroup - variables.putAll(parent.getAllVariables()); + tmpVariables.putAll(parent.getAllVariables()); } + variables = tmpVariables; --- End diff -- Do not need any test that verify this change ? ---
[jira] [Commented] (FLINK-9194) Finished jobs are not archived to HistoryServer
[ https://issues.apache.org/jira/browse/FLINK-9194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466736#comment-16466736 ] ASF GitHub Bot commented on FLINK-9194: --- Github user yuqi1129 commented on a diff in the pull request: https://github.com/apache/flink/pull/5902#discussion_r186599431 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -162,7 +166,7 @@ public void run() { String json = archive.getJson(); File target; - if (path.equals("/joboverview")) { --- End diff -- Agree > Finished jobs are not archived to HistoryServer > --- > > Key: FLINK-9194 > URL: https://issues.apache.org/jira/browse/FLINK-9194 > Project: Flink > Issue Type: Bug > Components: History Server, JobManager >Affects Versions: 1.5.0 > Environment: Flink 2af481a >Reporter: Gary Yao >Assignee: Chesnay Schepler >Priority: Blocker > Labels: flip-6 > Fix For: 1.6.0 > > > In flip6 mode, jobs are not archived to the HistoryServer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5902: [FLINK-9194][history] Add HistoryServer support to...
Github user yuqi1129 commented on a diff in the pull request: https://github.com/apache/flink/pull/5902#discussion_r186599431 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -162,7 +166,7 @@ public void run() { String json = archive.getJson(); File target; - if (path.equals("/joboverview")) { --- End diff -- Agree ---
[jira] [Closed] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null
[ https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-8237. Resolution: Fixed Fix Version/s: 1.4.3 1.5.0 Fixed for 1.4.3 with c7d9ad49e9b3b82b934738903c37de2bd77d8ea7 Fixed for 1.5.0 with 7d6c81d4b8b7340c53717e38725dbe1e3199dfd2 Fixed for 1.6.0 with 7a31ffd3a5455d7da0a459b62081ca27466833bb > BucketingSink throws NPE when Writer.duplicate returns null > --- > > Key: FLINK-8237 > URL: https://issues.apache.org/jira/browse/FLINK-8237 > Project: Flink > Issue Type: Bug >Reporter: Gábor Hermann >Assignee: Pavel Shvetsov >Priority: Minor > Fix For: 1.5.0, 1.4.3 > > > Users need to look into Flink code to find the cause. We could catch that > null before even running the job. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9281) LogBack not working
[ https://issues.apache.org/jira/browse/FLINK-9281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466438#comment-16466438 ] Tim commented on FLINK-9281: Not really. Here's what I know a) If I run the Flink job in my IDE, logback (latest version) works fine. b) If I run in a standalone cluster, no output. Actually, it does not look like the -Dlogback.configurationFile JVM arg is even being honored I'll try doing "{{-verbose:class}}" to see if the LogBack classes are even being loaded. > LogBack not working > --- > > Key: FLINK-9281 > URL: https://issues.apache.org/jira/browse/FLINK-9281 > Project: Flink > Issue Type: Bug > Components: Logging >Affects Versions: 1.4.2 >Reporter: Tim >Priority: Major > > I am trying to get Flink to work with Logback instead of Log4J. However, it > is not working. > My setup is as follows the advice on this page: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html#use-logback-when-running-flink-on-a-cluster > * Flink v1.4.2 running a stand-alone cluster. > * Started JobManager as a foreground process (bin/jobmanager.sh > start-foreground cluster). I updated bin/flink-console.sh to reference > logback.xml via -Dlogback.configurationFile=file:/path/to/logfile. > * Removed log4j jars under libs/ (log4j-1.2.xx.jar and > sfl4j-log4j12-xxx.jar) > * Added logback jars under libs/ (logback-classic, logback-core, > log4j-over-slf4j.jar) > However, I am not getting any file created. Also, as a dumb test I > referenced a non-existent logback.xml file (changed path to a non-existent > folder) just to see if any errors appear on stdout, but nothing. > Thanks > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null
[ https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-8237: Assignee: Pavel Shvetsov > BucketingSink throws NPE when Writer.duplicate returns null > --- > > Key: FLINK-8237 > URL: https://issues.apache.org/jira/browse/FLINK-8237 > Project: Flink > Issue Type: Bug >Reporter: Gábor Hermann >Assignee: Pavel Shvetsov >Priority: Minor > > Users need to look into Flink code to find the cause. We could catch that > null before even running the job. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9312) Perform mutual authentication during SSL handshakes
[ https://issues.apache.org/jira/browse/FLINK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466435#comment-16466435 ] ASF GitHub Bot commented on FLINK-9312: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5966 @EronWright This might be interesting to you as well > Perform mutual authentication during SSL handshakes > --- > > Key: FLINK-9312 > URL: https://issues.apache.org/jira/browse/FLINK-9312 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.6.0 > > > Currently, the Flink processes encrypted connections via SSL: > - Data exchange TM - TM > - RPC JM - TM > - Blob Service JM - TM > However, the server side always accepts any client to build up the > connection, meaning the connections are not strongly authenticated. > Activating SSL mutual authentication solves that - only processes that have > the same certificate can connect. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5966 @EronWright This might be interesting to you as well ---
[jira] [Commented] (FLINK-9312) Perform mutual authentication during SSL handshakes
[ https://issues.apache.org/jira/browse/FLINK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466431#comment-16466431 ] ASF GitHub Bot commented on FLINK-9312: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5966 [FLINK-9312] [security] Add mutual authentication for RPC and data plane ## What is the purpose of the change Currently, the Flink processes encrypted connections via SSL: - Data exchange TM - TM - RPC JM - TM - Blob Service JM - TM - (Optionally to ZooKeeper and connectors, this is connector specific and not in scope of this change) However, the server side always accepts any client to build up the connection, meaning the connections are not strongly authenticated. Activating SSL mutual authentication strengthens this significantly - only processes that have access to the same certificate can connect. ## Brief change log - Activate mutual auth in akka (via akka config) - Activate mutual auth in Netty for data shuffles via `SSLContext` and `SSLEngine` parameters ## Verifying this change - Adds a test to the `NettyClientServerSslTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink mutual_auth Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5966 commit 8bceb03d5653c94247b72d6256f4e9e37b036e35 Author: Stephan EwenDate: 2018-05-07T17:44:33Z [FLINK-9313] [security] Activate mutual authentication for RPC/akka commit 59b017580d30904418e0867ac122a8183dc5db70 Author: Stephan Ewen Date: 2018-05-07T19:28:41Z [FLINK-9314] [security] Add mutual authentication for Netty / TaskManager's data plane > Perform mutual authentication during SSL handshakes > --- > > Key: FLINK-9312 > URL: https://issues.apache.org/jira/browse/FLINK-9312 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.6.0 > > > Currently, the Flink processes encrypted connections via SSL: > - Data exchange TM - TM > - RPC JM - TM > - Blob Service JM - TM > However, the server side always accepts any client to build up the > connection, meaning the connections are not strongly authenticated. > Activating SSL mutual authentication solves that - only processes that have > the same certificate can connect. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5966: [FLINK-9312] [security] Add mutual authentication ...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5966 [FLINK-9312] [security] Add mutual authentication for RPC and data plane ## What is the purpose of the change Currently, the Flink processes encrypted connections via SSL: - Data exchange TM - TM - RPC JM - TM - Blob Service JM - TM - (Optionally to ZooKeeper and connectors, this is connector specific and not in scope of this change) However, the server side always accepts any client to build up the connection, meaning the connections are not strongly authenticated. Activating SSL mutual authentication strengthens this significantly - only processes that have access to the same certificate can connect. ## Brief change log - Activate mutual auth in akka (via akka config) - Activate mutual auth in Netty for data shuffles via `SSLContext` and `SSLEngine` parameters ## Verifying this change - Adds a test to the `NettyClientServerSslTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink mutual_auth Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5966 commit 8bceb03d5653c94247b72d6256f4e9e37b036e35 Author: Stephan EwenDate: 2018-05-07T17:44:33Z [FLINK-9313] [security] Activate mutual authentication for RPC/akka commit 59b017580d30904418e0867ac122a8183dc5db70 Author: Stephan Ewen Date: 2018-05-07T19:28:41Z [FLINK-9314] [security] Add mutual authentication for Netty / TaskManager's data plane ---
[jira] [Comment Edited] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466409#comment-16466409 ] Richard Deurwaarder edited comment on FLINK-9311 at 5/7/18 8:06 PM: PubSub works more like RabbitMQ than Kafka using ack/nack. It has [atleast-once-delivery|https://cloud.google.com/pubsub/docs/subscriber], but using a [message id|http://googleapis.github.io/googleapis/java/all/latest/apidocs/com/google/pubsub/v1/PubsubMessageOrBuilder.html#getMessageId--] exactly-once can be achived. Is there an existing connector using message id's for deduplication? was (Author: xeli): PubSub works more like RabbitMQ than Kafka using ack/nack. It has atleast-once-delivery, but using a [message id|http://googleapis.github.io/googleapis/java/all/latest/apidocs/com/google/pubsub/v1/PubsubMessageOrBuilder.html#getMessageId--] exactly-once can be achived. Is there an existing connector using message id's for deduplication? > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466409#comment-16466409 ] Richard Deurwaarder commented on FLINK-9311: PubSub works more like RabbitMQ than Kafka using ack/nack. It has atleast-once-delivery, but using a [message id|http://googleapis.github.io/googleapis/java/all/latest/apidocs/com/google/pubsub/v1/PubsubMessageOrBuilder.html#getMessageId--] exactly-once can be achived. Is there an existing connector using message id's for deduplication? > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9314) Enable SSL mutual authentication for Netty / TaskManagers
Stephan Ewen created FLINK-9314: --- Summary: Enable SSL mutual authentication for Netty / TaskManagers Key: FLINK-9314 URL: https://issues.apache.org/jira/browse/FLINK-9314 Project: Flink Issue Type: Sub-task Components: Security Reporter: Stephan Ewen Assignee: Stephan Ewen Making sure that TaskManagers authenticate both ways (client and server) requires giving access to keystore and truststore on both ends, and enabling the client authentication flag when creating the SSL Engine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9313) Enable mutual authentication for RPC (akka)
Stephan Ewen created FLINK-9313: --- Summary: Enable mutual authentication for RPC (akka) Key: FLINK-9313 URL: https://issues.apache.org/jira/browse/FLINK-9313 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Reporter: Stephan Ewen Assignee: Stephan Ewen Trivial, just needs to add the respective line in the akka configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9312) Perform mutual authentication during SSL handshakes
Stephan Ewen created FLINK-9312: --- Summary: Perform mutual authentication during SSL handshakes Key: FLINK-9312 URL: https://issues.apache.org/jira/browse/FLINK-9312 Project: Flink Issue Type: New Feature Components: Security Reporter: Stephan Ewen Fix For: 1.6.0 Currently, the Flink processes encrypted connections via SSL: - Data exchange TM - TM - RPC JM - TM - Blob Service JM - TM However, the server side always accepts any client to build up the connection, meaning the connections are not strongly authenticated. Activating SSL mutual authentication solves that - only processes that have the same certificate can connect. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work
[ https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466383#comment-16466383 ] ASF GitHub Bot commented on FLINK-8255: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186527205 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -157,15 +156,15 @@ public T set(T record, F fieldValue) { SimpleTupleFieldAccessor(int pos, TypeInformation typeInfo) { --- End diff -- accessing fields in a `Row` will fail because `Row` does not extend `Tuple`. For a proper fix, we would need a `RowFieldAccessor` and use that one when we deal with a `DataStream`. We would then need to add the `RowFieldAccessor` to the `FieldAccessorFactory`. > Key expressions on named row types do not work > -- > > Key: FLINK-8255 > URL: https://issues.apache.org/jira/browse/FLINK-8255 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Affects Versions: 1.4.0, 1.5.0 >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > The following program fails with a {{ClassCastException}}. It seems that key > expressions and rows are not tested well. We should add more tests for them. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT}; > String[] fieldNames = new String[]{"id", "value"}; > RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); > env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo) > .keyBy("id").sum("value").print(); > env.execute("Streaming WordCount"); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work
[ https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466382#comment-16466382 ] ASF GitHub Bot commented on FLINK-8255: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186527277 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -197,7 +196,7 @@ public T set(T record, F fieldValue) { checkNotNull(typeInfo, "typeInfo must not be null."); checkNotNull(innerAccessor, "innerAccessor must not be null."); - int arity = ((TupleTypeInfo) typeInfo).getArity(); + int arity = typeInfo.getArity(); --- End diff -- Same as for `SimpleTupleFieldAccessor`. > Key expressions on named row types do not work > -- > > Key: FLINK-8255 > URL: https://issues.apache.org/jira/browse/FLINK-8255 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Affects Versions: 1.4.0, 1.5.0 >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > The following program fails with a {{ClassCastException}}. It seems that key > expressions and rows are not tested well. We should add more tests for them. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT}; > String[] fieldNames = new String[]{"id", "value"}; > RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); > env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo) > .keyBy("id").sum("value").print(); > env.execute("Streaming WordCount"); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work
[ https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466385#comment-16466385 ] ASF GitHub Bot commented on FLINK-8255: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186528901 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java --- @@ -368,4 +369,23 @@ public void testIllegalBasicType2() { FieldAccessorf = FieldAccessorFactory.getAccessor(tpeInfo, "foo", null); } + + /** +* Validates that no ClassCastException happens +* should not fail e.g. like in FLINK-8255. +*/ + @Test + public void testRowTypeInfo() { --- End diff -- This test just validates that a `FieldAccessor` is created. At runtime it would fail with a `ClassCastException`. > Key expressions on named row types do not work > -- > > Key: FLINK-8255 > URL: https://issues.apache.org/jira/browse/FLINK-8255 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Affects Versions: 1.4.0, 1.5.0 >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > The following program fails with a {{ClassCastException}}. It seems that key > expressions and rows are not tested well. We should add more tests for them. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT}; > String[] fieldNames = new String[]{"id", "value"}; > RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); > env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo) > .keyBy("id").sum("value").print(); > env.execute("Streaming WordCount"); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work
[ https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466381#comment-16466381 ] ASF GitHub Bot commented on FLINK-8255: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186477977 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java --- @@ -41,7 +41,7 @@ * is regarded in the reduce function. First index has highest priority and last index has * least priority. */ - public SelectByMinFunction(TupleTypeInfo type, int... fields) { + public SelectByMinFunction(TupleTypeInfoBase type, int... fields) { --- End diff -- The `ReduceFunction` is still typed to `T extends Tuple` such that this will still fail at runtime. The same is true for all other built-in aggregation method like `sum()` and `min()` on `DataSet` and `UnsortedGrouping`. This cannot be resolved without major changes. I don't think we should add these features, but rather throw meaningful error messages instead of `ClassCastException`. Can you try to override the the `isTupleType()` method in `RowTypeInfo` and return `false`? This would prevent `Row` from being used in contexts that are only supported for `Tuple`. > Key expressions on named row types do not work > -- > > Key: FLINK-8255 > URL: https://issues.apache.org/jira/browse/FLINK-8255 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Affects Versions: 1.4.0, 1.5.0 >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > The following program fails with a {{ClassCastException}}. It seems that key > expressions and rows are not tested well. We should add more tests for them. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT}; > String[] fieldNames = new String[]{"id", "value"}; > RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); > env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo) > .keyBy("id").sum("value").print(); > env.execute("Streaming WordCount"); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work
[ https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466384#comment-16466384 ] ASF GitHub Bot commented on FLINK-8255: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186484649 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java --- @@ -230,4 +235,43 @@ public String toString() { } } + /** +* Validates that no ClassCastException happens +* should not fail e.g. like in FLINK-8255. +*/ + @Test + public void testMaxMinByRowTypeInfoKeyFieldsDataset() { + + final ExecutionEnvironment env = ExecutionEnvironment + .getExecutionEnvironment(); + TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT}; + + String[] fieldNames = new String[]{"id", "value"}; + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); + DataSet tupleDs = env + .fromCollection(Collections.singleton(new Row(2)), rowTypeInfo); + + tupleDs.maxBy(0); + tupleDs.minBy(0); + } + +/** + * Validates that no ClassCastException happens +* should not fail e.g. like in FLINK-8255. +*/ + @Test + public void testMaxMinByRowTypeInfoKeyFieldsForUnsortedGrouping() { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + TypeInformation[] types = new TypeInformation[]{Types.INT, Types.INT}; + + String[] fieldNames = new String[]{"id", "value"}; + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); + + UnsortedGrouping groupDs = env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo).groupBy(0); + + groupDs.maxBy(1); + groupDs.minBy(1); --- End diff -- The tests pass because the program is not executed. You would have to call `env.collect()` to run the program and compare the returned result against the expected result. As I pointed out before, this will fail, because the operator will cast the `Row` objects to `Tuple`. > Key expressions on named row types do not work > -- > > Key: FLINK-8255 > URL: https://issues.apache.org/jira/browse/FLINK-8255 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Affects Versions: 1.4.0, 1.5.0 >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > The following program fails with a {{ClassCastException}}. It seems that key > expressions and rows are not tested well. We should add more tests for them. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT}; > String[] fieldNames = new String[]{"id", "value"}; > RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); > env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo) > .keyBy("id").sum("value").print(); > env.execute("Streaming WordCount"); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186528901 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java --- @@ -368,4 +369,23 @@ public void testIllegalBasicType2() { FieldAccessorf = FieldAccessorFactory.getAccessor(tpeInfo, "foo", null); } + + /** +* Validates that no ClassCastException happens +* should not fail e.g. like in FLINK-8255. +*/ + @Test + public void testRowTypeInfo() { --- End diff -- This test just validates that a `FieldAccessor` is created. At runtime it would fail with a `ClassCastException`. ---
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186484649 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java --- @@ -230,4 +235,43 @@ public String toString() { } } + /** +* Validates that no ClassCastException happens +* should not fail e.g. like in FLINK-8255. +*/ + @Test + public void testMaxMinByRowTypeInfoKeyFieldsDataset() { + + final ExecutionEnvironment env = ExecutionEnvironment + .getExecutionEnvironment(); + TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT}; + + String[] fieldNames = new String[]{"id", "value"}; + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); + DataSet tupleDs = env + .fromCollection(Collections.singleton(new Row(2)), rowTypeInfo); + + tupleDs.maxBy(0); + tupleDs.minBy(0); + } + +/** + * Validates that no ClassCastException happens +* should not fail e.g. like in FLINK-8255. +*/ + @Test + public void testMaxMinByRowTypeInfoKeyFieldsForUnsortedGrouping() { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + TypeInformation[] types = new TypeInformation[]{Types.INT, Types.INT}; + + String[] fieldNames = new String[]{"id", "value"}; + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); + + UnsortedGrouping groupDs = env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo).groupBy(0); + + groupDs.maxBy(1); + groupDs.minBy(1); --- End diff -- The tests pass because the program is not executed. You would have to call `env.collect()` to run the program and compare the returned result against the expected result. As I pointed out before, this will fail, because the operator will cast the `Row` objects to `Tuple`. ---
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186477977 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java --- @@ -41,7 +41,7 @@ * is regarded in the reduce function. First index has highest priority and last index has * least priority. */ - public SelectByMinFunction(TupleTypeInfo type, int... fields) { + public SelectByMinFunction(TupleTypeInfoBase type, int... fields) { --- End diff -- The `ReduceFunction` is still typed to `T extends Tuple` such that this will still fail at runtime. The same is true for all other built-in aggregation method like `sum()` and `min()` on `DataSet` and `UnsortedGrouping`. This cannot be resolved without major changes. I don't think we should add these features, but rather throw meaningful error messages instead of `ClassCastException`. Can you try to override the the `isTupleType()` method in `RowTypeInfo` and return `false`? This would prevent `Row` from being used in contexts that are only supported for `Tuple`. ---
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186527205 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -157,15 +156,15 @@ public T set(T record, F fieldValue) { SimpleTupleFieldAccessor(int pos, TypeInformation typeInfo) { --- End diff -- accessing fields in a `Row` will fail because `Row` does not extend `Tuple`. For a proper fix, we would need a `RowFieldAccessor` and use that one when we deal with a `DataStream`. We would then need to add the `RowFieldAccessor` to the `FieldAccessorFactory`. ---
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186527277 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -197,7 +196,7 @@ public T set(T record, F fieldValue) { checkNotNull(typeInfo, "typeInfo must not be null."); checkNotNull(innerAccessor, "innerAccessor must not be null."); - int arity = ((TupleTypeInfo) typeInfo).getArity(); + int arity = typeInfo.getArity(); --- End diff -- Same as for `SimpleTupleFieldAccessor`. ---
[jira] [Commented] (FLINK-9310) Update default cyphersuites
[ https://issues.apache.org/jira/browse/FLINK-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466380#comment-16466380 ] ASF GitHub Bot commented on FLINK-9310: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5965 @EronWright This might be interesting to you. > Update default cyphersuites > --- > > Key: FLINK-9310 > URL: https://issues.apache.org/jira/browse/FLINK-9310 > Project: Flink > Issue Type: Task > Components: Security >Affects Versions: 1.4.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > > The current default cipher suite {{TLS_RSA_WITH_AES_128_CBC_SHA}} is no > longer recommended. > RFC 7525 [1] recommends to use the following cipher suites only: > * TLS_DHE_RSA_WITH_AES_128_GCM_SHA256 > * TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 > * TLS_DHE_RSA_WITH_AES_256_GCM_SHA384 > * TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 > [1] https://tools.ietf.org/html/rfc7525 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5965: [FLINK-9310] [security] Update standard cipher suites for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5965 @EronWright This might be interesting to you. ---
[jira] [Commented] (FLINK-9310) Update default cyphersuites
[ https://issues.apache.org/jira/browse/FLINK-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466379#comment-16466379 ] ASF GitHub Bot commented on FLINK-9310: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5965 [FLINK-9310] [security] Update standard cipher suites for secure mode ## What is the purpose of the change This sets the cipher suits accepted by default to those recommended in IETF RFC 7525 : https://tools.ietf.org/html/rfc7525 ## Brief change log Updates the default value of the respective config option to ``` TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 ``` ## Verifying this change This change is already covered by the existing tests that test SSL setups. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink update_cipher_suits Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5965.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5965 commit 9b24574cd437ddbc2d3546c1fa0f73e983c02e31 Author: Stephan EwenDate: 2018-05-07T17:47:00Z [FLINK-9310] [security] Update standard cipher suites for secure mode This sets the cipher suits accepted by default to those recommended in IETF RFC 7525 : https://tools.ietf.org/html/rfc7525 > Update default cyphersuites > --- > > Key: FLINK-9310 > URL: https://issues.apache.org/jira/browse/FLINK-9310 > Project: Flink > Issue Type: Task > Components: Security >Affects Versions: 1.4.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > > The current default cipher suite {{TLS_RSA_WITH_AES_128_CBC_SHA}} is no > longer recommended. > RFC 7525 [1] recommends to use the following cipher suites only: > * TLS_DHE_RSA_WITH_AES_128_GCM_SHA256 > * TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 > * TLS_DHE_RSA_WITH_AES_256_GCM_SHA384 > * TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 > [1] https://tools.ietf.org/html/rfc7525 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5965: [FLINK-9310] [security] Update standard cipher sui...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5965 [FLINK-9310] [security] Update standard cipher suites for secure mode ## What is the purpose of the change This sets the cipher suits accepted by default to those recommended in IETF RFC 7525 : https://tools.ietf.org/html/rfc7525 ## Brief change log Updates the default value of the respective config option to ``` TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 ``` ## Verifying this change This change is already covered by the existing tests that test SSL setups. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink update_cipher_suits Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5965.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5965 commit 9b24574cd437ddbc2d3546c1fa0f73e983c02e31 Author: Stephan EwenDate: 2018-05-07T17:47:00Z [FLINK-9310] [security] Update standard cipher suites for secure mode This sets the cipher suits accepted by default to those recommended in IETF RFC 7525 : https://tools.ietf.org/html/rfc7525 ---
[jira] [Closed] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-8690. Resolution: Implemented Fix Version/s: 1.6.0 Implemented for 1.6.0 with 53610c31e88d3c4194990de70fb99d9f935f2e0d > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Fix For: 1.6.0 > > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466356#comment-16466356 ] Stephan Ewen commented on FLINK-9311: - The RichParallelSourceFunction is a good place to start. What is the exactly-once model behind PubSub? Is it message ack/commit, like for example in RabbitMQ, or is it sequence-number/offset commits, like in Kafka/Kinesis? > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Priority: Minor > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6335) Parse DISTINCT over grouped window in stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466345#comment-16466345 ] ASF GitHub Bot commented on FLINK-6335: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3764 > Parse DISTINCT over grouped window in stream SQL > > > Key: FLINK-6335 > URL: https://issues.apache.org/jira/browse/FLINK-6335 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Major > > The SQL on the batch side supports the {{DISTINCT}} keyword over aggregation. > This jira proposes to support the {{DISTINCT}} keyword on streaming > aggregation using the same technique on the batch side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466348#comment-16466348 ] ASF GitHub Bot commented on FLINK-8690: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5940 > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null
[ https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466346#comment-16466346 ] ASF GitHub Bot commented on FLINK-8237: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5927 > BucketingSink throws NPE when Writer.duplicate returns null > --- > > Key: FLINK-8237 > URL: https://issues.apache.org/jira/browse/FLINK-8237 > Project: Flink > Issue Type: Bug >Reporter: Gábor Hermann >Priority: Minor > > Users need to look into Flink code to find the cause. We could catch that > null before even running the job. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6373) Add runtime support for distinct aggregation over grouped windows
[ https://issues.apache.org/jira/browse/FLINK-6373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466347#comment-16466347 ] ASF GitHub Bot commented on FLINK-6373: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3765 > Add runtime support for distinct aggregation over grouped windows > - > > Key: FLINK-6373 > URL: https://issues.apache.org/jira/browse/FLINK-6373 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Major > > This is a follow up task for FLINK-6335. FLINK-6335 enables parsing the > distinct aggregations over grouped windows. This jira tracks the effort of > adding runtime support for the query. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #3764: [FLINK-6335] Parse DISTINCT over grouped windows i...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3764 ---
[GitHub] flink pull request #3765: [FLINK-6373] Add runtime support for distinct aggr...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3765 ---
[GitHub] flink pull request #5927: [FLINK-8237] [BucketingSink] Better error message ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5927 ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5940 ---
[jira] [Created] (FLINK-9311) PubSub connector
Richard Deurwaarder created FLINK-9311: -- Summary: PubSub connector Key: FLINK-9311 URL: https://issues.apache.org/jira/browse/FLINK-9311 Project: Flink Issue Type: New Feature Components: Streaming Connectors Reporter: Richard Deurwaarder I would like start adding some google cloud connectors starting with a PubSub Source. I have a basic implementation ready but I want it to be able to: * easily scale up (should I have it extend RichParallelSourceFunction for this?) * Make it easier to provide the google cloud credentials. This would require being able to send some json string / ServiceAccount to the nodes when starting up this source. Could this be something that would be useful for others and added to the flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException
[ https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466281#comment-16466281 ] Narayanan Arunachalam commented on FLINK-9302: -- Thanks [~srichter]. I ran some tests over the weekend and found that though the error is from S3, it was actually because of the size of the checkpoint. In one of my tests, after the job recovered from a last good checkpoint, the state size continued to grow at ~10G every 15 mins. This was because, too much data was read in to the pipeline with `maxOutOfOrderness` set to 60 secs. The windows won't fire soon enough resulting in large states. One option is to scale the cluster to deal with many open windows. But I realize `AscendingTimestampExtractor` might be enough for my use case and running some tests using this setting. In any case, this particular error is a side effect of the way my windows are setup and no direct evidence of any bug. Thought I will post this comment anyway to keep you all in sync. > Checkpoints continues to fail when using filesystem state backend with > CIRCULAR REFERENCE:java.io.IOException > - > > Key: FLINK-9302 > URL: https://issues.apache.org/jira/browse/FLINK-9302 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > *state backend: filesystem* > *checkpoint.mode:EXACTLY_ONCE* > +dag:+ > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(EventTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(EventTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > * The job runs fine and checkpoints succeed for few hours. > * Later it fails because of the following checkpoint error. > * Once the job is recovered from the last successful checkpoint, it > continues to fail with the same checkpoint error. > * This persists until the job is restarted with no checkpoint state or using > the checkpoint previous to the last good one. > AsynchronousException\{java.lang.Exception: Could not materialize checkpoint > 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: > NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 42 for > operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, > makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed keyed > state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at >
[jira] [Created] (FLINK-9310) Update default cyphersuites
Stephan Ewen created FLINK-9310: --- Summary: Update default cyphersuites Key: FLINK-9310 URL: https://issues.apache.org/jira/browse/FLINK-9310 Project: Flink Issue Type: Task Components: Security Affects Versions: 1.4.2 Reporter: Stephan Ewen Assignee: Stephan Ewen The current default cipher suite {{TLS_RSA_WITH_AES_128_CBC_SHA}} is no longer recommended. RFC 7525 [1] recommends to use the following cipher suites only: * TLS_DHE_RSA_WITH_AES_128_GCM_SHA256 * TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 * TLS_DHE_RSA_WITH_AES_256_GCM_SHA384 * TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 [1] https://tools.ietf.org/html/rfc7525 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9265) Upgrade Prometheus version
[ https://issues.apache.org/jira/browse/FLINK-9265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466236#comment-16466236 ] ASF GitHub Bot commented on FLINK-9265: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5936 Yes, before giving +1 to this commit, we need to check that this introduces no new transitive dependency, or need to make sure that dependency is not an issue. > Upgrade Prometheus version > -- > > Key: FLINK-9265 > URL: https://issues.apache.org/jira/browse/FLINK-9265 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > We're using 0.0.26 > Latest release is 2.2.1 > This issue is for upgrading the Prometheus version -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5936: [FLINK-9265] Upgrade Prometheus version
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5936 Yes, before giving +1 to this commit, we need to check that this introduces no new transitive dependency, or need to make sure that dependency is not an issue. ---
[jira] [Commented] (FLINK-9281) LogBack not working
[ https://issues.apache.org/jira/browse/FLINK-9281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466229#comment-16466229 ] Stephan Ewen commented on FLINK-9281: - Do you have any insights into why it does not work with newer logback versions? Is the slf4j version in Flink outdated? > LogBack not working > --- > > Key: FLINK-9281 > URL: https://issues.apache.org/jira/browse/FLINK-9281 > Project: Flink > Issue Type: Bug > Components: Logging >Affects Versions: 1.4.2 >Reporter: Tim >Priority: Major > > I am trying to get Flink to work with Logback instead of Log4J. However, it > is not working. > My setup is as follows the advice on this page: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html#use-logback-when-running-flink-on-a-cluster > * Flink v1.4.2 running a stand-alone cluster. > * Started JobManager as a foreground process (bin/jobmanager.sh > start-foreground cluster). I updated bin/flink-console.sh to reference > logback.xml via -Dlogback.configurationFile=file:/path/to/logfile. > * Removed log4j jars under libs/ (log4j-1.2.xx.jar and > sfl4j-log4j12-xxx.jar) > * Added logback jars under libs/ (logback-classic, logback-core, > log4j-over-slf4j.jar) > However, I am not getting any file created. Also, as a dumb test I > referenced a non-existent logback.xml file (changed path to a non-existent > folder) just to see if any errors appear on stdout, but nothing. > Thanks > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9300) Improve error message when in-memory state is too large
[ https://issues.apache.org/jira/browse/FLINK-9300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466227#comment-16466227 ] Stephan Ewen commented on FLINK-9300: - This should actually not happen with the FsStateBackend. The message occurs when "in-line" state (i.e. state stored in the state handled, persisted with the metadata) grows too large. The FsStateBackend writes state on checkpoints directly to the file system and communicate only the paths in the state handles. This looks like there is a case where either the configuration is not properly propagated (and the MemoryStateBackend is used after all), or some code path uses the wrong type of state handle. Can you share the stack trace of the original problem? > Improve error message when in-memory state is too large > --- > > Key: FLINK-9300 > URL: https://issues.apache.org/jira/browse/FLINK-9300 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: Ken Krugler >Assignee: vinoyang >Priority: Minor > > Currently in the {{MemCheckpointStreamFactory.checkSize()}} method, it can > throw an {{IOException}} via: > {code:java} > throw new IOException( > "Size of the state is larger than the maximum permitted memory-backed state. > Size=" > + size + " , maxSize=" + maxSize > + " . Consider using a different state backend, like the File System State > backend.");{code} > But this will happen even if you’re using the File System State backend. > This came up here: > [https://stackoverflow.com/questions/50149005/ioexception-size-of-the-state-is-larger-than-the-maximum-permitted-memory-backe] > We could change the message to be: > {quote}Please consider increasing the maximum permitted memory size, > increasing the task manager parallelism, or using a non-memory-based state > backend such as RocksDB. > {quote} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9301) NotSoMiniClusterIterations job fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466224#comment-16466224 ] Stephan Ewen commented on FLINK-9301: - Okay, so apparently the required memory per TaskManager is now a bit higher than it was. Can we fix that by simply reducing the number of TaskManagers? The test was initially a "manual test", so not executed automatically as part of a build, and it was intended to observe and debug execution of a reasonably complex job locally, to assess if deployment and ramp up happens fast. > NotSoMiniClusterIterations job fails on travis > -- > > Key: FLINK-9301 > URL: https://issues.apache.org/jira/browse/FLINK-9301 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Critical > Fix For: 1.5.0 > > > The high-parallelism-iterations-test fails on travis. After starting ~55 > taskmanagers all memory is used and further memory allocations fail. > I'm currently letting it run another time, if it fails again I will disable > the test temporarily. > https://travis-ci.org/zentol/flink-ci/builds/375189790 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9305) Register flink-s3-fs-hadoop for the s3a:// scheme as well
[ https://issues.apache.org/jira/browse/FLINK-9305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466222#comment-16466222 ] Stephan Ewen commented on FLINK-9305: - I am not 100% sure about this issue. Flink's shaded s3a currently does not work with the bucketing sink and will never work with Hadoop Input and Output formats, because it does not look like a Hadoop FS any more. This would prevent using vanilla s3a in addition. One could argue, though, that in that particular case, users should only use vanilla s3a in the first place. > Register flink-s3-fs-hadoop for the s3a:// scheme as well > - > > Key: FLINK-9305 > URL: https://issues.apache.org/jira/browse/FLINK-9305 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > For enhanced user experience, we should also register our Hadoop S3A-based > shaded S3 file system implementation for the {{s3a://}} file system scheme, > not just {{s3://}}. This way, the user can easily switch from the manual S3 > integration to the shaded one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9308) The method enableCheckpointing with low values like 10 are forming DoS on Kafka Clusters
[ https://issues.apache.org/jira/browse/FLINK-9308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466209#comment-16466209 ] Stephan Ewen commented on FLINK-9308: - The rate at which Kafka can handle offset commits should not be really the biggest concern here. Flink checkpoints do not need to commit to Kafka, this is optional. > The method enableCheckpointing with low values like 10 are forming DoS on > Kafka Clusters > > > Key: FLINK-9308 > URL: https://issues.apache.org/jira/browse/FLINK-9308 > Project: Flink > Issue Type: Bug >Reporter: Seweryn Habdank-Wojewodzki >Assignee: vinoyang >Priority: Major > > Hi, > Docus about Checkpoints in Flink contains such an example: > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // start a checkpoint every 1000 ms > env.enableCheckpointing(1000); > {code} > Nice. There is one hack. The enableCheckpointing( parametr /* in [ms]*/); > when used with eg. 1 or 10 will kill Kafka Server by continuous commits of > offsets. > Every creatiive developer, who would like to defend the SW from duplication > of messages in case of crash, will decrease this parameter to minimum. He > will protect his app, but on the Kafka Broker/Server side he will cause DoS. > Can you have a look, to limit minimum value in case of Kafka Stream > Environment? > I am not sure if 100ms as minimum is enough, but 1000 ms as minimum would be > nice. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466166#comment-16466166 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186480680 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME + } + + /** +* @return the key as a byte array (null if no key has been set). +*/ + byte[] getKey(); + + /** +* @return The message, as a byte array (null if the message was empty or deleted). +*/ + byte[] getMessage(); + + /** +* @return The topic the message has originated from (for example the Kafka topic). +*/ + String getTopic(); + + /** +* @return The partition the message has originated from (for example the Kafka partition). +*/ + int getPartition(); + + /** +* @return the offset of the message in the original source (for example the Kafka offset). +*/ + long getOffset(); + + /** +* @return the timestamp of the consumer record --- End diff -- I've added some more comments > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186480680 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME + } + + /** +* @return the key as a byte array (null if no key has been set). +*/ + byte[] getKey(); + + /** +* @return The message, as a byte array (null if the message was empty or deleted). +*/ + byte[] getMessage(); + + /** +* @return The topic the message has originated from (for example the Kafka topic). +*/ + String getTopic(); + + /** +* @return The partition the message has originated from (for example the Kafka partition). +*/ + int getPartition(); + + /** +* @return the offset of the message in the original source (for example the Kafka offset). +*/ + long getOffset(); + + /** +* @return the timestamp of the consumer record --- End diff -- I've added some more comments ---
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466157#comment-16466157 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186479633 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . --- End diff -- I added some more text to the javadoc explaining that implementing the other `deserialize` method has more benefits. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186479633 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . --- End diff -- I added some more text to the javadoc explaining that implementing the other `deserialize` method has more benefits. ---
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466141#comment-16466141 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186479026 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- I agree that it's probably better to make separate `DeserializationSchema` classes, for each connector type. But for now I think this is a relative easy fix without breaking the Flink API for the custom deserializers. There is already some discussion about redesigning the connectors (see issue 5479) with a `common connector framework` in mind. I think that would be a good place to decide what to do with a shared `DeserializationSchema`. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186479026 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- I agree that it's probably better to make separate `DeserializationSchema` classes, for each connector type. But for now I think this is a relative easy fix without breaking the Flink API for the custom deserializers. There is already some discussion about redesigning the connectors (see issue 5479) with a `common connector framework` in mind. I think that would be a good place to decide what to do with a shared `DeserializationSchema`. ---
[jira] [Commented] (FLINK-9253) Make buffer count per InputGate always #channels*buffersPerChannel + ExclusiveBuffers
[ https://issues.apache.org/jira/browse/FLINK-9253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466138#comment-16466138 ] ASF GitHub Bot commented on FLINK-9253: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5923 Apparently, there's a problem with unknown input channels that surfaced with the newly-merged extra tests: they don't get exclusive buffers (naturally) but the floating buffers are calculated and would need to be updated when the channels' types are known. I'll take a look in the next few days. > Make buffer count per InputGate always #channels*buffersPerChannel + > ExclusiveBuffers > - > > Key: FLINK-9253 > URL: https://issues.apache.org/jira/browse/FLINK-9253 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.5.0 > > > The credit-based flow control path assigns exclusive buffers only to remote > channels (which makes sense since local channels don't use any own buffers). > However, this is a bit intransparent with respect to how much data may be in > buffers since this depends on the actual schedule of the job and not the job > graph. > By adapting the floating buffers to use a maximum of > {{#channels*buffersPerChannel + floatingBuffersPerGate - #exclusiveBuffers}}, > we would be channel-type agnostic and keep the old behaviour. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5923: [FLINK-9253][network] make the maximum floating buffers c...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5923 Apparently, there's a problem with unknown input channels that surfaced with the newly-merged extra tests: they don't get exclusive buffers (naturally) but the floating buffers are calculated and would need to be updated when the channels' types are known. I'll take a look in the next few days. ---
[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466112#comment-16466112 ] ASF GitHub Bot commented on FLINK-8655: --- GitHub user ctamisier opened a pull request: https://github.com/apache/flink/pull/5964 [FLINK-8655] [Cassandra Connector] add keyspace in cassandra sink builder ## What is the purpose of the change This PR is an alternative to https://github.com/apache/flink/pull/5538. ## Brief change log A discussion started on https://github.com/apache/flink/pull/5538 that i'm quoting here: > What about using the Configuration that is provided in RichFunction.open(Configuration parameters) for the CassandraSinkBase.open(Configuration configuration) {...} implementation ? > > I saw in old docs that Configuration can be used in the open(...) method but today (1.4+) it might not be a good practice anymore. > > What about adding keyspace attribute in CassandraPojoSink and CassandraSinkBuilder (throwing exception when not using a CassandraPojoSinkBuilder for the moment). > And create a new Configuration() with this keyspace in CassandraPojoSink. > And finally do a cluster.connect(keyspace); > > I've done this here if you can have a look. > I've updated CassandraConnectorITCase with a new test. > I would like to run CassandraPojoSinkExample.main() to cover the CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink master branch). > > Can this be a candidate for a PR, I am new to flink so it might break the flink good practice principles... > Let me know! The reply from zentol (Chesnay) (**I will check this point**): > We would have to pass the keyspace via the constructor as the Configuration approach doesn't work for streaming. > > Generally speaking it isn't a problem to set the keyspace when creating the connection. But I would like to know what happens if a POJO comes along that explicitly sets the keyspace; is it ignored, respected or will it cause an exception? ## Verifying this change This change added tests and can be verified as follows: - `testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink()` in `CassandraConnectorITCase.java` - Need to run CassandraPojoSinkExample.main() to cover the CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink master branch, **I will investigate**) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **don't know** - The runtime per-record code paths (performance sensitive): **don't know** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **JavaDocs** You can merge this pull request into a Git repository by running: $ git pull https://github.com/ctamisier/flink pojo-keyspace Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5964.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5964 commit 1ed3518a5f80c5e6bbbf70ab0b3c3d20b60f2e2f Author: Clément TamisierDate: 2018-05-06T15:04:18Z [FLINK-8655] add keyspace in cassandra sink builder > Add a default keyspace to CassandraSink > --- > > Key: FLINK-8655 > URL: https://issues.apache.org/jira/browse/FLINK-8655 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.0 >Reporter: Christopher Hughes >Priority: Minor > Labels: features > Fix For: 1.6.0 > > > Currently, to use the CassandraPojoSink, it is necessary for a user to > provide keyspace information on the desired POJOs using datastax annotations. > This allows various POJOs to be written to multiple keyspaces while sinking > messages, but prevent runtime flexibility. > For many developers, non-production environments may all share a single > Cassandra instance differentiated by keyspace names. I propose adding a > `defaultKeyspace(String keyspace)` to the ClusterBuilder. POJOs lacking a > definitive keyspace would attempt to be loaded to the provided default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
GitHub user ctamisier opened a pull request: https://github.com/apache/flink/pull/5964 [FLINK-8655] [Cassandra Connector] add keyspace in cassandra sink builder ## What is the purpose of the change This PR is an alternative to https://github.com/apache/flink/pull/5538. ## Brief change log A discussion started on https://github.com/apache/flink/pull/5538 that i'm quoting here: > What about using the Configuration that is provided in RichFunction.open(Configuration parameters) for the CassandraSinkBase.open(Configuration configuration) {...} implementation ? > > I saw in old docs that Configuration can be used in the open(...) method but today (1.4+) it might not be a good practice anymore. > > What about adding keyspace attribute in CassandraPojoSink and CassandraSinkBuilder (throwing exception when not using a CassandraPojoSinkBuilder for the moment). > And create a new Configuration() with this keyspace in CassandraPojoSink. > And finally do a cluster.connect(keyspace); > > I've done this here if you can have a look. > I've updated CassandraConnectorITCase with a new test. > I would like to run CassandraPojoSinkExample.main() to cover the CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink master branch). > > Can this be a candidate for a PR, I am new to flink so it might break the flink good practice principles... > Let me know! The reply from zentol (Chesnay) (**I will check this point**): > We would have to pass the keyspace via the constructor as the Configuration approach doesn't work for streaming. > > Generally speaking it isn't a problem to set the keyspace when creating the connection. But I would like to know what happens if a POJO comes along that explicitly sets the keyspace; is it ignored, respected or will it cause an exception? ## Verifying this change This change added tests and can be verified as follows: - `testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink()` in `CassandraConnectorITCase.java` - Need to run CassandraPojoSinkExample.main() to cover the CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink master branch, **I will investigate**) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **don't know** - The runtime per-record code paths (performance sensitive): **don't know** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **JavaDocs** You can merge this pull request into a Git repository by running: $ git pull https://github.com/ctamisier/flink pojo-keyspace Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5964.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5964 commit 1ed3518a5f80c5e6bbbf70ab0b3c3d20b60f2e2f Author: Clément TamisierDate: 2018-05-06T15:04:18Z [FLINK-8655] add keyspace in cassandra sink builder ---
[jira] [Commented] (FLINK-6373) Add runtime support for distinct aggregation over grouped windows
[ https://issues.apache.org/jira/browse/FLINK-6373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466068#comment-16466068 ] ASF GitHub Bot commented on FLINK-6373: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3765 The features that this PR was going to implement has been resolved by PR #. I will close it. > Add runtime support for distinct aggregation over grouped windows > - > > Key: FLINK-6373 > URL: https://issues.apache.org/jira/browse/FLINK-6373 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Major > > This is a follow up task for FLINK-6335. FLINK-6335 enables parsing the > distinct aggregations over grouped windows. This jira tracks the effort of > adding runtime support for the query. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #3765: [FLINK-6373] Add runtime support for distinct aggregation...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3765 The features that this PR was going to implement has been resolved by PR #. I will close it. ---
[GitHub] flink issue #3764: [FLINK-6335] Parse DISTINCT over grouped windows in strea...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3764 This PR has been integrated into #5940. I'll close it. ---
[jira] [Commented] (FLINK-6335) Parse DISTINCT over grouped window in stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466067#comment-16466067 ] ASF GitHub Bot commented on FLINK-6335: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3764 This PR has been integrated into #5940. I'll close it. > Parse DISTINCT over grouped window in stream SQL > > > Key: FLINK-6335 > URL: https://issues.apache.org/jira/browse/FLINK-6335 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Major > > The SQL on the batch side supports the {{DISTINCT}} keyword over aggregation. > This jira proposes to support the {{DISTINCT}} keyword on streaming > aggregation using the same technique on the batch side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466066#comment-16466066 ] ASF GitHub Bot commented on FLINK-8690: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5940 merging > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null
[ https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466065#comment-16466065 ] ASF GitHub Bot commented on FLINK-8237: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5927 merging > BucketingSink throws NPE when Writer.duplicate returns null > --- > > Key: FLINK-8237 > URL: https://issues.apache.org/jira/browse/FLINK-8237 > Project: Flink > Issue Type: Bug >Reporter: Gábor Hermann >Priority: Minor > > Users need to look into Flink code to find the cause. We could catch that > null before even running the job. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5940 merging ---
[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5927 merging ---
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466062#comment-16466062 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186465223 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME --- End diff -- I'll rename them both. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186465223 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME --- End diff -- I'll rename them both. ---
[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null
[ https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466051#comment-16466051 ] ASF GitHub Bot commented on FLINK-8237: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5927 Thanks for the update @pavel-shvetsov-git. +1 to merge > BucketingSink throws NPE when Writer.duplicate returns null > --- > > Key: FLINK-8237 > URL: https://issues.apache.org/jira/browse/FLINK-8237 > Project: Flink > Issue Type: Bug >Reporter: Gábor Hermann >Priority: Minor > > Users need to look into Flink code to find the cause. We could catch that > null before even running the job. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5927 Thanks for the update @pavel-shvetsov-git. +1 to merge ---
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466048#comment-16466048 ] ASF GitHub Bot commented on FLINK-8690: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5940 Thanks for the update @walterddr. The PR is good to merge. > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466047#comment-16466047 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186462871 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -78,6 +79,69 @@ public Kafka010Fetcher( useMetrics); } + private class KafkaConsumerRecordWrapper10 implements ConsumerRecordMetaInfo { + private static final long serialVersionUID = 2651665280744549935L; + + private final ConsumerRecordconsumerRecord; + + public KafkaConsumerRecordWrapper10(ConsumerRecord consumerRecord) { + this.consumerRecord = consumerRecord; + } + + @Override + public byte[] getKey() { + return consumerRecord.key(); + } + + @Override + public byte[] getMessage() { + return consumerRecord.value(); + } + + @Override + public String getTopic() { + return consumerRecord.topic(); + } + + @Override + public int getPartition() { + return consumerRecord.partition(); + } + + @Override + public long getOffset() { + return consumerRecord.offset(); + } + + @Override + public long getTimestamp() { + return Long.MIN_VALUE; --- End diff -- Yes, it certainly does, it should return the `consumerRecord.timestamp()`. I'll fix it. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5940 Thanks for the update @walterddr. The PR is good to merge. ---
[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...
Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186462871 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -78,6 +79,69 @@ public Kafka010Fetcher( useMetrics); } + private class KafkaConsumerRecordWrapper10 implements ConsumerRecordMetaInfo { + private static final long serialVersionUID = 2651665280744549935L; + + private final ConsumerRecordconsumerRecord; + + public KafkaConsumerRecordWrapper10(ConsumerRecord consumerRecord) { + this.consumerRecord = consumerRecord; + } + + @Override + public byte[] getKey() { + return consumerRecord.key(); + } + + @Override + public byte[] getMessage() { + return consumerRecord.value(); + } + + @Override + public String getTopic() { + return consumerRecord.topic(); + } + + @Override + public int getPartition() { + return consumerRecord.partition(); + } + + @Override + public long getOffset() { + return consumerRecord.offset(); + } + + @Override + public long getTimestamp() { + return Long.MIN_VALUE; --- End diff -- Yes, it certainly does, it should return the `consumerRecord.timestamp()`. I'll fix it. ---
[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable
[ https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466037#comment-16466037 ] Ted Yu commented on FLINK-9303: --- {code} + if (!removedPartitions.isEmpty()) { + log.info("Removing " + removedPartitions.size() + " partition(s) from consumer."); + partitionsToBeRemoved.removeAll(removedPartitions); {code} {{removeAll}} can be saved by using iterator removal for partitionsToBeRemoved in the preceding loop. > Unassign partitions from Kafka client if partitions become unavailable > -- > > Key: FLINK-9303 > URL: https://issues.apache.org/jira/browse/FLINK-9303 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.6.0 > > > Originally reported in ML: > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html] > The problem is that the Kafka consumer has no notion of "closed" partitions > at the moment, so statically assigned partitions to the Kafka client is never > removed and is always continuously requested for records. > This causes LOG noises as reported in the reported mail thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9064) Add Scaladocs link to documentation
[ https://issues.apache.org/jira/browse/FLINK-9064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466020#comment-16466020 ] ASF GitHub Bot commented on FLINK-9064: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5773 cc @zentol @GJL @tzulitai please review thanks~ > Add Scaladocs link to documentation > --- > > Key: FLINK-9064 > URL: https://issues.apache.org/jira/browse/FLINK-9064 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Matt Hagen >Assignee: vinoyang >Priority: Minor > Original Estimate: 5m > Remaining Estimate: 5m > > Browse to the [Apache Flink > Documentation|https://ci.apache.org/projects/flink/flink-docs-master/] page. > On the sidebar, under the Javadocs link, I recommend that you add a > [Scaladocs|https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/scala/index.html#org.apache.flink.api.scala.package] > link. > Thanks! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5773: [FLINK-9064] Add Scaladocs link to documentation
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5773 cc @zentol @GJL @tzulitai please review thanks~ ---
[jira] [Commented] (FLINK-7001) Improve performance of Sliding Time Window with pane optimization
[ https://issues.apache.org/jira/browse/FLINK-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466014#comment-16466014 ] Rong Rong commented on FLINK-7001: -- Thanks [~pgrulich], This is definitely a great solution when handling high frequency, long length sliding windows. I briefly went over the paper and got a few questions regarding the use cases and compatibility. * The non-overlapping slide separator + slide manager approach is very elegant in order to save memory buffer usage and having a sole slice manager to handle out-of-order messages by updating the slices in store is definitely great. My concern is with [~StephanEwen] on this especially the backward & RocksDB compatibility. * Another point is the partial aggregates vs final aggregates complexity. There's little discussed in the paper regarding the "Window manager" and seems like the assumption is the final aggregate over the partial results will have the same amount of time/space complexity comparing with the partial aggregates. Most of the built-in aggregate functions we currently have in Flink are pretty much satisfied with this assumption, however, there are some complex aggregate functions of which the "merge" method might be much more complex than the "accumulate" methods. Would we have to consider the trade off between these two approaches? * https://issues.apache.org/jira/browse/FLINK-5387 seems to suggest there are trade-offs when using aligning window approaches. We can probably extend the discussions here. Thanks, Rong > Improve performance of Sliding Time Window with pane optimization > - > > Key: FLINK-7001 > URL: https://issues.apache.org/jira/browse/FLINK-7001 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > > Currently, the implementation of time-based sliding windows treats each > window individually and replicates records to each window. For a window of 10 > minute size that slides by 1 second the data is replicated 600 fold (10 > minutes / 1 second). We can optimize sliding window by divide windows into > panes (aligned with slide), so that we can avoid record duplication and > leverage the checkpoint. > I will attach a more detail design doc to the issue. > The following issues are similar to this issue: FLINK-5387, FLINK-6990 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7897) Consider using nio.Files for file deletion in TransientBlobCleanupTask
[ https://issues.apache.org/jira/browse/FLINK-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466011#comment-16466011 ] ASF GitHub Bot commented on FLINK-7897: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5777 cc @zentol @GJL @tzulitai this PR takes a long time, please review it thanks. > Consider using nio.Files for file deletion in TransientBlobCleanupTask > -- > > Key: FLINK-7897 > URL: https://issues.apache.org/jira/browse/FLINK-7897 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > nio.Files#delete() provides better clue as to why the deletion may fail: > https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path) > Depending on the potential exception (FileNotFound), the call to > localFile.exists() may be skipped. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5777: [FLINK-7897] Consider using nio.Files for file deletion i...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5777 cc @zentol @GJL @tzulitai this PR takes a long time, please review it thanks. ---
[jira] [Assigned] (FLINK-8999) Ensure the job has an operator with operator state.
[ https://issues.apache.org/jira/browse/FLINK-8999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-8999: --- Assignee: (was: mingleizhang) > Ensure the job has an operator with operator state. > --- > > Key: FLINK-8999 > URL: https://issues.apache.org/jira/browse/FLINK-8999 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7917) The return of taskInformationOrBlobKey should be placed inside synchronized in ExecutionJobVertex
[ https://issues.apache.org/jira/browse/FLINK-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466010#comment-16466010 ] ASF GitHub Bot commented on FLINK-7917: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5798 cc @zentol @GJL @tzulitai this PR takes a long time, please review thanks~ > The return of taskInformationOrBlobKey should be placed inside synchronized > in ExecutionJobVertex > - > > Key: FLINK-7917 > URL: https://issues.apache.org/jira/browse/FLINK-7917 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > Currently in ExecutionJobVertex#getTaskInformationOrBlobKey: > {code} > } > return taskInformationOrBlobKey; > {code} > The return should be placed inside synchronized block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5798: [FLINK-7917] The return of taskInformationOrBlobKey shoul...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5798 cc @zentol @GJL @tzulitai this PR takes a long time, please review thanks~ ---