[jira] [Commented] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2018-06-26 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524649#comment-16524649
 ] 

Gary Yao commented on FLINK-4534:
-

Is this really needed? Flink callbacks are generally not invoked concurrently.

> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry> entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9649) TaskManagers are not scheduled on Mesos

2018-06-26 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524647#comment-16524647
 ] 

Gary Yao commented on FLINK-9649:
-

[~lishim] In FLIP-6 mode, resources are allocated dynamically: 
[https://cwiki-test.apache.org/confluence/pages/viewpage.action?pageId=65147077]
 

In 1.5 we made flip6 the default mode.

> TaskManagers are not scheduled on Mesos
> ---
>
> Key: FLINK-9649
> URL: https://issues.apache.org/jira/browse/FLINK-9649
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Leonid Ishimnikov
>Priority: Major
>
> Flink correctly registers as a framework, but does not schedule task managers.
> Command:
> {noformat}
> ./bin/mesos-appmaster.sh -Dmesos.master="zk://192.168.0.101:2181/mesos" 
> -Djobmanager.heap.mb=1024 -Djobmanager.rpc.address=$(hostname -i) 
> -Djobmanager.rpc.port=6123 -Djobmanager.web.address=$(hostname -i) 
> -Djobmanager.web.port=8080 -Dmesos.initial-tasks=2 
> -Dmesos.resourcemanager.tasks.mem=4096 -Dtaskmanager.heap.mb=3500 
> -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=10 
> -Dmesos.resourcemanager.tasks.cpus=1 
> -Dmesos.resourcemanager.framework.principal=someuser 
> -Dmesos.resourcemanager.framework.secret=somepassword 
> -Dmesos.resourcemanager.framework.name="Flink-Test"{noformat}
> Log:
> {noformat}
> 2018-06-22 17:39:27,082 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - TaskManagers 
> will be created with 2 task slots
> 2018-06-22 17:39:27,082 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - TaskManagers 
> will be started with container size 4096 MB, JVM heap size 2765 MB, JVM 
> direct memory limit 1331 MB, 1.0 cpus, 0 gpus
> ...
> 2018-06-22 17:39:27,304 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting 
> the SlotManager.
> 2018-06-22 17:39:27,305 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> Registering as new framework.
> 2018-06-22 17:39:27,305 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> 
> 2018-06-22 17:39:27,305 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  -  
> Mesos Info:
> 2018-06-22 17:39:27,305 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> Master URL: zk://192.168.0.101:2181/mesos
> 2018-06-22 17:39:27,305 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  -  
> Framework Info:
> 2018-06-22 17:39:27,305 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> ID: (none)
> 2018-06-22 17:39:27,305 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> Name: Flink-Test
> 2018-06-22 17:39:27,305 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> Failover Timeout (secs): 10.0
> 2018-06-22 17:39:27,305 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> Role: *
> 2018-06-22 17:39:27,306 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> Capabilities: (none)
> 2018-06-22 17:39:27,306 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> Principal: someuser
> 2018-06-22 17:39:27,306 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> Host: 192.168.0.100
> 2018-06-22 17:39:27,306 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> Web UI: (none)
> 2018-06-22 17:39:27,306 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - 
> 
> 2018-06-22 17:39:27,432 INFO  
> org.apache.flink.mesos.scheduler.ConnectionMonitor    - Connecting to 
> Mesos...
> 2018-06-22 17:39:27,434 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  - Mesos 
> resource manager initialized.
> 2018-06-22 17:39:27,444 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher 
> akka.tcp://flink@192.168.0.100:6123/user/dispatcher was granted leadership 
> with fencing token 
> 2018-06-22 17:39:27,444 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Recovering 
> all persisted jobs.
> 2018-06-22 17:39:27,466 INFO  
> org.apache.flink.mesos.scheduler.ConnectionMonitor    - Connected to 
> Mesos as framework ID 7295a8f7-c0a9-41d1-a737-ae71c57b72bf-1141.{noformat}
> There is nothing further in the log after that.



--
This message was sent by Atlassian JIRA
(v

[jira] [Commented] (FLINK-9565) Evaluating scalar UDFs in parallel

2018-06-26 Thread yinhua.dai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524641#comment-16524641
 ] 

yinhua.dai commented on FLINK-9565:
---

[~fhueske] [~twalthr]

So there are two options:

#1. using annotation like @Expensive

 
{code:java}
@Expensive
public class MyScalarFunc extends ScalarFunction {
public Double eval(Double d) {
// expensive logic
}
}{code}
#2. using additional method such as evalParallel, then sample code would be
{code:java}
public class MyScalarFunc extends ScalarFunction {
public Double evalParallel(Double d) {
// expensive logic
}
}
{code}
I personally prefer to option #1, I feel like it makes the code generation 
easier so we always call "eval" method, and we simply check if the scalar 
function is wrapped with @Expensive annotation, in which case we wrapped the 
generated code within a future.

> Evaluating scalar UDFs in parallel
> --
>
> Key: FLINK-9565
> URL: https://issues.apache.org/jira/browse/FLINK-9565
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.2
>Reporter: yinhua.dai
>Priority: Major
>
> As per 
> [https://stackoverflow.com/questions/50790023/does-flink-sql-support-to-run-projections-in-parallel,]
>  scalar UDF in the same SQL is always evaluated sequentially even when those 
> UDF are irrelevant, it may increase latency when the UDF is time consuming 
> function.
> It would be great if Flink SQL can support to run those UDF in parallel to 
> reduce calculation latency.
>  
> cc [~fhueske]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6962) SQL DDL for input and output tables

2018-06-26 Thread lincoln.lee (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524606#comment-16524606
 ] 

lincoln.lee commented on FLINK-6962:


Hi, [~shenyufeng],  We are busy with other things recently and have no time to 
handle this issue. 
Would be great to see your proposal, plz free feel to take over this.

> SQL DDL for input and output tables
> ---
>
> Key: FLINK-6962
> URL: https://issues.apache.org/jira/browse/FLINK-6962
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: lincoln.lee
>Priority: Major
>
> This Jira adds support to allow user define the DDL for source and sink 
> tables, including the waterMark(on source table) and emit SLA (on result 
> table). The detailed design doc will be attached soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-26 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9619:
--
Issue Type: Improvement  (was: Bug)

> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.6.0, 1.5.1
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-26 Thread Sihua Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9619:
--
Priority: Major  (was: Critical)

> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.6.0, 1.5.1
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-06-26 Thread Indrajit Roychoudhury (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524470#comment-16524470
 ] 

Indrajit Roychoudhury commented on FLINK-9061:
--

initial set of changes based out of 1.4

https://github.com/indrc/flink/commit/96def47c57ae58f9717ab35d41e1a4bf6d152851

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524462#comment-16524462
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Is this  look ok now? ping @sihuazhou @dawidwys 


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-26 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Is this  look ok now? ping @sihuazhou @dawidwys 


---


[jira] [Commented] (FLINK-9662) Task manager isolation for jobs

2018-06-26 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524278#comment-16524278
 ] 

