[jira] [Commented] (FLINK-6376) when deploy flink cluster on the yarn, it is lack of hdfs delegation token.

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984204#comment-15984204
 ] 

ASF GitHub Bot commented on FLINK-6376:
---

GitHub user Rucongzhang opened a pull request:

https://github.com/apache/flink/pull/3776

[FLINK-6376]when deploy flink cluster on the yarn, it is lack of hdfs 
delegation.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Rucongzhang/flink flink-6376

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3776.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3776


commit 256f519cda73571c73914c98b3c9ff4381520907
Author: z00376786 
Date:   2017-04-26T03:36:43Z

when deploy flink cluster on the yarn, it is lack of hdfs delegation token




> when deploy flink cluster on the yarn, it is lack of hdfs delegation token.
> ---
>
> Key: FLINK-6376
> URL: https://issues.apache.org/jira/browse/FLINK-6376
> Project: Flink
>  Issue Type: Bug
>  Components: Security, YARN
>Reporter: zhangrucong1982
>Assignee: zhangrucong1982
>
> 1、I use the flink of version 1.2.0. And  I deploy the flink cluster on the 
> yarn. The hadoop version is 2.7.2.
> 2、I use flink in security model with the keytab and principal. And the key 
> configuration is :security.kerberos.login.keytab: /home/ketab/test.keytab 
> 、security.kerberos.login.principal: test.
> 3、The yarn configuration is default and enable the yarn log aggregation 
> configuration" yarn.log-aggregation-enable : true";
> 4、 Deploying the flink cluster  on the yarn,  the yarn Node manager occur the 
> following failure when aggregation the log in HDFS. The basic reason is lack 
> of HDFS  delegation token. 
>  java.io.IOException: Failed on local exception: java.io.IOException: 
> org.apache.hadoop.security.AccessControlException: Client cannot authenticate 
> via:[TOKEN, KERBEROS]; Host Details : local host is: 
> "SZV1000258954/10.162.181.24"; destination host is: "SZV1000258954":25000;
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:796)
> at org.apache.hadoop.ipc.Client.call(Client.java:1515)
> at org.apache.hadoop.ipc.Client.call(Client.java:1447)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
> at com.sun.proxy.$Proxy26.getFileInfo(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:802)
> at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:201)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
> at com.sun.proxy.$Proxy27.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1919)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1500)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1496)
> at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at 
> 

[GitHub] flink pull request #3776: [FLINK-6376]when deploy flink cluster on the yarn,...

2017-04-25 Thread Rucongzhang
GitHub user Rucongzhang opened a pull request:

https://github.com/apache/flink/pull/3776

[FLINK-6376]when deploy flink cluster on the yarn, it is lack of hdfs 
delegation.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Rucongzhang/flink flink-6376

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3776.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3776


commit 256f519cda73571c73914c98b3c9ff4381520907
Author: z00376786 
Date:   2017-04-26T03:36:43Z

when deploy flink cluster on the yarn, it is lack of hdfs delegation token




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984131#comment-15984131
 ] 

ASF GitHub Bot commented on FLINK-5867:
---

Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3773#discussion_r113361061
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * FailoverRegion manages the failover of a minimal pipeline connected sub 
graph.
+ * It will change from CREATED to CANCELING and then to CANCELLED and at 
last to RUNNING,
+ */
+public class FailoverRegion {
+
+   private static final AtomicReferenceFieldUpdater STATE_UPDATER =
+   
AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, 
"state");
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(FailoverRegion.class);
+
+   // 

+
+   /** a unique id for debugging */
+   private final AbstractID id = new AbstractID();
+
+   private final ExecutionGraph executionGraph;
+
+   private final List connectedExecutionVertexes;
+
+   /** The executor that executes the recovery action after all vertices 
are in a */
+   private final Executor executor;
+
+   /** Current status of the job execution */
+   private volatile JobStatus state = JobStatus.RUNNING;
+
+
+   public FailoverRegion(ExecutionGraph executionGraph, Executor executor, 
List connectedExecutions) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executor = checkNotNull(executor);
+   this.connectedExecutionVertexes = 
checkNotNull(connectedExecutions);
+
+   LOG.debug("Created failover region {} with vertices: {}", id, 
connectedExecutions);
+   }
+
+   public void onExecutionFail(Execution taskExecution, Throwable cause) {
+   // TODO: check if need to failover the preceding region
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   executionGraph.failGlobal(cause);
+   }
+   else {
+   cancel(taskExecution.getGlobalModVersion());
+   }
+   }
+
+   private void allVerticesInTerminalState(long 
globalModVersionOfFailover) {
+   while (true) {
+   JobStatus curStatus = this.state;
+   if (curStatus.equals(JobStatus.CANCELLING)) {
+ 

[GitHub] flink pull request #3773: [FLINK-5867] [FLINK-5866] [flip-1] Implement Failo...

2017-04-25 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3773#discussion_r113361061
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * FailoverRegion manages the failover of a minimal pipeline connected sub 
graph.
+ * It will change from CREATED to CANCELING and then to CANCELLED and at 
last to RUNNING,
+ */
+public class FailoverRegion {
+
+   private static final AtomicReferenceFieldUpdater STATE_UPDATER =
+   
AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, 
"state");
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(FailoverRegion.class);
+
+   // 

+
+   /** a unique id for debugging */
+   private final AbstractID id = new AbstractID();
+
+   private final ExecutionGraph executionGraph;
+
+   private final List connectedExecutionVertexes;
+
+   /** The executor that executes the recovery action after all vertices 
are in a */
+   private final Executor executor;
+
+   /** Current status of the job execution */
+   private volatile JobStatus state = JobStatus.RUNNING;
+
+
+   public FailoverRegion(ExecutionGraph executionGraph, Executor executor, 
List connectedExecutions) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executor = checkNotNull(executor);
+   this.connectedExecutionVertexes = 
checkNotNull(connectedExecutions);
+
+   LOG.debug("Created failover region {} with vertices: {}", id, 
connectedExecutions);
+   }
+
+   public void onExecutionFail(Execution taskExecution, Throwable cause) {
+   // TODO: check if need to failover the preceding region
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   executionGraph.failGlobal(cause);
+   }
+   else {
+   cancel(taskExecution.getGlobalModVersion());
+   }
+   }
+
+   private void allVerticesInTerminalState(long 
globalModVersionOfFailover) {
+   while (true) {
+   JobStatus curStatus = this.state;
+   if (curStatus.equals(JobStatus.CANCELLING)) {
+   if (transitionState(curStatus, 
JobStatus.CANCELED)) {
+   reset(globalModVersionOfFailover);
+   break;
+   }
+ 

[jira] [Commented] (FLINK-6386) Missing bracket in 'Compiler Limitation' section

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984096#comment-15984096
 ] 

ASF GitHub Bot commented on FLINK-6386:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/3775

[FLINK-6386] Missing bracket in 'Compiler Limitation' section

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-6386] 
Missing bracket in 'Compiler Limitation' section")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-6386

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3775.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3775


commit 1533645864465d4ce174d5e9eef3cbb11dead5e3
Author: Bowen Li 
Date:   2017-04-26T02:55:25Z

FLINK-6386 Missing bracket in 'Compiler Limitation' section




> Missing bracket in 'Compiler Limitation' section
> 
>
> Key: FLINK-6386
> URL: https://issues.apache.org/jira/browse/FLINK-6386
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Trivial
> Fix For: 1.2.2
>
>
> "This means that types such as `Tuple2 declared as..."
> should be 
> "This means that types such as `Tuple2` or 
> `Collector` declared as..."



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3775: [FLINK-6386] Missing bracket in 'Compiler Limitati...

2017-04-25 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/3775

[FLINK-6386] Missing bracket in 'Compiler Limitation' section

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-6386] 
Missing bracket in 'Compiler Limitation' section")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-6386

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3775.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3775


commit 1533645864465d4ce174d5e9eef3cbb11dead5e3
Author: Bowen Li 
Date:   2017-04-26T02:55:25Z

FLINK-6386 Missing bracket in 'Compiler Limitation' section




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6386) Missing bracket in 'Compiler Limitation' section

2017-04-25 Thread Bowen Li (JIRA)
Bowen Li created FLINK-6386:
---

 Summary: Missing bracket in 'Compiler Limitation' section
 Key: FLINK-6386
 URL: https://issues.apache.org/jira/browse/FLINK-6386
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.2.0
Reporter: Bowen Li
Assignee: Bowen Li
Priority: Trivial
 Fix For: 1.2.2


"This means that types such as `Tuple2` or `Collector` 
declared as..."



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4621) Improve decimal literals of SQL API

2017-04-25 Thread JiJun Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983968#comment-15983968
 ] 

JiJun Tang commented on FLINK-4621:
---

I think simple numbers can be coverted to Java primitives.For exmaple : -0.5 -> 
longval=-5,precision=1

> Improve decimal literals of SQL API
> ---
>
> Key: FLINK-4621
> URL: https://issues.apache.org/jira/browse/FLINK-4621
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, all SQL {{DECIMAL}} types are converted to BigDecimals internally. 
> By default, the SQL parsers creates {{DECIMAL}} literals of any number e.g. 
> {{SELECT 1.0, 12, -0.5 FROM x}}. I think it would be better if these simple 
> numbers would be represented as Java primitives instead of objects.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-25 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-5855:

Description: 
{code}
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
{code}

Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
handlePendingFilesForPreviousCheckpoints().

After discussion. I would give a related jira from this issue. 
https://issues.apache.org/jira/browse/FLINK-6381

  was:
{code}
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
{code}

Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
handlePendingFilesForPreviousCheckpoints().


> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().
> After discussion. I would give a related jira from this issue. 
> https://issues.apache.org/jira/browse/FLINK-6381



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-25 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983948#comment-15983948
 ] 

mingleizhang commented on FLINK-5855:
-

[~tedyu] Hi, Ted. Thanks for telling me so useful information. I know what to 
do and how to do next time. Thanks again and very appreciate it.

> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6350) Flink: Windowing does not work with streams from collections and the local execution environment

2017-04-25 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983852#comment-15983852
 ] 

Bowen Li commented on FLINK-6350:
-

I'll take a look first. Thanks!

> Flink: Windowing does not work with streams from collections and the local 
> execution environment
> 
>
> Key: FLINK-6350
> URL: https://issues.apache.org/jira/browse/FLINK-6350
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.2.2
>
>
> When using events via the {{fromCollection}} method of 
> {{StreamExecutionEnvironment}}, window timing is not supported. The time 
> windows close immediately.  
> This is unfortunate because mocking events from collections and testing them 
> locally is a powerful way to unit test stream processors.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...

2017-04-25 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3769
  
@zentol  fix that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6367) support custom header settings of allow origin

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983806#comment-15983806
 ] 

ASF GitHub Bot commented on FLINK-6367:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3769
  
@zentol  fix that


> support custom header settings of allow origin
> --
>
> Key: FLINK-6367
> URL: https://issues.apache.org/jira/browse/FLINK-6367
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> `jobmanager.web.access-control-allow-origin`: Enable custom access control 
> parameter for allow origin header, default is `*`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983655#comment-15983655
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
@StephanEwen @zentol I shaded okhttp3 and okio from flink-metrics-datadog. 
I didn't use 'shade-flink' because I found it somehow prevents me from 
building a uber jar. Let me know if it's ok to shade them this way.


```
$ jar -tf 
target/flink-metrics-datadog-1.3-SNAPSHOT-jar-with-dependencies.jar

META-INF/
META-INF/MANIFEST.MF
META-INF/DEPENDENCIES
META-INF/LICENSE
META-INF/NOTICE
org/
org/apache/
org/apache/flink/
org/apache/flink/metrics/
org/apache/flink/metrics/datadog/
org/apache/flink/metrics/datadog/DatadogHttpClient.class

org/apache/flink/metrics/datadog/DatadogHttpReporter$DatadogHttpRequest.class
org/apache/flink/metrics/datadog/DatadogHttpReporter.class
org/apache/flink/metrics/datadog/DCounter.class
org/apache/flink/metrics/datadog/DGauge.class
org/apache/flink/metrics/datadog/DMeter.class
org/apache/flink/metrics/datadog/DMetric.class
org/apache/flink/metrics/datadog/DSeries.class
org/apache/flink/metrics/datadog/MetricType.class
META-INF/maven/
META-INF/maven/org.apache.flink/
META-INF/maven/org.apache.flink/flink-metrics-datadog/
META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.xml
META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.properties
META-INF/maven/org.apache.flink/force-shading/
META-INF/maven/org.apache.flink/force-shading/pom.xml
META-INF/maven/org.apache.flink/force-shading/pom.properties
org/apache/flink/shaded/
org/apache/flink/shaded/okhttp3/
org/apache/flink/shaded/okhttp3/Address.class
.
org/apache/flink/shaded/okhttp3/WebSocketListener.class
META-INF/maven/com.squareup.okhttp3/
META-INF/maven/com.squareup.okhttp3/okhttp/
META-INF/maven/com.squareup.okhttp3/okhttp/pom.xml
META-INF/maven/com.squareup.okhttp3/okhttp/pom.properties
org/apache/flink/shaded/okio/
org/apache/flink/shaded/okio/AsyncTimeout$1.class
...
org/apache/flink/shaded/okio/Util.class
META-INF/maven/com.squareup.okio/
META-INF/maven/com.squareup.okio/okio/
META-INF/maven/com.squareup.okio/okio/pom.xml
META-INF/maven/com.squareup.okio/okio/pom.properties
```


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-25 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
@StephanEwen @zentol I shaded okhttp3 and okio from flink-metrics-datadog. 
I didn't use 'shade-flink' because I found it somehow prevents me from 
building a uber jar. Let me know if it's ok to shade them this way.


