[jira] [Commented] (FLINK-6376) when deploy flink cluster on the yarn, it is lack of hdfs delegation token.
[ https://issues.apache.org/jira/browse/FLINK-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984204#comment-15984204 ] ASF GitHub Bot commented on FLINK-6376: --- GitHub user Rucongzhang opened a pull request: https://github.com/apache/flink/pull/3776 [FLINK-6376]when deploy flink cluster on the yarn, it is lack of hdfs delegation. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/Rucongzhang/flink flink-6376 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3776.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3776 commit 256f519cda73571c73914c98b3c9ff4381520907 Author: z00376786Date: 2017-04-26T03:36:43Z when deploy flink cluster on the yarn, it is lack of hdfs delegation token > when deploy flink cluster on the yarn, it is lack of hdfs delegation token. > --- > > Key: FLINK-6376 > URL: https://issues.apache.org/jira/browse/FLINK-6376 > Project: Flink > Issue Type: Bug > Components: Security, YARN >Reporter: zhangrucong1982 >Assignee: zhangrucong1982 > > 1、I use the flink of version 1.2.0. And I deploy the flink cluster on the > yarn. The hadoop version is 2.7.2. > 2、I use flink in security model with the keytab and principal. And the key > configuration is :security.kerberos.login.keytab: /home/ketab/test.keytab > 、security.kerberos.login.principal: test. > 3、The yarn configuration is default and enable the yarn log aggregation > configuration" yarn.log-aggregation-enable : true"; > 4、 Deploying the flink cluster on the yarn, the yarn Node manager occur the > following failure when aggregation the log in HDFS. The basic reason is lack > of HDFS delegation token. > java.io.IOException: Failed on local exception: java.io.IOException: > org.apache.hadoop.security.AccessControlException: Client cannot authenticate > via:[TOKEN, KERBEROS]; Host Details : local host is: > "SZV1000258954/10.162.181.24"; destination host is: "SZV1000258954":25000; > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:796) > at org.apache.hadoop.ipc.Client.call(Client.java:1515) > at org.apache.hadoop.ipc.Client.call(Client.java:1447) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) > at com.sun.proxy.$Proxy26.getFileInfo(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:802) > at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:201) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) > at com.sun.proxy.$Proxy27.getFileInfo(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1919) > at > org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1500) > at > org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1496) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at >
[GitHub] flink pull request #3776: [FLINK-6376]when deploy flink cluster on the yarn,...
GitHub user Rucongzhang opened a pull request: https://github.com/apache/flink/pull/3776 [FLINK-6376]when deploy flink cluster on the yarn, it is lack of hdfs delegation. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/Rucongzhang/flink flink-6376 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3776.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3776 commit 256f519cda73571c73914c98b3c9ff4381520907 Author: z00376786Date: 2017-04-26T03:36:43Z when deploy flink cluster on the yarn, it is lack of hdfs delegation token --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy
[ https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984131#comment-15984131 ] ASF GitHub Bot commented on FLINK-5867: --- Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/3773#discussion_r113361061 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java --- @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * FailoverRegion manages the failover of a minimal pipeline connected sub graph. + * It will change from CREATED to CANCELING and then to CANCELLED and at last to RUNNING, + */ +public class FailoverRegion { + + private static final AtomicReferenceFieldUpdaterSTATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state"); + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class); + + // + + /** a unique id for debugging */ + private final AbstractID id = new AbstractID(); + + private final ExecutionGraph executionGraph; + + private final List connectedExecutionVertexes; + + /** The executor that executes the recovery action after all vertices are in a */ + private final Executor executor; + + /** Current status of the job execution */ + private volatile JobStatus state = JobStatus.RUNNING; + + + public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List connectedExecutions) { + this.executionGraph = checkNotNull(executionGraph); + this.executor = checkNotNull(executor); + this.connectedExecutionVertexes = checkNotNull(connectedExecutions); + + LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions); + } + + public void onExecutionFail(Execution taskExecution, Throwable cause) { + // TODO: check if need to failover the preceding region + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + executionGraph.failGlobal(cause); + } + else { + cancel(taskExecution.getGlobalModVersion()); + } + } + + private void allVerticesInTerminalState(long globalModVersionOfFailover) { + while (true) { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.CANCELLING)) { +
[GitHub] flink pull request #3773: [FLINK-5867] [FLINK-5866] [flip-1] Implement Failo...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/3773#discussion_r113361061 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java --- @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * FailoverRegion manages the failover of a minimal pipeline connected sub graph. + * It will change from CREATED to CANCELING and then to CANCELLED and at last to RUNNING, + */ +public class FailoverRegion { + + private static final AtomicReferenceFieldUpdaterSTATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state"); + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class); + + // + + /** a unique id for debugging */ + private final AbstractID id = new AbstractID(); + + private final ExecutionGraph executionGraph; + + private final List connectedExecutionVertexes; + + /** The executor that executes the recovery action after all vertices are in a */ + private final Executor executor; + + /** Current status of the job execution */ + private volatile JobStatus state = JobStatus.RUNNING; + + + public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List connectedExecutions) { + this.executionGraph = checkNotNull(executionGraph); + this.executor = checkNotNull(executor); + this.connectedExecutionVertexes = checkNotNull(connectedExecutions); + + LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions); + } + + public void onExecutionFail(Execution taskExecution, Throwable cause) { + // TODO: check if need to failover the preceding region + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + executionGraph.failGlobal(cause); + } + else { + cancel(taskExecution.getGlobalModVersion()); + } + } + + private void allVerticesInTerminalState(long globalModVersionOfFailover) { + while (true) { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.CANCELLING)) { + if (transitionState(curStatus, JobStatus.CANCELED)) { + reset(globalModVersionOfFailover); + break; + } +
[jira] [Commented] (FLINK-6386) Missing bracket in 'Compiler Limitation' section
[ https://issues.apache.org/jira/browse/FLINK-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984096#comment-15984096 ] ASF GitHub Bot commented on FLINK-6386: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/3775 [FLINK-6386] Missing bracket in 'Compiler Limitation' section Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-6386] Missing bracket in 'Compiler Limitation' section") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-6386 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3775.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3775 commit 1533645864465d4ce174d5e9eef3cbb11dead5e3 Author: Bowen LiDate: 2017-04-26T02:55:25Z FLINK-6386 Missing bracket in 'Compiler Limitation' section > Missing bracket in 'Compiler Limitation' section > > > Key: FLINK-6386 > URL: https://issues.apache.org/jira/browse/FLINK-6386 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Trivial > Fix For: 1.2.2 > > > "This means that types such as `Tuple2 declared as..." > should be > "This means that types such as `Tuple2 ` or > `Collector` declared as..." -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3775: [FLINK-6386] Missing bracket in 'Compiler Limitati...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/3775 [FLINK-6386] Missing bracket in 'Compiler Limitation' section Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-6386] Missing bracket in 'Compiler Limitation' section") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-6386 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3775.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3775 commit 1533645864465d4ce174d5e9eef3cbb11dead5e3 Author: Bowen LiDate: 2017-04-26T02:55:25Z FLINK-6386 Missing bracket in 'Compiler Limitation' section --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6386) Missing bracket in 'Compiler Limitation' section
Bowen Li created FLINK-6386: --- Summary: Missing bracket in 'Compiler Limitation' section Key: FLINK-6386 URL: https://issues.apache.org/jira/browse/FLINK-6386 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.2.0 Reporter: Bowen Li Assignee: Bowen Li Priority: Trivial Fix For: 1.2.2 "This means that types such as `Tuple2` or `Collector` declared as..." -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4621) Improve decimal literals of SQL API
[ https://issues.apache.org/jira/browse/FLINK-4621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983968#comment-15983968 ] JiJun Tang commented on FLINK-4621: --- I think simple numbers can be coverted to Java primitives.For exmaple : -0.5 -> longval=-5,precision=1 > Improve decimal literals of SQL API > --- > > Key: FLINK-4621 > URL: https://issues.apache.org/jira/browse/FLINK-4621 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, all SQL {{DECIMAL}} types are converted to BigDecimals internally. > By default, the SQL parsers creates {{DECIMAL}} literals of any number e.g. > {{SELECT 1.0, 12, -0.5 FROM x}}. I think it would be better if these simple > numbers would be represented as Java primitives instead of objects. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-5855: Description: {code} handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); {code} Lock on pendingFilesPerCheckpoint should be obtained prior to the call to handlePendingFilesForPreviousCheckpoints(). After discussion. I would give a related jira from this issue. https://issues.apache.org/jira/browse/FLINK-6381 was: {code} handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); {code} Lock on pendingFilesPerCheckpoint should be obtained prior to the call to handlePendingFilesForPreviousCheckpoints(). > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). > After discussion. I would give a related jira from this issue. > https://issues.apache.org/jira/browse/FLINK-6381 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983948#comment-15983948 ] mingleizhang commented on FLINK-5855: - [~tedyu] Hi, Ted. Thanks for telling me so useful information. I know what to do and how to do next time. Thanks again and very appreciate it. > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6350) Flink: Windowing does not work with streams from collections and the local execution environment
[ https://issues.apache.org/jira/browse/FLINK-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983852#comment-15983852 ] Bowen Li commented on FLINK-6350: - I'll take a look first. Thanks! > Flink: Windowing does not work with streams from collections and the local > execution environment > > > Key: FLINK-6350 > URL: https://issues.apache.org/jira/browse/FLINK-6350 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.2.2 > > > When using events via the {{fromCollection}} method of > {{StreamExecutionEnvironment}}, window timing is not supported. The time > windows close immediately. > This is unfortunate because mocking events from collections and testing them > locally is a powerful way to unit test stream processors. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3769 @zentol fix that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6367) support custom header settings of allow origin
[ https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983806#comment-15983806 ] ASF GitHub Bot commented on FLINK-6367: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3769 @zentol fix that > support custom header settings of allow origin > -- > > Key: FLINK-6367 > URL: https://issues.apache.org/jira/browse/FLINK-6367 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > `jobmanager.web.access-control-allow-origin`: Enable custom access control > parameter for allow origin header, default is `*`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983655#comment-15983655 ] ASF GitHub Bot commented on FLINK-6013: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3736 @StephanEwen @zentol I shaded okhttp3 and okio from flink-metrics-datadog. I didn't use 'shade-flink' because I found it somehow prevents me from building a uber jar. Let me know if it's ok to shade them this way. ``` $ jar -tf target/flink-metrics-datadog-1.3-SNAPSHOT-jar-with-dependencies.jar META-INF/ META-INF/MANIFEST.MF META-INF/DEPENDENCIES META-INF/LICENSE META-INF/NOTICE org/ org/apache/ org/apache/flink/ org/apache/flink/metrics/ org/apache/flink/metrics/datadog/ org/apache/flink/metrics/datadog/DatadogHttpClient.class org/apache/flink/metrics/datadog/DatadogHttpReporter$DatadogHttpRequest.class org/apache/flink/metrics/datadog/DatadogHttpReporter.class org/apache/flink/metrics/datadog/DCounter.class org/apache/flink/metrics/datadog/DGauge.class org/apache/flink/metrics/datadog/DMeter.class org/apache/flink/metrics/datadog/DMetric.class org/apache/flink/metrics/datadog/DSeries.class org/apache/flink/metrics/datadog/MetricType.class META-INF/maven/ META-INF/maven/org.apache.flink/ META-INF/maven/org.apache.flink/flink-metrics-datadog/ META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.xml META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.properties META-INF/maven/org.apache.flink/force-shading/ META-INF/maven/org.apache.flink/force-shading/pom.xml META-INF/maven/org.apache.flink/force-shading/pom.properties org/apache/flink/shaded/ org/apache/flink/shaded/okhttp3/ org/apache/flink/shaded/okhttp3/Address.class . org/apache/flink/shaded/okhttp3/WebSocketListener.class META-INF/maven/com.squareup.okhttp3/ META-INF/maven/com.squareup.okhttp3/okhttp/ META-INF/maven/com.squareup.okhttp3/okhttp/pom.xml META-INF/maven/com.squareup.okhttp3/okhttp/pom.properties org/apache/flink/shaded/okio/ org/apache/flink/shaded/okio/AsyncTimeout$1.class ... org/apache/flink/shaded/okio/Util.class META-INF/maven/com.squareup.okio/ META-INF/maven/com.squareup.okio/okio/ META-INF/maven/com.squareup.okio/okio/pom.xml META-INF/maven/com.squareup.okio/okio/pom.properties ``` > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3736 @StephanEwen @zentol I shaded okhttp3 and okio from flink-metrics-datadog. I didn't use 'shade-flink' because I found it somehow prevents me from building a uber jar. Let me know if it's ok to shade them this way. ``` $ jar -tf target/flink-metrics-datadog-1.3-SNAPSHOT-jar-with-dependencies.jar META-INF/ META-INF/MANIFEST.MF META-INF/DEPENDENCIES META-INF/LICENSE META-INF/NOTICE org/ org/apache/ org/apache/flink/ org/apache/flink/metrics/ org/apache/flink/metrics/datadog/ org/apache/flink/metrics/datadog/DatadogHttpClient.class org/apache/flink/metrics/datadog/DatadogHttpReporter$DatadogHttpRequest.class org/apache/flink/metrics/datadog/DatadogHttpReporter.class org/apache/flink/metrics/datadog/DCounter.class org/apache/flink/metrics/datadog/DGauge.class org/apache/flink/metrics/datadog/DMeter.class org/apache/flink/metrics/datadog/DMetric.class org/apache/flink/metrics/datadog/DSeries.class org/apache/flink/metrics/datadog/MetricType.class META-INF/maven/ META-INF/maven/org.apache.flink/ META-INF/maven/org.apache.flink/flink-metrics-datadog/ META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.xml META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.properties META-INF/maven/org.apache.flink/force-shading/ META-INF/maven/org.apache.flink/force-shading/pom.xml META-INF/maven/org.apache.flink/force-shading/pom.properties org/apache/flink/shaded/ org/apache/flink/shaded/okhttp3/ org/apache/flink/shaded/okhttp3/Address.class . org/apache/flink/shaded/okhttp3/WebSocketListener.class META-INF/maven/com.squareup.okhttp3/ META-INF/maven/com.squareup.okhttp3/okhttp/ META-INF/maven/com.squareup.okhttp3/okhttp/pom.xml META-INF/maven/com.squareup.okhttp3/okhttp/pom.properties org/apache/flink/shaded/okio/ org/apache/flink/shaded/okio/AsyncTimeout$1.class ... org/apache/flink/shaded/okio/Util.class META-INF/maven/com.squareup.okio/ META-INF/maven/com.squareup.okio/okio/ META-INF/maven/com.squareup.okio/okio/pom.xml META-INF/maven/com.squareup.okio/okio/pom.properties ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5869) ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes
[ https://issues.apache.org/jira/browse/FLINK-5869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983516#comment-15983516 ] ASF GitHub Bot commented on FLINK-5869: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3772#discussion_r113272506 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java --- @@ -72,6 +72,13 @@ .defaultValue(16) .withDeprecatedKeys("job-manager.max-attempts-history-size"); + /** +* The maximum number of prior execution attempts kept in history. +*/ + public static final ConfigOption EXECUTION_FAILOVER_STRATEGY = + key("jobmanager.execution.failover-strategy") + .defaultValue("full"); --- End diff -- have we ever considered defining a set of valid values directly in the config option? > ExecutionGraph use FailoverCoordinator to manage the failover of execution > vertexes > --- > > Key: FLINK-5869 > URL: https://issues.apache.org/jira/browse/FLINK-5869 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > > Execution graph doesn't manage the failover of executions. It only care for > the state of the whole job, which is CREATED, RUNNING, FAILED, FINISHED, or > SUSPEND. > For execution failure, it will notice the FailoverCoordinator to do failover. > It only record the finished job vertex and changes to FINISHED after all > vertexes finished. > It will change to final fail if restart strategy fail or meet unrecoverable > exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3772: [FLINK-5869] [flip-1] Introduce abstraction for Fa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3772#discussion_r113272506 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java --- @@ -72,6 +72,13 @@ .defaultValue(16) .withDeprecatedKeys("job-manager.max-attempts-history-size"); + /** +* The maximum number of prior execution attempts kept in history. +*/ + public static final ConfigOption EXECUTION_FAILOVER_STRATEGY = + key("jobmanager.execution.failover-strategy") + .defaultValue("full"); --- End diff -- have we ever considered defining a set of valid values directly in the config option? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6302) Documentation build error on ruby 2.4
[ https://issues.apache.org/jira/browse/FLINK-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983512#comment-15983512 ] ASF GitHub Bot commented on FLINK-6302: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3720 The `GemFile` should be updated as well. It would still work as long as the `GemFile.lock` file is present. If it deleted however the bundler would load the dependencies from `GemFile` and create a new `GemFile.lock`. > Documentation build error on ruby 2.4 > - > > Key: FLINK-6302 > URL: https://issues.apache.org/jira/browse/FLINK-6302 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Meng >Assignee: Tao Meng >Priority: Trivial > > {code} > /usr/local/Cellar/ruby/2.4.1_1/include/ruby-2.4.0/ruby/ruby.h:981:28: note: > expanded from macro 'RSTRING_LEN' > RSTRING(str)->as.heap.len) > ~~^~~ > yajl_ext.c:881:22: error: use of undeclared identifier 'rb_cFixnum' > rb_define_method(rb_cFixnum, "to_json", rb_yajl_json_ext_fixnum_to_json, > -1); > ^ > 17 warnings and 1 error generated. > make: *** [yajl_ext.o] Error 1 > make failed, exit code 2 > {code} > We should update Gemfile.lock. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3720: [FLINK-6302] Documentation build error on ruby 2.4
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3720 The `GemFile` should be updated as well. It would still work as long as the `GemFile.lock` file is present. If it deleted however the bundler would load the dependencies from `GemFile` and create a new `GemFile.lock`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6302) Documentation build error on ruby 2.4
[ https://issues.apache.org/jira/browse/FLINK-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983506#comment-15983506 ] ASF GitHub Bot commented on FLINK-6302: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3720#discussion_r113292688 --- Diff: docs/Gemfile.lock --- @@ -30,18 +30,19 @@ GEM redcarpet (~> 3.1) safe_yaml (~> 1.0) toml (~> 0.1.0) -jekyll-coffeescript (1.0.1) +jekyll-coffeescript (1.0.2) coffee-script (~> 2.2) + coffee-script-source (~> 1.11.1) jekyll-gist (1.4.0) - octokit (~> 4.3.0) + octokit (~> 4.2) --- End diff -- There was never an explicit version downgrade in `jekyll-gist`, they always had the `4.3.0` `octokit` dependency since it was added. Similarly, since we added the `jekyll-gist`/`octokit` dependency the version of `octokit` was set to `4.3.0`. In conclusion, it doesn't appear like it's related to a bug fix. We just always relied on a higher version than `jekyll-gist` did, and it worked out fine. > Documentation build error on ruby 2.4 > - > > Key: FLINK-6302 > URL: https://issues.apache.org/jira/browse/FLINK-6302 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Meng >Assignee: Tao Meng >Priority: Trivial > > {code} > /usr/local/Cellar/ruby/2.4.1_1/include/ruby-2.4.0/ruby/ruby.h:981:28: note: > expanded from macro 'RSTRING_LEN' > RSTRING(str)->as.heap.len) > ~~^~~ > yajl_ext.c:881:22: error: use of undeclared identifier 'rb_cFixnum' > rb_define_method(rb_cFixnum, "to_json", rb_yajl_json_ext_fixnum_to_json, > -1); > ^ > 17 warnings and 1 error generated. > make: *** [yajl_ext.o] Error 1 > make failed, exit code 2 > {code} > We should update Gemfile.lock. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3720: [FLINK-6302] Documentation build error on ruby 2.4
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3720#discussion_r113292688 --- Diff: docs/Gemfile.lock --- @@ -30,18 +30,19 @@ GEM redcarpet (~> 3.1) safe_yaml (~> 1.0) toml (~> 0.1.0) -jekyll-coffeescript (1.0.1) +jekyll-coffeescript (1.0.2) coffee-script (~> 2.2) + coffee-script-source (~> 1.11.1) jekyll-gist (1.4.0) - octokit (~> 4.3.0) + octokit (~> 4.2) --- End diff -- There was never an explicit version downgrade in `jekyll-gist`, they always had the `4.3.0` `octokit` dependency since it was added. Similarly, since we added the `jekyll-gist`/`octokit` dependency the version of `octokit` was set to `4.3.0`. In conclusion, it doesn't appear like it's related to a bug fix. We just always relied on a higher version than `jekyll-gist` did, and it worked out fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3757: (refactor) some opportunities to use multi-catch
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3757#discussion_r113290832 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java --- @@ -121,11 +120,7 @@ public static boolean validateClassLoadable(ClassNotFoundException cnfe, ClassLo String className = cnfe.getMessage(); Class.forName(className, false, cl); return true; - } - catch (ClassNotFoundException e) { - return false; - } - catch (Exception e) { + }catch(ClassNotFoundException | Exception e) /*multi-catch refactor*/ { --- End diff -- you can remove the `ClassNotFoundException` clause since it is included in the `Exception` clause anway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3757: (refactor) some opportunities to use multi-catch
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3757#discussion_r113290672 --- Diff: flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java --- @@ -125,9 +125,7 @@ public void open(int taskNumber, int numTasks) throws IOException { datumWriter = new SpecificDatumWriter(avroValueType); try { schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema(); - } catch (InstantiationException e) { - throw new RuntimeException(e.getMessage()); - } catch (IllegalAccessException e) { + }catch(InstantiationException | IllegalAccessException e) /*multi-catch refactor*/ { --- End diff -- In this particular case it makes sense as it reduces clutter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6384) PythonStreamer does not close python processes
[ https://issues.apache.org/jira/browse/FLINK-6384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983476#comment-15983476 ] ASF GitHub Bot commented on FLINK-6384: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3774 Looks good, +1 to merge. > PythonStreamer does not close python processes > -- > > Key: FLINK-6384 > URL: https://issues.apache.org/jira/browse/FLINK-6384 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.3.0 > > > The {{PythonStreamer}} opens a new process calling the python binary to check > whether the binary is available. This process won't get closed leading to an > excessive number of open python processes when running the > {{PythonPlanBinderTest}}. I'm not entirely sure whether we need this extra > process, because the actual python call with the python code would also fail > if there is no python binary available. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3774: [FLINK-6384] [py] Remove python binary check via addition...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3774 Looks good, +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Reopened] (FLINK-6385) HistoryServerTest.testFullArchiveLifecycle instable on Travis
[ https://issues.apache.org/jira/browse/FLINK-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-6385: - > HistoryServerTest.testFullArchiveLifecycle instable on Travis > - > > Key: FLINK-6385 > URL: https://issues.apache.org/jira/browse/FLINK-6385 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{HistoryServerTest.testFullArchiveLifecycle}} failed on Travis [1]. > [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/225620353/log.txt -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6385) HistoryServerTest.testFullArchiveLifecycle instable on Travis
[ https://issues.apache.org/jira/browse/FLINK-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-6385. --- Resolution: Duplicate > HistoryServerTest.testFullArchiveLifecycle instable on Travis > - > > Key: FLINK-6385 > URL: https://issues.apache.org/jira/browse/FLINK-6385 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{HistoryServerTest.testFullArchiveLifecycle}} failed on Travis [1]. > [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/225620353/log.txt -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6175) HistoryServerTest.testFullArchiveLifecycle fails
[ https://issues.apache.org/jira/browse/FLINK-6175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-6175: Component/s: History Server > HistoryServerTest.testFullArchiveLifecycle fails > > > Key: FLINK-6175 > URL: https://issues.apache.org/jira/browse/FLINK-6175 > Project: Flink > Issue Type: Test > Components: History Server, Tests, Webfrontend >Reporter: Ufuk Celebi >Assignee: Chesnay Schepler > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/213933823/log.txt > {code} > estFullArchiveLifecycle(org.apache.flink.runtime.webmonitor.history.HistoryServerTest) > Time elapsed: 2.162 sec <<< FAILURE! > java.lang.AssertionError: /joboverview.json did not contain valid json > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertNotNull(Assert.java:712) > at > org.apache.flink.runtime.webmonitor.history.HistoryServerTest.testFullArchiveLifecycle(HistoryServerTest.java:98) > {code} > Happened on a branch with unrelated changes [~Zentol]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6385) HistoryServerTest.testFullArchiveLifecycle instable on Travis
[ https://issues.apache.org/jira/browse/FLINK-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-6385. --- Resolution: Fixed > HistoryServerTest.testFullArchiveLifecycle instable on Travis > - > > Key: FLINK-6385 > URL: https://issues.apache.org/jira/browse/FLINK-6385 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{HistoryServerTest.testFullArchiveLifecycle}} failed on Travis [1]. > [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/225620353/log.txt -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6155) Allow to specify endpoint names
[ https://issues.apache.org/jira/browse/FLINK-6155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-6155. Resolution: Fixed Fixed via 433a345edccdee29385957841e6513679690a5e9 > Allow to specify endpoint names > --- > > Key: FLINK-6155 > URL: https://issues.apache.org/jira/browse/FLINK-6155 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.3.0 > > > In order to make the standalone mode work, we have to be able to assign names > to the {{RpcEndpoints}}. In the case of the Akka implementation they would > correspond to the actor names. This information is necessary to look the > corresponding endpoints up on a remote host because in standalone mode the > names have to be deterministic. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3596: [FLINK-6155] Introduce an endpoint id for RpcEndpo...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3596 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6155) Allow to specify endpoint names
[ https://issues.apache.org/jira/browse/FLINK-6155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983436#comment-15983436 ] ASF GitHub Bot commented on FLINK-6155: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3596 > Allow to specify endpoint names > --- > > Key: FLINK-6155 > URL: https://issues.apache.org/jira/browse/FLINK-6155 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.3.0 > > > In order to make the standalone mode work, we have to be able to assign names > to the {{RpcEndpoints}}. In the case of the Akka implementation they would > correspond to the actor names. This information is necessary to look the > corresponding endpoints up on a remote host because in standalone mode the > names have to be deterministic. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6155) Allow to specify endpoint names
[ https://issues.apache.org/jira/browse/FLINK-6155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983433#comment-15983433 ] ASF GitHub Bot commented on FLINK-6155: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3596 Failing test case has just been fixed. Merging this PR. > Allow to specify endpoint names > --- > > Key: FLINK-6155 > URL: https://issues.apache.org/jira/browse/FLINK-6155 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.3.0 > > > In order to make the standalone mode work, we have to be able to assign names > to the {{RpcEndpoints}}. In the case of the Akka implementation they would > correspond to the actor names. This information is necessary to look the > corresponding endpoints up on a remote host because in standalone mode the > names have to be deterministic. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3596: [FLINK-6155] Introduce an endpoint id for RpcEndpoints
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3596 Failing test case has just been fixed. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6384) PythonStreamer does not close python processes
[ https://issues.apache.org/jira/browse/FLINK-6384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983424#comment-15983424 ] ASF GitHub Bot commented on FLINK-6384: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3774 [FLINK-6384] [py] Remove python binary check via additional process The PythonStreamer used to check for the existence of the python binary by starting a python process. This process was not closed afterwards. This caused the PythonPlanBinderTest to fail locally. I think the check whether a python binary exists is not necessary since the subsequent python command would fail anyway if there is no binary available on the system. The system failure message is that there is no such file or directory. This error message should be descriptive enough in order to debug such a problem. cc @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixPythonStreamer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3774.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3774 commit 746fe59737338c02502443c5c3e88c613d0b92ad Author: Till RohrmannDate: 2017-04-25T18:41:58Z [FLINK-6384] [py] Remove python binary check via additional process The PythonStreamer used to check for the existence of the python binary by starting a python process. This process was not closed afterwards. This caused the PythonPlanBinderTest to fail locally. I think the check whether a python binary exists is not necessary since the subsequent python command would fail anyway if there is no binary available on the system. The system failure message is that there is no such file or directory. This error message should be descriptive enough in order to debug such a problem. > PythonStreamer does not close python processes > -- > > Key: FLINK-6384 > URL: https://issues.apache.org/jira/browse/FLINK-6384 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.3.0 > > > The {{PythonStreamer}} opens a new process calling the python binary to check > whether the binary is available. This process won't get closed leading to an > excessive number of open python processes when running the > {{PythonPlanBinderTest}}. I'm not entirely sure whether we need this extra > process, because the actual python call with the python code would also fail > if there is no python binary available. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3774: [FLINK-6384] [py] Remove python binary check via a...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3774 [FLINK-6384] [py] Remove python binary check via additional process The PythonStreamer used to check for the existence of the python binary by starting a python process. This process was not closed afterwards. This caused the PythonPlanBinderTest to fail locally. I think the check whether a python binary exists is not necessary since the subsequent python command would fail anyway if there is no binary available on the system. The system failure message is that there is no such file or directory. This error message should be descriptive enough in order to debug such a problem. cc @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixPythonStreamer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3774.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3774 commit 746fe59737338c02502443c5c3e88c613d0b92ad Author: Till RohrmannDate: 2017-04-25T18:41:58Z [FLINK-6384] [py] Remove python binary check via additional process The PythonStreamer used to check for the existence of the python binary by starting a python process. This process was not closed afterwards. This caused the PythonPlanBinderTest to fail locally. I think the check whether a python binary exists is not necessary since the subsequent python command would fail anyway if there is no binary available on the system. The system failure message is that there is no such file or directory. This error message should be descriptive enough in order to debug such a problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6330) Improve Docker documentation
[ https://issues.apache.org/jira/browse/FLINK-6330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983411#comment-15983411 ] ASF GitHub Bot commented on FLINK-6330: --- Github user patricklucas commented on the issue: https://github.com/apache/flink/pull/3751 It will be available before the next release, though I noticed just yesterday that it seems we build the [official 1.2 docs](https://ci.apache.org/projects/flink/flink-docs-release-1.2/) (erroneously?) from a development branch instead of the actual tagged release. I'm about ready to submit the PR to get the official images created, but it's fine if you want to wait. > Improve Docker documentation > > > Key: FLINK-6330 > URL: https://issues.apache.org/jira/browse/FLINK-6330 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Patrick Lucas >Assignee: Patrick Lucas > Fix For: 1.2.2 > > > The "Docker" page in the docs exists but is blank. > Add something useful here, including references to the official images that > should exist once 1.2.1 is released, and add a brief "Kubernetes" page as > well, referencing the [helm chart|https://github.com/docker-flink/examples]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3751: [FLINK-6330] [docs] Add basic Docker, K8s docs
Github user patricklucas commented on the issue: https://github.com/apache/flink/pull/3751 It will be available before the next release, though I noticed just yesterday that it seems we build the [official 1.2 docs](https://ci.apache.org/projects/flink/flink-docs-release-1.2/) (erroneously?) from a development branch instead of the actual tagged release. I'm about ready to submit the PR to get the official images created, but it's fine if you want to wait. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6302) Documentation build error on ruby 2.4
[ https://issues.apache.org/jira/browse/FLINK-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983397#comment-15983397 ] ASF GitHub Bot commented on FLINK-6302: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3720 I am not familiar with the Ruby / Bundler / etc space, but it looks like `Gemfile` is the original source of truth and `Gemfile.lock` is in some way derived from that. Is it correct to only update `Gemfile.lock` and not `Gemfile`? @uce or @alpinegizmo can you maybe shed some light here? > Documentation build error on ruby 2.4 > - > > Key: FLINK-6302 > URL: https://issues.apache.org/jira/browse/FLINK-6302 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Meng >Assignee: Tao Meng >Priority: Trivial > > {code} > /usr/local/Cellar/ruby/2.4.1_1/include/ruby-2.4.0/ruby/ruby.h:981:28: note: > expanded from macro 'RSTRING_LEN' > RSTRING(str)->as.heap.len) > ~~^~~ > yajl_ext.c:881:22: error: use of undeclared identifier 'rb_cFixnum' > rb_define_method(rb_cFixnum, "to_json", rb_yajl_json_ext_fixnum_to_json, > -1); > ^ > 17 warnings and 1 error generated. > make: *** [yajl_ext.o] Error 1 > make failed, exit code 2 > {code} > We should update Gemfile.lock. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3720: [FLINK-6302] Documentation build error on ruby 2.4
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3720 I am not familiar with the Ruby / Bundler / etc space, but it looks like `Gemfile` is the original source of truth and `Gemfile.lock` is in some way derived from that. Is it correct to only update `Gemfile.lock` and not `Gemfile`? @uce or @alpinegizmo can you maybe shed some light here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6302) Documentation build error on ruby 2.4
[ https://issues.apache.org/jira/browse/FLINK-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983391#comment-15983391 ] ASF GitHub Bot commented on FLINK-6302: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3720#discussion_r113273807 --- Diff: docs/Gemfile.lock --- @@ -30,18 +30,19 @@ GEM redcarpet (~> 3.1) safe_yaml (~> 1.0) toml (~> 0.1.0) -jekyll-coffeescript (1.0.1) +jekyll-coffeescript (1.0.2) coffee-script (~> 2.2) + coffee-script-source (~> 1.11.1) jekyll-gist (1.4.0) - octokit (~> 4.3.0) + octokit (~> 4.2) --- End diff -- Does the downgrade fix an issue? If yes, its okay to do it, I think > Documentation build error on ruby 2.4 > - > > Key: FLINK-6302 > URL: https://issues.apache.org/jira/browse/FLINK-6302 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Meng >Assignee: Tao Meng >Priority: Trivial > > {code} > /usr/local/Cellar/ruby/2.4.1_1/include/ruby-2.4.0/ruby/ruby.h:981:28: note: > expanded from macro 'RSTRING_LEN' > RSTRING(str)->as.heap.len) > ~~^~~ > yajl_ext.c:881:22: error: use of undeclared identifier 'rb_cFixnum' > rb_define_method(rb_cFixnum, "to_json", rb_yajl_json_ext_fixnum_to_json, > -1); > ^ > 17 warnings and 1 error generated. > make: *** [yajl_ext.o] Error 1 > make failed, exit code 2 > {code} > We should update Gemfile.lock. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3720: [FLINK-6302] Documentation build error on ruby 2.4
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3720#discussion_r113273807 --- Diff: docs/Gemfile.lock --- @@ -30,18 +30,19 @@ GEM redcarpet (~> 3.1) safe_yaml (~> 1.0) toml (~> 0.1.0) -jekyll-coffeescript (1.0.1) +jekyll-coffeescript (1.0.2) coffee-script (~> 2.2) + coffee-script-source (~> 1.11.1) jekyll-gist (1.4.0) - octokit (~> 4.3.0) + octokit (~> 4.2) --- End diff -- Does the downgrade fix an issue? If yes, its okay to do it, I think --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6330) Improve Docker documentation
[ https://issues.apache.org/jira/browse/FLINK-6330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983383#comment-15983383 ] ASF GitHub Bot commented on FLINK-6330: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3751 Sorry, taking a step back. It seems that https://hub.docker.com/_/flink/ is not yet available. Should we postpone this merge until it is available? > Improve Docker documentation > > > Key: FLINK-6330 > URL: https://issues.apache.org/jira/browse/FLINK-6330 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Patrick Lucas >Assignee: Patrick Lucas > Fix For: 1.2.2 > > > The "Docker" page in the docs exists but is blank. > Add something useful here, including references to the official images that > should exist once 1.2.1 is released, and add a brief "Kubernetes" page as > well, referencing the [helm chart|https://github.com/docker-flink/examples]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3751: [FLINK-6330] [docs] Add basic Docker, K8s docs
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3751 Sorry, taking a step back. It seems that https://hub.docker.com/_/flink/ is not yet available. Should we postpone this merge until it is available? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6330) Improve Docker documentation
[ https://issues.apache.org/jira/browse/FLINK-6330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983371#comment-15983371 ] ASF GitHub Bot commented on FLINK-6330: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3751 Great, thanks! Merging this... > Improve Docker documentation > > > Key: FLINK-6330 > URL: https://issues.apache.org/jira/browse/FLINK-6330 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.2.0 >Reporter: Patrick Lucas >Assignee: Patrick Lucas > Fix For: 1.2.2 > > > The "Docker" page in the docs exists but is blank. > Add something useful here, including references to the official images that > should exist once 1.2.1 is released, and add a brief "Kubernetes" page as > well, referencing the [helm chart|https://github.com/docker-flink/examples]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3751: [FLINK-6330] [docs] Add basic Docker, K8s docs
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3751 Great, thanks! Merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983370#comment-15983370 ] ASF GitHub Bot commented on FLINK-6013: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3736 I think this is starting to look very good! Given that this introduces new libraries as dependencies (okhttp, okio), should we pro-actively shade those away to avoid dependency conflicts? Admittedly, it would only impact users that explicitly drop in the datadog reporter, but it might still be nice for those users. Given that we build a jr-with-dependencies anyways, the step to shading is small... > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3736 I think this is starting to look very good! Given that this introduces new libraries as dependencies (okhttp, okio), should we pro-actively shade those away to avoid dependency conflicts? Admittedly, it would only impact users that explicitly drop in the datadog reporter, but it might still be nice for those users. Given that we build a jr-with-dependencies anyways, the step to shading is small... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy
[ https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983350#comment-15983350 ] ASF GitHub Bot commented on FLINK-5867: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3773 [FLINK-5867] [FLINK-5866] [flip-1] Implement FailoverStrategy for pipelined regions This is based on #3772 , the relevant commits are the latter four. The majority of the work has been done by @tiemsn , with some rebasing and additions from me. # Pipelined Region Failover As described in [FLIP-1](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures), this pull request implements the detection of pipelined regions in the `ExecutionGraph` and failover within these pipelined regions. ![st0-nzqia5abpwrgaogpllw](https://cloud.githubusercontent.com/assets/1727146/25399938/54fda5a4-29f1-11e7-9efe-5d845644089f.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink flip-1-pipelined-regions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3773.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3773 commit ef7fd9964c1c74feb4641e57a138c54558b2449c Author: Stephan EwenDate: 2017-03-21T18:13:34Z [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery - Add base class for FailoverStrategy - Add default implementation (restart all tasks) - Add logic to load the failover strategy from the configuration commit c04a8a312098fddce14e392b8d9dbf396b1df3f3 Author: Stephan Ewen Date: 2017-03-29T20:49:54Z [FLINK-6340] [flip-1] Add a termination future to the Execution commit 92d3f7e1025dc3c3499730bda8e8a9acfd3b5c13 Author: shuai.xus Date: 2017-04-18T06:15:29Z [FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the ExecutionGraph on task failure commit 456600d5e37724bbcc7d570f6828e3fef6298483 Author: shuai.xus Date: 2017-04-20T21:56:53Z [FLINK-5867] [flip-1] Add tests for pipelined failover region construction commit 622f07e0efc82bf13f12ae1960a35ecc48c865c1 Author: Stephan Ewen Date: 2017-04-20T22:02:19Z [FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region construction This method exploits the fact that verties are already in topological order. commit 39402583df8b4c51016c72f968772cbbdd6c92e3 Author: shuai.xus Date: 2017-04-25T07:42:48Z [FLINK-5867] [flip-1] Correct some JavaDocs for RestartIndividualStrategy > The implementation of RestartPipelinedRegionStrategy > > > Key: FLINK-5867 > URL: https://issues.apache.org/jira/browse/FLINK-5867 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > > The RestartPipelinedRegionStrategy's responsibility is the following: > 1. Calculate all FailoverRegions and their relations when initializing. > 2. Listen for the failure of the job and executions, and find corresponding > FailoverRegions to do the failover. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3773: [FLINK-5867] [FLINK-5866] [flip-1] Implement Failo...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3773 [FLINK-5867] [FLINK-5866] [flip-1] Implement FailoverStrategy for pipelined regions This is based on #3772 , the relevant commits are the latter four. The majority of the work has been done by @tiemsn , with some rebasing and additions from me. # Pipelined Region Failover As described in [FLIP-1](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures), this pull request implements the detection of pipelined regions in the `ExecutionGraph` and failover within these pipelined regions. ![st0-nzqia5abpwrgaogpllw](https://cloud.githubusercontent.com/assets/1727146/25399938/54fda5a4-29f1-11e7-9efe-5d845644089f.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink flip-1-pipelined-regions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3773.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3773 commit ef7fd9964c1c74feb4641e57a138c54558b2449c Author: Stephan EwenDate: 2017-03-21T18:13:34Z [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery - Add base class for FailoverStrategy - Add default implementation (restart all tasks) - Add logic to load the failover strategy from the configuration commit c04a8a312098fddce14e392b8d9dbf396b1df3f3 Author: Stephan Ewen Date: 2017-03-29T20:49:54Z [FLINK-6340] [flip-1] Add a termination future to the Execution commit 92d3f7e1025dc3c3499730bda8e8a9acfd3b5c13 Author: shuai.xus Date: 2017-04-18T06:15:29Z [FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the ExecutionGraph on task failure commit 456600d5e37724bbcc7d570f6828e3fef6298483 Author: shuai.xus Date: 2017-04-20T21:56:53Z [FLINK-5867] [flip-1] Add tests for pipelined failover region construction commit 622f07e0efc82bf13f12ae1960a35ecc48c865c1 Author: Stephan Ewen Date: 2017-04-20T22:02:19Z [FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region construction This method exploits the fact that verties are already in topological order. commit 39402583df8b4c51016c72f968772cbbdd6c92e3 Author: shuai.xus Date: 2017-04-25T07:42:48Z [FLINK-5867] [flip-1] Correct some JavaDocs for RestartIndividualStrategy --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5869) ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes
[ https://issues.apache.org/jira/browse/FLINK-5869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983328#comment-15983328 ] ASF GitHub Bot commented on FLINK-5869: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3772 [FLINK-5869] [flip-1] Introduce abstraction for FailoverStrategy This PR has two sets of changes that I could not pull apart into separate pull requests. # (1) Termination Futures Prior to this change, the `ExecutionGraph` decided when cancellation and finishing was complete by tracking how many `ExecutionJobVertex` were in a terminal state. This abstraction is too inflexible to track when subregions of the graph are in a terminal state. To fix that, this change introduces a *termination future* on the `Execution`. Building conjunct futures of the termination futures, any observer can track when any number of vertices in a terminal state. The `ExecutionGraph` now also uses that model to track when cancellation of all vertices during failover is complete. # Local Failover and FailoverStrategy The `ExecutionGraph` now supports *local failover* and *global failover*. Quoting from the JavaDocs: - **Global failover** aborts the task executions for all vertices and restarts whole data flow graph from the last completed checkpoint. Global failover is considered the *fallback strategy* that is used when a local failover is unsuccessful, or when a issue is found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug). - **Local failover** is triggered when an individual vertex execution (a task) fails. The local failover is coordinated by the `FailoverStrategy`. A local failover typically attempts to restart as little as possible, but as much as necessary. - Between local- and global failover, the global failover always takes precedence, because it is the core mechanism that the `ExecutionGraph` relies on to bring back consistency. The guard that, the `ExecutionGraph` maintains a **global modification version**, which is incremented with every global failover (and other global actions, like job cancellation, or terminal failure). Local failover is always scoped by the modification version that the execution graph had when the failover was triggered. If a new global modification version is reached during local failover (meaning there is a concurrent global failover), the failover strategy has to yield before the global failover. ### Failover Strategies How exactly local failover happens is the concern of a pluggable `FailoverStrategy`. - The default failover strategy simply triggers a global failover - The pull request introduces a very simple *restart individual* failover strategy that restarts tasks without any connections to other tasks independently. # Tests This pull requests adds new tests for - The termination future abstraction - The global mod version handling - Proper handling of concurrent local- and global failover The pull requests rewrites various original tests. This was necessary, because the tests were using Mockito very heavily and re-building or whitebox testing specific behavior that was affected by the changes. The changes to the tests introduce simple ways to actually bring up a functional ExecutionGraph and walk it through its state transitions. That way the tests now rely minimally on mocking and actually test the proper ExecutionGraph, rather than a mock which is expected to behave similar to the proper class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink flip-1-basics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3772.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3772 commit ef7fd9964c1c74feb4641e57a138c54558b2449c Author: Stephan EwenDate: 2017-03-21T18:13:34Z [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery - Add base class for FailoverStrategy - Add default implementation (restart all tasks) - Add logic to load the failover strategy from the configuration commit c04a8a312098fddce14e392b8d9dbf396b1df3f3 Author: Stephan Ewen Date: 2017-03-29T20:49:54Z [FLINK-6340] [flip-1] Add a termination future to the Execution > ExecutionGraph use FailoverCoordinator to manage the failover of execution > vertexes >
[GitHub] flink pull request #3772: [FLINK-5869] [flip-1] Introduce abstraction for Fa...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3772 [FLINK-5869] [flip-1] Introduce abstraction for FailoverStrategy This PR has two sets of changes that I could not pull apart into separate pull requests. # (1) Termination Futures Prior to this change, the `ExecutionGraph` decided when cancellation and finishing was complete by tracking how many `ExecutionJobVertex` were in a terminal state. This abstraction is too inflexible to track when subregions of the graph are in a terminal state. To fix that, this change introduces a *termination future* on the `Execution`. Building conjunct futures of the termination futures, any observer can track when any number of vertices in a terminal state. The `ExecutionGraph` now also uses that model to track when cancellation of all vertices during failover is complete. # Local Failover and FailoverStrategy The `ExecutionGraph` now supports *local failover* and *global failover*. Quoting from the JavaDocs: - **Global failover** aborts the task executions for all vertices and restarts whole data flow graph from the last completed checkpoint. Global failover is considered the *fallback strategy* that is used when a local failover is unsuccessful, or when a issue is found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug). - **Local failover** is triggered when an individual vertex execution (a task) fails. The local failover is coordinated by the `FailoverStrategy`. A local failover typically attempts to restart as little as possible, but as much as necessary. - Between local- and global failover, the global failover always takes precedence, because it is the core mechanism that the `ExecutionGraph` relies on to bring back consistency. The guard that, the `ExecutionGraph` maintains a **global modification version**, which is incremented with every global failover (and other global actions, like job cancellation, or terminal failure). Local failover is always scoped by the modification version that the execution graph had when the failover was triggered. If a new global modification version is reached during local failover (meaning there is a concurrent global failover), the failover strategy has to yield before the global failover. ### Failover Strategies How exactly local failover happens is the concern of a pluggable `FailoverStrategy`. - The default failover strategy simply triggers a global failover - The pull request introduces a very simple *restart individual* failover strategy that restarts tasks without any connections to other tasks independently. # Tests This pull requests adds new tests for - The termination future abstraction - The global mod version handling - Proper handling of concurrent local- and global failover The pull requests rewrites various original tests. This was necessary, because the tests were using Mockito very heavily and re-building or whitebox testing specific behavior that was affected by the changes. The changes to the tests introduce simple ways to actually bring up a functional ExecutionGraph and walk it through its state transitions. That way the tests now rely minimally on mocking and actually test the proper ExecutionGraph, rather than a mock which is expected to behave similar to the proper class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink flip-1-basics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3772.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3772 commit ef7fd9964c1c74feb4641e57a138c54558b2449c Author: Stephan EwenDate: 2017-03-21T18:13:34Z [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery - Add base class for FailoverStrategy - Add default implementation (restart all tasks) - Add logic to load the failover strategy from the configuration commit c04a8a312098fddce14e392b8d9dbf396b1df3f3 Author: Stephan Ewen Date: 2017-03-29T20:49:54Z [FLINK-6340] [flip-1] Add a termination future to the Execution --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983316#comment-15983316 ] ASF GitHub Bot commented on FLINK-6075: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3714 @fhueske I have implemented all the suggestions you made. The only thing remaining for this is: - decide if keyBy is needed or not for the case when sorting happens only on proc time (i would say it is needed to make it fully equivalent with an actual sorting that would be implemented) - refactor the datasetsort to use the common classes from sortutil (can be done in a follow up request after the merge) - implement the other 2 cases from the jira issue based on event time to offer full sort support without retraction (for feature freeze) > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time when a new event arrives in the system. Events > are of the type (a,b) with the ordering being applied on the b field. > `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `) > ||Rowtime|| Proctime|| Stream1|| Limit 2|| Top 2|| Sort > [ASC]|| > | |10:00:00 |(aaa, 11) | | | >| > | |10:05:00|(aab, 7) | | || > |10-11 |11:00:00 | | aab,aaa |aab,aaa | aab,aaa >| > | |11:03:00 |(aac,21) | | || > > |11-12|12:00:00 | | aab,aaa |aab,aaa | aab,aaa,aac| > | |12:10:00 |(abb,12) | | || > > | |12:15:00 |(abb,12) | | || > > |12-13 |13:00:00 | | abb,abb | abb,abb | > abb,abb,aac| > |...| > **Implementation option** > Considering that the SQL operators will be associated with window boundaries, > the functionality will be implemented within the logic of the window as > follows. > * Window assigner – selected based on the type of window used in SQL >
[GitHub] flink issue #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3714 @fhueske I have implemented all the suggestions you made. The only thing remaining for this is: - decide if keyBy is needed or not for the case when sorting happens only on proc time (i would say it is needed to make it fully equivalent with an actual sorting that would be implemented) - refactor the datasetsort to use the common classes from sortutil (can be done in a follow up request after the merge) - implement the other 2 cases from the jira issue based on event time to offer full sort support without retraction (for feature freeze) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983292#comment-15983292 ] ASF GitHub Bot commented on FLINK-6013: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113259225 --- Diff: flink-dist/src/main/assemblies/opt.xml --- @@ -95,5 +95,12 @@ flink-metrics-statsd-${project.version}.jar 0644 + + + ../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}.jar --- End diff -- Thank you for clarifying! > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113259225 --- Diff: flink-dist/src/main/assemblies/opt.xml --- @@ -95,5 +95,12 @@ flink-metrics-statsd-${project.version}.jar 0644 + + + ../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}.jar --- End diff -- Thank you for clarifying! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6385) HistoryServerTest.testFullArchiveLifecycle instable on Travis
Till Rohrmann created FLINK-6385: Summary: HistoryServerTest.testFullArchiveLifecycle instable on Travis Key: FLINK-6385 URL: https://issues.apache.org/jira/browse/FLINK-6385 Project: Flink Issue Type: Bug Components: Distributed Coordination Reporter: Till Rohrmann Priority: Critical The {{HistoryServerTest.testFullArchiveLifecycle}} failed on Travis [1]. [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/225620353/log.txt -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983282#comment-15983282 ] Ted Yu commented on FLINK-5855: --- IMO, after discussion, if the new fix is in the same area of code, the description of the JIRA can be modified to reflect the outcome of discussion - instead of closing the first JIRA and opening a new one. > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6384) PythonStreamer does not close python processes
Till Rohrmann created FLINK-6384: Summary: PythonStreamer does not close python processes Key: FLINK-6384 URL: https://issues.apache.org/jira/browse/FLINK-6384 Project: Flink Issue Type: Bug Components: Python API Affects Versions: 1.3.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.3.0 The {{PythonStreamer}} opens a new process calling the python binary to check whether the binary is available. This process won't get closed leading to an excessive number of open python processes when running the {{PythonPlanBinderTest}}. I'm not entirely sure whether we need this extra process, because the actual python call with the python code would also fail if there is no python binary available. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983254#comment-15983254 ] ASF GitHub Bot commented on FLINK-6250: --- Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3771 @fhueske @rtudoran @shijinkui @sunjincheng121 I have create a new PR for distinct in the code generator. Please have a look and let me know. I have implemented and tested only for OverProcTimeRowBounded window, but if you like it I can quickly implement and test also the others. > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3771 @fhueske @rtudoran @shijinkui @sunjincheng121 I have create a new PR for distinct in the code generator. Please have a look and let me know. I have implemented and tested only for OverProcTimeRowBounded window, but if you like it I can quickly implement and test also the others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983252#comment-15983252 ] ASF GitHub Bot commented on FLINK-6250: --- GitHub user stefanobortoli opened a pull request: https://github.com/apache/flink/pull/3771 [FLINK-6250] Distinct procTime with Rows boundaries Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/stefanobortoli/flink FLINK-6250b Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3771.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3771 > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...
GitHub user stefanobortoli opened a pull request: https://github.com/apache/flink/pull/3771 [FLINK-6250] Distinct procTime with Rows boundaries Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/stefanobortoli/flink FLINK-6250b Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3771.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3771 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order
[ https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983234#comment-15983234 ] ASF GitHub Bot commented on FLINK-6368: --- Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3768 I ran into the same problem today when adding the new test cases for UDAGG. Thanks for the fix, @xccui > Grouping keys in stream aggregations have wrong order > - > > Key: FLINK-6368 > URL: https://issues.apache.org/jira/browse/FLINK-6368 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui > Fix For: 1.3.0 > > > FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It > seems that the order of grouping keys is sometimes messed up. The following > tests fails: > {code} > @Test > def testEventTimeSlidingGroupWindow(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.testResults = mutable.MutableList() > val stream = env > .fromCollection(data) > .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) > .map(t => (t._2, t._6)) > val table = stream.toTable(tEnv, 'int, 'string) > val windowedTable = table > .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) > .groupBy('w, 'string) > .select('string, 'int.count, 'w.start, 'w.end) > val results = windowedTable.toDataStream[Row] > results.addSink(new StreamITCase.StringSink) > env.execute() > } > {code} > Exception: > {code} > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > ... 7 more > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.String > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3768: [FLINK-6368][table] Grouping keys in stream aggregations ...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3768 I ran into the same problem today when adding the new test cases for UDAGG. Thanks for the fix, @xccui --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983211#comment-15983211 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241221 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; --- End diff -- `GenericRecord` -> `SpecificRecord` > Add Kafka TableSource with Avro serialization > - > > Key: FLINK-3871 > URL: https://issues.apache.org/jira/browse/FLINK-3871 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > Add a Kafka TableSource which supports Avro serialized data. > The KafkaAvroTableSource should support two modes: > # SpecificRecord Mode: In this case the user specifies a class which was > code-generated by Avro depending on a schema. Flink treats these classes as > regular POJOs. Hence, they are also natively supported by the Table API and > SQL. Classes generated by Avro contain their Schema in a static field. The > schema should be used to automatically derive field names and types. Hence, > there is no additional information required than the name of the class. > # GenericRecord Mode: In this case the user specifies an Avro Schema. The > schema is used to deserialize the data into a GenericRecord which must be > translated into possibly nested {{Row}} based on the schema information. > Again, the Avro Schema is used to automatically derive the field names and > types. This mode is less efficient than the SpecificRecord mode because the > {{GenericRecord}} needs to be converted into {{Row}}. > This feature depends on FLINK-5280, i.e., support for nested data in > {{TableSource}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983217#comment-15983217 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113243664 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** +* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. +* Avro's {@link Utf8} fields are converted into regular Java strings. +*/ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types =
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983209#comment-15983209 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113179453 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List fields = schema.getFields(); + final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) extracted; + + final TypeInformation[] types = new TypeInformation[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo) { + final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983216#comment-15983216 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241700 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; --- End diff -- Change all `GenericRecord` to `SpecificRecord` > Add Kafka TableSource with Avro serialization > - > > Key: FLINK-3871 > URL: https://issues.apache.org/jira/browse/FLINK-3871 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > Add a Kafka TableSource which supports Avro serialized data. > The KafkaAvroTableSource should support two modes: > # SpecificRecord Mode: In this case the user specifies a class which was > code-generated by Avro depending on a schema. Flink treats these classes as > regular POJOs. Hence, they are also natively supported by the Table API and > SQL. Classes generated by Avro contain their Schema in a static field. The > schema should be used to automatically derive field names and types. Hence, > there is no additional information required than the name of the class. > # GenericRecord Mode: In this case the user specifies an Avro Schema. The > schema is used to deserialize the data into a GenericRecord which must be > translated into possibly nested {{Row}} based on the schema information. > Again, the Avro Schema is used to automatically derive the field names and > types. This mode is less efficient than the SpecificRecord mode because the > {{GenericRecord}} needs to be converted into {{Row}}. > This feature depends on FLINK-5280, i.e., support for nested data in > {{TableSource}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983206#comment-15983206 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113161806 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { --- End diff -- Change this to ``` private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) ``` and factor out the recursive logic to a method ``` convertToTypeInfomation(TypeInformation extracted, Schema schema) ``` ? > Add Kafka TableSource with Avro serialization > - > > Key: FLINK-3871 > URL: https://issues.apache.org/jira/browse/FLINK-3871 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > Add a Kafka TableSource which supports Avro serialized data. > The KafkaAvroTableSource should support two modes: > # SpecificRecord Mode: In this case the user specifies a class which was > code-generated by Avro depending on a schema. Flink treats these classes as > regular POJOs. Hence, they are also natively supported by the Table API and > SQL. Classes generated by Avro
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983207#comment-15983207 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113207820 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List fields = schema.getFields(); + final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) extracted; + + final TypeInformation[] types = new TypeInformation[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo) { + final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983205#comment-15983205 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113179472 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List fields = schema.getFields(); + final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) extracted; + + final TypeInformation[] types = new TypeInformation[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo) { + final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983215#comment-15983215 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113207375 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List fields = schema.getFields(); + final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) extracted; + + final TypeInformation[] types = new TypeInformation[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo) { + final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983208#comment-15983208 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241294 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; --- End diff -- `GenericRecord` -> `SpecificRecord` > Add Kafka TableSource with Avro serialization > - > > Key: FLINK-3871 > URL: https://issues.apache.org/jira/browse/FLINK-3871 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > Add a Kafka TableSource which supports Avro serialized data. > The KafkaAvroTableSource should support two modes: > # SpecificRecord Mode: In this case the user specifies a class which was > code-generated by Avro depending on a schema. Flink treats these classes as > regular POJOs. Hence, they are also natively supported by the Table API and > SQL. Classes generated by Avro contain their Schema in a static field. The > schema should be used to automatically derive field names and types. Hence, > there is no additional information required than the name of the class. > # GenericRecord Mode: In this case the user specifies an Avro Schema. The > schema is used to deserialize the data into a GenericRecord which must be > translated into possibly nested {{Row}} based on the schema information. > Again, the Avro Schema is used to automatically derive the field names and > types. This mode is less efficient than the SpecificRecord mode because the > {{GenericRecord}} needs to be converted into {{Row}}. > This feature depends on FLINK-5280, i.e., support for nested data in > {{TableSource}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983218#comment-15983218 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113244628 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- See comment on UNION in deserializer > Add Kafka TableSource with Avro serialization >
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983214#comment-15983214 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113243381 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** +* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. +* Avro's {@link Utf8} fields are converted into regular Java strings. +*/ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types =
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983210#comment-15983210 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113237296 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- This limitation exists because the Table API cannot handle UNION types either, right? Isn't this the same as
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983213#comment-15983213 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113236676 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- are union types always ordered? Could it happen that type `0` is `RECORD` and `1` is `NULL`? > Add Kafka
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983219#comment-15983219 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113244339 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** +* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. +* Avro's {@link Utf8} fields are converted into regular Java strings. +*/ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { --- End diff -- Not sure if we should
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983212#comment-15983212 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241182 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); --- End diff -- We can use a specific record here. We have the class for it. ``` this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); ``` > Add Kafka TableSource with Avro serialization > - > > Key: FLINK-3871 > URL: https://issues.apache.org/jira/browse/FLINK-3871 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > Add a Kafka TableSource which supports Avro serialized data. > The KafkaAvroTableSource should support two modes: > # SpecificRecord Mode: In this case the user specifies a class which was > code-generated by Avro depending on a schema. Flink treats these classes as > regular POJOs. Hence, they are also natively supported by the Table API and > SQL. Classes generated by Avro contain their Schema in a static field. The > schema should be used to automatically derive field names and types. Hence, > there is no additional information required than the name of the class. > # GenericRecord Mode: In this case the user specifies an Avro Schema. The > schema is used
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113207820 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List fields = schema.getFields(); + final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) extracted; + + final TypeInformation[] types = new TypeInformation[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo) { + final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + return extracted; + } + + private static TypeInformation[] createFieldTypes(Class record) { --- End diff -- This method can be removed
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983204#comment-15983204 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113236267 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { +
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113236267 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113161806 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { --- End diff -- Change this to ``` private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) ``` and factor out the recursive logic to a method ``` convertToTypeInfomation(TypeInformation extracted, Schema schema) ``` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241182 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); --- End diff -- We can use a specific record here. We have the class for it. ``` this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241221 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; --- End diff -- `GenericRecord` -> `SpecificRecord` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113243664 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** +* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. +* Avro's {@link Utf8} fields are converted into regular Java strings. +*/ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); +
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241700 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; --- End diff -- Change all `GenericRecord` to `SpecificRecord` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113244628 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- See comment on UNION in deserializer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113237296 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- This limitation exists because the Table API cannot handle UNION types either, right? Isn't this the same as having a nullable record field? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113179472 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List fields = schema.getFields(); + final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) extracted; + + final TypeInformation[] types = new TypeInformation[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo) { + final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + return extracted; + } + + private static TypeInformation[] createFieldTypes(Class record) { + final AvroTypeInfo avroTypeInfo = new
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113207375 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List fields = schema.getFields(); + final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) extracted; + + final TypeInformation[] types = new TypeInformation[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo) { + final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + return extracted; + } + + private static TypeInformation[] createFieldTypes(Class record) { + final AvroTypeInfo avroTypeInfo = new
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113179453 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** +* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. +* +* @param topic Kafka topic to consume. +* @param properties Properties for the Kafka consumer. +* @param record Avro specific record. +*/ + KafkaAvroTableSource( + String topic, + Properties properties, + Class record) { + + super( + topic, + properties, + createDeserializationSchema(record), + createFieldNames(record), + createFieldTypes(record)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class record) { + return new AvroRowDeserializationSchema(record); + } + + /** +* Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. +* Replaces generic Utf8 with basic String type information. +*/ + private static TypeInformation convertToRowTypeInformation(TypeInformation extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List fields = schema.getFields(); + final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) extracted; + + final TypeInformation[] types = new TypeInformation[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo) { + final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + return extracted; + } + + private static TypeInformation[] createFieldTypes(Class record) { --- End diff -- `record` -> `avroClass`?
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113244339 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** +* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. +* Avro's {@link Utf8} fields are converted into regular Java strings. +*/ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { --- End diff -- Not sure if we should support `UNION` at all. If the you have a UNION[NULL, RECORD] field in Avro, you'd expect it to be represented also as UNION field in a Table. We change it here to a nullable Record field. Not sure if that's expected. Should
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113241294 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; --- End diff -- `GenericRecord` -> `SpecificRecord` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113243381 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** +* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. +* Avro's {@link Utf8} fields are converted into regular Java strings. +*/ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); +
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113236676 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- are union types always ordered? Could it happen that type `0` is `RECORD` and `1` is `NULL`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983195#comment-15983195 ] ASF GitHub Bot commented on FLINK-5892: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3770 [FLINK-5892] Restore state on the operator level ## General This PR is a collaboration between @guoweiM and myself, enabling Flink to restore state on the operator level. This means that the topology of a job may change in regards to chains when restoring from a 1.3 savepoint, allowing the arbitrary addition, removal or modification of chains. The cornerstone for this is a semantic change for savepoints, no structural changes have been made to the `SavepointV0/1/2` classes or their serialized format: In 1.2 a savepoint contains the states of tasks. If a task consists of multiple operators then the stored TaskState internally contains a list of states, one entry for each operator. In 1.3 a savepoint contains the states of operators only; the notion of tasks is eliminated. If a task consists of multiple operators we store one TaskState for each operator instead. Internally they each contain a list of states with a length of 1. ## Implementation In order for this to work a number of changes had to be made. First and foremost we required a new `StateAssignmentOperation` that was aware of operators. (74881a2, 8be9c58, 4fa8bbd) Since the SAO uses the `ExecutionGraph` classes to map the restored state it was necessary to forward the IDs of all contained operators from the `StreamingJobGraphGenerator` to the `ExecutionJobVertex`. (73427c3) The `PendingCheckpoint` class had to be adjusted to conform to the new semantics; received `SubtaskStates`, containing the state of a task, are broken down into SubtaskStates for the individual operators. (f7b8ef9) ## Tests The majority of this PR are new tests (60% or so). A number of tests were added under flink-tests that test the migration path from 1.2 to 1.3. (d1efdb1) These tests first restore a job from a 1.2 savepoint, without changes to the topology, verify that the state was restored correctly and finally create a new savepoint. They then restore from this migrated 1.3 savepoint, with changes to the topology for varying scenarios, and verify the correct restoration of state again. A new test was also added to the `CheckpointCoordinatorTest` which tests the support for topology changes without executing a job. (8b5430f9) A number of existing tests had to be tweaked to run with the new changes, but these changes all boil down to extending existing mocks by a method or two. (b5430f9) ## Other changes To make it more obvious that we deal with operators and not tasks a new `OperatorID` class was introduced, and usages of `JobVertexID` in savepoint-related parts were replaced when appropriate. (fe74023) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5982_operator_state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3770 commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f Author: zentolDate: 2017-04-03T15:39:50Z [prerequisite] Disable exception when assigning uid on chained operator commit 74881a2788d034db67d99d6d32dbb2cf923aed53 Author: zentol Date: 2017-04-04T10:53:56Z [internal] Adjust SavepointLoader to new Savepoint semantics commit f7b8ef943097cd994a4ef3d5594fea4027720f5a Author: zentol Date: 2017-04-04T13:02:55Z [internal] adjust PendingCheckpoint to be in line with new semantics commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac Author: zentol Date: 2017-04-04T11:33:54Z [internal] Get operator ID's into ExecutionGraph commit 465805792932cb888393d9257fdefd828fa59343 Author: zentol Date: 2017-04-25T16:07:16Z [internals] Extract several utility methods from StateAssignmentOperation commit 008e848715b7091c3deabc9251d9d673f5506e64 Author: guowei.mgw Date: 2017-04-24T09:47:47Z [internal] Add new StateAssignmentOperation commit ffb93298ce90956b9886b3526258f6a814b7e0af Author: zentol Date: 2017-04-04T13:01:07Z [internal] Integrate new StateAssignmentOperation version commit d1efdb1c34d59f04147292b320528cd2bc838244 Author: zentol Date: 2017-04-03T15:40:21Z [tests] Add tests for chain modifications commit
[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3770 [FLINK-5892] Restore state on the operator level ## General This PR is a collaboration between @guoweiM and myself, enabling Flink to restore state on the operator level. This means that the topology of a job may change in regards to chains when restoring from a 1.3 savepoint, allowing the arbitrary addition, removal or modification of chains. The cornerstone for this is a semantic change for savepoints, no structural changes have been made to the `SavepointV0/1/2` classes or their serialized format: In 1.2 a savepoint contains the states of tasks. If a task consists of multiple operators then the stored TaskState internally contains a list of states, one entry for each operator. In 1.3 a savepoint contains the states of operators only; the notion of tasks is eliminated. If a task consists of multiple operators we store one TaskState for each operator instead. Internally they each contain a list of states with a length of 1. ## Implementation In order for this to work a number of changes had to be made. First and foremost we required a new `StateAssignmentOperation` that was aware of operators. (74881a2, 8be9c58, 4fa8bbd) Since the SAO uses the `ExecutionGraph` classes to map the restored state it was necessary to forward the IDs of all contained operators from the `StreamingJobGraphGenerator` to the `ExecutionJobVertex`. (73427c3) The `PendingCheckpoint` class had to be adjusted to conform to the new semantics; received `SubtaskStates`, containing the state of a task, are broken down into SubtaskStates for the individual operators. (f7b8ef9) ## Tests The majority of this PR are new tests (60% or so). A number of tests were added under flink-tests that test the migration path from 1.2 to 1.3. (d1efdb1) These tests first restore a job from a 1.2 savepoint, without changes to the topology, verify that the state was restored correctly and finally create a new savepoint. They then restore from this migrated 1.3 savepoint, with changes to the topology for varying scenarios, and verify the correct restoration of state again. A new test was also added to the `CheckpointCoordinatorTest` which tests the support for topology changes without executing a job. (8b5430f9) A number of existing tests had to be tweaked to run with the new changes, but these changes all boil down to extending existing mocks by a method or two. (b5430f9) ## Other changes To make it more obvious that we deal with operators and not tasks a new `OperatorID` class was introduced, and usages of `JobVertexID` in savepoint-related parts were replaced when appropriate. (fe74023) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5982_operator_state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3770 commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f Author: zentolDate: 2017-04-03T15:39:50Z [prerequisite] Disable exception when assigning uid on chained operator commit 74881a2788d034db67d99d6d32dbb2cf923aed53 Author: zentol Date: 2017-04-04T10:53:56Z [internal] Adjust SavepointLoader to new Savepoint semantics commit f7b8ef943097cd994a4ef3d5594fea4027720f5a Author: zentol Date: 2017-04-04T13:02:55Z [internal] adjust PendingCheckpoint to be in line with new semantics commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac Author: zentol Date: 2017-04-04T11:33:54Z [internal] Get operator ID's into ExecutionGraph commit 465805792932cb888393d9257fdefd828fa59343 Author: zentol Date: 2017-04-25T16:07:16Z [internals] Extract several utility methods from StateAssignmentOperation commit 008e848715b7091c3deabc9251d9d673f5506e64 Author: guowei.mgw Date: 2017-04-24T09:47:47Z [internal] Add new StateAssignmentOperation commit ffb93298ce90956b9886b3526258f6a814b7e0af Author: zentol Date: 2017-04-04T13:01:07Z [internal] Integrate new StateAssignmentOperation version commit d1efdb1c34d59f04147292b320528cd2bc838244 Author: zentol Date: 2017-04-03T15:40:21Z [tests] Add tests for chain modifications commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7 Author: zentol Date: 2017-04-24T11:58:07Z [tests] Adjust existing tests commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714 Author: guowei.mgw Date: 2017-04-24T10:13:44Z
[jira] [Commented] (FLINK-6337) Remove the buffer provider from PartitionRequestServerHandler
[ https://issues.apache.org/jira/browse/FLINK-6337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983189#comment-15983189 ] zhijiang commented on FLINK-6337: - Yeah, I got your point. Thank you for check and advice, my friend! > Remove the buffer provider from PartitionRequestServerHandler > - > > Key: FLINK-6337 > URL: https://issues.apache.org/jira/browse/FLINK-6337 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Currently, {{PartitionRequestServerHandler}} will create a > {{LocalBufferPool}} when the channel is registered. The {{LocalBufferPool}} > is only used to get segment size for creating read view in > {{SpillableSubpartition}}, and the buffers in the pool will not be used all > the time, so it will waste the buffer resource of global pool. > We would like to remove the {{LocalBufferPool}} from the > {{PartitionRequestServerHandler}}, and the {{LocalBufferPool}} in > {{ResultPartition}} can also provide the segment size for creating sub > partition view. -- This message was sent by Atlassian JIRA (v6.3.15#6346)