Elias Levy commented on FLINK-9662:
---

I took a look at the design doc, but I am not sufficiently familiar with the 
recent resource manager changes to comment on the details.

> Task manager isolation for jobs
> ---
>
> Key: FLINK-9662
> URL: https://issues.apache.org/jira/browse/FLINK-9662
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
> Fix For: 1.5.1
>
>
> Disable task manager sharing for different jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9664) FlinkML Quickstart Loading Data section example doesn't work as described

2018-06-26 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong reassigned FLINK-9664:


Assignee: Rong Rong

> FlinkML Quickstart Loading Data section example doesn't work as described
> -
>
> Key: FLINK-9664
> URL: https://issues.apache.org/jira/browse/FLINK-9664
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.5.0
>Reporter: Mano Swerts
>Assignee: Rong Rong
>Priority: Major
>  Labels: documentation-update, machine_learning, ml
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The ML documentation example isn't complete: 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/libs/ml/quickstart.html#loading-data]
> The referred section loads data from an astroparticle binary classification 
> dataset to showcase SVM. The dataset uses 0 and 1 as labels, which doesn't 
> produce correct results. The SVM predictor expects -1 and 1 labels to 
> correctly predict the label. The documentation, however, doesn't mention 
> that. The example therefore doesn't work without a clue why.
> The documentation should be updated with an explicit mention to -1 and 1 
> labels and a mapping function that shows the conversion of the labels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9343) Add Async Example with External Rest API call

2018-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9343:
--
Labels: pull-request-available  (was: )

> Add Async Example with External Rest API call
> -
>
> Key: FLINK-9343
> URL: https://issues.apache.org/jira/browse/FLINK-9343
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 1.4.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Minor
>  Labels: pull-request-available
>
> Async I/O is a good way to call External resources such as REST API and 
> enrich the stream with external data.
> Adding example to simulate Async GET api call on an input stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9343) Add Async Example with External Rest API call

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524024#comment-16524024
 ] 

ASF GitHub Bot commented on FLINK-9343:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/5996
  
@StephanEwen I would appreciate it if you do the review after my changes!


> Add Async Example with External Rest API call
> -
>
> Key: FLINK-9343
> URL: https://issues.apache.org/jira/browse/FLINK-9343
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 1.4.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Minor
>  Labels: pull-request-available
>
> Async I/O is a good way to call External resources such as REST API and 
> enrich the stream with external data.
> Adding example to simulate Async GET api call on an input stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5996: [FLINK-9343] [Example] Add Async Example with External Re...

2018-06-26 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/5996
  
@StephanEwen I would appreciate it if you do the review after my changes!


---


[jira] [Updated] (FLINK-8654) Extend quickstart docs on how to submit jobs

2018-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8654:
--
Labels: pull-request-available  (was: )

> Extend quickstart docs on how to submit jobs
> 
>
> Key: FLINK-8654
> URL: https://issues.apache.org/jira/browse/FLINK-8654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Quickstarts
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Yazdan Shirvany
>Priority: Major
>  Labels: pull-request-available
>
> The quickstart documentation explains how to setup the project, build the jar 
> and run things in the IDE, but neither explains how to submit the jar to a 
> cluster nor guides the user to where he could find this information (like the 
> CLI docs).
> Additionally, the quickstart poms should also contain the commands for 
> submitting the jar to a cluster, in particular how to select a main-class if 
> it wasn't set in the pom. (-c CLI flag)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8654) Extend quickstart docs on how to submit jobs

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524020#comment-16524020
 ] 

ASF GitHub Bot commented on FLINK-8654:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6084
  
@zentol PR has been updated! Please review.


> Extend quickstart docs on how to submit jobs
> 
>
> Key: FLINK-8654
> URL: https://issues.apache.org/jira/browse/FLINK-8654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Quickstarts
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Yazdan Shirvany
>Priority: Major
>  Labels: pull-request-available
>
> The quickstart documentation explains how to setup the project, build the jar 
> and run things in the IDE, but neither explains how to submit the jar to a 
> cluster nor guides the user to where he could find this information (like the 
> CLI docs).
> Additionally, the quickstart poms should also contain the commands for 
> submitting the jar to a cluster, in particular how to select a main-class if 
> it wasn't set in the pom. (-c CLI flag)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6084: [FLINK-8654][Docs] Extend quickstart docs on how to submi...

2018-06-26 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6084
  
@zentol PR has been updated! Please review.