```
$ jar -tf 
target/flink-metrics-datadog-1.3-SNAPSHOT-jar-with-dependencies.jar

META-INF/
META-INF/MANIFEST.MF
META-INF/DEPENDENCIES
META-INF/LICENSE
META-INF/NOTICE
org/
org/apache/
org/apache/flink/
org/apache/flink/metrics/
org/apache/flink/metrics/datadog/
org/apache/flink/metrics/datadog/DatadogHttpClient.class

org/apache/flink/metrics/datadog/DatadogHttpReporter$DatadogHttpRequest.class
org/apache/flink/metrics/datadog/DatadogHttpReporter.class
org/apache/flink/metrics/datadog/DCounter.class
org/apache/flink/metrics/datadog/DGauge.class
org/apache/flink/metrics/datadog/DMeter.class
org/apache/flink/metrics/datadog/DMetric.class
org/apache/flink/metrics/datadog/DSeries.class
org/apache/flink/metrics/datadog/MetricType.class
META-INF/maven/
META-INF/maven/org.apache.flink/
META-INF/maven/org.apache.flink/flink-metrics-datadog/
META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.xml
META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.properties
META-INF/maven/org.apache.flink/force-shading/
META-INF/maven/org.apache.flink/force-shading/pom.xml
META-INF/maven/org.apache.flink/force-shading/pom.properties
org/apache/flink/shaded/
org/apache/flink/shaded/okhttp3/
org/apache/flink/shaded/okhttp3/Address.class
.
org/apache/flink/shaded/okhttp3/WebSocketListener.class
META-INF/maven/com.squareup.okhttp3/
META-INF/maven/com.squareup.okhttp3/okhttp/
META-INF/maven/com.squareup.okhttp3/okhttp/pom.xml
META-INF/maven/com.squareup.okhttp3/okhttp/pom.properties
org/apache/flink/shaded/okio/
org/apache/flink/shaded/okio/AsyncTimeout$1.class
...
org/apache/flink/shaded/okio/Util.class
META-INF/maven/com.squareup.okio/
META-INF/maven/com.squareup.okio/okio/
META-INF/maven/com.squareup.okio/okio/pom.xml
META-INF/maven/com.squareup.okio/okio/pom.properties
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5869) ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983516#comment-15983516
 ] 

ASF GitHub Bot commented on FLINK-5869:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3772#discussion_r113272506
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,6 +72,13 @@
.defaultValue(16)

.withDeprecatedKeys("job-manager.max-attempts-history-size");
 
+   /**
+* The maximum number of prior execution attempts kept in history.
+*/
+   public static final ConfigOption EXECUTION_FAILOVER_STRATEGY =
+   key("jobmanager.execution.failover-strategy")
+   .defaultValue("full");
--- End diff --

have we ever considered defining a set of valid values directly in the 
config option?


> ExecutionGraph use FailoverCoordinator to manage the failover of execution 
> vertexes
> ---
>
> Key: FLINK-5869
> URL: https://issues.apache.org/jira/browse/FLINK-5869
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> Execution graph doesn't manage the failover of executions. It only care for 
> the state of the whole job, which is CREATED, RUNNING, FAILED, FINISHED, or 
> SUSPEND. 
> For execution failure, it will notice the FailoverCoordinator to do failover.
> It only record the finished job vertex and changes to FINISHED after all 
> vertexes finished.
> It will change to final fail if restart strategy fail or meet unrecoverable 
> exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3772: [FLINK-5869] [flip-1] Introduce abstraction for Fa...

2017-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3772#discussion_r113272506
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,6 +72,13 @@
.defaultValue(16)

.withDeprecatedKeys("job-manager.max-attempts-history-size");
 
+   /**
+* The maximum number of prior execution attempts kept in history.
+*/
+   public static final ConfigOption EXECUTION_FAILOVER_STRATEGY =
+   key("jobmanager.execution.failover-strategy")
+   .defaultValue("full");
--- End diff --

have we ever considered defining a set of valid values directly in the 
config option?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6302) Documentation build error on ruby 2.4

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983512#comment-15983512
 ] 

ASF GitHub Bot commented on FLINK-6302:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3720
  
The `GemFile` should be updated as well. It would still work as long as the 
`GemFile.lock` file is present. If it deleted however the bundler would load 
the dependencies from `GemFile` and create a new `GemFile.lock`.


> Documentation build error on ruby 2.4
> -
>
> Key: FLINK-6302
> URL: https://issues.apache.org/jira/browse/FLINK-6302
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Tao Meng
>Assignee: Tao Meng
>Priority: Trivial
>
> {code}
> /usr/local/Cellar/ruby/2.4.1_1/include/ruby-2.4.0/ruby/ruby.h:981:28: note: 
> expanded from macro 'RSTRING_LEN'
>  RSTRING(str)->as.heap.len)
>  ~~^~~
> yajl_ext.c:881:22: error: use of undeclared identifier 'rb_cFixnum'
> rb_define_method(rb_cFixnum, "to_json", rb_yajl_json_ext_fixnum_to_json, 
> -1);
>  ^
> 17 warnings and 1 error generated.
> make: *** [yajl_ext.o] Error 1
> make failed, exit code 2
> {code}
> We should update Gemfile.lock.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3720: [FLINK-6302] Documentation build error on ruby 2.4

2017-04-25 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3720
  
The `GemFile` should be updated as well. It would still work as long as the 
`GemFile.lock` file is present. If it deleted however the bundler would load 
the dependencies from `GemFile` and create a new `GemFile.lock`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6302) Documentation build error on ruby 2.4

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983506#comment-15983506
 ] 

ASF GitHub Bot commented on FLINK-6302:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3720#discussion_r113292688
  
--- Diff: docs/Gemfile.lock ---
@@ -30,18 +30,19 @@ GEM
   redcarpet (~> 3.1)
   safe_yaml (~> 1.0)
   toml (~> 0.1.0)
-jekyll-coffeescript (1.0.1)
+jekyll-coffeescript (1.0.2)
   coffee-script (~> 2.2)
+  coffee-script-source (~> 1.11.1)
 jekyll-gist (1.4.0)
-  octokit (~> 4.3.0)
+  octokit (~> 4.2)
--- End diff --

There was never an explicit version downgrade in `jekyll-gist`, they always 
had the `4.3.0` `octokit` dependency since it was added.

Similarly, since we added the `jekyll-gist`/`octokit` dependency the 
version of `octokit` was set to `4.3.0`.

In conclusion, it doesn't appear like it's related to a bug fix. We just 
always relied on a higher version than `jekyll-gist` did, and it worked out 
fine.


> Documentation build error on ruby 2.4
> -
>
> Key: FLINK-6302
> URL: https://issues.apache.org/jira/browse/FLINK-6302
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Tao Meng
>Assignee: Tao Meng
>Priority: Trivial
>
> {code}
> /usr/local/Cellar/ruby/2.4.1_1/include/ruby-2.4.0/ruby/ruby.h:981:28: note: 
> expanded from macro 'RSTRING_LEN'
>  RSTRING(str)->as.heap.len)
>  ~~^~~
> yajl_ext.c:881:22: error: use of undeclared identifier 'rb_cFixnum'
> rb_define_method(rb_cFixnum, "to_json", rb_yajl_json_ext_fixnum_to_json, 
> -1);
>  ^
> 17 warnings and 1 error generated.
> make: *** [yajl_ext.o] Error 1
> make failed, exit code 2
> {code}
> We should update Gemfile.lock.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3720: [FLINK-6302] Documentation build error on ruby 2.4

2017-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3720#discussion_r113292688
  
--- Diff: docs/Gemfile.lock ---
@@ -30,18 +30,19 @@ GEM
   redcarpet (~> 3.1)
   safe_yaml (~> 1.0)
   toml (~> 0.1.0)
-jekyll-coffeescript (1.0.1)
+jekyll-coffeescript (1.0.2)
   coffee-script (~> 2.2)
+  coffee-script-source (~> 1.11.1)
 jekyll-gist (1.4.0)
-  octokit (~> 4.3.0)
+  octokit (~> 4.2)
--- End diff --

There was never an explicit version downgrade in `jekyll-gist`, they always 
had the `4.3.0` `octokit` dependency since it was added.

Similarly, since we added the `jekyll-gist`/`octokit` dependency the 
version of `octokit` was set to `4.3.0`.

In conclusion, it doesn't appear like it's related to a bug fix. We just 
always relied on a higher version than `jekyll-gist` did, and it worked out 
fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3757: (refactor) some opportunities to use multi-catch

2017-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3757#discussion_r113290832
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java 
---
@@ -121,11 +120,7 @@ public static boolean 
validateClassLoadable(ClassNotFoundException cnfe, ClassLo
String className = cnfe.getMessage();
Class.forName(className, false, cl);
return true;
-   }
-   catch (ClassNotFoundException e) {
-   return false;
-   }
-   catch (Exception e) {
+   }catch(ClassNotFoundException | Exception  e) /*multi-catch 
refactor*/ {
--- End diff --

you can remove the `ClassNotFoundException` clause since it is included in 
the `Exception` clause anway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3757: (refactor) some opportunities to use multi-catch

2017-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3757#discussion_r113290672
  
--- Diff: 
flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
 ---
@@ -125,9 +125,7 @@ public void open(int taskNumber, int numTasks) throws 
IOException {
datumWriter = new SpecificDatumWriter(avroValueType);
try {
schema = 
((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
-   } catch (InstantiationException e) {
-   throw new RuntimeException(e.getMessage());
-   } catch (IllegalAccessException e) {
+   }catch(InstantiationException | IllegalAccessException  
e) /*multi-catch refactor*/ {
--- End diff --

In this particular case it makes sense as it reduces clutter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6384) PythonStreamer does not close python processes

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983476#comment-15983476
 ] 

ASF GitHub Bot commented on FLINK-6384:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3774
  
Looks good, +1 to merge.


> PythonStreamer does not close python processes
> --
>
> Key: FLINK-6384
> URL: https://issues.apache.org/jira/browse/FLINK-6384
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.3.0
>
>
> The {{PythonStreamer}} opens a new process calling the python binary to check 
> whether the binary is available. This process won't get closed leading to an 
> excessive number of open python processes when running the 
> {{PythonPlanBinderTest}}. I'm not entirely sure whether we need this extra 
> process, because the actual python call with the python code would also fail 
> if there is no python binary available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3774: [FLINK-6384] [py] Remove python binary check via addition...

2017-04-25 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3774
  
Looks good, +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Reopened] (FLINK-6385) HistoryServerTest.testFullArchiveLifecycle instable on Travis

2017-04-25 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-6385:
-

> HistoryServerTest.testFullArchiveLifecycle instable on Travis
> -
>
> Key: FLINK-6385
> URL: https://issues.apache.org/jira/browse/FLINK-6385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{HistoryServerTest.testFullArchiveLifecycle}} failed on Travis [1].
> [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/225620353/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6385) HistoryServerTest.testFullArchiveLifecycle instable on Travis

2017-04-25 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6385.
---
Resolution: Duplicate

> HistoryServerTest.testFullArchiveLifecycle instable on Travis
> -
>
> Key: FLINK-6385
> URL: https://issues.apache.org/jira/browse/FLINK-6385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{HistoryServerTest.testFullArchiveLifecycle}} failed on Travis [1].
> [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/225620353/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6175) HistoryServerTest.testFullArchiveLifecycle fails

2017-04-25 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6175:

Component/s: History Server

> HistoryServerTest.testFullArchiveLifecycle fails
> 
>
> Key: FLINK-6175
> URL: https://issues.apache.org/jira/browse/FLINK-6175
> Project: Flink
>  Issue Type: Test
>  Components: History Server, Tests, Webfrontend
>Reporter: Ufuk Celebi
>Assignee: Chesnay Schepler
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/213933823/log.txt
> {code}
> estFullArchiveLifecycle(org.apache.flink.runtime.webmonitor.history.HistoryServerTest)
>   Time elapsed: 2.162 sec  <<< FAILURE!
> java.lang.AssertionError: /joboverview.json did not contain valid json
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertNotNull(Assert.java:712)
>   at 
> org.apache.flink.runtime.webmonitor.history.HistoryServerTest.testFullArchiveLifecycle(HistoryServerTest.java:98)
> {code}
> Happened on a branch with unrelated changes [~Zentol].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6385) HistoryServerTest.testFullArchiveLifecycle instable on Travis

2017-04-25 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6385.
---
Resolution: Fixed

> HistoryServerTest.testFullArchiveLifecycle instable on Travis
> -
>
> Key: FLINK-6385
> URL: https://issues.apache.org/jira/browse/FLINK-6385
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{HistoryServerTest.testFullArchiveLifecycle}} failed on Travis [1].
> [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/225620353/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6155) Allow to specify endpoint names

2017-04-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-6155.

Resolution: Fixed

Fixed via 433a345edccdee29385957841e6513679690a5e9

> Allow to specify endpoint names
> ---
>
> Key: FLINK-6155
> URL: https://issues.apache.org/jira/browse/FLINK-6155
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.3.0
>
>
> In order to make the standalone mode work, we have to be able to assign names 
> to the {{RpcEndpoints}}. In the case of the Akka implementation they would 
> correspond to the actor names. This information is necessary to look the 
> corresponding endpoints up on a remote host because in standalone mode the  
> names have to be deterministic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3596: [FLINK-6155] Introduce an endpoint id for RpcEndpo...

2017-04-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3596


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6155) Allow to specify endpoint names

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983436#comment-15983436
 ] 

ASF GitHub Bot commented on FLINK-6155:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3596


> Allow to specify endpoint names
> ---
>
> Key: FLINK-6155
> URL: https://issues.apache.org/jira/browse/FLINK-6155
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.3.0
>
>
> In order to make the standalone mode work, we have to be able to assign names 
> to the {{RpcEndpoints}}. In the case of the Akka implementation they would 
> correspond to the actor names. This information is necessary to look the 
> corresponding endpoints up on a remote host because in standalone mode the  
> names have to be deterministic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6155) Allow to specify endpoint names

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983433#comment-15983433
 ] 

ASF GitHub Bot commented on FLINK-6155:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3596
  
Failing test case has just been fixed. Merging this PR.


> Allow to specify endpoint names
> ---
>
> Key: FLINK-6155
> URL: https://issues.apache.org/jira/browse/FLINK-6155
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.3.0
>
>
> In order to make the standalone mode work, we have to be able to assign names 
> to the {{RpcEndpoints}}. In the case of the Akka implementation they would 
> correspond to the actor names. This information is necessary to look the 
> corresponding endpoints up on a remote host because in standalone mode the  
> names have to be deterministic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3596: [FLINK-6155] Introduce an endpoint id for RpcEndpoints

2017-04-25 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3596
  
Failing test case has just been fixed. Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6384) PythonStreamer does not close python processes

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983424#comment-15983424
 ] 

ASF GitHub Bot commented on FLINK-6384:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3774

[FLINK-6384] [py] Remove python binary check via additional process

The PythonStreamer used to check for the existence of the python binary by
starting a python process. This process was not closed afterwards. This 
caused
the PythonPlanBinderTest to fail locally.

I think the check whether a python binary exists is not necessary since the
subsequent python command would fail anyway if there is no binary available 
on
the system. The system failure message is that there is no such file or 
directory.
This error message should be descriptive enough in order to debug such a 
problem.

cc @zentol

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixPythonStreamer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3774.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3774


commit 746fe59737338c02502443c5c3e88c613d0b92ad
Author: Till Rohrmann 
Date:   2017-04-25T18:41:58Z

[FLINK-6384] [py] Remove python binary check via additional process

The PythonStreamer used to check for the existence of the python binary by
starting a python process. This process was not closed afterwards. This 
caused
the PythonPlanBinderTest to fail locally.

I think the check whether a python binary exists is not necessary since the
subsequent python command would fail anyway if there is no binary available 
on
the system. The system failure message is that there is no such file or 
directory.
This error message should be descriptive enough in order to debug such a 
problem.




> PythonStreamer does not close python processes
> --
>
> Key: FLINK-6384
> URL: https://issues.apache.org/jira/browse/FLINK-6384
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.3.0
>
>
> The {{PythonStreamer}} opens a new process calling the python binary to check 
> whether the binary is available. This process won't get closed leading to an 
> excessive number of open python processes when running the 
> {{PythonPlanBinderTest}}. I'm not entirely sure whether we need this extra 
> process, because the actual python call with the python code would also fail 
> if there is no python binary available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3774: [FLINK-6384] [py] Remove python binary check via a...

2017-04-25 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3774

[FLINK-6384] [py] Remove python binary check via additional process

The PythonStreamer used to check for the existence of the python binary by
starting a python process. This process was not closed afterwards. This 
caused
the PythonPlanBinderTest to fail locally.

I think the check whether a python binary exists is not necessary since the
subsequent python command would fail anyway if there is no binary available 
on
the system. The system failure message is that there is no such file or 
directory.
This error message should be descriptive enough in order to debug such a 
problem.

cc @zentol

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixPythonStreamer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3774.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3774


commit 746fe59737338c02502443c5c3e88c613d0b92ad
Author: Till Rohrmann 
Date:   2017-04-25T18:41:58Z

[FLINK-6384] [py] Remove python binary check via additional process

The PythonStreamer used to check for the existence of the python binary by
starting a python process. This process was not closed afterwards. This 
caused
the PythonPlanBinderTest to fail locally.

I think the check whether a python binary exists is not necessary since the
subsequent python command would fail anyway if there is no binary available 
on
the system. The system failure message is that there is no such file or 
directory.
This error message should be descriptive enough in order to debug such a 
problem.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6330) Improve Docker documentation

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983411#comment-15983411
 ] 

ASF GitHub Bot commented on FLINK-6330:
---

Github user patricklucas commented on the issue:

https://github.com/apache/flink/pull/3751
  
It will be available before the next release, though I noticed just 
yesterday that it seems we build the [official 1.2 
docs](https://ci.apache.org/projects/flink/flink-docs-release-1.2/) 
(erroneously?) from a development branch instead of the actual tagged release.

I'm about ready to submit the PR to get the official images created, but 
it's fine if you want to wait.


> Improve Docker documentation
> 
>
> Key: FLINK-6330
> URL: https://issues.apache.org/jira/browse/FLINK-6330
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Patrick Lucas
>Assignee: Patrick Lucas
> Fix For: 1.2.2
>
>
> The "Docker" page in the docs exists but is blank.
> Add something useful here, including references to the official images that 
> should exist once 1.2.1 is released, and add a brief "Kubernetes" page as 
> well, referencing the [helm chart|https://github.com/docker-flink/examples]. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3751: [FLINK-6330] [docs] Add basic Docker, K8s docs

2017-04-25 Thread patricklucas
Github user patricklucas commented on the issue:

https://github.com/apache/flink/pull/3751
  
It will be available before the next release, though I noticed just 
yesterday that it seems we build the [official 1.2 
docs](https://ci.apache.org/projects/flink/flink-docs-release-1.2/) 
(erroneously?) from a development branch instead of the actual tagged release.

I'm about ready to submit the PR to get the official images created, but 
it's fine if you want to wait.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6302) Documentation build error on ruby 2.4

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983397#comment-15983397
 ] 

ASF GitHub Bot commented on FLINK-6302:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3720
  
I am not familiar with the Ruby / Bundler / etc space, but it looks like 
`Gemfile` is the original source of truth and `Gemfile.lock` is in some way 
derived from that.
Is it correct to only update `Gemfile.lock` and not `Gemfile`?

@uce or @alpinegizmo can you maybe shed some light here?


> Documentation build error on ruby 2.4
> -
>
> Key: FLINK-6302
> URL: https://issues.apache.org/jira/browse/FLINK-6302
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Tao Meng
>Assignee: Tao Meng
>Priority: Trivial
>
> {code}
> /usr/local/Cellar/ruby/2.4.1_1/include/ruby-2.4.0/ruby/ruby.h:981:28: note: 
> expanded from macro 'RSTRING_LEN'
>  RSTRING(str)->as.heap.len)
>  ~~^~~
> yajl_ext.c:881:22: error: use of undeclared identifier 'rb_cFixnum'
> rb_define_method(rb_cFixnum, "to_json", rb_yajl_json_ext_fixnum_to_json, 
> -1);
>  ^
> 17 warnings and 1 error generated.
> make: *** [yajl_ext.o] Error 1
> make failed, exit code 2
> {code}
> We should update Gemfile.lock.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3720: [FLINK-6302] Documentation build error on ruby 2.4

2017-04-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3720
  
I am not familiar with the Ruby / Bundler / etc space, but it looks like 
`Gemfile` is the original source of truth and `Gemfile.lock` is in some way 
derived from that.
Is it correct to only update `Gemfile.lock` and not `Gemfile`?

@uce or @alpinegizmo can you maybe shed some light here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6302) Documentation build error on ruby 2.4

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983391#comment-15983391
 ] 

ASF GitHub Bot commented on FLINK-6302:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3720#discussion_r113273807
  
--- Diff: docs/Gemfile.lock ---
@@ -30,18 +30,19 @@ GEM
   redcarpet (~> 3.1)
   safe_yaml (~> 1.0)
   toml (~> 0.1.0)
-jekyll-coffeescript (1.0.1)
+jekyll-coffeescript (1.0.2)
   coffee-script (~> 2.2)
+  coffee-script-source (~> 1.11.1)
 jekyll-gist (1.4.0)
-  octokit (~> 4.3.0)
+  octokit (~> 4.2)
--- End diff --

Does the downgrade fix an issue? If yes, its okay to do it, I think


> Documentation build error on ruby 2.4
> -
>
> Key: FLINK-6302
> URL: https://issues.apache.org/jira/browse/FLINK-6302
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Tao Meng
>Assignee: Tao Meng
>Priority: Trivial
>
> {code}
> /usr/local/Cellar/ruby/2.4.1_1/include/ruby-2.4.0/ruby/ruby.h:981:28: note: 
> expanded from macro 'RSTRING_LEN'
>  RSTRING(str)->as.heap.len)
>  ~~^~~
> yajl_ext.c:881:22: error: use of undeclared identifier 'rb_cFixnum'
> rb_define_method(rb_cFixnum, "to_json", rb_yajl_json_ext_fixnum_to_json, 
> -1);
>  ^
> 17 warnings and 1 error generated.
> make: *** [yajl_ext.o] Error 1
> make failed, exit code 2
> {code}
> We should update Gemfile.lock.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3720: [FLINK-6302] Documentation build error on ruby 2.4

2017-04-25 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3720#discussion_r113273807
  
--- Diff: docs/Gemfile.lock ---
@@ -30,18 +30,19 @@ GEM
   redcarpet (~> 3.1)
   safe_yaml (~> 1.0)
   toml (~> 0.1.0)
-jekyll-coffeescript (1.0.1)
+jekyll-coffeescript (1.0.2)
   coffee-script (~> 2.2)
+  coffee-script-source (~> 1.11.1)
 jekyll-gist (1.4.0)
-  octokit (~> 4.3.0)
+  octokit (~> 4.2)
--- End diff --

Does the downgrade fix an issue? If yes, its okay to do it, I think


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6330) Improve Docker documentation

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983383#comment-15983383
 ] 

ASF GitHub Bot commented on FLINK-6330:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3751
  
Sorry, taking a step back.

It seems that https://hub.docker.com/_/flink/ is not yet available. Should 
we postpone this merge until it is available?


> Improve Docker documentation
> 
>
> Key: FLINK-6330
> URL: https://issues.apache.org/jira/browse/FLINK-6330
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Patrick Lucas
>Assignee: Patrick Lucas
> Fix For: 1.2.2
>
>
> The "Docker" page in the docs exists but is blank.
> Add something useful here, including references to the official images that 
> should exist once 1.2.1 is released, and add a brief "Kubernetes" page as 
> well, referencing the [helm chart|https://github.com/docker-flink/examples]. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3751: [FLINK-6330] [docs] Add basic Docker, K8s docs

2017-04-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3751
  
Sorry, taking a step back.

It seems that https://hub.docker.com/_/flink/ is not yet available. Should 
we postpone this merge until it is available?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6330) Improve Docker documentation

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983371#comment-15983371
 ] 

ASF GitHub Bot commented on FLINK-6330:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3751
  
Great, thanks!

Merging this...


> Improve Docker documentation
> 
>
> Key: FLINK-6330
> URL: https://issues.apache.org/jira/browse/FLINK-6330
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Patrick Lucas
>Assignee: Patrick Lucas
> Fix For: 1.2.2
>
>
> The "Docker" page in the docs exists but is blank.
> Add something useful here, including references to the official images that 
> should exist once 1.2.1 is released, and add a brief "Kubernetes" page as 
> well, referencing the [helm chart|https://github.com/docker-flink/examples]. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3751: [FLINK-6330] [docs] Add basic Docker, K8s docs

2017-04-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3751
  
Great, thanks!

Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983370#comment-15983370
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3736
  
I think this is starting to look very good!

Given that this introduces new libraries as dependencies (okhttp, okio), 
should we pro-actively shade those away to avoid dependency conflicts?
Admittedly, it would only impact users that explicitly drop in the datadog 
reporter, but it might still be nice for those users. Given that we build a 
jr-with-dependencies anyways, the step to shading is small...


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3736
  
I think this is starting to look very good!

Given that this introduces new libraries as dependencies (okhttp, okio), 
should we pro-actively shade those away to avoid dependency conflicts?
Admittedly, it would only impact users that explicitly drop in the datadog 
reporter, but it might still be nice for those users. Given that we build a 
jr-with-dependencies anyways, the step to shading is small...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983350#comment-15983350
 ] 

ASF GitHub Bot commented on FLINK-5867:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/3773

[FLINK-5867] [FLINK-5866] [flip-1] Implement FailoverStrategy for pipelined 
regions

This is based on #3772 , the relevant commits are the latter four.

The majority of the work has been done by @tiemsn , with some rebasing and 
additions from me.

# Pipelined Region Failover

As described in 
[FLIP-1](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures),
 this pull request implements the detection of pipelined regions in the 
`ExecutionGraph` and failover within these pipelined regions.


![st0-nzqia5abpwrgaogpllw](https://cloud.githubusercontent.com/assets/1727146/25399938/54fda5a4-29f1-11e7-9efe-5d845644089f.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
flip-1-pipelined-regions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3773.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3773


commit ef7fd9964c1c74feb4641e57a138c54558b2449c
Author: Stephan Ewen 
Date:   2017-03-21T18:13:34Z

[FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to 
ExecutionGraph

  - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to 
differentiate from fine grained failures/recovery
  - Add base class for FailoverStrategy
  - Add default implementation (restart all tasks)
  - Add logic to load the failover strategy from the configuration

commit c04a8a312098fddce14e392b8d9dbf396b1df3f3
Author: Stephan Ewen 
Date:   2017-03-29T20:49:54Z

[FLINK-6340] [flip-1] Add a termination future to the Execution

commit 92d3f7e1025dc3c3499730bda8e8a9acfd3b5c13
Author: shuai.xus 
Date:   2017-04-18T06:15:29Z

[FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the 
ExecutionGraph on task failure

commit 456600d5e37724bbcc7d570f6828e3fef6298483
Author: shuai.xus 
Date:   2017-04-20T21:56:53Z

[FLINK-5867] [flip-1] Add tests for pipelined failover region construction

commit 622f07e0efc82bf13f12ae1960a35ecc48c865c1
Author: Stephan Ewen 
Date:   2017-04-20T22:02:19Z

[FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region 
construction

This method exploits the fact that verties are already in topological order.

commit 39402583df8b4c51016c72f968772cbbdd6c92e3
Author: shuai.xus 
Date:   2017-04-25T07:42:48Z

[FLINK-5867] [flip-1] Correct some JavaDocs for RestartIndividualStrategy




> The implementation of RestartPipelinedRegionStrategy
> 
>
> Key: FLINK-5867
> URL: https://issues.apache.org/jira/browse/FLINK-5867
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> The RestartPipelinedRegionStrategy's responsibility is the following:
> 1. Calculate all FailoverRegions and their relations when initializing.
> 2. Listen for the failure of the job and executions, and find corresponding 
> FailoverRegions to do the failover.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3773: [FLINK-5867] [FLINK-5866] [flip-1] Implement Failo...

2017-04-25 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/3773

[FLINK-5867] [FLINK-5866] [flip-1] Implement FailoverStrategy for pipelined 
regions

This is based on #3772 , the relevant commits are the latter four.

The majority of the work has been done by @tiemsn , with some rebasing and 
additions from me.

# Pipelined Region Failover

As described in 
[FLIP-1](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures),
 this pull request implements the detection of pipelined regions in the 
`ExecutionGraph` and failover within these pipelined regions.


![st0-nzqia5abpwrgaogpllw](https://cloud.githubusercontent.com/assets/1727146/25399938/54fda5a4-29f1-11e7-9efe-5d845644089f.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
flip-1-pipelined-regions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3773.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3773


commit ef7fd9964c1c74feb4641e57a138c54558b2449c
Author: Stephan Ewen 
Date:   2017-03-21T18:13:34Z

[FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to 
ExecutionGraph

  - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to 
differentiate from fine grained failures/recovery
  - Add base class for FailoverStrategy
  - Add default implementation (restart all tasks)
  - Add logic to load the failover strategy from the configuration

commit c04a8a312098fddce14e392b8d9dbf396b1df3f3
Author: Stephan Ewen 
Date:   2017-03-29T20:49:54Z

[FLINK-6340] [flip-1] Add a termination future to the Execution

commit 92d3f7e1025dc3c3499730bda8e8a9acfd3b5c13
Author: shuai.xus 
Date:   2017-04-18T06:15:29Z

[FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the 
ExecutionGraph on task failure

commit 456600d5e37724bbcc7d570f6828e3fef6298483
Author: shuai.xus 
Date:   2017-04-20T21:56:53Z

[FLINK-5867] [flip-1] Add tests for pipelined failover region construction

commit 622f07e0efc82bf13f12ae1960a35ecc48c865c1
Author: Stephan Ewen 
Date:   2017-04-20T22:02:19Z

[FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region 
construction

This method exploits the fact that verties are already in topological order.

commit 39402583df8b4c51016c72f968772cbbdd6c92e3
Author: shuai.xus 
Date:   2017-04-25T07:42:48Z

[FLINK-5867] [flip-1] Correct some JavaDocs for RestartIndividualStrategy




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5869) ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983328#comment-15983328
 ] 

ASF GitHub Bot commented on FLINK-5869:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/3772

[FLINK-5869] [flip-1] Introduce abstraction for FailoverStrategy

This PR has two sets of changes that I could not pull apart into separate 
pull requests.

# (1) Termination Futures

Prior to this change, the `ExecutionGraph` decided when cancellation and 
finishing was complete by tracking how many `ExecutionJobVertex` were in a 
terminal state.

This abstraction is too inflexible to track when subregions of the graph 
are in a terminal state. To fix that, this change introduces a *termination 
future* on the `Execution`. Building conjunct futures of the termination 
futures, any observer can track when any number of vertices in a terminal state.

The `ExecutionGraph` now also uses that model to track when cancellation of 
all vertices during failover is complete.

# Local Failover and FailoverStrategy

The `ExecutionGraph` now supports *local failover* and *global failover*. 
Quoting from the JavaDocs:

  - **Global failover** aborts the task executions for all vertices and 
restarts whole data flow graph from the last completed checkpoint. Global 
failover is considered the *fallback strategy* that is used when a local 
failover is unsuccessful, or when a issue is found in the state of the 
ExecutionGraph that could mark it as inconsistent (caused by a bug).

  - **Local failover** is triggered when an individual vertex execution (a 
task) fails. The local failover is coordinated by the `FailoverStrategy`. A 
local failover typically attempts to restart as little as possible, but as much 
as necessary.

  - Between local- and global failover, the global failover always takes 
precedence, because it is the core mechanism that the `ExecutionGraph` relies 
on to bring back consistency. The guard that, the `ExecutionGraph` maintains a 
**global modification version**, which is incremented with every global 
failover (and other global actions, like job cancellation, or terminal 
failure). Local failover is always scoped by the modification version that the 
execution graph had when the failover was triggered. If a new global 
modification version is reached during local failover (meaning there is a 
concurrent global failover), the failover strategy has to yield before the 
global failover.  

### Failover Strategies

How exactly local failover happens is the concern of a pluggable 
`FailoverStrategy`.

  - The default failover strategy simply triggers a global failover
  - The pull request introduces a very simple *restart individual* failover 
strategy that restarts tasks without any connections to other tasks 
independently.

# Tests

This pull requests adds new tests for

  - The termination future abstraction
  - The global mod version handling
  - Proper handling of concurrent local- and global failover

The pull requests rewrites various original tests. This was necessary, 
because the tests were using Mockito very heavily and re-building or whitebox 
testing specific behavior that was affected by the changes.

The changes to the tests introduce simple ways to actually bring up a 
functional ExecutionGraph and walk it through its state transitions. That way 
the tests now rely minimally on mocking and actually test the proper 
ExecutionGraph, rather than a mock which is expected to behave similar to the 
proper class.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink flip-1-basics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3772.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3772


commit ef7fd9964c1c74feb4641e57a138c54558b2449c
Author: Stephan Ewen 
Date:   2017-03-21T18:13:34Z

[FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to 
ExecutionGraph

  - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to 
differentiate from fine grained failures/recovery
  - Add base class for FailoverStrategy
  - Add default implementation (restart all tasks)
  - Add logic to load the failover strategy from the configuration

commit c04a8a312098fddce14e392b8d9dbf396b1df3f3
Author: Stephan Ewen 
Date:   2017-03-29T20:49:54Z

[FLINK-6340] [flip-1] Add a termination future to the Execution




> ExecutionGraph use FailoverCoordinator to manage the failover of execution 
> vertexes
> 

[GitHub] flink pull request #3772: [FLINK-5869] [flip-1] Introduce abstraction for Fa...

2017-04-25 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/3772

[FLINK-5869] [flip-1] Introduce abstraction for FailoverStrategy

This PR has two sets of changes that I could not pull apart into separate 
pull requests.

# (1) Termination Futures

Prior to this change, the `ExecutionGraph` decided when cancellation and 
finishing was complete by tracking how many `ExecutionJobVertex` were in a 
terminal state.

This abstraction is too inflexible to track when subregions of the graph 
are in a terminal state. To fix that, this change introduces a *termination 
future* on the `Execution`. Building conjunct futures of the termination 
futures, any observer can track when any number of vertices in a terminal state.

The `ExecutionGraph` now also uses that model to track when cancellation of 
all vertices during failover is complete.

# Local Failover and FailoverStrategy

The `ExecutionGraph` now supports *local failover* and *global failover*. 
Quoting from the JavaDocs:

  - **Global failover** aborts the task executions for all vertices and 
restarts whole data flow graph from the last completed checkpoint. Global 
failover is considered the *fallback strategy* that is used when a local 
failover is unsuccessful, or when a issue is found in the state of the 
ExecutionGraph that could mark it as inconsistent (caused by a bug).

  - **Local failover** is triggered when an individual vertex execution (a 
task) fails. The local failover is coordinated by the `FailoverStrategy`. A 
local failover typically attempts to restart as little as possible, but as much 
as necessary.

  - Between local- and global failover, the global failover always takes 
precedence, because it is the core mechanism that the `ExecutionGraph` relies 
on to bring back consistency. The guard that, the `ExecutionGraph` maintains a 
**global modification version**, which is incremented with every global 
failover (and other global actions, like job cancellation, or terminal 
failure). Local failover is always scoped by the modification version that the 
execution graph had when the failover was triggered. If a new global 
modification version is reached during local failover (meaning there is a 
concurrent global failover), the failover strategy has to yield before the 
global failover.  

### Failover Strategies

How exactly local failover happens is the concern of a pluggable 
`FailoverStrategy`.

  - The default failover strategy simply triggers a global failover
  - The pull request introduces a very simple *restart individual* failover 
strategy that restarts tasks without any connections to other tasks 
independently.

# Tests

This pull requests adds new tests for

  - The termination future abstraction
  - The global mod version handling
  - Proper handling of concurrent local- and global failover

The pull requests rewrites various original tests. This was necessary, 
because the tests were using Mockito very heavily and re-building or whitebox 
testing specific behavior that was affected by the changes.

The changes to the tests introduce simple ways to actually bring up a 
functional ExecutionGraph and walk it through its state transitions. That way 
the tests now rely minimally on mocking and actually test the proper 
ExecutionGraph, rather than a mock which is expected to behave similar to the 
proper class.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink flip-1-basics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3772.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3772


commit ef7fd9964c1c74feb4641e57a138c54558b2449c
Author: Stephan Ewen 
Date:   2017-03-21T18:13:34Z

[FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to 
ExecutionGraph

  - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to 
differentiate from fine grained failures/recovery
  - Add base class for FailoverStrategy
  - Add default implementation (restart all tasks)
  - Add logic to load the failover strategy from the configuration

commit c04a8a312098fddce14e392b8d9dbf396b1df3f3
Author: Stephan Ewen 
Date:   2017-03-29T20:49:54Z

[FLINK-6340] [flip-1] Add a termination future to the Execution




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983316#comment-15983316
 ] 

ASF GitHub Bot commented on FLINK-6075:
---

Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3714
  
@fhueske 

I have implemented all the suggestions you made. The only thing remaining 
for this is:
- decide if keyBy is needed or not for the case when sorting happens only 
on proc time (i would say it is needed to make it fully equivalent with an 
actual sorting that would be implemented)
- refactor the datasetsort to use the common classes from sortutil (can be 
done in a follow up request after the merge)
- implement the other 2 cases from the jira issue based on event time to 
offer full sort support without retraction (for feature freeze)


> Support Limit/Top(Sort) for Stream SQL
> --
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>  Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP windows will trigger at every hour (fixed hour) and each event will 
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime 
> indicates the processing time when a new event arrives in the system. Events 
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) 
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime||   Proctime||  Stream1||   Limit 2||   Top 2|| Sort 
> [ASC]||
> | |10:00:00  |(aaa, 11)   |   | | 
>|
> | |10:05:00|(aab, 7)  |   | ||
> |10-11  |11:00:00  |  |   aab,aaa |aab,aaa  | aab,aaa 
>|
> | |11:03:00  |(aac,21)  |   | ||  
> 
> |11-12|12:00:00  |  | aab,aaa |aab,aaa  | aab,aaa,aac|
> | |12:10:00  |(abb,12)  |   | ||  
> 
> | |12:15:00  |(abb,12)  |   | ||  
> 
> |12-13  |13:00:00  |  |   abb,abb | abb,abb | 
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries, 
> the functionality will be implemented within the logic of the window as 
> follows.
> * Window assigner – selected based on the type of window used in SQL 
> 

[GitHub] flink issue #3714: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL

2017-04-25 Thread rtudoran
Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3714
  
@fhueske 

I have implemented all the suggestions you made. The only thing remaining 
for this is:
- decide if keyBy is needed or not for the case when sorting happens only 
on proc time (i would say it is needed to make it fully equivalent with an 
actual sorting that would be implemented)
- refactor the datasetsort to use the common classes from sortutil (can be 
done in a follow up request after the merge)
- implement the other 2 cases from the jira issue based on event time to 
offer full sort support without retraction (for feature freeze)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983292#comment-15983292
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113259225
  
--- Diff: flink-dist/src/main/assemblies/opt.xml ---
@@ -95,5 +95,12 @@

flink-metrics-statsd-${project.version}.jar
0644

+
+   
+   
../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}.jar
--- End diff --

Thank you for clarifying!


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-25 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113259225
  
--- Diff: flink-dist/src/main/assemblies/opt.xml ---
@@ -95,5 +95,12 @@

flink-metrics-statsd-${project.version}.jar
0644

+
+   
+   
../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}.jar
--- End diff --

Thank you for clarifying!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6385) HistoryServerTest.testFullArchiveLifecycle instable on Travis

2017-04-25 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-6385:


 Summary: HistoryServerTest.testFullArchiveLifecycle instable on 
Travis
 Key: FLINK-6385
 URL: https://issues.apache.org/jira/browse/FLINK-6385
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Reporter: Till Rohrmann
Priority: Critical


The {{HistoryServerTest.testFullArchiveLifecycle}} failed on Travis [1].

[1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/225620353/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-04-25 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983282#comment-15983282
 ] 

Ted Yu commented on FLINK-5855:
---

IMO, after discussion, if the new fix is in the same area of code, the 
description of the JIRA can be modified to reflect the outcome of discussion - 
instead of closing the first JIRA and opening a new one.


> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6384) PythonStreamer does not close python processes

2017-04-25 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-6384:


 Summary: PythonStreamer does not close python processes
 Key: FLINK-6384
 URL: https://issues.apache.org/jira/browse/FLINK-6384
 Project: Flink
  Issue Type: Bug
  Components: Python API
Affects Versions: 1.3.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.3.0


The {{PythonStreamer}} opens a new process calling the python binary to check 
whether the binary is available. This process won't get closed leading to an 
excessive number of open python processes when running the 
{{PythonPlanBinderTest}}. I'm not entirely sure whether we need this extra 
process, because the actual python call with the python code would also fail if 
there is no python binary available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983254#comment-15983254
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske @rtudoran @shijinkui @sunjincheng121 I have create a new PR for 
distinct in the code generator. Please have a look and let me know. I have 
implemented and tested only for OverProcTimeRowBounded window, but if you like 
it I can quickly implement and test also the others. 


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-25 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske @rtudoran @shijinkui @sunjincheng121 I have create a new PR for 
distinct in the code generator. Please have a look and let me know. I have 
implemented and tested only for OverProcTimeRowBounded window, but if you like 
it I can quickly implement and test also the others. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983252#comment-15983252
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

GitHub user stefanobortoli opened a pull request:

https://github.com/apache/flink/pull/3771

[FLINK-6250] Distinct procTime with Rows boundaries

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stefanobortoli/flink FLINK-6250b

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3771.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3771






> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-25 Thread stefanobortoli
GitHub user stefanobortoli opened a pull request:

https://github.com/apache/flink/pull/3771

[FLINK-6250] Distinct procTime with Rows boundaries

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stefanobortoli/flink FLINK-6250b

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3771.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3771






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6368) Grouping keys in stream aggregations have wrong order

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983234#comment-15983234
 ] 

ASF GitHub Bot commented on FLINK-6368:
---

Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3768
  
I ran into the same problem today when adding the new test cases for UDAGG. 
Thanks for the fix, @xccui 


> Grouping keys in stream aggregations have wrong order
> -
>
> Key: FLINK-6368
> URL: https://issues.apache.org/jira/browse/FLINK-6368
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
> Fix For: 1.3.0
>
>
> FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It 
> seems that the order of grouping keys is sometimes messed up. The following 
> tests fails:
> {code}
>   @Test
>   def testEventTimeSlidingGroupWindow(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.testResults = mutable.MutableList()
> val stream = env
>   .fromCollection(data)
>   .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>   .map(t => (t._2, t._6))
> val table = stream.toTable(tEnv, 'int, 'string)
> val windowedTable = table
>   .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
>   .groupBy('w, 'string)
>   .select('string, 'int.count, 'w.start, 'w.end)
> val results = windowedTable.toDataStream[Row]
> results.addSink(new StreamITCase.StringSink)
> env.execute()
>   }
> {code}
> Exception:
> {code}
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   ... 7 more
> Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to 
> java.lang.String
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3768: [FLINK-6368][table] Grouping keys in stream aggregations ...

2017-04-25 Thread shaoxuan-wang
Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3768
  
I ran into the same problem today when adding the new test cases for UDAGG. 
Thanks for the fix, @xccui 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983211#comment-15983211
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241221
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
--- End diff --

`GenericRecord` -> `SpecificRecord`


> Add Kafka TableSource with Avro serialization
> -
>
> Key: FLINK-3871
> URL: https://issues.apache.org/jira/browse/FLINK-3871
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> Add a Kafka TableSource which supports Avro serialized data.
> The KafkaAvroTableSource should support two modes:
> # SpecificRecord Mode: In this case the user specifies a class which was 
> code-generated by Avro depending on a schema. Flink treats these classes as 
> regular POJOs. Hence, they are also natively supported by the Table API and 
> SQL. Classes generated by Avro contain their Schema in a static field. The 
> schema should be used to automatically derive field names and types. Hence, 
> there is no additional information required than the name of the class.
> # GenericRecord Mode: In this case the user specifies an Avro Schema. The 
> schema is used to deserialize the data into a GenericRecord which must be 
> translated into possibly nested {{Row}} based on the schema information. 
> Again, the Avro Schema is used to automatically derive the field names and 
> types. This mode is less efficient than the SpecificRecord mode because the 
> {{GenericRecord}} needs to be converted into {{Row}}.
> This feature depends on FLINK-5280, i.e., support for nested data in 
> {{TableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983217#comment-15983217
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113243664
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983209#comment-15983209
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113179453
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983216#comment-15983216
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241700
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
--- End diff --

Change all `GenericRecord` to `SpecificRecord`


> Add Kafka TableSource with Avro serialization
> -
>
> Key: FLINK-3871
> URL: https://issues.apache.org/jira/browse/FLINK-3871
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> Add a Kafka TableSource which supports Avro serialized data.
> The KafkaAvroTableSource should support two modes:
> # SpecificRecord Mode: In this case the user specifies a class which was 
> code-generated by Avro depending on a schema. Flink treats these classes as 
> regular POJOs. Hence, they are also natively supported by the Table API and 
> SQL. Classes generated by Avro contain their Schema in a static field. The 
> schema should be used to automatically derive field names and types. Hence, 
> there is no additional information required than the name of the class.
> # GenericRecord Mode: In this case the user specifies an Avro Schema. The 
> schema is used to deserialize the data into a GenericRecord which must be 
> translated into possibly nested {{Row}} based on the schema information. 
> Again, the Avro Schema is used to automatically derive the field names and 
> types. This mode is less efficient than the SpecificRecord mode because the 
> {{GenericRecord}} needs to be converted into {{Row}}.
> This feature depends on FLINK-5280, i.e., support for nested data in 
> {{TableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983206#comment-15983206
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113161806
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
--- End diff --

Change this to 
```
private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema)
```

and factor out the recursive logic to a method 
```
convertToTypeInfomation(TypeInformation extracted, Schema schema)
``` 
?



> Add Kafka TableSource with Avro serialization
> -
>
> Key: FLINK-3871
> URL: https://issues.apache.org/jira/browse/FLINK-3871
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> Add a Kafka TableSource which supports Avro serialized data.
> The KafkaAvroTableSource should support two modes:
> # SpecificRecord Mode: In this case the user specifies a class which was 
> code-generated by Avro depending on a schema. Flink treats these classes as 
> regular POJOs. Hence, they are also natively supported by the Table API and 
> SQL. Classes generated by Avro 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983207#comment-15983207
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113207820
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983205#comment-15983205
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113179472
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983215#comment-15983215
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113207375
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983208#comment-15983208
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241294
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
--- End diff --

`GenericRecord` -> `SpecificRecord`


> Add Kafka TableSource with Avro serialization
> -
>
> Key: FLINK-3871
> URL: https://issues.apache.org/jira/browse/FLINK-3871
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> Add a Kafka TableSource which supports Avro serialized data.
> The KafkaAvroTableSource should support two modes:
> # SpecificRecord Mode: In this case the user specifies a class which was 
> code-generated by Avro depending on a schema. Flink treats these classes as 
> regular POJOs. Hence, they are also natively supported by the Table API and 
> SQL. Classes generated by Avro contain their Schema in a static field. The 
> schema should be used to automatically derive field names and types. Hence, 
> there is no additional information required than the name of the class.
> # GenericRecord Mode: In this case the user specifies an Avro Schema. The 
> schema is used to deserialize the data into a GenericRecord which must be 
> translated into possibly nested {{Row}} based on the schema information. 
> Again, the Avro Schema is used to automatically derive the field names and 
> types. This mode is less efficient than the SpecificRecord mode because the 
> {{GenericRecord}} needs to be converted into {{Row}}.
> This feature depends on FLINK-5280, i.e., support for nested data in 
> {{TableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983218#comment-15983218
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113244628
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

See comment on UNION in deserializer


> Add Kafka TableSource with Avro serialization
> 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983214#comment-15983214
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113243381
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983210#comment-15983210
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113237296
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

This limitation exists because the Table API cannot handle UNION types 
either, right?
Isn't this the same as 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983213#comment-15983213
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113236676
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

are union types always ordered? Could it happen that type `0` is `RECORD` 
and `1` is `NULL`?


> Add Kafka 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983219#comment-15983219
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113244339
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
--- End diff --

Not sure if we should 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983212#comment-15983212
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241182
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
--- End diff --

We can use a specific record here. We have the class for it.
```
this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, 
schema);
```


> Add Kafka TableSource with Avro serialization
> -
>
> Key: FLINK-3871
> URL: https://issues.apache.org/jira/browse/FLINK-3871
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> Add a Kafka TableSource which supports Avro serialized data.
> The KafkaAvroTableSource should support two modes:
> # SpecificRecord Mode: In this case the user specifies a class which was 
> code-generated by Avro depending on a schema. Flink treats these classes as 
> regular POJOs. Hence, they are also natively supported by the Table API and 
> SQL. Classes generated by Avro contain their Schema in a static field. The 
> schema should be used to automatically derive field names and types. Hence, 
> there is no additional information required than the name of the class.
> # GenericRecord Mode: In this case the user specifies an Avro Schema. The 
> schema is used 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113207820
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   }
+   return extracted;
+   }
+
+   private static  TypeInformation[] 
createFieldTypes(Class record) {
--- End diff --

This method can be removed 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983204#comment-15983204
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113236267
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   }
+   else {
+  

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113236267
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   }
+   else {
+   throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD]. Given: " + 
schema);
+   }
+   } else if 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113161806
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
--- End diff --

Change this to 
```
private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema)
```

and factor out the recursive logic to a method 
```
convertToTypeInfomation(TypeInformation extracted, Schema schema)
``` 
?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241182
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
--- End diff --

We can use a specific record here. We have the class for it.
```
this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, 
schema);
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241221
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
--- End diff --

