[jira] [Commented] (FLINK-7511) Remove dead code after dropping backward compatibility with <=1.2

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335461#comment-16335461
 ] 

ASF GitHub Bot commented on FLINK-7511:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4587


> Remove dead code after dropping backward compatibility with <=1.2
> -
>
> Key: FLINK-7511
> URL: https://issues.apache.org/jira/browse/FLINK-7511
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4587: [FLINK-7511] [cep] Remove dead code after dropping...

2018-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4587


---


[GitHub] flink issue #4587: [FLINK-7511] [cep] Remove dead code after dropping backwa...

2018-01-22 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4587
  
Thanks for review @pnowojski. Merging.


---


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335449#comment-16335449
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163161066
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
 ---
@@ -132,4 +137,23 @@ public LeaderElectionService 
getJobManagerLeaderElectionService(JobID jobID) {
return new StandaloneLeaderElectionService();
}
}
+
+   @Override
+   public LeaderRetrievalService getRestServerLeaderRetriever() {
--- End diff --

renamed


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7511) Remove dead code after dropping backward compatibility with <=1.2

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335450#comment-16335450
 ] 

ASF GitHub Bot commented on FLINK-7511:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4587
  
Thanks for review @pnowojski. Merging.


> Remove dead code after dropping backward compatibility with <=1.2
> -
>
> Key: FLINK-7511
> URL: https://issues.apache.org/jira/browse/FLINK-7511
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...

2018-01-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163161066
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
 ---