---


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524000#comment-16524000
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198146766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging {
 
 // check for supported properties
 plainProperties.foreach { k =>
-  if (!supportedProperties.contains(k)) {
+  if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && 
!supportedProperties.contains(k)) {
 throw new ValidationException(
   s"Table factory '${factory.getClass.getCanonicalName}' does not 
support the " +
-  s"property '$k'. Supported properties are: \n" +
-  
s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
+s"property '$k'. Supported properties are: \n" +
+
s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
   }
 }
 
-// create the table source
+// create the table connector
 try {
   factory.create(properties.asJava)
 } catch {
   case t: Throwable =>
 throw new TableException(
-  s"Table source factory '${factory.getClass.getCanonicalName}' 
caused an exception.",
+  s"Table connector factory '${factory.getClass.getCanonicalName}' 
caused an exception.",
--- End diff --

There are more exception messages in this class that need an update.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524009#comment-16524009
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198215478
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.sql.Timestamp
+import java.util
+import java.util.Collections
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sinks.{AppendStreamTableSink, 
BatchTableSink, TableSinkBase}
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
+import org.apache.flink.table.sources._
+import org.apache.flink.table.util.TableConnectorUtil
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object MemoryTableSourceSinkUtil {
+  var tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]()
+
+  def clear = {
+MemoryTableSourceSinkUtil.tableData.clear()
+  }
+
+  class UnsafeMemoryTableSource(tableSchema: TableSchema,
+returnType: TypeInformation[Row],
--- End diff --

We usually intend differently. Take 
`org.apache.flink.table.codegen.CodeGenerator` as an example.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523998#comment-16523998
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198136187
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
+*/
+  def tableType() : String
--- End diff --

Rename to `getType()`?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524016#comment-16524016
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198218238
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
--- End diff --

Wrap it into a try-catch similar to 
org.apache.flink.table.client.gateway.local.LocalExecutor#createTable.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523996#comment-16523996
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135204
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
--- End diff --

Use plural `connectors`


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523997#comment-16523997
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137127
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
 
-  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableSourceFactory[_]])
+  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableConnectorFactory[_]])
 
-  def findAndCreateTableSource(descriptor: TableSourceDescriptor): 
TableSource[_] = {
-findAndCreateTableSource(descriptor, null)
+  def findAndCreateTableConnector(descriptor: TableDescriptor): T = {
+findAndCreateTableConnector(descriptor, null)
   }
 
-  def findAndCreateTableSource(
-  descriptor: TableSourceDescriptor,
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(descriptor: TableDescriptor, 
classLoader: ClassLoader)
+  : T = {
 
 val properties = new DescriptorProperties()
 descriptor.addProperties(properties)
-findAndCreateTableSource(properties.asMap.asScala.toMap, classLoader)
+findAndCreateTableConnector(properties.asMap.asScala.toMap, 
classLoader)
   }
 
-  def findAndCreateTableSource(properties: Map[String, String]): 
TableSource[_] = {
-findAndCreateTableSource(properties, null)
+  def findAndCreateTableConnector(properties: Map[String, String]): T = {
+findAndCreateTableConnector(properties, null)
   }
 
-  def findAndCreateTableSource(
-  properties: Map[String, String],
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(properties: Map[String, String],
--- End diff --

This can fit into one line.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory

[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524013#comment-16524013
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198154676
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
@@ -39,9 +39,12 @@ abstract class PhysicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
-  case _: StreamTableSourceTable[_] => true
-  case _: BatchTableSourceTable[_] => false
+val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
+  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
+case _: StreamTableSourceTable[_] => true
--- End diff --

The pattern are incorrect since the method returns an option.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524012#comment-16524012
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198147548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
 ---
@@ -27,6 +28,8 @@ class FileSystem extends ConnectorDescriptor(
 CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) {
 
   private var path: Option[String] = None
+  private var numFiles: Option[Int] = None
--- End diff --

Please remove these unrelated changes and open a separate issue for them.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524002#comment-16524002
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198148852
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Please add a comment to this method as it will be public in a Java API 
(similar to TableSourceDescriptor).


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524008#comment-16524008
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198151603
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Do we need a `TableSinkDescriptor`? Can't we unify `TableSinkDescriptor` 
and `TableSourceDescriptor` to `TableDescriptor`?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524015#comment-16524015
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198223665
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
+   // create job graph with dependencies
+   final String jobName = context.getSessionContext().getName() + 
": " + query;
+   final JobGraph jobGraph = envInst.createJobGraph(jobName);
+
+   // create execution
+   new Thread(new ProgramDeployer<>(context, jobName, jobGraph, 
null)).start();
--- End diff --

I think even a detached job needs to return a result. Otherwise you cannot 
be sure if the job has been submitted or not. E.g., the cluster might not be 
reachable. In any case, every created thread should be managed by the result 
store. So we should have a similar architecture as for queries. Maybe instead 
of `CollectStreamResult` a `StatusResult`. Maybe we should do the SQL Client 
changes in a separate PR?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524011#comment-16524011
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198215161
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -712,7 +712,48 @@ class SqlITCase extends StreamingWithStateTestBase {
   "1,1,Hi,1970-01-01 00:00:00.001",
   "2,2,Hello,1970-01-01 00:00:00.002",
   "3,2,Hello world,1970-01-01 00:00:00.002")
-assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+assertEquals(expected.sorted, 
MemoryTableSourceSinkUtil.tableData.map(_.toString).sorted)
+  }
+
+  @Test
+  def testWriteReadTableSourceSink(): Unit = {
+var env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+var tEnv = TableEnvironment.getTableEnvironment(env)
+MemoryTableSourceSinkUtil.clear
+
+val t = StreamTestData.getSmall3TupleDataStream(env)
+  .assignAscendingTimestamps(x => x._2)
+  .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+tEnv.registerTable("sourceTable", t)
+
+val fieldNames = Array("a", "e", "f", "t")
+val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, 
Types.SQL_TIMESTAMP)
+  .asInstanceOf[Array[TypeInformation[_]]]
+
+val tableSchema = new TableSchema(
+  Array("a", "e", "f", "t", "rowtime", "proctime"),
+  Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP,
+Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
+val returnType = new RowTypeInfo(
+  Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+.asInstanceOf[Array[TypeInformation[_]]],
+  Array("a", "e", "f", "t"))
+tEnv.registerTableSource("targetTable", new 
MemoryTableSourceSinkUtil.UnsafeMemoryTableSource(
+  tableSchema, returnType, "rowtime", 3))
+tEnv.registerTableSink("targetTable",
+  new 
MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames, 
fieldTypes))
+
+tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM 
sourceTable")
+tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable")
--- End diff --

I think we need more test cases about how we handle the time attributes for 
`both` table types. Maybe not only ITCases but also unit tests. The `configure` 
method is an internal method that should not be called here.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524005#comment-16524005
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
 
-  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableSourceFactory[_]])
+  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableConnectorFactory[_]])
 
-  def findAndCreateTableSource(descriptor: TableSourceDescriptor): 
TableSource[_] = {
-findAndCreateTableSource(descriptor, null)
+  def findAndCreateTableConnector(descriptor: TableDescriptor): T = {
+findAndCreateTableConnector(descriptor, null)
   }
 
-  def findAndCreateTableSource(
-  descriptor: TableSourceDescriptor,
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(descriptor: TableDescriptor, 
classLoader: ClassLoader)
--- End diff --

This can fit into one line.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523999#comment-16523999
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198146395
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging {
 
 // check for supported properties
 plainProperties.foreach { k =>
-  if (!supportedProperties.contains(k)) {
+  if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && 
!supportedProperties.contains(k)) {
--- End diff --

The table type must not be part of the supported properties as it is 
defined in a separate method in the `TableConnectorFactory` interface.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524003#comment-16524003
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
--- End diff --

remove comma


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524007#comment-16524007
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198207327
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connector/TableSinkFactoryServiceTest.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector
+
+import org.apache.flink.table.api.{NoMatchingTableConnectorException, 
TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.TableDescriptorValidator
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSinkFactoryServiceTest {
+  @Test
+  def testValidProperties(): Unit = {
+val props = properties()
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[NoMatchingTableConnectorException])
+  def testInvalidContext(): Unit = {
+val props = properties()
+props.put(CONNECTOR_TYPE, "FAIL")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+val props = properties()
+props.put(CONNECTOR_PROPERTY_VERSION, "2")
+// the table source should still be found
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedProperty(): Unit = {
+val props = properties()
+props.put("format.path_new", "/new/path")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFailingFactory(): Unit = {
+val props = properties()
+props.put("failing", "true")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  private def properties(): mutable.Map[String, String] = {
+val properties = mutable.Map[String, String]()
+properties.put(TableDescriptorValidator.TABLE_TYPE,
+  TableDescriptorValidator.TABLE_TYPE_VALUE_SINK)
+properties.put(CONNECTOR_TYPE, "test")
--- End diff --

I would use strings here for everything (not the variables). This allows 
tests to fail if we refactor one of the properties.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524010#comment-16524010
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198160808
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+
+class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: 
Option[TableSourceTable[T1]],
--- End diff --

Call this `ExternalTable` or `ConnectorTable`?

Maybe we can add more helper methods here like `hasSource()`, `getSource` 
in order to reduce the many pattern matching changes in this PR.

I'm also wondering if we need the separate classes `TableSinkTable` and 
`TableSourceTable` anymore. We could store `TableSink` and `TableSource` 
directly in `UnifiedTable` 


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524001#comment-16524001
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198134945
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
--- End diff --

Add the updated comment again.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524014#comment-16524014
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198219005
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
--- End diff --

We also need to ship the query config here.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524004#comment-16524004
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198142339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -98,10 +98,12 @@ object TableSourceFactoryService extends Logging {
 plainContext.remove(STATISTICS_PROPERTY_VERSION)
 
 // check if required context is met
-if (plainContext.forall(e => properties.contains(e._1) && 
properties(e._1) == e._2)) {
+if 
(properties.get(TableDescriptorValidator.TABLE_TYPE).get.equals(factory.tableType())
 &&
--- End diff --

Consider cases where the type has not been set. Btw 
`properties.get(TableDescriptorValidator.TABLE_TYPE).get` can be simplified to 
`properties(TableDescriptorValidator.TABLE_TYPE)`. It might be useful to enable 
more warnings in your IDE.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524006#comment-16524006
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137289
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

Make abstract?


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523995#comment-16523995
 ] 

ASF GitHub Bot commented on FLINK-8866:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135589
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

Actually, we could also simplify this and call it `TableFactory`. What do 
you think? We also call `CREATE TABLE` not `CREATE TABLE CONNECTOR` in SQL.


> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198215478
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.sql.Timestamp
+import java.util
+import java.util.Collections
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sinks.{AppendStreamTableSink, 
BatchTableSink, TableSinkBase}
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
+import org.apache.flink.table.sources._
+import org.apache.flink.table.util.TableConnectorUtil
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+object MemoryTableSourceSinkUtil {
+  var tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]()
+
+  def clear = {
+MemoryTableSourceSinkUtil.tableData.clear()
+  }
+
+  class UnsafeMemoryTableSource(tableSchema: TableSchema,
+returnType: TypeInformation[Row],
--- End diff --

We usually intend differently. Take 
`org.apache.flink.table.codegen.CodeGenerator` as an example.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135204
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
--- End diff --

Use plural `connectors`


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137127
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
 
-  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableSourceFactory[_]])
+  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableConnectorFactory[_]])
 
-  def findAndCreateTableSource(descriptor: TableSourceDescriptor): 
TableSource[_] = {
-findAndCreateTableSource(descriptor, null)
+  def findAndCreateTableConnector(descriptor: TableDescriptor): T = {
+findAndCreateTableConnector(descriptor, null)
   }
 
-  def findAndCreateTableSource(
-  descriptor: TableSourceDescriptor,
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(descriptor: TableDescriptor, 
classLoader: ClassLoader)
+  : T = {
 
 val properties = new DescriptorProperties()
 descriptor.addProperties(properties)
-findAndCreateTableSource(properties.asMap.asScala.toMap, classLoader)
+findAndCreateTableConnector(properties.asMap.asScala.toMap, 
classLoader)
   }
 
-  def findAndCreateTableSource(properties: Map[String, String]): 
TableSource[_] = {
-findAndCreateTableSource(properties, null)
+  def findAndCreateTableConnector(properties: Map[String, String]): T = {
+findAndCreateTableConnector(properties, null)
   }
 
-  def findAndCreateTableSource(
-  properties: Map[String, String],
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(properties: Map[String, String],
--- End diff --

This can fit into one line.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198160808
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+
+class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: 
Option[TableSourceTable[T1]],
--- End diff --

Call this `ExternalTable` or `ConnectorTable`?

Maybe we can add more helper methods here like `hasSource()`, `getSource` 
in order to reduce the many pattern matching changes in this PR.

I'm also wondering if we need the separate classes `TableSinkTable` and 
`TableSourceTable` anymore. We could store `TableSink` and `TableSource` 
directly in `UnifiedTable` 


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198146766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging {
 
 // check for supported properties
 plainProperties.foreach { k =>
-  if (!supportedProperties.contains(k)) {
+  if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && 
!supportedProperties.contains(k)) {
 throw new ValidationException(
   s"Table factory '${factory.getClass.getCanonicalName}' does not 
support the " +
-  s"property '$k'. Supported properties are: \n" +
-  
s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
+s"property '$k'. Supported properties are: \n" +
+
s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
   }
 }
 
-// create the table source
+// create the table connector
 try {
   factory.create(properties.asJava)
 } catch {
   case t: Throwable =>
 throw new TableException(
-  s"Table source factory '${factory.getClass.getCanonicalName}' 
caused an exception.",
+  s"Table connector factory '${factory.getClass.getCanonicalName}' 
caused an exception.",
--- End diff --

There are more exception messages in this class that need an update.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198223665
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
+   // create job graph with dependencies
+   final String jobName = context.getSessionContext().getName() + 
": " + query;
+   final JobGraph jobGraph = envInst.createJobGraph(jobName);
+
+   // create execution
+   new Thread(new ProgramDeployer<>(context, jobName, jobGraph, 
null)).start();
--- End diff --

I think even a detached job needs to return a result. Otherwise you cannot 
be sure if the job has been submitted or not. E.g., the cluster might not be 
reachable. In any case, every created thread should be managed by the result 
store. So we should have a similar architecture as for queries. Maybe instead 
of `CollectStreamResult` a `StatusResult`. Maybe we should do the SQL Client 
changes in a separate PR?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
--- End diff --

remove comma


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198146395
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -141,22 +143,26 @@ object TableSourceFactoryService extends Logging {
 
 // check for supported properties
 plainProperties.foreach { k =>
-  if (!supportedProperties.contains(k)) {
+  if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && 
!supportedProperties.contains(k)) {
--- End diff --

The table type must not be part of the supported properties as it is 
defined in a separate method in the `TableConnectorFactory` interface.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198151603
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Do we need a `TableSinkDescriptor`? Can't we unify `TableSinkDescriptor` 
and `TableSourceDescriptor` to `TableDescriptor`?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198148852
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

Please add a comment to this method as it will be public in a Java API 
(similar to TableSourceDescriptor).


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198147548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
 ---
@@ -27,6 +28,8 @@ class FileSystem extends ConnectorDescriptor(
 CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) {
 
   private var path: Option[String] = None
+  private var numFiles: Option[Int] = None
--- End diff --

Please remove these unrelated changes and open a separate issue for them.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198154676
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
@@ -39,9 +39,12 @@ abstract class PhysicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
-  case _: StreamTableSourceTable[_] => true
-  case _: BatchTableSourceTable[_] => false
+val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) 
match {
+  case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
+case _: StreamTableSourceTable[_] => true
--- End diff --

The pattern are incorrect since the method returns an option.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
 
-  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableSourceFactory[_]])
+  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableConnectorFactory[_]])
 
-  def findAndCreateTableSource(descriptor: TableSourceDescriptor): 
TableSource[_] = {
-findAndCreateTableSource(descriptor, null)
+  def findAndCreateTableConnector(descriptor: TableDescriptor): T = {
+findAndCreateTableConnector(descriptor, null)
   }
 
-  def findAndCreateTableSource(
-  descriptor: TableSourceDescriptor,
-  classLoader: ClassLoader)
-: TableSource[_] = {
+  def findAndCreateTableConnector(descriptor: TableDescriptor, 
classLoader: ClassLoader)
--- End diff --

This can fit into one line.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198137289
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

Make abstract?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198215161
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -712,7 +712,48 @@ class SqlITCase extends StreamingWithStateTestBase {
   "1,1,Hi,1970-01-01 00:00:00.001",
   "2,2,Hello,1970-01-01 00:00:00.002",
   "3,2,Hello world,1970-01-01 00:00:00.002")
-assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+assertEquals(expected.sorted, 
MemoryTableSourceSinkUtil.tableData.map(_.toString).sorted)
+  }
+
+  @Test
+  def testWriteReadTableSourceSink(): Unit = {
+var env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+var tEnv = TableEnvironment.getTableEnvironment(env)
+MemoryTableSourceSinkUtil.clear
+
+val t = StreamTestData.getSmall3TupleDataStream(env)
+  .assignAscendingTimestamps(x => x._2)
+  .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+tEnv.registerTable("sourceTable", t)
+
+val fieldNames = Array("a", "e", "f", "t")
+val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, 
Types.SQL_TIMESTAMP)
+  .asInstanceOf[Array[TypeInformation[_]]]
+
+val tableSchema = new TableSchema(
+  Array("a", "e", "f", "t", "rowtime", "proctime"),
+  Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP,
+Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
+val returnType = new RowTypeInfo(
+  Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+.asInstanceOf[Array[TypeInformation[_]]],
+  Array("a", "e", "f", "t"))
+tEnv.registerTableSource("targetTable", new 
MemoryTableSourceSinkUtil.UnsafeMemoryTableSource(
+  tableSchema, returnType, "rowtime", 3))
+tEnv.registerTableSink("targetTable",
+  new 
MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames, 
fieldTypes))
+
+tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM 
sourceTable")
+tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable")
--- End diff --

I think we need more test cases about how we handle the time attributes for 
`both` table types. Maybe not only ITCases but also unit tests. The `configure` 
method is an internal method that should not be called here.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198142339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -98,10 +98,12 @@ object TableSourceFactoryService extends Logging {
 plainContext.remove(STATISTICS_PROPERTY_VERSION)
 
 // check if required context is met
-if (plainContext.forall(e => properties.contains(e._1) && 
properties(e._1) == e._2)) {
+if 
(properties.get(TableDescriptorValidator.TABLE_TYPE).get.equals(factory.tableType())
 &&
--- End diff --

Consider cases where the type has not been set. Btw 
`properties.get(TableDescriptorValidator.TABLE_TYPE).get` can be simplified to 
`properties(TableDescriptorValidator.TABLE_TYPE)`. It might be useful to enable 
more warnings in your IDE.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198207327
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/connector/TableSinkFactoryServiceTest.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector
+
+import org.apache.flink.table.api.{NoMatchingTableConnectorException, 
TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.TableDescriptorValidator
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSinkFactoryServiceTest {
+  @Test
+  def testValidProperties(): Unit = {
+val props = properties()
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[NoMatchingTableConnectorException])
+  def testInvalidContext(): Unit = {
+val props = properties()
+props.put(CONNECTOR_TYPE, "FAIL")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+val props = properties()
+props.put(CONNECTOR_PROPERTY_VERSION, "2")
+// the table source should still be found
+
assertTrue(TableSinkFactoryService.findAndCreateTableConnector(props.toMap) != 
null)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedProperty(): Unit = {
+val props = properties()
+props.put("format.path_new", "/new/path")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFailingFactory(): Unit = {
+val props = properties()
+props.put("failing", "true")
+TableSinkFactoryService.findAndCreateTableConnector(props.toMap)
+  }
+
+  private def properties(): mutable.Map[String, String] = {
+val properties = mutable.Map[String, String]()
+properties.put(TableDescriptorValidator.TABLE_TYPE,
+  TableDescriptorValidator.TABLE_TYPE_VALUE_SINK)
+properties.put(CONNECTOR_TYPE, "test")
--- End diff --

I would use strings here for everything (not the variables). This allows 
tests to fail if we refactor one of the properties.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198134945
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
--- End diff --

Add the updated comment again.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198219005
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
--- End diff --

We also need to ship the query config here.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198218238
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -321,6 +327,18 @@ public void stop(SessionContext session) {
}
}
 
+   private  void executeUpdateInternal(ExecutionContext context, 
String query) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   envInst.getTableEnvironment().sqlUpdate(query);
--- End diff --

Wrap it into a try-catch similar to 
org.apache.flink.table.client.gateway.local.LocalExecutor#createTable.


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198136187
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
+  /**
+* Specify the type of the table connector, check
+* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for 
all values.
+*
+* @return the table connector type,.
+*/
+  def tableType() : String
--- End diff --

Rename to `getType()`?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-06-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r198135589
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

Actually, we could also simplify this and call it `TableFactory`. What do 
you think? We also call `CREATE TABLE` not `CREATE TABLE CONNECTOR` in SQL.


---


[jira] [Created] (FLINK-9665) PrometheusReporter does not properly unregister metrics

2018-06-26 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9665:
---

 Summary: PrometheusReporter does not properly unregister metrics
 Key: FLINK-9665
 URL: https://issues.apache.org/jira/browse/FLINK-9665
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.4.2, 1.5.0, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


The {{PrometheusReporter}} groups metrics with the same logical scope in a 
single {{Collector}} which are periodically polled by Prometheus.

New metrics are added to an existing collector, and a reference count is 
maintained so we can eventually cleanup the {{Collector}} itself.

For removed metrics we decrease the reference count, do not however remove the 
metrics that were added. As a result the collector will continue to expose 
metrics, as long as at least 1 metric exists with the same logical scope.

If the collector is a {{io.prometheus.client.Gauge}} we can use the 
{{#remove()}} method. For histograms we will have to modify our 
{{HistogramSummaryProxy}} class to allow removing individual histograms.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6962) SQL DDL for input and output tables

2018-06-26 Thread YuFeng Shen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523913#comment-16523913
 ] 

YuFeng Shen commented on FLINK-6962:


Any update on this functionality, I would like to do something similar.

> SQL DDL for input and output tables
> ---
>
> Key: FLINK-6962
> URL: https://issues.apache.org/jira/browse/FLINK-6962
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: lincoln.lee
>Priority: Major
>
> This Jira adds support to allow user define the DDL for source and sink 
> tables, including the waterMark(on source table) and emit SLA (on result 
> table). The detailed design doc will be attached soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523902#comment-16523902
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6203
  
It appears that the upload or large (64mb is what i tried) mixed multipart 
message currently fails, which caused the scala-shell failures. Currently 
investigating, but i can't reproduce it locally unfortunately...


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6203: [FLINK-9280][rest] Rework JobSubmitHandler to accept jar/...

2018-06-26 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6203
  
It appears that the upload or large (64mb is what i tried) mixed multipart 
message currently fails, which caused the scala-shell failures. Currently 
investigating, but i can't reproduce it locally unfortunately...


---


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523838#comment-16523838
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r198178787
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,42 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
+
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new RuntimeException("lol", e);
--- End diff --

needs a proper exception


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6203: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6203#discussion_r198178787
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -315,42 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be 
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
 
-   log.info("Requesting blob server port.");
-   CompletableFuture portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+   CompletableFuture submissionFuture = 
CompletableFuture.supplyAsync(
+   () -> {
+   log.info("Submitting job graph.");
+
+   List jarFileNames = new ArrayList<>(8);
+   List 
artifactFileNames = new ArrayList<>(8);
+   Collection filesToUpload = new 
ArrayList<>(8);
 
-   CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
-   getDispatcherAddress(),
-   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   final int blobServerPort = response.port;
-   final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
+   // TODO: need configurable location
+   final java.nio.file.Path jobGraphFile;
try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
+   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
+   try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
+   
objectOut.writeObject(jobGraph);
+   }
+   }
+   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+   } catch (IOException e) {
+   throw new RuntimeException("lol", e);
--- End diff --

needs a proper exception


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523794#comment-16523794
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
fixed the error that the access to state increased in `NFAStateAccessTest` 
by add the `isEmpty` judgment before update the state.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-26 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
fixed the error that the access to state increased in `NFAStateAccessTest` 
by add the `isEmpty` judgment before update the state.


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523771#comment-16523771
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Using the `entries#putAll` in `flushCache` lead to the count in 
`NFAStateAccessTest` increased,  I will check it locally , this travis will 
fail.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523770#comment-16523770
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r198156227
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema extends 
RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   private static final long serialVersionUID = -1671641202177852775L;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(Class 
recordClazz, @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url,
--- End diff --

End user is supposed to use only this or `forSpecific` method and no other 
one. Therefore it must be public.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Report

[GitHub] flink issue #6205: [FLINK-9642]Reduce the count to deal with state during a ...

2018-06-26 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Using the `entries#putAll` in `flushCache` lead to the count in 
`NFAStateAccessTest` increased,  I will check it locally , this travis will 
fail.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-06-26 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r198156227
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema extends 
RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   private static final long serialVersionUID = -1671641202177852775L;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(Class 
recordClazz, @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url,
--- End diff --

End user is supposed to use only this or `forSpecific` method and no other 
one. Therefore it must be public.


---


[jira] [Updated] (FLINK-9337) Implement AvroDeserializationSchema

2018-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9337:
--
Labels: pull-request-available  (was: )

> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523767#comment-16523767
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r198150153
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema extends 
RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   private static final long serialVersionUID = -1671641202177852775L;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(Class 
recordClazz, @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url,
--- End diff --

@dawidwys couldn't this be `private`?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>P

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523766#comment-16523766
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r198150284
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema extends 
RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   private static final long serialVersionUID = -1671641202177852775L;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(Class 
recordClazz, @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url,
+   int identityMapCapacity) {
+   return new ConfluentRegistryAvroDeserializationSchema<>(
+   GenericRecord.class,
+   schema,
+   new CachedSchemaCoderProvider(url, 
identityMapCapacity));
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were generated fro

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-06-26 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r198150284
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema extends 
RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   private static final long serialVersionUID = -1671641202177852775L;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(Class 
recordClazz, @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url,
+   int identityMapCapacity) {
+   return new ConfluentRegistryAvroDeserializationSchema<>(
+   GenericRecord.class,
+   schema,
+   new CachedSchemaCoderProvider(url, 
identityMapCapacity));
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from avro
+* schema and looks up writer schema in Confluent Schema Registry.
+*
+* @param tClass class of record to be produced
+* @param urlurl of schema registry to connect
+* @return deserialized record
+*

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-06-26 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r198150153
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema extends 
RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   private static final long serialVersionUID = -1671641202177852775L;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(Class 
recordClazz, @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url,
--- End diff --

@dawidwys couldn't this be `private`?


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523754#comment-16523754
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198151182
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

Get it.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198151182
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

Get it.


---


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523738#comment-16523738
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6083
  
@tillrohrmann 
here is my two cents:
By converting the events to `String` and sending them to Kafka there is a 
high risk to produce a `bad events` and we will lose the benifit of Schema 
Registry to avoid this malformed events sent to the topic. 

what you think about `AvroSerializationConfluentSchema` to Flink dist? If 
we move this serialization code in Flink dist we can drop `kafka-avro` 
dependencies from here.

but still if you think using `String` is ok I can update the PR and use 
`String` instead of `AvroSerializationConfluentSchema` 


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>  Labels: pull-request-available
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6083: [FLINK-8983] End-to-end test: Confluent schema registry

2018-06-26 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6083
  
@tillrohrmann 
here is my two cents:
By converting the events to `String` and sending them to Kafka there is a 
high risk to produce a `bad events` and we will lose the benifit of Schema 
Registry to avoid this malformed events sent to the topic. 

what you think about `AvroSerializationConfluentSchema` to Flink dist? If 
we move this serialization code in Flink dist we can drop `kafka-avro` 
dependencies from here.

but still if you think using `String` is ok I can update the PR and use 
`String` instead of `AvroSerializationConfluentSchema` 


---


[jira] [Commented] (FLINK-9634) Deactivate previous location based scheduling if local recovery is disabled

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523736#comment-16523736
 ] 

ASF GitHub Bot commented on FLINK-9634:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6208
  
I think this is a good fix for the moment. Only had minor comments inline. 
LGTM 👍 


> Deactivate previous location based scheduling if local recovery is disabled
> ---
>
> Key: FLINK-9634
> URL: https://issues.apache.org/jira/browse/FLINK-9634
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> With Flink 1.5.0 we introduced local recovery. In order to make local 
> recovery work we had to change the scheduling to be aware of the previous 
> location of the {{Execution}}. This scheduling strategy is also active if 
> local recovery is deactivated. I suggest to also disable the scheduling 
> strategy if local recovery is not enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523733#comment-16523733
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198146934
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

In fact, what I means is that if you want to throw an exception here, you 
could throw the exception as `throw new RuntimeException("exception message", 
originalException)`, this way the original exception won't be swallowed.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This mess

[GitHub] flink issue #6208: [FLINK-9634] Disable local recovery scheduling if local r...

2018-06-26 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6208
  
I think this is a good fix for the moment. Only had minor comments inline. 
LGTM 👍 


---


[jira] [Commented] (FLINK-9634) Deactivate previous location based scheduling if local recovery is disabled

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523731#comment-16523731
 ] 

ASF GitHub Bot commented on FLINK-9634:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198146744
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

Revert?


> Deactivate previous location based scheduling if local recovery is disabled
> ---
>
> Key: FLINK-9634
> URL: https://issues.apache.org/jira/browse/FLINK-9634
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> With Flink 1.5.0 we introduced local recovery. In order to make local 
> recovery work we had to change the scheduling to be aware of the previous 
> location of the {{Execution}}. This scheduling strategy is also active if 
> local recovery is deactivated. I suggest to also disable the scheduling 
> strategy if local recovery is not enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198146934
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

In fact, what I means is that if you want to throw an exception here, you 
could throw the exception as `throw new RuntimeException("exception message", 
originalException)`, this way the original exception won't be swallowed.


---


[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

2018-06-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198146744
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

Revert?


---


[jira] [Commented] (FLINK-9634) Deactivate previous location based scheduling if local recovery is disabled

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523728#comment-16523728
 ] 

ASF GitHub Bot commented on FLINK-9634:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198146426
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * Classes that implement this interface provide a method to match objects 
to somehow represent slot candidates
+ * against the {@link SlotProfile} that produced the matcher object. A 
matching candidate is transformed into a
+ * desired result. If the matcher does not find a matching candidate, it 
returns null.
+ */
+public interface SchedulingStrategy {
+
+   /**
+* This method takes the candidate slots, extracts slot contexts from 
them, filters them by the profile
+* requirements and potentially by additional requirements, and 
produces a result from a match.
+*
+* @param slotProfile slotProfile for which to find a matching slot
--- End diff --

Indentation looks a bit inconsistent.


> Deactivate previous location based scheduling if local recovery is disabled
> ---
>
> Key: FLINK-9634
> URL: https://issues.apache.org/jira/browse/FLINK-9634
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> With Flink 1.5.0 we introduced local recovery. In order to make local 
> recovery work we had to change the scheduling to be aware of the previous 
> location of the {{Execution}}. This scheduling strategy is also active if 
> local recovery is deactivated. I suggest to also disable the scheduling 
> strategy if local recovery is not enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

2018-06-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198146426
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * Classes that implement this interface provide a method to match objects 
to somehow represent slot candidates
+ * against the {@link SlotProfile} that produced the matcher object. A 
matching candidate is transformed into a
+ * desired result. If the matcher does not find a matching candidate, it 
returns null.
+ */
+public interface SchedulingStrategy {
+
+   /**
+* This method takes the candidate slots, extracts slot contexts from 
them, filters them by the profile
+* requirements and potentially by additional requirements, and 
produces a result from a match.
+*
+* @param slotProfile slotProfile for which to find a matching slot
--- End diff --

Indentation looks a bit inconsistent.


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523727#comment-16523727
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198146056
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

Yes.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198146056
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

Yes.


---


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523721#comment-16523721
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198143091
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

But I don't know how to deal with the exception in a stream api in java8, 
do you have a better way to deal with this situation? thanks.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198143091
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

But I don't know how to deal with the exception in a stream api in java8, 
do you have a better way to deal with this situation? thanks.


---


[jira] [Commented] (FLINK-9634) Deactivate previous location based scheduling if local recovery is disabled

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523719#comment-16523719
 ] 

ASF GitHub Bot commented on FLINK-9634:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198142541
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * {@link SchedulingStrategy} which tries to match a slot with its 
previous {@link AllocationID}.
+ * If the previous allocation cannot be found, then it returns {@code 
null}. If the slot has not
+ * been scheduled before (no assigned allocation id), it will fall back to
+ * {@link LocationPreferenceSchedulingStrategy}.
+ */
+public class PreviousAllocationSchedulingStrategy extends 
LocationPreferenceSchedulingStrategy {
+
+   private static final PreviousAllocationSchedulingStrategy INSTANCE = 
new PreviousAllocationSchedulingStrategy();
+
+   PreviousAllocationSchedulingStrategy() {}
--- End diff --

can be `private`


> Deactivate previous location based scheduling if local recovery is disabled
> ---
>
> Key: FLINK-9634
> URL: https://issues.apache.org/jira/browse/FLINK-9634
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> With Flink 1.5.0 we introduced local recovery. In order to make local 
> recovery work we had to change the scheduling to be aware of the previous 
> location of the {{Execution}}. This scheduling strategy is also active if 
> local recovery is deactivated. I suggest to also disable the scheduling 
> strategy if local recovery is not enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9634) Deactivate previous location based scheduling if local recovery is disabled

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523717#comment-16523717
 ] 

ASF GitHub Bot commented on FLINK-9634:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198142470
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * Default {@link SchedulingStrategy} which tries to match a slot with its 
location preferences.
+ */
+public class LocationPreferenceSchedulingStrategy implements 
SchedulingStrategy {
+
+   private static final LocationPreferenceSchedulingStrategy INSTANCE = 
new LocationPreferenceSchedulingStrategy();
+
+   /**
+* Calculates the candidate's locality score.
+*/
+   private static final BiFunction 
LOCALITY_EVALUATION_FUNCTION = (localWeigh, hostLocalWeigh) -> localWeigh * 10 
+ hostLocalWeigh;
+
+   LocationPreferenceSchedulingStrategy() {}
--- End diff --

can be `private`.


> Deactivate previous location based scheduling if local recovery is disabled
> ---
>
> Key: FLINK-9634
> URL: https://issues.apache.org/jira/browse/FLINK-9634
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> With Flink 1.5.0 we introduced local recovery. In order to make local 
> recovery work we had to change the scheduling to be aware of the previous 
> location of the {{Execution}}. This scheduling strategy is also active if 
> local recovery is deactivated. I suggest to also disable the scheduling 
> strategy if local recovery is not enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523718#comment-16523718
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198142501
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

Ok, Is this benefit from the `RocksDBWriteBatchWrapper` when use the 
`putAll`?


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

2018-06-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198142541
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * {@link SchedulingStrategy} which tries to match a slot with its 
previous {@link AllocationID}.
+ * If the previous allocation cannot be found, then it returns {@code 
null}. If the slot has not
+ * been scheduled before (no assigned allocation id), it will fall back to
+ * {@link LocationPreferenceSchedulingStrategy}.
+ */
+public class PreviousAllocationSchedulingStrategy extends 
LocationPreferenceSchedulingStrategy {
+
+   private static final PreviousAllocationSchedulingStrategy INSTANCE = 
new PreviousAllocationSchedulingStrategy();
+
+   PreviousAllocationSchedulingStrategy() {}
--- End diff --

can be `private`


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198142501
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

Ok, Is this benefit from the `RocksDBWriteBatchWrapper` when use the 
`putAll`?


---


[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

2018-06-26 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198142470
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * Default {@link SchedulingStrategy} which tries to match a slot with its 
location preferences.
+ */
+public class LocationPreferenceSchedulingStrategy implements 
SchedulingStrategy {
+
+   private static final LocationPreferenceSchedulingStrategy INSTANCE = 
new LocationPreferenceSchedulingStrategy();
+
+   /**
+* Calculates the candidate's locality score.
+*/
+   private static final BiFunction 
LOCALITY_EVALUATION_FUNCTION = (localWeigh, hostLocalWeigh) -> localWeigh * 10 
+ hostLocalWeigh;
+
+   LocationPreferenceSchedulingStrategy() {}
--- End diff --

can be `private`.


---


  1   2   >