`GenericRecord` -> `SpecificRecord`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113243664
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241700
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
--- End diff --

Change all `GenericRecord` to `SpecificRecord`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113244628
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

See comment on UNION in deserializer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113237296
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

This limitation exists because the Table API cannot handle UNION types 
either, right?
Isn't this the same as having a nullable record field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113179472
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   }
+   return extracted;
+   }
+
+   private static  TypeInformation[] 
createFieldTypes(Class record) {
+   final AvroTypeInfo avroTypeInfo = new 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113207375
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   }
+   return extracted;
+   }
+
+   private static  TypeInformation[] 
createFieldTypes(Class record) {
+   final AvroTypeInfo avroTypeInfo = new 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113179453
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+   /**
+* Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+*
+* @param topic  Kafka topic to consume.
+* @param properties Properties for the Kafka consumer.
+* @param record Avro specific record.
+*/
+   KafkaAvroTableSource(
+   String topic,
+   Properties properties,
+   Class record) {
+
+   super(
+   topic,
+   properties,
+   createDeserializationSchema(record),
+   createFieldNames(record),
+   createFieldTypes(record));
+   }
+
+   private static AvroRowDeserializationSchema 
createDeserializationSchema(Class record) {
+   return new AvroRowDeserializationSchema(record);
+   }
+
+   /**
+* Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+* Replaces generic Utf8 with basic String type information.
+*/
+   private static TypeInformation 
convertToRowTypeInformation(TypeInformation extracted, Schema schema) {
+   if (schema.getType() == Schema.Type.RECORD) {
+   final List fields = schema.getFields();
+   final AvroTypeInfo avroTypeInfo = (AvroTypeInfo) 
extracted;
+
+   final TypeInformation[] types = new 
TypeInformation[fields.size()];
+   final String[] names = new String[fields.size()];
+   for (int i = 0; i < fields.size(); i++) {
+   final Schema.Field field = fields.get(i);
+   types[i] = 
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()), 
field.schema());
+   names[i] = field.name();
+   }
+   return new RowTypeInfo(types, names);
+   } else if (extracted instanceof GenericTypeInfo) {
+   final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
+   if (genericTypeInfo.getTypeClass() == Utf8.class) {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   }
+   return extracted;
+   }
+
+   private static  TypeInformation[] 
createFieldTypes(Class record) {
--- End diff --

`record` -> `avroClass`?



[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113244339
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
--- End diff --

Not sure if we should support `UNION` at all. 
If the you have a UNION[NULL, RECORD] field in Avro, you'd expect it to be 
represented also as UNION field in a Table. 
We change it here to a nullable Record field. Not sure if that's expected.

Should 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113241294
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
--- End diff --

`GenericRecord` -> `SpecificRecord`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113243381
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113236676
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

are union types always ordered? Could it happen that type `0` is `RECORD` 
and `1` is `NULL`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983195#comment-15983195
 ] 

ASF GitHub Bot commented on FLINK-5892:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3770

[FLINK-5892] Restore state on the operator level

## General
This PR is a collaboration between @guoweiM and myself, enabling Flink to 
restore state on the operator level. This means that the topology of a job may 
change in regards to chains when restoring from a 1.3 savepoint, allowing the 
arbitrary addition, removal or modification of chains.

The cornerstone for this is a semantic change for savepoints, no structural 
changes have been made to the `SavepointV0/1/2` classes or their serialized 
format:

In 1.2 a savepoint contains the states of tasks. If a task consists of 
multiple operators then the stored TaskState internally contains a list of 
states, one entry for each operator.

In 1.3 a savepoint contains the states of operators only; the notion of 
tasks is eliminated. If a task consists of multiple operators we store one 
TaskState for each operator instead. Internally they each contain a list of 
states with a length of 1.

## Implementation

In order for this to work a number of changes had to be made.

First and foremost we required a new `StateAssignmentOperation` that was 
aware of operators.
(74881a2, 8be9c58, 4fa8bbd)

Since the SAO uses the `ExecutionGraph` classes to map the restored state 
it was necessary to forward the IDs of all contained operators from the 
`StreamingJobGraphGenerator` to the `ExecutionJobVertex`.
(73427c3)

The `PendingCheckpoint` class had to be adjusted to conform to the new 
semantics; received `SubtaskStates`, containing the state of a task, are broken 
down into SubtaskStates for the individual operators.
(f7b8ef9)

## Tests

The majority of this PR are new tests (60% or so).

A number of tests were added under flink-tests that test the migration path 
from 1.2 to 1.3.
(d1efdb1)

These tests first restore a job from a 1.2 savepoint, without changes to 
the topology, verify that the state was restored correctly and finally create a 
new savepoint. They then restore from this migrated 1.3 savepoint, with changes 
to the topology for varying scenarios, and verify the correct restoration of 
state again.

A new test was also added to the `CheckpointCoordinatorTest` which tests 
the support for topology changes without executing a job.
(8b5430f9)

A number of existing tests had to be tweaked to run with the new changes, 
but these changes all boil down to extending existing mocks by a method or two.
(b5430f9)

## Other changes

To make it more obvious that we deal with operators and not tasks a new 
`OperatorID` class was introduced, and usages of `JobVertexID` in 
savepoint-related parts were replaced when appropriate.
(fe74023)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5982_operator_state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3770


commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
Author: zentol 
Date:   2017-04-03T15:39:50Z

[prerequisite] Disable exception when assigning uid on chained operator

commit 74881a2788d034db67d99d6d32dbb2cf923aed53
Author: zentol 
Date:   2017-04-04T10:53:56Z

[internal] Adjust SavepointLoader to new Savepoint semantics

commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
Author: zentol 
Date:   2017-04-04T13:02:55Z

[internal] adjust PendingCheckpoint to be in line with new semantics

commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
Author: zentol 
Date:   2017-04-04T11:33:54Z

[internal] Get operator ID's into ExecutionGraph

commit 465805792932cb888393d9257fdefd828fa59343
Author: zentol 
Date:   2017-04-25T16:07:16Z

[internals] Extract several utility methods from StateAssignmentOperation

commit 008e848715b7091c3deabc9251d9d673f5506e64
Author: guowei.mgw 
Date:   2017-04-24T09:47:47Z

[internal] Add new StateAssignmentOperation

commit ffb93298ce90956b9886b3526258f6a814b7e0af
Author: zentol 
Date:   2017-04-04T13:01:07Z

[internal] Integrate new StateAssignmentOperation version

commit d1efdb1c34d59f04147292b320528cd2bc838244
Author: zentol 
Date:   2017-04-03T15:40:21Z

[tests] Add tests for chain modifications

commit 

[GitHub] flink pull request #3770: [FLINK-5892] Restore state on the operator level

2017-04-25 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3770

[FLINK-5892] Restore state on the operator level

## General
This PR is a collaboration between @guoweiM and myself, enabling Flink to 
restore state on the operator level. This means that the topology of a job may 
change in regards to chains when restoring from a 1.3 savepoint, allowing the 
arbitrary addition, removal or modification of chains.

The cornerstone for this is a semantic change for savepoints, no structural 
changes have been made to the `SavepointV0/1/2` classes or their serialized 
format:

In 1.2 a savepoint contains the states of tasks. If a task consists of 
multiple operators then the stored TaskState internally contains a list of 
states, one entry for each operator.

In 1.3 a savepoint contains the states of operators only; the notion of 
tasks is eliminated. If a task consists of multiple operators we store one 
TaskState for each operator instead. Internally they each contain a list of 
states with a length of 1.

## Implementation

In order for this to work a number of changes had to be made.

First and foremost we required a new `StateAssignmentOperation` that was 
aware of operators.
(74881a2, 8be9c58, 4fa8bbd)

Since the SAO uses the `ExecutionGraph` classes to map the restored state 
it was necessary to forward the IDs of all contained operators from the 
`StreamingJobGraphGenerator` to the `ExecutionJobVertex`.
(73427c3)

The `PendingCheckpoint` class had to be adjusted to conform to the new 
semantics; received `SubtaskStates`, containing the state of a task, are broken 
down into SubtaskStates for the individual operators.
(f7b8ef9)

## Tests

The majority of this PR are new tests (60% or so).

A number of tests were added under flink-tests that test the migration path 
from 1.2 to 1.3.
(d1efdb1)

These tests first restore a job from a 1.2 savepoint, without changes to 
the topology, verify that the state was restored correctly and finally create a 
new savepoint. They then restore from this migrated 1.3 savepoint, with changes 
to the topology for varying scenarios, and verify the correct restoration of 
state again.

A new test was also added to the `CheckpointCoordinatorTest` which tests 
the support for topology changes without executing a job.
(8b5430f9)

A number of existing tests had to be tweaked to run with the new changes, 
but these changes all boil down to extending existing mocks by a method or two.
(b5430f9)

## Other changes

To make it more obvious that we deal with operators and not tasks a new 
`OperatorID` class was introduced, and usages of `JobVertexID` in 
savepoint-related parts were replaced when appropriate.
(fe74023)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5982_operator_state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3770


commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
Author: zentol 
Date:   2017-04-03T15:39:50Z

[prerequisite] Disable exception when assigning uid on chained operator

commit 74881a2788d034db67d99d6d32dbb2cf923aed53
Author: zentol 
Date:   2017-04-04T10:53:56Z

[internal] Adjust SavepointLoader to new Savepoint semantics

commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
Author: zentol 
Date:   2017-04-04T13:02:55Z

[internal] adjust PendingCheckpoint to be in line with new semantics

commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
Author: zentol 
Date:   2017-04-04T11:33:54Z

[internal] Get operator ID's into ExecutionGraph

commit 465805792932cb888393d9257fdefd828fa59343
Author: zentol 
Date:   2017-04-25T16:07:16Z

[internals] Extract several utility methods from StateAssignmentOperation

commit 008e848715b7091c3deabc9251d9d673f5506e64
Author: guowei.mgw 
Date:   2017-04-24T09:47:47Z

[internal] Add new StateAssignmentOperation

commit ffb93298ce90956b9886b3526258f6a814b7e0af
Author: zentol 
Date:   2017-04-04T13:01:07Z

[internal] Integrate new StateAssignmentOperation version

commit d1efdb1c34d59f04147292b320528cd2bc838244
Author: zentol 
Date:   2017-04-03T15:40:21Z

[tests] Add tests for chain modifications

commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7
Author: zentol 
Date:   2017-04-24T11:58:07Z

[tests] Adjust existing tests

commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714
Author: guowei.mgw 
Date:   2017-04-24T10:13:44Z


[jira] [Commented] (FLINK-6337) Remove the buffer provider from PartitionRequestServerHandler

2017-04-25 Thread zhijiang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983189#comment-15983189
 ] 

zhijiang commented on FLINK-6337:
-

Yeah, I got your point.  Thank you for check and advice, my friend!

> Remove the buffer provider from PartitionRequestServerHandler
> -
>
> Key: FLINK-6337
> URL: https://issues.apache.org/jira/browse/FLINK-6337
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently, {{PartitionRequestServerHandler}} will create a 
> {{LocalBufferPool}} when the channel is registered. The {{LocalBufferPool}} 
> is only used to get segment size for creating read view in 
> {{SpillableSubpartition}}, and the buffers in the pool will not be used all 
> the time, so it will waste the buffer resource of global pool.
> We would like to remove the {{LocalBufferPool}} from the 
> {{PartitionRequestServerHandler}}, and the {{LocalBufferPool}} in 
> {{ResultPartition}} can also provide the segment size for creating sub 
> partition view.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   3   >