@@ -132,4 +137,23 @@ public LeaderElectionService 
getJobManagerLeaderElectionService(JobID jobID) {
return new StandaloneLeaderElectionService();
}
}
+
+   @Override
+   public LeaderRetrievalService getRestServerLeaderRetriever() {
--- End diff --

renamed


---


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335445#comment-16335445
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160900
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -61,46 +69,77 @@
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import 
org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
-import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
+
+import akka.actor.AddressFromURIString;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
-import static java.util.Objects.requireNonNull;
+import scala.Option;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
  */
 public class RestClusterClient extends ClusterClient {
 
+   private static final long AWAIT_LEADER_TIMEOUT = 10_000;
+
+   private static final int MAX_RETRIES = 20;
+
+   private static final Time RETRY_DELAY = Time.seconds(3);
--- End diff --

Moved to `RestOptions`


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335447#comment-16335447
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163161003
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -327,13 +376,14 @@ public T getClusterId() {
}
 
private  R 
waitForResource(
-   final SupplierWithException resourceFutureSupplier)
+   final Supplier 
resourceFutureSupplier)
throws IOException, InterruptedException, 
ExecutionException, TimeoutException {
A asynchronouslyCreatedResource;
long attempt = 0;
while (true) {
final CompletableFuture responseFuture = 
resourceFutureSupplier.get();
--- End diff --

Made the method `waitForResource ` non-blocking.


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335446#comment-16335446
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160938
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() {
public int getMaxSlots() {
return 0;
}
+
+   
//-
+   // RestClient Helper
+   
//-
+
+   private , U extends 
MessageParameters, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters) 
throws IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, messageParameters, 
EmptyRequestBody.getInstance());
+   }
+
+   private , R 
extends RequestBody, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, R request) throws 
IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), request);
+   }
+
+   private , P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders) throws IOException, 
LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters, R 
request) throws IOException, LeaderNotAvailableException {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRetryableRequest(M messageHeaders, U 
messageParameters, R request, Predicate retryPredicate) {
+   return retry(() -> {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return 
restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }, retryPredicate);
+   }
+
+   private  CompletableFuture retry(
+   CheckedSupplier operation,
+   Predicate retryPredicate) {
+   return FutureUtils.retryWithDelay(
+   CheckedSupplier.unchecked(operation),
+   MAX_RETRIES,
+   RETRY_DELAY,
+   retryPredicate,
+   new 
ScheduledExecutorServiceAdapter(retryExecutorService));
+   }
+
+   private static Predicate isTimeoutException() {
+   return (throwable) ->
+   ExceptionUtils.findThrowable(throwable, 
java.net.ConnectException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
java.net.SocketTimeoutException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
ConnectTimeoutException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
IOException.class).isPresent();
+   }
+
+   private static Predicate isHttpStatusUnsuccessfulException() 
{
+   return (throwable) -> ExceptionUtils.findThrowable(throwable, 
RestClientException.class)
+   .map(restClientException -> {
+   final int code = 
restClientException.getHttpResponseStatus().code();
+   return code < 200 || code > 299;
+   })
+   .orElse(false);
+   }
+
+   private abstract class RestClusterClientLeaderRetrievalListener 
implements LeaderRetrievalListener {
+   @Override
+   public final void handleError(final Exception exception) {
+   log.error("Exception in LeaderRetrievalListener", 
exception);
+   shutdown();
--- End diff --

Using `LeaderRetriever` now


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
>  

[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335444#comment-16335444
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160849
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -327,13 +376,14 @@ public T getClusterId() {
}
 
private  R 
waitForResource(
-   final SupplierWithException resourceFutureSupplier)
+   final Supplier 
resourceFutureSupplier)
throws IOException, InterruptedException, 
ExecutionException, TimeoutException {
--- End diff --

removed.. also made the method non-blocking


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...

2018-01-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163161003
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -327,13 +376,14 @@ public T getClusterId() {
}
 
private  R 
waitForResource(
-   final SupplierWithException resourceFutureSupplier)
+   final Supplier 
resourceFutureSupplier)
throws IOException, InterruptedException, 
ExecutionException, TimeoutException {
A asynchronouslyCreatedResource;
long attempt = 0;
while (true) {
final CompletableFuture responseFuture = 
resourceFutureSupplier.get();
--- End diff --

Made the method `waitForResource ` non-blocking.


---


[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...

2018-01-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160938
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() {
public int getMaxSlots() {
return 0;
}
+
+   
//-
+   // RestClient Helper
+   
//-
+
+   private , U extends 
MessageParameters, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters) 
throws IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, messageParameters, 
EmptyRequestBody.getInstance());
+   }
+
+   private , R 
extends RequestBody, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, R request) throws 
IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), request);
+   }
+
+   private , P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders) throws IOException, 
LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters, R 
request) throws IOException, LeaderNotAvailableException {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRetryableRequest(M messageHeaders, U 
messageParameters, R request, Predicate retryPredicate) {
+   return retry(() -> {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return 
restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }, retryPredicate);
+   }
+
+   private  CompletableFuture retry(
+   CheckedSupplier operation,
+   Predicate retryPredicate) {
+   return FutureUtils.retryWithDelay(
+   CheckedSupplier.unchecked(operation),
+   MAX_RETRIES,
+   RETRY_DELAY,
+   retryPredicate,
+   new 
ScheduledExecutorServiceAdapter(retryExecutorService));
+   }
+
+   private static Predicate isTimeoutException() {
+   return (throwable) ->
+   ExceptionUtils.findThrowable(throwable, 
java.net.ConnectException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
java.net.SocketTimeoutException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
ConnectTimeoutException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
IOException.class).isPresent();
+   }
+
+   private static Predicate isHttpStatusUnsuccessfulException() 
{
+   return (throwable) -> ExceptionUtils.findThrowable(throwable, 
RestClientException.class)
+   .map(restClientException -> {
+   final int code = 
restClientException.getHttpResponseStatus().code();
+   return code < 200 || code > 299;
+   })
+   .orElse(false);
+   }
+
+   private abstract class RestClusterClientLeaderRetrievalListener 
implements LeaderRetrievalListener {
+   @Override
+   public final void handleError(final Exception exception) {
+   log.error("Exception in LeaderRetrievalListener", 
exception);
+   shutdown();
--- End diff --

Using `LeaderRetriever` now


---


[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...

2018-01-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160802
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() {
public int getMaxSlots() {
return 0;
}
+
+   
//-
+   // RestClient Helper
+   
//-
+
+   private , U extends 
MessageParameters, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters) 
throws IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, messageParameters, 
EmptyRequestBody.getInstance());
+   }
+
+   private , R 
extends RequestBody, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, R request) throws 
IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), request);
+   }
+
+   private , P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders) throws IOException, 
LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters, R 
request) throws IOException, LeaderNotAvailableException {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRetryableRequest(M messageHeaders, U 
messageParameters, R request, Predicate retryPredicate) {
+   return retry(() -> {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
--- End diff --

done


---


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335443#comment-16335443
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160802
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() {
public int getMaxSlots() {
return 0;
}
+
+   
//-
+   // RestClient Helper
+   
//-
+
+   private , U extends 
MessageParameters, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters) 
throws IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, messageParameters, 
EmptyRequestBody.getInstance());
+   }
+
+   private , R 
extends RequestBody, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, R request) throws 
IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), request);
+   }
+
+   private , P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders) throws IOException, 
LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters, R 
request) throws IOException, LeaderNotAvailableException {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRetryableRequest(M messageHeaders, U 
messageParameters, R request, Predicate retryPredicate) {
+   return retry(() -> {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
--- End diff --

done


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...

2018-01-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160900
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -61,46 +69,77 @@
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import 
org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
-import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
+
+import akka.actor.AddressFromURIString;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
-import static java.util.Objects.requireNonNull;
+import scala.Option;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
  */
 public class RestClusterClient extends ClusterClient {
 
+   private static final long AWAIT_LEADER_TIMEOUT = 10_000;
+
+   private static final int MAX_RETRIES = 20;
+
+   private static final Time RETRY_DELAY = Time.seconds(3);
--- End diff --

Moved to `RestOptions`


---


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335441#comment-16335441
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160791
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() {
public int getMaxSlots() {
return 0;
}
+
+   
//-
+   // RestClient Helper
+   
//-
+
+   private , U extends 
MessageParameters, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters) 
throws IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, messageParameters, 
EmptyRequestBody.getInstance());
+   }
+
+   private , R 
extends RequestBody, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, R request) throws 
IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), request);
+   }
+
+   private , P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders) throws IOException, 
LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters, R 
request) throws IOException, LeaderNotAvailableException {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRetryableRequest(M messageHeaders, U 
messageParameters, R request, Predicate retryPredicate) {
+   return retry(() -> {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return 
restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }, retryPredicate);
+   }
+
+   private  CompletableFuture retry(
+   CheckedSupplier operation,
+   Predicate retryPredicate) {
+   return FutureUtils.retryWithDelay(
+   CheckedSupplier.unchecked(operation),
+   MAX_RETRIES,
+   RETRY_DELAY,
+   retryPredicate,
+   new 
ScheduledExecutorServiceAdapter(retryExecutorService));
+   }
+
+   private static Predicate isTimeoutException() {
+   return (throwable) ->
+   ExceptionUtils.findThrowable(throwable, 
java.net.ConnectException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
java.net.SocketTimeoutException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
ConnectTimeoutException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
IOException.class).isPresent();
--- End diff --

renamed to `isConnectionProblemException`


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...

2018-01-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160849
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -327,13 +376,14 @@ public T getClusterId() {
}
 
private  R 
waitForResource(
-   final SupplierWithException resourceFutureSupplier)
+   final Supplier 
resourceFutureSupplier)
throws IOException, InterruptedException, 
ExecutionException, TimeoutException {
--- End diff --

removed.. also made the method non-blocking


---


[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...

2018-01-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160791
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -376,4 +430,99 @@ public GetClusterStatusResponse getClusterStatus() {
public int getMaxSlots() {
return 0;
}
+
+   
//-
+   // RestClient Helper
+   
//-
+
+   private , U extends 
MessageParameters, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters) 
throws IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, messageParameters, 
EmptyRequestBody.getInstance());
+   }
+
+   private , R 
extends RequestBody, P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders, R request) throws 
IOException, LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), request);
+   }
+
+   private , P extends ResponseBody> CompletableFuture
+   sendRequest(M messageHeaders) throws IOException, 
LeaderNotAvailableException {
+   return sendRequest(messageHeaders, 
EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRequest(M messageHeaders, U messageParameters, R 
request) throws IOException, LeaderNotAvailableException {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture
+   sendRetryableRequest(M messageHeaders, U 
messageParameters, R request, Predicate retryPredicate) {
+   return retry(() -> {
+   final URL restServerBaseUrl = 
restServerLeaderHolder.getLeaderAddress();
+   return 
restClient.sendRequest(restServerBaseUrl.getHost(), 
restServerBaseUrl.getPort(), messageHeaders, messageParameters, request);
+   }, retryPredicate);
+   }
+
+   private  CompletableFuture retry(
+   CheckedSupplier operation,
+   Predicate retryPredicate) {
+   return FutureUtils.retryWithDelay(
+   CheckedSupplier.unchecked(operation),
+   MAX_RETRIES,
+   RETRY_DELAY,
+   retryPredicate,
+   new 
ScheduledExecutorServiceAdapter(retryExecutorService));
+   }
+
+   private static Predicate isTimeoutException() {
+   return (throwable) ->
+   ExceptionUtils.findThrowable(throwable, 
java.net.ConnectException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
java.net.SocketTimeoutException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
ConnectTimeoutException.class).isPresent() ||
+   ExceptionUtils.findThrowable(throwable, 
IOException.class).isPresent();
--- End diff --

renamed to `isConnectionProblemException`


---


[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...

2018-01-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160703
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
 ---
@@ -30,49 +28,20 @@
  */
 public final class RestClusterClientConfiguration {
--- End diff --

Now it's needed again.


---


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335439#comment-16335439
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160703
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
 ---
@@ -30,49 +28,20 @@
  */
 public final class RestClusterClientConfiguration {
--- End diff --

Now it's needed again.


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5312: [FLINK-8344][WIP] Add support for HA to RestCluste...

2018-01-22 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160672
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 ---
@@ -144,6 +154,15 @@ public WebMonitorEndpoint(
metricQueryServiceRetriever,
executor,
restConfiguration.getTimeout());
+
+   this.leaderElectionService = 
Preconditions.checkNotNull(leaderElectionService);
+   this.fatalErrorHandler = 
Preconditions.checkNotNull(fatalErrorHandler);
+   }
+
+   @Override
+   public void start() throws Exception {
+   super.start();
+   leaderElectionService.start(this);
--- End diff --

Added 
```
try {
leaderElectionService.stop();
} catch (Exception e) {
log.warn("Error while stopping leaderElectionService", 
e);
}
```
to `shutdown` hook


---


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335437#comment-16335437
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5312#discussion_r163160672
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 ---
@@ -144,6 +154,15 @@ public WebMonitorEndpoint(
metricQueryServiceRetriever,
executor,
restConfiguration.getTimeout());
+
+   this.leaderElectionService = 
Preconditions.checkNotNull(leaderElectionService);
+   this.fatalErrorHandler = 
Preconditions.checkNotNull(fatalErrorHandler);
+   }
+
+   @Override
+   public void start() throws Exception {
+   super.start();
+   leaderElectionService.start(this);
--- End diff --

Added 
```
try {
leaderElectionService.stop();
} catch (Exception e) {
log.warn("Error while stopping leaderElectionService", 
e);
}
```
to `shutdown` hook


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8458) Add the switch for keeping both the old mode and the new credit-based mode

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335419#comment-16335419
 ] 

ASF GitHub Bot commented on FLINK-8458:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5317#discussion_r163155607
  
--- Diff: docs/ops/config.md ---
@@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager 
and TaskManagers.
 
 - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three 
parameters above): The number of buffers available to the network stack. This 
number determines how many streaming data exchange channels a TaskManager can 
have at the same time and how well buffered the channels are. If a job is 
rejected or you get a warning that the system has not enough buffers available, 
increase this value (DEFAULT: **2048**). If set, it will be mapped to 
`taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on 
`taskmanager.memory.segment-size`.
 
+- `taskmanager.network.memory.buffers-per-channel`: Number of network 
buffers to use for each outgoing/incoming channel (subpartition/input channel). 
Especially in credit-based flow control mode, it indicates how many credits are 
exclusive in each input channel. It should be configured at least 2 for good 
performance. 1 buffer is for receving in-flight data in the subpartition and 1 
buffer is for parallel serialization. 
+
+- `taskmanager.network.memory.floating-buffers-per-gate`: Number of extra 
network buffers to use for each outgoing/incoming gate (result partition/input 
gate). In credit-based flow control mode, it indicates how many floating 
credits are shared for all the input channels. The floating buffers are 
distributed based on backlog (real-time output buffers in the subpartition) 
feedback. So the floating buffers can help relief back-pressure caused by 
imbalance data distribution among subpartitions.
+
--- End diff --

thanks for your polish, alpinegizmo.

I will apply the above fixes.


> Add the switch for keeping both the old mode and the new credit-based mode
> --
>
> Key: FLINK-8458
> URL: https://issues.apache.org/jira/browse/FLINK-8458
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> After the whole feature of credit-based flow control is done, we should add a 
> config parameter to switch on/off the new credit-based mode. To do so, we can 
> roll back to the old network mode for any expected risks.
> The parameter is defined as 
> {{taskmanager.network.credit-based-flow-control.enabled}} and the default 
> value is true. This switch may be removed after next release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...

2018-01-22 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5317#discussion_r163155607
  
--- Diff: docs/ops/config.md ---
@@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager 
and TaskManagers.
 
 - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three 
parameters above): The number of buffers available to the network stack. This 
number determines how many streaming data exchange channels a TaskManager can 
have at the same time and how well buffered the channels are. If a job is 
rejected or you get a warning that the system has not enough buffers available, 
increase this value (DEFAULT: **2048**). If set, it will be mapped to 
`taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on 
`taskmanager.memory.segment-size`.
 
+- `taskmanager.network.memory.buffers-per-channel`: Number of network 
buffers to use for each outgoing/incoming channel (subpartition/input channel). 
Especially in credit-based flow control mode, it indicates how many credits are 
exclusive in each input channel. It should be configured at least 2 for good 
performance. 1 buffer is for receving in-flight data in the subpartition and 1 
buffer is for parallel serialization. 
+
+- `taskmanager.network.memory.floating-buffers-per-gate`: Number of extra 
network buffers to use for each outgoing/incoming gate (result partition/input 
gate). In credit-based flow control mode, it indicates how many floating 
credits are shared for all the input channels. The floating buffers are 
distributed based on backlog (real-time output buffers in the subpartition) 
feedback. So the floating buffers can help relief back-pressure caused by 
imbalance data distribution among subpartitions.
+
--- End diff --

thanks for your polish, alpinegizmo.

I will apply the above fixes.


---


[jira] [Commented] (FLINK-8458) Add the switch for keeping both the old mode and the new credit-based mode

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335418#comment-16335418
 ] 

ASF GitHub Bot commented on FLINK-8458:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5317#discussion_r163155437
  
--- Diff: docs/ops/config.md ---
@@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager 
and TaskManagers.
 
 - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three 
parameters above): The number of buffers available to the network stack. This 
number determines how many streaming data exchange channels a TaskManager can 
have at the same time and how well buffered the channels are. If a job is 
rejected or you get a warning that the system has not enough buffers available, 
increase this value (DEFAULT: **2048**). If set, it will be mapped to 
`taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on 
`taskmanager.memory.segment-size`.
 
+- `taskmanager.network.memory.buffers-per-channel`: Number of network 
buffers to use for each outgoing/incoming channel (subpartition/input channel). 
Especially in credit-based flow control mode, it indicates how many credits are 
exclusive in each input channel. It should be configured at least 2 for good 
performance. 1 buffer is for receving in-flight data in the subpartition and 1 
buffer is for parallel serialization. 
+
--- End diff --

It is also used in current old mode and it is no need to change the default 
value in most cases in the old mode.

Considering the new credit-based mode, if the value greater than 2, it can 
get benefits if the bottleneck is caused by slow downstream processing. The 
greater the value is set, the  lower probability of blocking the upstream and 
causing back-pressure. But we should also consider the total available buffer 
resources for setting this parameter.


> Add the switch for keeping both the old mode and the new credit-based mode
> --
>
> Key: FLINK-8458
> URL: https://issues.apache.org/jira/browse/FLINK-8458
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> After the whole feature of credit-based flow control is done, we should add a 
> config parameter to switch on/off the new credit-based mode. To do so, we can 
> roll back to the old network mode for any expected risks.
> The parameter is defined as 
> {{taskmanager.network.credit-based-flow-control.enabled}} and the default 
> value is true. This switch may be removed after next release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5317: [FLINK-8458] Add the switch for keeping both the o...

2018-01-22 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5317#discussion_r163155437
  
--- Diff: docs/ops/config.md ---
@@ -290,6 +290,12 @@ The following parameters configure Flink's JobManager 
and TaskManagers.
 
 - `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three 
parameters above): The number of buffers available to the network stack. This 
number determines how many streaming data exchange channels a TaskManager can 
have at the same time and how well buffered the channels are. If a job is 
rejected or you get a warning that the system has not enough buffers available, 
increase this value (DEFAULT: **2048**). If set, it will be mapped to 
`taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on 
`taskmanager.memory.segment-size`.
 
+- `taskmanager.network.memory.buffers-per-channel`: Number of network 
buffers to use for each outgoing/incoming channel (subpartition/input channel). 
Especially in credit-based flow control mode, it indicates how many credits are 
exclusive in each input channel. It should be configured at least 2 for good 
performance. 1 buffer is for receving in-flight data in the subpartition and 1 
buffer is for parallel serialization. 
+
--- End diff --

It is also used in current old mode and it is no need to change the default 
value in most cases in the old mode.

Considering the new credit-based mode, if the value greater than 2, it can 
get benefits if the bottleneck is caused by slow downstream processing. The 
greater the value is set, the  lower probability of blocking the upstream and 
causing back-pressure. But we should also consider the total available buffer 
resources for setting this parameter.


---


[jira] [Commented] (FLINK-8445) hostname used in metric names for taskmanager and jobmanager are not consistent

2018-01-22 Thread Chris Thomson (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335362#comment-16335362
 ] 

Chris Thomson commented on FLINK-8445:
--

Problem appears to be present in 1.4.0 as well.

> hostname used in metric names for taskmanager and jobmanager are not 
> consistent
> ---
>
> Key: FLINK-8445
> URL: https://issues.apache.org/jira/browse/FLINK-8445
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.1, 1.4.0
> Environment: I think that this problem is present for metrics 
> reporting enabled configurations that include '' as part of the scope 
> for the metrics.  For example, using Graphite reporting configuration in 
> flink-conf.yaml below:
> {code:java}
> metrics.scope.jm: flink..jobmanager
> metrics.scope.jm.job: flink..jobmanager.
> metrics.scope.tm: flink..taskmanager
> metrics.scope.tm.job: flink..taskmanager.
> metrics.scope.task: 
> flink..taskmanager...
> metrics.scope.operator: 
> flink..taskmanager...
> metrics.reporters: graphite
> metrics.reporter.graphite.class: 
> org.apache.flink.metrics.graphite.GraphiteReporter
> ...{code}
>Reporter: Chris Thomson
>Priority: Minor
>
> Enabled Flink metrics reporting using Graphite using system scopes that 
> contain '' for both the job manager and task manager.  The resulting 
> metrics reported to Graphite use two different representations for ''.
> For *Task Manager metrics* it uses the *short hostname* (without the DNS 
> domain).  This is a result of logic in 
> org.apache.flink.runtime.taskmanager.TaskManagerLocation constructor that 
> tries to extract the short hostname from the fully qualified domain name 
> looked up from InetAddress.getCanonicalHostName().
> For *Job Manager metrics* it uses the *fully qualified domain name* (with the 
> DNS domain). This is a result of there being no logic in 
> org.apache.flink.runtime.jobmanager.JobManagerRunner or 
> org.apache.flink.runtime.rpc.akka.AkkaRpcService to perform equivalent 
> normalization of the fully qualified domain name down to the short hostname.
> Ideally the '' placeholders in the system scopes for the job manager 
> and task manager related metrics would be replaced with a consistent value 
> (either the short hostname or the fully qualified domain name).  Even better 
> if there was a configuration option to decide which one should be used for 
> metric name generation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8445) hostname used in metric names for taskmanager and jobmanager are not consistent

2018-01-22 Thread Chris Thomson (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Thomson updated FLINK-8445:
-
Affects Version/s: 1.4.0

> hostname used in metric names for taskmanager and jobmanager are not 
> consistent
> ---
>
> Key: FLINK-8445
> URL: https://issues.apache.org/jira/browse/FLINK-8445
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.1, 1.4.0
> Environment: I think that this problem is present for metrics 
> reporting enabled configurations that include '' as part of the scope 
> for the metrics.  For example, using Graphite reporting configuration in 
> flink-conf.yaml below:
> {code:java}
> metrics.scope.jm: flink..jobmanager
> metrics.scope.jm.job: flink..jobmanager.
> metrics.scope.tm: flink..taskmanager
> metrics.scope.tm.job: flink..taskmanager.
> metrics.scope.task: 
> flink..taskmanager...
> metrics.scope.operator: 
> flink..taskmanager...
> metrics.reporters: graphite
> metrics.reporter.graphite.class: 
> org.apache.flink.metrics.graphite.GraphiteReporter
> ...{code}
>Reporter: Chris Thomson
>Priority: Minor
>
> Enabled Flink metrics reporting using Graphite using system scopes that 
> contain '' for both the job manager and task manager.  The resulting 
> metrics reported to Graphite use two different representations for ''.
> For *Task Manager metrics* it uses the *short hostname* (without the DNS 
> domain).  This is a result of logic in 
> org.apache.flink.runtime.taskmanager.TaskManagerLocation constructor that 
> tries to extract the short hostname from the fully qualified domain name 
> looked up from InetAddress.getCanonicalHostName().
> For *Job Manager metrics* it uses the *fully qualified domain name* (with the 
> DNS domain). This is a result of there being no logic in 
> org.apache.flink.runtime.jobmanager.JobManagerRunner or 
> org.apache.flink.runtime.rpc.akka.AkkaRpcService to perform equivalent 
> normalization of the fully qualified domain name down to the short hostname.
> Ideally the '' placeholders in the system scopes for the job manager 
> and task manager related metrics would be replaced with a consistent value 
> (either the short hostname or the fully qualified domain name).  Even better 
> if there was a configuration option to decide which one should be used for 
> metric name generation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8431) Allow to specify # GPUs for TaskManager in Mesos

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335239#comment-16335239
 ] 

ASF GitHub Bot commented on FLINK-8431:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5307
  
@tillrohrmann please merge


> Allow to specify # GPUs for TaskManager in Mesos
> 
>
> Key: FLINK-8431
> URL: https://issues.apache.org/jira/browse/FLINK-8431
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Mesos
>Reporter: Dongwon Kim
>Assignee: Dongwon Kim
>Priority: Minor
>
> Mesos provides first-class support for Nvidia GPUs [1], but Flink does not 
> exploit it when scheduling TaskManagers. If Mesos agents are configured to 
> isolate GPUs as shown in [2], TaskManagers that do not specify to use GPUs 
> cannot see GPUs at all.
> We, therefore, need to introduce a new configuration property named 
> "mesos.resourcemanager.tasks.gpus" to allow users to specify # of GPUs for 
> each TaskManager process in Mesos.
> [1] http://mesos.apache.org/documentation/latest/gpu-support/
> [2] http://mesos.apache.org/documentation/latest/gpu-support/#agent-flags



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5307: [FLINK-8431] [mesos] Allow to specify # GPUs for TaskMana...

2018-01-22 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5307
  
@tillrohrmann please merge


---


[jira] [Created] (FLINK-8485) Running Flink inside Intellij no longer works after upgrading from 1.3.2 to 1.4.0

2018-01-22 Thread Xuan Nguyen (JIRA)
Xuan Nguyen created FLINK-8485:
--

 Summary: Running Flink inside Intellij no longer works after 
upgrading from 1.3.2 to 1.4.0
 Key: FLINK-8485
 URL: https://issues.apache.org/jira/browse/FLINK-8485
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.0
Reporter: Xuan Nguyen


I upgraded flink from 1.3.2 to 1.4.0 and my simple test case no longer runs 
within Intellij or any other IDE when I click on the "RUN" button.  I'm using 
JDK 1.8.

My Dependencies are:

 
{code:java}
dependencies {
compile group: 'log4j', name: 'log4j', version: '1.2.17' 
compile 'org.apache.flink:flink-java:1.4.0' 
compile 'org.apache.flink:flink-streaming-java_2.11:1.4.0' 
compile 'org.apache.flink:flink-clients_2.11:1.4.0' 
compile 'org.apache.flink:flink-table_2.11:1.4.0' 
compile 'org.apache.flink:flink-scala_2.11:1.4.0' 
compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.0'
compile 'org.apache.flink:flink-connector-kafka-0.8_2.11:1.4.0'
compile 'org.apache.flink:flink-queryable-state-runtime_2.11:1.4.0'
compile 'org.apache.flink:flink-queryable-state-client-java__2.11:1.4.0' 
testCompile 'junit:junit:+'
}
{code}
 

{{ }}

{{The exception:}}
{code:java}
Exception in thread "main" 
org.apache.flink.runtime.client.JobSubmissionException: Could not retrieve 
BlobServer address. at 
org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:166)
 at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97) at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.util.concurrent.ExecutionException: 
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1662993273]] after [1 ms]. 
Sender[null] sent message of type 
"org.apache.flink.runtime.messages.JobManagerMessages$RequestBlobManagerPort$". 
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at 
org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:160)
 ... 9 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1662993273]] after [1 ms]. 
Sender[null] sent message of type 
"org.apache.flink.runtime.messages.JobManagerMessages$RequestBlobManagerPort$". 
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) 
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
 at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
 at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
 at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
 at java.lang.Thread.run(Thread.java:748){code}
 

The debug full logs are located in 
[https://gist.github.com/xuan/e6d4543c478c30d5747428589b03dd03] along with the 
test case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-01-22 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335116#comment-16335116
 ] 

Eron Wright  commented on FLINK-5479:
-

To elaborate on my earlier comment about `max.message.time.difference.ms`, 
let's consider the ideal watermark for the two types of timestamps supported by 
Kafka (as per KIP-32), CreateTime and LogAppendTime.

In LogAppendTime, the timestamp is monotonically increasing with each message, 
and corresponds to the wall clock time of the broker at append time.   The 
per-partition watermark could simply track the message time.   The complication 
is how to advance the watermark when the partition is idle; an in-band 
heartbeat from the broker (informing the client about the progression of its 
wall clock) would be ideal.

In CreateTime, the timestamp is supplied by the producer, but the broker may 
enforce an upper bound ("max difference") on the delta between the message 
timestamp and the broker's current time.  The ideal per-partition watermark 
would be the broker's current time minus the max difference.

 

> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8120) Cannot access Web UI from YARN application overview in FLIP-6 mode

2018-01-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8120.
--
Resolution: Fixed

Fixed via 627bcda

> Cannot access Web UI from YARN application overview in FLIP-6 mode
> --
>
> Key: FLINK-8120
> URL: https://issues.apache.org/jira/browse/FLINK-8120
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The Web UI cannot be accessed through YARN's application overview (_Tracking 
> UI_ link). The proxy displays a stacktrace.
> {noformat}
> Caused by:
> org.apache.http.client.ClientProtocolException
>   at 
> org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:888)
>   at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
>   at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
>   at 
> org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:242)
>   at 
> org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.methodAction(WebAppProxyServlet.java:461)
>   at 
> org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:290)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
>   at 
> org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
>   at 
> com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66)
>   at 
> com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900)
>   at 
> com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:178)
>   at 
> com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
>   at 
> com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
>   at 
> com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
>   at 
> com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
>   at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at 
> org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at 
> org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:636)
>   at 
> org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter.doFilter(DelegationTokenAuthenticationFilter.java:294)
>   at 
> org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:588)
>   at 
> org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter.doFilter(RMAuthenticationFilter.java:82)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at 
> org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1353)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
>   at 
> org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
>   at 
> org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
>   at 
> org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
>   at 
> org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
>   at 
> org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
>   at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
>   at 
> org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
>   at 
> org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
>   at 

[jira] [Commented] (FLINK-8365) Relax List type in HeapListState and HeapKeyedStateBackend

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335028#comment-16335028
 ] 

ASF GitHub Bot commented on FLINK-8365:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5326
  
Thanks for the feedback! I updated the PR and squashed all commits


> Relax List type in HeapListState and HeapKeyedStateBackend
> --
>
> Key: FLINK-8365
> URL: https://issues.apache.org/jira/browse/FLINK-8365
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> {{stateTable}} in HeapListState and 
> {{HeapKeyedStateBackend#createListState()}} are both strongly typed to 
> {{ArrayList}} right now.
> As discussed with [~StephanEwen] and [~stefanrichte...@gmail.com] in 
> https://github.com/apache/flink/pull/4963, we may want to relax the type to 
> {{List}}.
> Problems discovered now:
> 1. That may require changing serializer from {{ArrayListSerializer}} to 
> {{ListSerializer}} in the following code, and we need to discuss the pros and 
> cons
> {code:java}
> @Override
>   public  InternalListState createListState(
>   TypeSerializer namespaceSerializer,
>   ListStateDescriptor stateDesc) throws Exception {
>   // the list state does some manual mapping, because the state 
> is typed to the generic
>   // 'List' interface, but we want to use an implementation typed 
> to ArrayList
>   // using a more specialized implementation opens up runtime 
> optimizations
>   StateTable stateTable = 
> tryRegisterStateTable(
>   stateDesc.getName(),
>   stateDesc.getType(),
>   namespaceSerializer,
>   new 
> ArrayListSerializer(stateDesc.getElementSerializer()));
>   return new HeapListState<>(stateDesc, stateTable, 
> keySerializer, namespaceSerializer);
>   }
> {code}
> 2. for non-RocksDBStateBackend (AsyncFileStateBackendTest, 
> AsyncMemoryStateBackendTest, FileStateBackendTest, and 
> MemoryStateBackendTest), unit tests testListState and 
> testListStateAddUpdateAndGet fail



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5326: [FLINK-8365] [State Backend] Relax List type in HeapListS...

2018-01-22 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5326
  
Thanks for the feedback! I updated the PR and squashed all commits


---


[jira] [Commented] (FLINK-8267) update Kinesis Producer example for setting Region key

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335021#comment-16335021
 ] 

ASF GitHub Bot commented on FLINK-8267:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5301
  
Make sense. Updated the PR also squashed all commits (first time using 
squash!)


> update Kinesis Producer example for setting Region key
> --
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Assignee: Bowen Li
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5323: [FLINK-8441] [State Backend] [RocksDB] change RocksDBList...

2018-01-22 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5323
  
Make sense. Updated the PR also squashed all commits


---


[GitHub] flink issue #5301: [FLINK-8267] [Kinesis Connector] update Kinesis Producer ...

2018-01-22 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5301
  
Make sense. Updated the PR also squashed all commits (first time using 
squash!)


---


[jira] [Commented] (FLINK-8441) serialize values and value separator directly to stream in RocksDBListState

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335019#comment-16335019
 ] 

ASF GitHub Bot commented on FLINK-8441:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5323
  
Make sense. Updated the PR also squashed all commits


> serialize values and value separator directly to stream in RocksDBListState
> ---
>
> Key: FLINK-8441
> URL: https://issues.apache.org/jira/browse/FLINK-8441
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> In \{{RocksDBListState#getPreMergedValue}}, we could probably serialize 
> values and value separator directly into {{keySerializationStream}}.
> We tried once, it didn't work out. Let's try one more time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8463) Remove unnecessary thread blocking in RestClient#submitRequest

2018-01-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8463.

Resolution: Fixed

Fixed via 016118026ca96abb691b236ca7d08db94c93684a

> Remove unnecessary thread blocking in RestClient#submitRequest
> --
>
> Key: FLINK-8463
> URL: https://issues.apache.org/jira/browse/FLINK-8463
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClient}} unnecessarily blocks an IO executor thread when trying to 
> open a connection to a remote destination. This can be improved by 
> registering a {{ChannelFuture}} listener which continues the execution once 
> the connection has been established.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8462) TaskExecutor does not verify RM heartbeat timeouts

2018-01-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8462.

Resolution: Fixed

Fixed via 776af4a882c85926fc0764b702fec717c675e34c

> TaskExecutor does not verify RM heartbeat timeouts
> --
>
> Key: FLINK-8462
> URL: https://issues.apache.org/jira/browse/FLINK-8462
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{TaskExecutor}} does neither properly stop RM heartbeats nor does it 
> check whether a RM heartbeat timeout is still valid. As a consequence, it can 
> happen that the {{TaskExecutor}} closes the connection to an active {{RM}} 
> due to an outdated heartbeat timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8463) Remove unnecessary thread blocking in RestClient#submitRequest

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334991#comment-16334991
 ] 

ASF GitHub Bot commented on FLINK-8463:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5319


> Remove unnecessary thread blocking in RestClient#submitRequest
> --
>
> Key: FLINK-8463
> URL: https://issues.apache.org/jira/browse/FLINK-8463
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClient}} unnecessarily blocks an IO executor thread when trying to 
> open a connection to a remote destination. This can be improved by 
> registering a {{ChannelFuture}} listener which continues the execution once 
> the connection has been established.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8462) TaskExecutor does not verify RM heartbeat timeouts

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334990#comment-16334990
 ] 

ASF GitHub Bot commented on FLINK-8462:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5318


> TaskExecutor does not verify RM heartbeat timeouts
> --
>
> Key: FLINK-8462
> URL: https://issues.apache.org/jira/browse/FLINK-8462
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{TaskExecutor}} does neither properly stop RM heartbeats nor does it 
> check whether a RM heartbeat timeout is still valid. As a consequence, it can 
> happen that the {{TaskExecutor}} closes the connection to an active {{RM}} 
> due to an outdated heartbeat timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5319: [FLINK-8463] [rest] Remove blocking of IO executor...

2018-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5319


---


[GitHub] flink pull request #5318: [FLINK-8462] [flip6] Filter invalid heartbeat time...

2018-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5318


---


[jira] [Commented] (FLINK-8267) update Kinesis Producer example for setting Region key

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334963#comment-16334963
 ] 

ASF GitHub Bot commented on FLINK-8267:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5301#discussion_r163075196
  
--- Diff: docs/dev/connectors/kinesis.md ---
@@ -271,9 +271,9 @@ For the monitoring to work, the user accessing the 
stream needs access to the Cl
 {% highlight java %}
 Properties producerConfig = new Properties();
 // Required configs
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
 producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
 producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+producerConfig.put("Region", "us-east-1");
--- End diff --

no, KPL doesn't have such configs. KPL takes a string like 'Region' and 
tries to find its setter using reflection.


> update Kinesis Producer example for setting Region key
> --
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Assignee: Bowen Li
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5301: [FLINK-8267] [Kinesis Connector] update Kinesis Pr...

2018-01-22 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5301#discussion_r163075196
  
--- Diff: docs/dev/connectors/kinesis.md ---
@@ -271,9 +271,9 @@ For the monitoring to work, the user accessing the 
stream needs access to the Cl
 {% highlight java %}
 Properties producerConfig = new Properties();
 // Required configs
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
 producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
 producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+producerConfig.put("Region", "us-east-1");
--- End diff --

no, KPL doesn't have such configs. KPL takes a string like 'Region' and 
tries to find its setter using reflection.


---


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334945#comment-16334945
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163061338
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+
+/**
+  * Describes a schema of a table.
+  */
+class Schema extends Descriptor {
+
+  private val tableSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+
+  /**
+* Sets the schema with field names and the types. Required.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): Schema = {
--- End diff --

add a method `def schema(schema: String): Schema` that parses the schema 
string?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334946#comment-16334946
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163012442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceFactoryService}
+
+/**
+  * Descriptor for specifying a table source in a streaming environment.
+  */
+class StreamTableSourceDescriptor(
+tableEnv: StreamTableEnvironment,
+schema: Schema)
+  extends TableSourceDescriptor {
+
+  schemaDescriptor = Some(schema)
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it.
+*/
+  def toTableSource: TableSource[_] = {
+val source = TableSourceFactoryService.findTableSourceFactory(this)
+source match {
+  case _: StreamTableSource[_] => source
+  case _ => throw new TableException(
+s"Found table source '${source.getClass.getCanonicalName}' is not 
applicable " +
+  s"in a streaming environment.")
+}
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it as a table.
+*/
+  def toTable: Table = {
+tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  def register(name: String): Unit = {
+tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+* Specifies an connector for reading data from a connector.
+*/
+  def withConnector(connector: ConnectorDescriptor): 
StreamTableSourceDescriptor = {
+connectorDescriptor = Some(connector)
+this
+  }
+
+  /**
+* Specifies an encoding that defines how to read data from a connector.
+*/
+  def withEncoding(encoding: EncodingDescriptor): 
StreamTableSourceDescriptor = {
+encodingDescriptor = Some(encoding)
--- End diff --

check if the connector requires an encoding?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, 

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334949#comment-16334949
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163071117
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/DescriptorsTest.scala
 ---
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class DescriptorsTest extends TableTestBase {
--- End diff --

I would move the tests to a separate class per descriptor. 
If we add a `validate` method to `Descriptor` this needs to be tested as 
well.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334952#comment-16334952
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163012548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceFactoryService}
+
+/**
+  * Descriptor for specifying a table source in a streaming environment.
+  */
+class StreamTableSourceDescriptor(
+tableEnv: StreamTableEnvironment,
+schema: Schema)
+  extends TableSourceDescriptor {
+
+  schemaDescriptor = Some(schema)
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it.
+*/
+  def toTableSource: TableSource[_] = {
+val source = TableSourceFactoryService.findTableSourceFactory(this)
+source match {
+  case _: StreamTableSource[_] => source
+  case _ => throw new TableException(
+s"Found table source '${source.getClass.getCanonicalName}' is not 
applicable " +
+  s"in a streaming environment.")
+}
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it as a table.
+*/
+  def toTable: Table = {
+tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  def register(name: String): Unit = {
+tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+* Specifies an connector for reading data from a connector.
+*/
+  def withConnector(connector: ConnectorDescriptor): 
StreamTableSourceDescriptor = {
+connectorDescriptor = Some(connector)
--- End diff --

check if an encoding was added that the connector does not need?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very 

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334933#comment-16334933
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162988639
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -23,22 +23,74 @@ import java.net.URL
 import org.apache.commons.configuration.{ConfigurationException, 
ConversionException, PropertiesConfiguration}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource, TableSourceFactoryService}
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
   * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
   */
 object ExternalTableSourceUtil extends Logging {
 
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable(
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceTable[_] = {
+
+// check for the legacy external catalog path
+if (externalCatalogTable.isLegacyTableType) {
+  LOG.warn("External catalog tables based on TableType annotations are 
deprecated. " +
+"Please consider updating them to TableSourceFactories.")
+  fromExternalCatalogTableType(externalCatalogTable)
+}
+// use the factory approach
+else {
+  val source = 
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+  tableEnv match {
+// check for a batch table source in this batch environment
+case _: BatchTableEnvironment =>
+  source match {
+case bts: BatchTableSource[_] =>
+  new BatchTableSourceTable(
+bts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a batch environment.")
+  }
+// check for a stream table source in this streaming environment
+case _: StreamTableEnvironment =>
+  source match {
+case sts: StreamTableSource[_] =>
+  new StreamTableSourceTable(
+sts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a streaming environment.")
+  }
+case _ => throw new TableException("Unsupported table 
environment.")
+  }
+}
+  }
+
+  // 
--
+  // NOTE: the following line can be removed once we drop support for 
TableType
--- End diff --

line or lines? 
Create a JIRA and link it here as reference?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a 

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334927#comment-16334927
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162964475
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -136,12 +136,51 @@ case class CatalogAlreadyExistException(
   def this(catalog: String) = this(catalog, null)
 }
 
+/**
+  * Exception for not finding a 
[[org.apache.flink.table.sources.TableSourceFactory]] for the
+  * given properties.
+  *
+  * @param properties properties that describe the table source
+  * @param cause the cause
+  */
+case class NoMatchingTableSourceException(
+properties: Map[String, String],
+cause: Throwable)
+extends RuntimeException(
+  s"Could not find a table source factory in the classpath satisfying 
the " +
+s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 
).mkString("\n")}",
+  cause) {
+
+  def this(properties: Map[String, String]) = this(properties, null)
+}
+
+/**
+  * Exception for finding more than one 
[[org.apache.flink.table.sources.TableSourceFactory]] for
+  * the given properties.
+  *
+  * @param properties properties that describe the table source
+  * @param cause the cause
+  */
+case class AmbiguousTableSourceException(
+properties: Map[String, String],
+cause: Throwable)
+extends RuntimeException(
+  s"More than one table source factory in the classpath satisfying the 
" +
+s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 
).mkString("\n")}",
+  cause) {
+
+  def this(properties: Map[String, String]) = this(properties, null)
+}
+
 /**
   * Exception for not finding a [[TableSourceConverter]] for a given table 
type.
   *
   * @param tableType table type
   * @param cause the cause
+  * @deprecated Use table source factories instead.
   */
+@Deprecated
+@deprecated("Use table factories instead.")
--- End diff --

Give a class name.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334953#comment-16334953
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163012882
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{BatchTableSource, TableSource, 
TableSourceFactoryService}
+
+class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, schema: 
Schema)
--- End diff --

Add `RowtimeDescriptor`. Batch table sources support timestamp extraction 
as well.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334930#comment-16334930
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162965980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.{DataSet, 
ExecutionEnvironment}
 import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.table.api._
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, 
ConnectorDescriptor}
--- End diff --

remove


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334941#comment-16334941
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163010702
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A class that adds a set of string-based, normalized properties for 
describing a
+  * table source or table sink.
+  */
+abstract class Descriptor {
+
+  /**
+* Internal method for properties conversion.
+*/
+  def addProperties(properties: NormalizedProperties): Unit
--- End diff --

does this method have to be public or can we hide it?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334942#comment-16334942
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163007886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
+this.fieldMapping = Some(tableToJsonMapping)
+this
+  }
+
+  /**
+* Sets flag whether to fail if a field is missing or not.
+*
+* @param failOnMissingField If set to true, the operation fails if 
there is a missing field.
+*   If set to false, a missing field is set to 
null.
+* @return The builder.
+*/
+  def failOnMissingField(failOnMissingField: Boolean): JSON = {
+this.failOnMissingField = 

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334944#comment-16334944
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163011304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A class that adds a set of string-based, normalized properties for 
describing a
+  * table source or table sink.
+  */
+abstract class Descriptor {
--- End diff --

Should we add a validation method that checks if the descriptor is valid?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334939#comment-16334939
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163002001
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
--- End diff --

We might want to make field mappings independent of the encoding. For 
example field mappings could also be used for JDBC connectors which do not have 
an encoding.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>   

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334937#comment-16334937
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r16231
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.statistics
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing a table source.
+  */
+abstract class TableSourceDescriptor extends Descriptor {
+
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var encodingDescriptor: Option[EncodingDescriptor] = None
+  protected var proctimeDescriptor: Option[Proctime] = None
+  protected var rowtimeDescriptor: Option[Rowtime] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
--- End diff --

We might need another descriptor for mapping fields of the encoding (or 
connector) to fields in the table schema. This can be used to rename or select 
fields from the encoding to the table schema. This would be the configuration 
for the `DefinedFieldMapping` interface.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334951#comment-16334951
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163071426
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.io.Serializable
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import 
org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo
+import org.apache.flink.util.InstantiationUtil
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, 
PackratParsers}
+
+/**
+  * Utilities to convert 
[[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
+  * string representation and back.
+  */
+object TypeStringUtils extends JavaTokenParsers with PackratParsers {
--- End diff --

We need unit tests for the parser.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334932#comment-16334932
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162967648
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,28 +18,282 @@
 
 package org.apache.flink.table.catalog
 
-import java.util.{HashMap => JHashMap, Map => JMap}
 import java.lang.{Long => JLong}
+import java.util.{HashMap => JHashMap, Map => JMap}
 
-import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.{TableException, TableSchema}
+import 
org.apache.flink.table.catalog.ExternalCatalogTable.{TableTypeConnector, 
toConnectorDescriptor, toMetadataDescriptor, toStatisticsDescriptor}
+import org.apache.flink.table.descriptors.DescriptorUtils.{connector, 
metadata}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
   * Defines a table in an [[ExternalCatalog]].
-  *
-  * @param tableTypeTable type, e.g csv, hbase, kafka
-  * @param schema   Schema of the table (column names and 
types)
-  * @param properties   Properties of the table
-  * @param statsStatistics of the table
-  * @param comment  Comment of the table
-  * @param createTime   Create timestamp of the table
-  * @param lastAccessTime   Timestamp of last access of the table
   */
-case class ExternalCatalogTable(
+class ExternalCatalogTable(
--- End diff --

Add descriptions for constructor arguments


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334948#comment-16334948
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163011691
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var rowtimeName: Option[String] = None
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Declares a field of the schema to be the rowtime attribute. Required.
+*
+* @param fieldName The name of the field that becomes the processing 
time field.
+*/
+  def field(fieldName: String): Rowtime = {
+rowtimeName = Some(fieldName)
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
+watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy which indicates the watermarks 
should be preserved from the
+* underlying DataStream API.
+*/
+  def watermarkFromDataStream(): Rowtime = {
+watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
+this
+  }
+

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334950#comment-16334950
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163067483
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.io.Serializable
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import 
org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo
+import org.apache.flink.util.InstantiationUtil
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, 
PackratParsers}
+
+/**
+  * Utilities to convert 
[[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
+  * string representation and back.
+  */
+object TypeStringUtils extends JavaTokenParsers with PackratParsers {
--- End diff --

Some examples about the supported syntax would be good. 
Would also be good to add these examples to the method docs that accept 
type strings.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334947#comment-16334947
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163065450
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
+  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
+  * describe the desired table source. The factory allows for matching to 
the given set of
+  * properties and creating a configured [[TableSource]] accordingly.
+  *
+  * Classes that implement this interface need to be added to the
+  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
--- End diff --

do all need to be added to the same file? Or can we have separate files for 
different modules. For instance, a `Kafka011JsonTableFactory` would be in the 
Kafka connectors module. Would a user have to change the service file if the 
Kafka factory should be used or can we built it in a way that it is sufficient 
include the Kafka connectors JAR?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334943#comment-16334943
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162995916
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.connector
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(private val tpe: String) extends 
Descriptor {
+
+  /**
+* Internal method for properties conversion.
+*/
+  final def addProperties(properties: NormalizedProperties): Unit = {
+properties.putString(connector("type"), tpe)
+val connectorProperties = new NormalizedProperties()
+addConnectorProperties(connectorProperties)
+connectorProperties.getProperties.foreach { case (k, v) =>
--- End diff --

why do we need to go over the properties again? Couldn't we implement 
`addConnectorProperties` to properly add the properties directly into 
`properties`?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334940#comment-16334940
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162996455
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+/**
+  * Utilities for working with a 
[[org.apache.flink.table.descriptors.Descriptor]].
+  */
+object DescriptorUtils {
+
+  def hasConnector(properties: util.Map[String, String], connector: 
String): Boolean = {
+val tpe = properties.get("connector.type")
+tpe != null || tpe == connector
--- End diff --

should be `tpe != null && tpe == connector`?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334931#comment-16334931
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162990874
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -23,22 +23,74 @@ import java.net.URL
 import org.apache.commons.configuration.{ConfigurationException, 
ConversionException, PropertiesConfiguration}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource, TableSourceFactoryService}
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
   * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
   */
 object ExternalTableSourceUtil extends Logging {
 
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable(
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceTable[_] = {
+
+// check for the legacy external catalog path
+if (externalCatalogTable.isLegacyTableType) {
+  LOG.warn("External catalog tables based on TableType annotations are 
deprecated. " +
+"Please consider updating them to TableSourceFactories.")
+  fromExternalCatalogTableType(externalCatalogTable)
+}
+// use the factory approach
+else {
+  val source = 
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+  tableEnv match {
+// check for a batch table source in this batch environment
+case _: BatchTableEnvironment =>
+  source match {
+case bts: BatchTableSource[_] =>
+  new BatchTableSourceTable(
+bts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a batch environment.")
+  }
+// check for a stream table source in this streaming environment
+case _: StreamTableEnvironment =>
+  source match {
+case sts: StreamTableSource[_] =>
+  new StreamTableSourceTable(
+sts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a streaming environment.")
+  }
+case _ => throw new TableException("Unsupported table 
environment.")
+  }
+}
+  }
+
+  // 
--
+  // NOTE: the following line can be removed once we drop support for 
TableType
--- End diff --

I think we can also remove the `org.reflections:reflections` dependency 
once we removed this.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table 

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334938#comment-16334938
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163005744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
--- End diff --

Should we add a method that defines the schema with a JSON Schema string? 
We would need a parser, but have immediate support for nested schema. 

Alternatively, we could use the nested schema parser of `TypeStringUtils` 
but this would not be JSON Schema.




> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334936#comment-16334936
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163007563
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
+this.fieldMapping = Some(tableToJsonMapping)
+this
+  }
+
+  /**
+* Sets flag whether to fail if a field is missing or not.
+*
+* @param failOnMissingField If set to true, the operation fails if 
there is a missing field.
+*   If set to false, a missing field is set to 
null.
+* @return The builder.
+*/
+  def failOnMissingField(failOnMissingField: Boolean): JSON = {
+this.failOnMissingField = 

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163012548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceFactoryService}
+
+/**
+  * Descriptor for specifying a table source in a streaming environment.
+  */
+class StreamTableSourceDescriptor(
+tableEnv: StreamTableEnvironment,
+schema: Schema)
+  extends TableSourceDescriptor {
+
+  schemaDescriptor = Some(schema)
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it.
+*/
+  def toTableSource: TableSource[_] = {
+val source = TableSourceFactoryService.findTableSourceFactory(this)
+source match {
+  case _: StreamTableSource[_] => source
+  case _ => throw new TableException(
+s"Found table source '${source.getClass.getCanonicalName}' is not 
applicable " +
+  s"in a streaming environment.")
+}
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it as a table.
+*/
+  def toTable: Table = {
+tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  def register(name: String): Unit = {
+tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+* Specifies an connector for reading data from a connector.
+*/
+  def withConnector(connector: ConnectorDescriptor): 
StreamTableSourceDescriptor = {
+connectorDescriptor = Some(connector)
--- End diff --

check if an encoding was added that the connector does not need?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162947812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -107,6 +111,16 @@ abstract class BatchTableEnvironment(
 }
   }
 
+  /**
+* Creates a table from a descriptor that describes the resulting table 
schema, the source
+* connector, source encoding, and other properties.
+*
+* @param schema schema descriptor describing the table to create
+*/
+  def createTable(schema: Schema): BatchTableSourceDescriptor = {
--- End diff --

I'm not sure about the approach of returning a `TableSourceDescriptor`. I 
think it would be better if the table creation and registration would be 
completed within this method, i.e., the table should be completely defined by 
the argument of the method.

For example

```
tEnv.registerTableSource(
  "MyTable",
  TableSource.create(tEnv)
.withSchema(
  Schema()
.field(...)
.field(...))
   .withConnector()
 ...
   .toTableSource()
  )
```

In this design, we would reuse existing `registerTableSource` method and 
`TableSource.create` is a static method that returns a `TableSourceDescriptor`. 
Not sure if this is the best approach, but I like that the table is completely 
defined within the method call.


---


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334934#comment-16334934
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162996633
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+/**
+  * Utilities for working with a 
[[org.apache.flink.table.descriptors.Descriptor]].
+  */
+object DescriptorUtils {
+
+  def hasConnector(properties: util.Map[String, String], connector: 
String): Boolean = {
+val tpe = properties.get("connector.type")
+tpe != null || tpe == connector
+  }
+
+  def hasEncoding(properties: util.Map[String, String], encoding: String): 
Boolean = {
+val tpe = properties.get("encoding.type")
+tpe != null || tpe == encoding
--- End diff --

should be  `tpe != null && tpe == encoding`?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162964487
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -156,7 +195,10 @@ case class NoMatchedTableSourceConverterException(
   *
   * @param tableType table type
   * @param cause the cause
+  * @deprecated Use table source factories instead.
   */
+@Deprecated
+@deprecated("Use table source factories instead.")
--- End diff --

Give a class name.


---


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334928#comment-16334928
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162947784
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -125,6 +129,16 @@ abstract class StreamTableEnvironment(
 }
   }
 
+  /**
+* Creates a table from a descriptor that describes the resulting table 
schema, the source
+* connector, the source encoding, and other properties.
+*
+* @param schema schema descriptor describing the table to create
+*/
+  def createTable(schema: Schema): StreamTableSourceDescriptor = {
--- End diff --

See comment on `BatchTableEnvironment`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162988639
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -23,22 +23,74 @@ import java.net.URL
 import org.apache.commons.configuration.{ConfigurationException, 
ConversionException, PropertiesConfiguration}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource, TableSourceFactoryService}
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
   * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
   */
 object ExternalTableSourceUtil extends Logging {
 
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable(
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceTable[_] = {
+
+// check for the legacy external catalog path
+if (externalCatalogTable.isLegacyTableType) {
+  LOG.warn("External catalog tables based on TableType annotations are 
deprecated. " +
+"Please consider updating them to TableSourceFactories.")
+  fromExternalCatalogTableType(externalCatalogTable)
+}
+// use the factory approach
+else {
+  val source = 
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+  tableEnv match {
+// check for a batch table source in this batch environment
+case _: BatchTableEnvironment =>
+  source match {
+case bts: BatchTableSource[_] =>
+  new BatchTableSourceTable(
+bts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a batch environment.")
+  }
+// check for a stream table source in this streaming environment
+case _: StreamTableEnvironment =>
+  source match {
+case sts: StreamTableSource[_] =>
+  new StreamTableSourceTable(
+sts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a streaming environment.")
+  }
+case _ => throw new TableException("Unsupported table 
environment.")
+  }
+}
+  }
+
+  // 
--
+  // NOTE: the following line can be removed once we drop support for 
TableType
--- End diff --

line or lines? 
Create a JIRA and link it here as reference?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163007886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
+this.fieldMapping = Some(tableToJsonMapping)
+this
+  }
+
+  /**
+* Sets flag whether to fail if a field is missing or not.
+*
+* @param failOnMissingField If set to true, the operation fails if 
there is a missing field.
+*   If set to false, a missing field is set to 
null.
+* @return The builder.
+*/
+  def failOnMissingField(failOnMissingField: Boolean): JSON = {
+this.failOnMissingField = Some(failOnMissingField)
+this
+  }
+
+  /**
+* Internal method for encoding properties conversion.
+*/
+  override protected def addEncodingProperties(properties: 
NormalizedProperties): Unit = {
+ 

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163012442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceFactoryService}
+
+/**
+  * Descriptor for specifying a table source in a streaming environment.
+  */
+class StreamTableSourceDescriptor(
+tableEnv: StreamTableEnvironment,
+schema: Schema)
+  extends TableSourceDescriptor {
+
+  schemaDescriptor = Some(schema)
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it.
+*/
+  def toTableSource: TableSource[_] = {
+val source = TableSourceFactoryService.findTableSourceFactory(this)
+source match {
+  case _: StreamTableSource[_] => source
+  case _ => throw new TableException(
+s"Found table source '${source.getClass.getCanonicalName}' is not 
applicable " +
+  s"in a streaming environment.")
+}
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and returns it as a table.
+*/
+  def toTable: Table = {
+tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+* Searches for the specified table source, configures it accordingly, 
and registers it as
+* a table under the given name.
+*
+* @param name table name to be registered in the table environment
+*/
+  def register(name: String): Unit = {
+tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+* Specifies an connector for reading data from a connector.
+*/
+  def withConnector(connector: ConnectorDescriptor): 
StreamTableSourceDescriptor = {
+connectorDescriptor = Some(connector)
+this
+  }
+
+  /**
+* Specifies an encoding that defines how to read data from a connector.
+*/
+  def withEncoding(encoding: EncodingDescriptor): 
StreamTableSourceDescriptor = {
+encodingDescriptor = Some(encoding)
--- End diff --

check if the connector requires an encoding?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162965980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.{DataSet, 
ExecutionEnvironment}
 import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.table.api._
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, 
ConnectorDescriptor}
--- End diff --

remove


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162967648
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -18,28 +18,282 @@
 
 package org.apache.flink.table.catalog
 
-import java.util.{HashMap => JHashMap, Map => JMap}
 import java.lang.{Long => JLong}
+import java.util.{HashMap => JHashMap, Map => JMap}
 
-import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.{TableException, TableSchema}
+import 
org.apache.flink.table.catalog.ExternalCatalogTable.{TableTypeConnector, 
toConnectorDescriptor, toMetadataDescriptor, toStatisticsDescriptor}
+import org.apache.flink.table.descriptors.DescriptorUtils.{connector, 
metadata}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
   * Defines a table in an [[ExternalCatalog]].
-  *
-  * @param tableTypeTable type, e.g csv, hbase, kafka
-  * @param schema   Schema of the table (column names and 
types)
-  * @param properties   Properties of the table
-  * @param statsStatistics of the table
-  * @param comment  Comment of the table
-  * @param createTime   Create timestamp of the table
-  * @param lastAccessTime   Timestamp of last access of the table
   */
-case class ExternalCatalogTable(
+class ExternalCatalogTable(
--- End diff --

Add descriptions for constructor arguments


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162995916
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.connector
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(private val tpe: String) extends 
Descriptor {
+
+  /**
+* Internal method for properties conversion.
+*/
+  final def addProperties(properties: NormalizedProperties): Unit = {
+properties.putString(connector("type"), tpe)
+val connectorProperties = new NormalizedProperties()
+addConnectorProperties(connectorProperties)
+connectorProperties.getProperties.foreach { case (k, v) =>
--- End diff --

why do we need to go over the properties again? Couldn't we implement 
`addConnectorProperties` to properly add the properties directly into 
`properties`?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162994182
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.connector
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(private val tpe: String) extends 
Descriptor {
--- End diff --

Should a `ConnectorDescriptor` know whether it requires an encoding? For 
example a file descriptor needs an encoding but a JDBC connector doesn't.

This property would then be used to validate the configuration


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163011691
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var rowtimeName: Option[String] = None
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Declares a field of the schema to be the rowtime attribute. Required.
+*
+* @param fieldName The name of the field that becomes the processing 
time field.
+*/
+  def field(fieldName: String): Rowtime = {
+rowtimeName = Some(fieldName)
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
+watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy which indicates the watermarks 
should be preserved from the
+* underlying DataStream API.
+*/
+  def watermarkFromDataStream(): Rowtime = {
+watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
+this
+  }
+
+  /**
+* Sets a custom watermark strategy to be used for the rowtime 
attribute.
+*/
+  def watermarkFromStrategy(strategy: WatermarkStrategy): Rowtime = {
+watermarkStrategy = Some(strategy)
+this

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163005744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
--- End diff --

Should we add a method that defines the schema with a JSON Schema string? 
We would need a parser, but have immediate support for nested schema. 

Alternatively, we could use the nested schema parser of `TypeStringUtils` 
but this would not be JSON Schema.




---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163067483
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.io.Serializable
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import 
org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo
+import org.apache.flink.util.InstantiationUtil
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, 
PackratParsers}
+
+/**
+  * Utilities to convert 
[[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
+  * string representation and back.
+  */
+object TypeStringUtils extends JavaTokenParsers with PackratParsers {
--- End diff --

Some examples about the supported syntax would be good. 
Would also be good to add these examples to the method docs that accept 
type strings.


---


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334926#comment-16334926
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162964487
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -156,7 +195,10 @@ case class NoMatchedTableSourceConverterException(
   *
   * @param tableType table type
   * @param cause the cause
+  * @deprecated Use table source factories instead.
   */
+@Deprecated
+@deprecated("Use table source factories instead.")
--- End diff --

Give a class name.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334935#comment-16334935
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162994182
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.connector
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(private val tpe: String) extends 
Descriptor {
--- End diff --

Should a `ConnectorDescriptor` know whether it requires an encoding? For 
example a file descriptor needs an encoding but a JDBC connector doesn't.

This property would then be used to validate the configuration


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16334929#comment-16334929
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162947812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -107,6 +111,16 @@ abstract class BatchTableEnvironment(
 }
   }
 
+  /**
+* Creates a table from a descriptor that describes the resulting table 
schema, the source
+* connector, source encoding, and other properties.
+*
+* @param schema schema descriptor describing the table to create
+*/
+  def createTable(schema: Schema): BatchTableSourceDescriptor = {
--- End diff --

I'm not sure about the approach of returning a `TableSourceDescriptor`. I 
think it would be better if the table creation and registration would be 
completed within this method, i.e., the table should be completely defined by 
the argument of the method.

For example

```
tEnv.registerTableSource(
  "MyTable",
  TableSource.create(tEnv)
.withSchema(
  Schema()
.field(...)
.field(...))
   .withConnector()
 ...
   .toTableSource()
  )
```

In this design, we would reuse existing `registerTableSource` method and 
`TableSource.create` is a static method that returns a `TableSourceDescriptor`. 
Not sure if this is the best approach, but I like that the table is completely 
defined within the method call.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163065450
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
+  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
+  * describe the desired table source. The factory allows for matching to 
the given set of
+  * properties and creating a configured [[TableSource]] accordingly.
+  *
+  * Classes that implement this interface need to be added to the
+  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
--- End diff --

do all need to be added to the same file? Or can we have separate files for 
different modules. For instance, a `Kafka011JsonTableFactory` would be in the 
Kafka connectors module. Would a user have to change the service file if the 
Kafka factory should be used or can we built it in a way that it is sufficient 
include the Kafka connectors JAR?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163011304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A class that adds a set of string-based, normalized properties for 
describing a
+  * table source or table sink.
+  */
+abstract class Descriptor {
--- End diff --

Should we add a validation method that checks if the descriptor is valid?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163071426
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.io.Serializable
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import 
org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo
+import org.apache.flink.util.InstantiationUtil
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, 
PackratParsers}
+
+/**
+  * Utilities to convert 
[[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
+  * string representation and back.
+  */
+object TypeStringUtils extends JavaTokenParsers with PackratParsers {
--- End diff --

We need unit tests for the parser.


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163012882
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{BatchTableSource, TableSource, 
TableSourceFactoryService}
+
+class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, schema: 
Schema)
--- End diff --

Add `RowtimeDescriptor`. Batch table sources support timestamp extraction 
as well.


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163061338
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+
+/**
+  * Describes a schema of a table.
+  */
+class Schema extends Descriptor {
+
+  private val tableSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+
+  /**
+* Sets the schema with field names and the types. Required.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): Schema = {
--- End diff --

add a method `def schema(schema: String): Schema` that parses the schema 
string?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163071117
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/DescriptorsTest.scala
 ---
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class DescriptorsTest extends TableTestBase {
--- End diff --

I would move the tests to a separate class per descriptor. 
If we add a `validate` method to `Descriptor` this needs to be tested as 
well.


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162947784
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -125,6 +129,16 @@ abstract class StreamTableEnvironment(
 }
   }
 
+  /**
+* Creates a table from a descriptor that describes the resulting table 
schema, the source
+* connector, the source encoding, and other properties.
+*
+* @param schema schema descriptor describing the table to create
+*/
+  def createTable(schema: Schema): StreamTableSourceDescriptor = {
--- End diff --

See comment on `BatchTableEnvironment`


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r163002001
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class JSON extends EncodingDescriptor("json") {
+
+  private val encodingSchema: mutable.LinkedHashMap[String, String] =
+  mutable.LinkedHashMap[String, String]()
+  private var fieldMapping: Option[util.Map[String, String]] = None
+  private var failOnMissingField: Option[Boolean] = None
+
+  /**
+* Sets the JSON schema with field names and the types for the 
JSON-encoded input.
+* The JSON schema must not contain nested fields.
+*
+* This method overwrites existing fields added with [[field()]].
+*
+* @param schema the table schema
+*/
+  def schema(schema: TableSchema): JSON = {
+this.encodingSchema.clear()
+NormalizedProperties.normalizeTableSchema(schema).foreach {
+  case (n, t) => field(n, t)
+}
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type information for 
the JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type information of the field
+*/
+  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
+field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
+this
+  }
+
+  /**
+* Adds a JSON field with the field name and the type string for the 
JSON-encoding.
+* This method can be called multiple times. The call order of this 
method defines
+* also the order of the fields in the JSON-encoding.
+*
+* @param fieldName the field name
+* @param fieldType the type string of the field
+*/
+  def field(fieldName: String, fieldType: String): JSON = {
+if (encodingSchema.contains(fieldName)) {
+  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+}
+encodingSchema += (fieldName -> fieldType)
+this
+  }
+
+  /**
+* Sets a mapping from schema fields to fields of the JSON schema.
+*
+* A field mapping is required if the fields of produced tables should 
be named different than
+* the fields of the JSON records.
+* The key of the provided Map refers to the field of the table schema,
+* the value to the field in the JSON schema.
+*
+* @param tableToJsonMapping A mapping from table schema fields to JSON 
schema fields.
+* @return The builder.
+*/
+  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): 
JSON = {
--- End diff --

We might want to make field mappings independent of the encoding. For 
example field mappings could also be used for JDBC connectors which do not have 
an encoding.


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162996633
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+/**
+  * Utilities for working with a 
[[org.apache.flink.table.descriptors.Descriptor]].
+  */
+object DescriptorUtils {
+
+  def hasConnector(properties: util.Map[String, String], connector: 
String): Boolean = {
+val tpe = properties.get("connector.type")
+tpe != null || tpe == connector
+  }
+
+  def hasEncoding(properties: util.Map[String, String], encoding: String): 
Boolean = {
+val tpe = properties.get("encoding.type")
+tpe != null || tpe == encoding
--- End diff --

should be  `tpe != null && tpe == encoding`?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r16231
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorUtils.statistics
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing a table source.
+  */
+abstract class TableSourceDescriptor extends Descriptor {
+
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var encodingDescriptor: Option[EncodingDescriptor] = None
+  protected var proctimeDescriptor: Option[Proctime] = None
+  protected var rowtimeDescriptor: Option[Rowtime] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
--- End diff --

We might need another descriptor for mapping fields of the encoding (or 
connector) to fields in the table schema. This can be used to rename or select 
fields from the encoding to the table schema. This would be the configuration 
for the `DefinedFieldMapping` interface.


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162996455
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+/**
+  * Utilities for working with a 
[[org.apache.flink.table.descriptors.Descriptor]].
+  */
+object DescriptorUtils {
+
+  def hasConnector(properties: util.Map[String, String], connector: 
String): Boolean = {
+val tpe = properties.get("connector.type")
+tpe != null || tpe == connector
--- End diff --

should be `tpe != null && tpe == connector`?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r162990874
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -23,22 +23,74 @@ import java.net.URL
 import org.apache.commons.configuration.{ConfigurationException, 
ConversionException, PropertiesConfiguration}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, 
StreamTableSource, TableSource, TableSourceFactoryService}
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
   * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
   */
 object ExternalTableSourceUtil extends Logging {
 
+  /**
+* Converts an [[ExternalCatalogTable]] instance to a 
[[TableSourceTable]] instance
+*
+* @param externalCatalogTable the [[ExternalCatalogTable]] instance 
which to convert
+* @return converted [[TableSourceTable]] instance from the input 
catalog table
+*/
+  def fromExternalCatalogTable(
+  tableEnv: TableEnvironment,
+  externalCatalogTable: ExternalCatalogTable)
+: TableSourceTable[_] = {
+
+// check for the legacy external catalog path
+if (externalCatalogTable.isLegacyTableType) {
+  LOG.warn("External catalog tables based on TableType annotations are 
deprecated. " +
+"Please consider updating them to TableSourceFactories.")
+  fromExternalCatalogTableType(externalCatalogTable)
+}
+// use the factory approach
+else {
+  val source = 
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+  tableEnv match {
+// check for a batch table source in this batch environment
+case _: BatchTableEnvironment =>
+  source match {
+case bts: BatchTableSource[_] =>
+  new BatchTableSourceTable(
+bts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a batch environment.")
+  }
+// check for a stream table source in this streaming environment
+case _: StreamTableEnvironment =>
+  source match {
+case sts: StreamTableSource[_] =>
+  new StreamTableSourceTable(
+sts,
+new FlinkStatistic(externalCatalogTable.getTableStats))
+case _ => throw new TableException(
+  s"Found table source '${source.getClass.getCanonicalName}' 
is not applicable " +
+s"in a streaming environment.")
+  }
+case _ => throw new TableException("Unsupported table 
environment.")
+  }
+}
+  }
+
+  // 
--
+  // NOTE: the following line can be removed once we drop support for 
TableType
--- End diff --

I think we can also remove the `org.reflections:reflections` dependency 
once we removed this.


---


  1   2   3   >