[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1524#discussion_r53014234
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---
@@ -176,6 +184,8 @@ abstract class ApplicationMasterBase {
 jobManagerPort, webServerPort, slots, taskManagerCount,
 dynamicPropertiesEncodedString)
 
+  //todo should I also set the FS default here
--- End diff --

No. I'll remove this TODO when merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148628#comment-15148628
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1633#issuecomment-184694713
  
Travis has passed. Did you have another look at this @tillrohrmann?


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1633#issuecomment-184694713
  
Travis has passed. Did you have another look at this @tillrohrmann?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3416) [py] .bat files fail when path contains spaces

2016-02-16 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-3416:
---

 Summary: [py] .bat files fail when path contains spaces
 Key: FLINK-3416
 URL: https://issues.apache.org/jira/browse/FLINK-3416
 Project: Flink
  Issue Type: Bug
  Components: Python API
Affects Versions: 1.0.0
Reporter: Chesnay Schepler
Priority: Minor
 Fix For: 1.0.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3115) Update Elasticsearch connector to 2.X

2016-02-16 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148622#comment-15148622
 ] 

Robert Metzger commented on FLINK-3115:
---

[~smarthi], can you give us an update on the progress with this issue?

> Update Elasticsearch connector to 2.X
> -
>
> Key: FLINK-3115
> URL: https://issues.apache.org/jira/browse/FLINK-3115
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 0.10.0, 1.0.0, 0.10.1
>Reporter: Maximilian Michels
>Assignee: Suneel Marthi
> Fix For: 1.0.0
>
>
> The Elasticsearch connector is not up to date anymore. In version 2.X the API 
> changed. The code needs to be adapted. Probably it makes sense to have a new 
> class {{ElasticsearchSink2}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3415) TimestampExctractor accepts negative watermarks

2016-02-16 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3415:
---

 Summary: TimestampExctractor accepts negative watermarks 
 Key: FLINK-3415
 URL: https://issues.apache.org/jira/browse/FLINK-3415
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Critical
 Fix For: 1.0.0


When the timestamp extractor returns a negative value for a watermark, it is 
accepted, as long as it is larger than the previous negative value, with the 
initial reference value being Long.MIN_VALUE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3304) AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode

2016-02-16 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-3304.
---
   Resolution: Fixed
Fix Version/s: 1.0.0

Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/c658763d

> AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode
> --
>
> Key: FLINK-3304
> URL: https://issues.apache.org/jira/browse/FLINK-3304
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.0, 0.10.1
>Reporter: Sebastian Klemke
>Assignee: Klou
> Fix For: 1.0.0
>
>
> Quoting flink cli (schema and names modified):
> "The program finished with the following exception:
> User-defined object org.apache.flink.api.java.io.AvroOutputFormat@5f253dfb 
> (org.apache.flink.api.java.io.AvroOutputFormat) contains non-serializable 
> field userDefinedSchema = 
> {"type":"record","name":"Pojo","namespace":"com.example","fields":[{"name":"id","type":["null","string"],"default":null,"subtype":"objectid"}],"EntityVersion":"0.1.0"}
> 
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper.(UserCodeObjectWrapper.java:84)
> 
> org.apache.flink.api.common.operators.GenericDataSinkBase.(GenericDataSinkBase.java:68)
> 
> org.apache.flink.api.java.operators.DataSink.translateToDataFlow(DataSink.java:258)
> 
> org.apache.flink.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:64)
> 
> org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49)
> 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:939)
> 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:907)
> 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:57)
> com.example.Tool.main(Tool.java:86)
> Shutting down YARN cluster"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3304: Making the Avro Schema serializabl...

2016-02-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1635


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3304) AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148613#comment-15148613
 ] 

ASF GitHub Bot commented on FLINK-3304:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1635


> AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode
> --
>
> Key: FLINK-3304
> URL: https://issues.apache.org/jira/browse/FLINK-3304
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.0, 0.10.1
>Reporter: Sebastian Klemke
>Assignee: Klou
>
> Quoting flink cli (schema and names modified):
> "The program finished with the following exception:
> User-defined object org.apache.flink.api.java.io.AvroOutputFormat@5f253dfb 
> (org.apache.flink.api.java.io.AvroOutputFormat) contains non-serializable 
> field userDefinedSchema = 
> {"type":"record","name":"Pojo","namespace":"com.example","fields":[{"name":"id","type":["null","string"],"default":null,"subtype":"objectid"}],"EntityVersion":"0.1.0"}
> 
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper.(UserCodeObjectWrapper.java:84)
> 
> org.apache.flink.api.common.operators.GenericDataSinkBase.(GenericDataSinkBase.java:68)
> 
> org.apache.flink.api.java.operators.DataSink.translateToDataFlow(DataSink.java:258)
> 
> org.apache.flink.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:64)
> 
> org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49)
> 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:939)
> 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:907)
> 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:57)
> com.example.Tool.main(Tool.java:86)
> Shutting down YARN cluster"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3304) AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148612#comment-15148612
 ] 

ASF GitHub Bot commented on FLINK-3304:
---

Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1635#issuecomment-184688675
  
Thanks a lot @rmetzger !


> AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode
> --
>
> Key: FLINK-3304
> URL: https://issues.apache.org/jira/browse/FLINK-3304
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.0, 0.10.1
>Reporter: Sebastian Klemke
>Assignee: Klou
>
> Quoting flink cli (schema and names modified):
> "The program finished with the following exception:
> User-defined object org.apache.flink.api.java.io.AvroOutputFormat@5f253dfb 
> (org.apache.flink.api.java.io.AvroOutputFormat) contains non-serializable 
> field userDefinedSchema = 
> {"type":"record","name":"Pojo","namespace":"com.example","fields":[{"name":"id","type":["null","string"],"default":null,"subtype":"objectid"}],"EntityVersion":"0.1.0"}
> 
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper.(UserCodeObjectWrapper.java:84)
> 
> org.apache.flink.api.common.operators.GenericDataSinkBase.(GenericDataSinkBase.java:68)
> 
> org.apache.flink.api.java.operators.DataSink.translateToDataFlow(DataSink.java:258)
> 
> org.apache.flink.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:64)
> 
> org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49)
> 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:939)
> 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:907)
> 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:57)
> com.example.Tool.main(Tool.java:86)
> Shutting down YARN cluster"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3304) AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148609#comment-15148609
 ] 

ASF GitHub Bot commented on FLINK-3304:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1635#issuecomment-184688215
  
Merging ...


> AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode
> --
>
> Key: FLINK-3304
> URL: https://issues.apache.org/jira/browse/FLINK-3304
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.0, 0.10.1
>Reporter: Sebastian Klemke
>Assignee: Klou
>
> Quoting flink cli (schema and names modified):
> "The program finished with the following exception:
> User-defined object org.apache.flink.api.java.io.AvroOutputFormat@5f253dfb 
> (org.apache.flink.api.java.io.AvroOutputFormat) contains non-serializable 
> field userDefinedSchema = 
> {"type":"record","name":"Pojo","namespace":"com.example","fields":[{"name":"id","type":["null","string"],"default":null,"subtype":"objectid"}],"EntityVersion":"0.1.0"}
> 
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper.(UserCodeObjectWrapper.java:84)
> 
> org.apache.flink.api.common.operators.GenericDataSinkBase.(GenericDataSinkBase.java:68)
> 
> org.apache.flink.api.java.operators.DataSink.translateToDataFlow(DataSink.java:258)
> 
> org.apache.flink.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:64)
> 
> org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49)
> 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:939)
> 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:907)
> 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:57)
> com.example.Tool.main(Tool.java:86)
> Shutting down YARN cluster"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3304: Making the Avro Schema serializabl...

2016-02-16 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1635#issuecomment-184688675
  
Thanks a lot @rmetzger !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3304: Making the Avro Schema serializabl...

2016-02-16 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1635#issuecomment-184688215
  
Merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3315] Fix Slot Sharing in Streaming API

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53010068
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,6 +203,29 @@ public String getUid() {
}
 
/**
+* Returns the slot sharing group of this transformation.
+*
+* @see #setSlotSharingGroup(String)
+*/
+   public String getSlotSharingGroup() {
+   return slotSharingGroup;
+   }
+
+   /**
+* Sets the slot sharing group of this transformation. Parallels 
instances of operations that
--- End diff --

Parallels typo (also in other variants of this method)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148596#comment-15148596
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1641#discussion_r53010068
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -202,6 +203,29 @@ public String getUid() {
}
 
/**
+* Returns the slot sharing group of this transformation.
+*
+* @see #setSlotSharingGroup(String)
+*/
+   public String getSlotSharingGroup() {
+   return slotSharingGroup;
+   }
+
+   /**
+* Sets the slot sharing group of this transformation. Parallels 
instances of operations that
--- End diff --

Parallels typo (also in other variants of this method)


> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3299) Remove ApplicationID from Environment

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148586#comment-15148586
 ] 

ASF GitHub Bot commented on FLINK-3299:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1642#issuecomment-184683583
  
In general the DbStateBackend should not misbehave under the current 
assumptions. What Ufuk means is that the DbStateBackend does not make any 
assumption about the job parallelism if using the App id (we could change the 
job parallelism). This will not hold now but it does not matter as we don't 
support this feature yet.


> Remove ApplicationID from Environment
> -
>
> Key: FLINK-3299
> URL: https://issues.apache.org/jira/browse/FLINK-3299
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> {{ApplicationID}} is used to identify an application across many job 
> submissions (for example after restoring from a savepoint). This is currently 
> exposed in the {{Environment}}, which might be unnecessary.
> State backends, which need the ID can generate it themselves and store it as 
> part of their state handle.
> This has to be checked with the DB state backend, which currently uses this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3299] Remove ApplicationID from Environ...

2016-02-16 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1642#issuecomment-184683583
  
In general the DbStateBackend should not misbehave under the current 
assumptions. What Ufuk means is that the DbStateBackend does not make any 
assumption about the job parallelism if using the App id (we could change the 
job parallelism). This will not hold now but it does not matter as we don't 
support this feature yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3299) Remove ApplicationID from Environment

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148560#comment-15148560
 ] 

ASF GitHub Bot commented on FLINK-3299:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1642#issuecomment-184677192
  
What would that corner case be?


> Remove ApplicationID from Environment
> -
>
> Key: FLINK-3299
> URL: https://issues.apache.org/jira/browse/FLINK-3299
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> {{ApplicationID}} is used to identify an application across many job 
> submissions (for example after restoring from a savepoint). This is currently 
> exposed in the {{Environment}}, which might be unnecessary.
> State backends, which need the ID can generate it themselves and store it as 
> part of their state handle.
> This has to be checked with the DB state backend, which currently uses this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3299] Remove ApplicationID from Environ...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1642#issuecomment-184677192
  
What would that corner case be?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3299) Remove ApplicationID from Environment

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148558#comment-15148558
 ] 

ASF GitHub Bot commented on FLINK-3299:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/1642

[FLINK-3299] Remove ApplicationID from Environment

As per discussion in the issue, we decided to remove the the 
`ApplicationID`.

Replaces the app ID in RocksDB and DB backend with job ID. I've talked to 
@gyfora about the DB backend and he mentioned a possible problem with 
savepoints in certain corner cases, but all in all the change should be OK.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 3299-app_id

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1642.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1642


commit f88e4d540b71f9a4e683b44a030ad6b267f132cb
Author: Ufuk Celebi 
Date:   2016-02-11T15:04:08Z

[FLINK-3299] Remove ApplicationID from Environment




> Remove ApplicationID from Environment
> -
>
> Key: FLINK-3299
> URL: https://issues.apache.org/jira/browse/FLINK-3299
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> {{ApplicationID}} is used to identify an application across many job 
> submissions (for example after restoring from a savepoint). This is currently 
> exposed in the {{Environment}}, which might be unnecessary.
> State backends, which need the ID can generate it themselves and store it as 
> part of their state handle.
> This has to be checked with the DB state backend, which currently uses this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3299] Remove ApplicationID from Environ...

2016-02-16 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/1642

[FLINK-3299] Remove ApplicationID from Environment

As per discussion in the issue, we decided to remove the the 
`ApplicationID`.

Replaces the app ID in RocksDB and DB backend with job ID. I've talked to 
@gyfora about the DB backend and he mentioned a possible problem with 
savepoints in certain corner cases, but all in all the change should be OK.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 3299-app_id

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1642.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1642


commit f88e4d540b71f9a4e683b44a030ad6b267f132cb
Author: Ufuk Celebi 
Date:   2016-02-11T15:04:08Z

[FLINK-3299] Remove ApplicationID from Environment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148548#comment-15148548
 ] 

ASF GitHub Bot commented on FLINK-3315:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/1641

[FLINK-3315] Fix Slot Sharing in Streaming API

This changes slot sharing settings to single method
slotSharingGroup(String) on DataStream.

Operations inherit the slot sharing group of the input if all input
operations are in the same slot sharing group.

The default slot sharing group is "default" this can also be explicitly
set using slotSharingGroup("default"). This overrides the inheriting
behaviour.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink slotsharing-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1641.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1641


commit f9bd1d25b26e639318dc737c3f7a1ce75df445fe
Author: Aljoscha Krettek 
Date:   2016-02-02T12:11:12Z

[FLINK-3315] Fix Slot Sharing in Streaming API

This changes slot sharing settings to single method
slotSharingGroup(String) on DataStream.

Operations inherit the slot sharing group of the input if all input
operations are in the same slot sharing group.

The default slot sharing group is "default" this can also be explicitly
set using slotSharingGroup("default"). This overrides the inheriting
behaviour.




> Fix Slot Sharing in Streaming API
> -
>
> Key: FLINK-3315
> URL: https://issues.apache.org/jira/browse/FLINK-3315
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the slot sharing/resource group logic is a bit "nebulous". The 
> slot sharing group that operators are put in depends on the order in which 
> operations are created. For example, in this case:
> {code}
> Source a = env.source()
> Source b = env.source()
> a.map().startNewResourceGroup().sink() 
> b.map().sink()
> {code}
> We end up with two resource groups:
> - group 1: source a
> - group 2: map(), sink(), source b, map(), sink()
> The reason is that the slot sharing id is incremented when transforming the 
> {{startNewResouceGroup()}} call and all operators that are transformed 
> afterwards in graph traversal get that new slot sharing id.
> (There is also {{isolateResources()}} which can be used to isolate an 
> operator.)
> What I propose is to remove {{startNewResourceGroup()}} and 
> {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By 
> default, operations would be in slot sharing group "default". This allows 
> very fine grained control over what operators end up in which slot sharing 
> group. For example, I could have this topology:
> {code}
> Source a = env.source().slotSharingGroup("sources")
> Source b = env.source().slotSharingGroup("sources")
> a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") 
> b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks")
> {code}
> Which would isolate the lightweight sources and sinks in a group and put 
> heavy operations inside their own slot groups.
> This is a bit more low level than the previous API and requires more calls 
> than a simple {{startNewResourceGroup()}} but I think not many people would 
> use this feature and this design makes it very clear what operations end up 
> in the same group.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3315] Fix Slot Sharing in Streaming API

2016-02-16 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/1641

[FLINK-3315] Fix Slot Sharing in Streaming API

This changes slot sharing settings to single method
slotSharingGroup(String) on DataStream.

Operations inherit the slot sharing group of the input if all input
operations are in the same slot sharing group.

The default slot sharing group is "default" this can also be explicitly
set using slotSharingGroup("default"). This overrides the inheriting
behaviour.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink slotsharing-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1641.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1641


commit f9bd1d25b26e639318dc737c3f7a1ce75df445fe
Author: Aljoscha Krettek 
Date:   2016-02-02T12:11:12Z

[FLINK-3315] Fix Slot Sharing in Streaming API

This changes slot sharing settings to single method
slotSharingGroup(String) on DataStream.

Operations inherit the slot sharing group of the input if all input
operations are in the same slot sharing group.

The default slot sharing group is "default" this can also be explicitly
set using slotSharingGroup("default"). This overrides the inheriting
behaviour.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148547#comment-15148547
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1634#issuecomment-184672115
  
Merged, please close @twalthr 


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3226] Casting support for arithmetic op...

2016-02-16 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1634#issuecomment-184672115
  
Merged, please close @twalthr 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148546#comment-15148546
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1639#issuecomment-184672051
  
Merged, please close @twalthr 


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3226] Translation of explicit casting

2016-02-16 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1639#issuecomment-184672051
  
Merged, please close @twalthr 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream

2016-02-16 Thread Stefano Baghino (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148498#comment-15148498
 ] 

Stefano Baghino commented on FLINK-3412:


I like the approach suggested by [~till.rohrmann], I've made a prototype here: 
[https://github.com/radicalbit/flink/commit/bff5870d5578de9d1aaffc648cacffac79da81a3]

> Remove implicit conversions JavaStream / ScalaStream
> 
>
> Key: FLINK-3412
> URL: https://issues.apache.org/jira/browse/FLINK-3412
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> I think the implicit conversions between the Java DataStream and the Scala 
> DataStream are dangerous.
> Because conversions exist in both directions, it is possible to write methods 
> that look like calling functions on the JavaStream, but instead convert it to 
> a Scala stream and call a different method.
> I just accidentally implemented an infinite recursion that way (via two 
> hidden implicit conversions).
> Making the conversions explicit (with a {{wrap()}} function like in the batch 
> API, we add minimally more code internally (nothing is different for users), 
> but avoid such accidental errors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148495#comment-15148495
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r53000913
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

Regarding the `JobSubmitSuccess`: we had it as a follow up to have more 
fine-grained integration with the the client and left it as a duplicate submit 
message for the time being (instead of something like `JobRecovered`).

The other behaviour is back to the previous state now. I hear you that it 
makes sense to integrate the state restore behaviour with the execution graph 
restart.


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r53000913
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

Regarding the `JobSubmitSuccess`: we had it as a follow up to have more 
fine-grained integration with the the client and left it as a duplicate submit 
message for the time being (instead of something like `JobRecovered`).

The other behaviour is back to the previous state now. I hear you that it 
makes sense to integrate the state restore behaviour with the execution graph 
restart.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148485#comment-15148485
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52999704
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1079,6 +1079,9 @@ class JobManager(
   executionGraph.registerExecutionListener(gateway)
   executionGraph.registerJobStatusListener(gateway)
 }
+
+// All good. Submission succeeded!
+jobInfo.client ! 
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
--- End diff --

Moved this one up to have correct ACKing behaviour.


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52999704
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1079,6 +1079,9 @@ class JobManager(
   executionGraph.registerExecutionListener(gateway)
   executionGraph.registerJobStatusListener(gateway)
 }
+
+// All good. Submission succeeded!
+jobInfo.client ! 
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
--- End diff --

Moved this one up to have correct ACKing behaviour.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148480#comment-15148480
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52998947
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

Had an offline discussion with Stephan. He agrees with you that the failure 
in this case is too hard. I'll undo that change by ACK'ing the submission 
earlier.


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52998947
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

Had an offline discussion with Stephan. He agrees with you that the failure 
in this case is too hard. I'll undo that change by ACK'ing the submission 
earlier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3296) DataStream.write*() methods are not flushing properly

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148479#comment-15148479
 ] 

ASF GitHub Bot commented on FLINK-3296:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1563#issuecomment-184642511
  
I renamed the method to `writeUsingOutputFormat` and rebased to current 
master.


> DataStream.write*() methods are not flushing properly
> -
>
> Key: FLINK-3296
> URL: https://issues.apache.org/jira/browse/FLINK-3296
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>
> The DataStream.write() methods rely on the {{FileSinkFunctionByMillis}} 
> class, which has a logic for flushing records, even though the underlying 
> stream is never flushed. This is misleading for users as files are not 
> written as they would expect it.
> The code was initial written with FileOutputFormats in mind, but the types 
> were not set correctly. This PR opened the write() method to any output 
> format: https://github.com/apache/flink/pull/706/files



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3296] Remove 'flushing' behavior of the...

2016-02-16 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1563#issuecomment-184642511
  
I renamed the method to `writeUsingOutputFormat` and rebased to current 
master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148478#comment-15148478
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52998567
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

The behaviour right now for a failure while doing a job recovery would 
simply fail the `ExecutionGraph` triggering a restart. A successful job 
recovery would send a `JobSubmitSuccess` to the client. I'm not sure whether 
this is actually correct, since the client already received a 
`JobSubmitMessage` from the `JobManager` while initially submitting the job. 
But I think this will simply be ignored.

Thus, suppressing the restart behaviour in case of a job recovery would 
actually change the behaviour.

If it makes sense and if it is possible to recover from failures while 
recovering a job or restoring a savepoint, it would make sense to not directly 
fail the job without restarting. Maybe one should distinguish that based on the 
actually occurring exception.


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52998567
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

The behaviour right now for a failure while doing a job recovery would 
simply fail the `ExecutionGraph` triggering a restart. A successful job 
recovery would send a `JobSubmitSuccess` to the client. I'm not sure whether 
this is actually correct, since the client already received a 
`JobSubmitMessage` from the `JobManager` while initially submitting the job. 
But I think this will simply be ignored.

Thus, suppressing the restart behaviour in case of a job recovery would 
actually change the behaviour.

If it makes sense and if it is possible to recover from failures while 
recovering a job or restoring a savepoint, it would make sense to not directly 
fail the job without restarting. Maybe one should distinguish that based on the 
actually occurring exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52998526
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
--- End diff --

I'd just add a try{...}catch{Exception e} block and log that stuff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148477#comment-15148477
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52998526
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
--- End diff --

I'd just add a try{...}catch{Exception e} block and log that stuff.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream

2016-02-16 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148470#comment-15148470
 ] 

Robert Metzger commented on FLINK-3412:
---

+1 I ran into this issue as well.

> Remove implicit conversions JavaStream / ScalaStream
> 
>
> Key: FLINK-3412
> URL: https://issues.apache.org/jira/browse/FLINK-3412
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> I think the implicit conversions between the Java DataStream and the Scala 
> DataStream are dangerous.
> Because conversions exist in both directions, it is possible to write methods 
> that look like calling functions on the JavaStream, but instead convert it to 
> a Scala stream and call a different method.
> I just accidentally implemented an infinite recursion that way (via two 
> hidden implicit conversions).
> Making the conversions explicit (with a {{wrap()}} function like in the batch 
> API, we add minimally more code internally (nothing is different for users), 
> but avoid such accidental errors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148469#comment-15148469
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52997627
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
--- End diff --

I would add null checks for session and cluster


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148468#comment-15148468
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52997588
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
--- End diff --

You are right. Close is called when open fails


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going 

[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52997627
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
--- End diff --

I would add null checks for session and cluster


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52997588
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
--- End diff --

You are right. Close is called when open fails


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148466#comment-15148466
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52997375
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

But then I would not keep the behaviour as it is right now. Instead, we 
should then consider the job submitted before trying to recover any checkpoint 
state and keep the restart behaviour. What do you think?


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52997375
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

But then I would not keep the behaviour as it is right now. Instead, we 
should then consider the job submitted before trying to recover any checkpoint 
state and keep the restart behaviour. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148456#comment-15148456
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995692
  
--- Diff: docs/apis/streaming/fault_tolerance.md ---
@@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks:
 
 
 
+Cassandra sink
+exactly once
--- End diff --

which is also not true. a flink failure while data is being written to 
cassandra will cause duplicates. you can only say this if writing the data to 
the final table is completely handled by cassandra (for example by writing into 
a temporary table, exporting it to csv and importing into the target table; the 
only way for duplicates is if cassandra fails while importing).


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995692
  
--- Diff: docs/apis/streaming/fault_tolerance.md ---
@@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks:
 
 
 
+Cassandra sink
+exactly once
--- End diff --

which is also not true. a flink failure while data is being written to 
cassandra will cause duplicates. you can only say this if writing the data to 
the final table is completely handled by cassandra (for example by writing into 
a temporary table, exporting it to csv and importing into the target table; the 
only way for duplicates is if cassandra fails while importing).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995595
  
--- Diff: docs/apis/streaming/fault_tolerance.md ---
@@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks:
 
 
 
+Cassandra sink
+exactly once
--- End diff --

I mean an at most once system will give you also exactly once processing 
guarantees under the assumption that nothing fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148455#comment-15148455
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995595
  
--- Diff: docs/apis/streaming/fault_tolerance.md ---
@@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks:
 
 
 
+Cassandra sink
+exactly once
--- End diff --

I mean an at most once system will give you also exactly once processing 
guarantees under the assumption that nothing fails.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148453#comment-15148453
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995525
  
--- Diff: docs/apis/streaming/fault_tolerance.md ---
@@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks:
 
 
 
+Cassandra sink
+exactly once
--- End diff --

That's what is defined as at least once.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995525
  
--- Diff: docs/apis/streaming/fault_tolerance.md ---
@@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks:
 
 
 
+Cassandra sink
+exactly once
--- End diff --

That's what is defined as at least once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148450#comment-15148450
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995374
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
+   }
+   preparedStatement = session.prepare(insertQuery);
+   }
+
+   @Override
+   protected void sendValue(Iterable values) throws Exception {
+   //verify that no query failed until now
+   if (exception != null) {
+   throw new Exception(exception);
--- End diff --

why is close() not called for a failing operation?


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flin

[jira] [Created] (FLINK-3414) Add Scala API for CEP's pattern definition

2016-02-16 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3414:


 Summary: Add Scala API for CEP's pattern definition
 Key: FLINK-3414
 URL: https://issues.apache.org/jira/browse/FLINK-3414
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Affects Versions: 1.0.0
Reporter: Till Rohrmann
Priority: Minor


Currently, the CEP library only supports a Java API to specify complex event 
patterns. In order to make it a bit less verbose for Scala users, it would be 
nice to also add a Scala API for the CEP library. 

A Scala API would also allow to pass Scala's anonymous functions as filter 
conditions or as a select function, for example, or to use partial functions to 
distinguish between different events.

Furthermore, the Scala API could be designed to feel a bit more like a DSL:

{code}
begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || "middle_2" 
where _.name equals "foobar" -> "end" where x => x.id <= x.volume
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148451#comment-15148451
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995438
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ *
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private final String host;
+   private final String keyspace;
+   private final String table;
+
+   private transient Cluster cluster;
+   private transient Session session;
+
+   public CassandraCommitter(String host, String keyspace, String table) {
+   this.host = host;
+   this.keyspace = keyspace;
+   this.table = table;
+   }
+
+   @Override
+   public void open() throws Exception {
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+
+   session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " 
with replication={'class':'SimpleStrategy', 'replication_factor':3};");
+   session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + "." 
+ table + " (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY 
(sink_id, sub_id));");
+   session.executeAsync("INSERT INTO " + keyspace + "." + table + 
" (sink_id, sub_id, checkpoint_id) values ('" + operatorId + "', " + subtaskId 
+ ", " + -1 + ");");
+   }
+
+   @Override
+   public void close() throws Exception {
+   session.executeAsync("DELETE FROM " + keyspace + "." + table + 
" where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";");
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void commitCheckpoint(long checkpointID) {
+   SimpleStatement s = new SimpleStatement("UPDATE " + keyspace + 
"." + table + " set checkpoint_id=" + checkpointID + " where sink_id='" + 
operatorId + "' and sub_id=" + subtaskId + ";");
+   s.setConsistencyLevel(ConsistencyLevel.ALL);
+   session.executeAsync(s);
--- End diff --

good point.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995438
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ *
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private final String host;
+   private final String keyspace;
+   private final String table;
+
+   private transient Cluster cluster;
+   private transient Session session;
+
+   public CassandraCommitter(String host, String keyspace, String table) {
+   this.host = host;
+   this.keyspace = keyspace;
+   this.table = table;
+   }
+
+   @Override
+   public void open() throws Exception {
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+
+   session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " 
with replication={'class':'SimpleStrategy', 'replication_factor':3};");
+   session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + "." 
+ table + " (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY 
(sink_id, sub_id));");
+   session.executeAsync("INSERT INTO " + keyspace + "." + table + 
" (sink_id, sub_id, checkpoint_id) values ('" + operatorId + "', " + subtaskId 
+ ", " + -1 + ");");
+   }
+
+   @Override
+   public void close() throws Exception {
+   session.executeAsync("DELETE FROM " + keyspace + "." + table + 
" where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";");
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void commitCheckpoint(long checkpointID) {
+   SimpleStatement s = new SimpleStatement("UPDATE " + keyspace + 
"." + table + " set checkpoint_id=" + checkpointID + " where sink_id='" + 
operatorId + "' and sub_id=" + subtaskId + ";");
+   s.setConsistencyLevel(ConsistencyLevel.ALL);
+   session.executeAsync(s);
--- End diff --

good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995374
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
+   }
+   preparedStatement = session.prepare(insertQuery);
+   }
+
+   @Override
+   protected void sendValue(Iterable values) throws Exception {
+   //verify that no query failed until now
+   if (exception != null) {
+   throw new Exception(exception);
--- End diff --

why is close() not called for a failing operation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2021) Rework examples to use ParameterTool

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148448#comment-15148448
 ] 

ASF GitHub Bot commented on FLINK-2021:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184624521
  
The `testDetachedPerJobYarnCluster()` YARN test is failing with


```
10:33:26,969 INFO  org.apache.flink.client.CliFrontend  
 - Starting execution of program
10:33:26,969 INFO  org.apache.flink.client.program.Client   
 - Starting program in interactive mode
10:33:26,971 ERROR org.apache.flink.client.CliFrontend  
 - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runDetached(Client.java:277)
at 
org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:774)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:320)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
at org.apache.flink.yarn.YarnTestBase$Runner.run(YarnTestBase.java:565)
Caused by: java.lang.RuntimeException: Error parsing arguments 
'[/tmp/junit2827618452146338513/junit4708065236621010716.tmp, 
/tmp/junit2827618452146338513/junit2411517217475739253]' on 
'/tmp/junit2827618452146338513/junit4708065236621010716.tmp'. Unexpected value. 
Please prefix values with -- or -.
at 
org.apache.flink.api.java.utils.ParameterTool.fromArgs(ParameterTool.java:107)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
... 6 more
10:33:26,974 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Shutting down remote daemon.
```


> Rework examples to use ParameterTool
> 
>
> Key: FLINK-2021
> URL: https://issues.apache.org/jira/browse/FLINK-2021
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Stefano Baghino
>Priority: Minor
>  Labels: starter
>
> In FLINK-1525, we introduced the {{ParameterTool}}.
> We should port the examples to use the tool.
> The examples could look like this (we should maybe discuss it first on the 
> mailing lists):
> {code}
> public static void main(String[] args) throws Exception {
> ParameterTool pt = ParameterTool.fromArgs(args);
> boolean fileOutput = pt.getNumberOfParameters() == 2;
> String textPath = null;
> String outputPath = null;
> if(fileOutput) {
> textPath = pt.getRequired("input");
> outputPath = pt.getRequired("output");
> }
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setUserConfig(pt);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148447#comment-15148447
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995123
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
--- End diff --

does that mean that close() is not called when an operation fails in open()?


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to fir

[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52995123
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
--- End diff --

does that mean that close() is not called when an operation fails in open()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2021] Rework examples to use ParameterT...

2016-02-16 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1581#issuecomment-184624521
  
The `testDetachedPerJobYarnCluster()` YARN test is failing with


```
10:33:26,969 INFO  org.apache.flink.client.CliFrontend  
 - Starting execution of program
10:33:26,969 INFO  org.apache.flink.client.program.Client   
 - Starting program in interactive mode
10:33:26,971 ERROR org.apache.flink.client.CliFrontend  
 - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runDetached(Client.java:277)
at 
org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:774)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:320)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
at org.apache.flink.yarn.YarnTestBase$Runner.run(YarnTestBase.java:565)
Caused by: java.lang.RuntimeException: Error parsing arguments 
'[/tmp/junit2827618452146338513/junit4708065236621010716.tmp, 
/tmp/junit2827618452146338513/junit2411517217475739253]' on 
'/tmp/junit2827618452146338513/junit4708065236621010716.tmp'. Unexpected value. 
Please prefix values with -- or -.
at 
org.apache.flink.api.java.utils.ParameterTool.fromArgs(ParameterTool.java:107)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
... 6 more
10:33:26,974 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Shutting down remote daemon.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148442#comment-15148442
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994814
  
--- Diff: docs/apis/streaming/fault_tolerance.md ---
@@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks:
 
 
 
+Cassandra sink
+exactly once
--- End diff --

I would try to summarize the guarantees here. Or maybe add an asterisk to 
the "exactly once", stating that Cassandra has no support for large atomic 
inserts, but as long as cassandra is not failing, Flink failures will not cause 
duplicates.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994814
  
--- Diff: docs/apis/streaming/fault_tolerance.md ---
@@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks:
 
 
 
+Cassandra sink
+exactly once
--- End diff --

I would try to summarize the guarantees here. Or maybe add an asterisk to 
the "exactly once", stating that Cassandra has no support for large atomic 
inserts, but as long as cassandra is not failing, Flink failures will not cause 
duplicates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148439#comment-15148439
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994610
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java
 ---
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import 
org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Generic Sink that emits its input elements into an arbitrary backend. 
This sink is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly-once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class GenericExactlyOnceSink extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private transient AbstractStateBackend.CheckpointStateOutputView out;
+   private TypeSerializer serializer;
+   protected transient TypeInformation typeInfo;
+   protected final CheckpointCommitter committer;
+   protected final String id;
+
+   private ExactlyOnceState state = new ExactlyOnceState();
+
+   public GenericExactlyOnceSink(CheckpointCommitter committer) {
+   if (committer == null) {
+   throw new IllegalArgumentException("CheckpointCommitter 
argument must not be null.");
+   }
+   this.committer = committer;
+   this.id = UUID.randomUUID().toString();
+   }
+
+   @Override
+   public void open() throws Exception {
+   committer.setOperatorId(id);
+   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
+   committer.open();
+   }
+
+   public void close() throws Exception {
+   committer.close();
+   }
+
+   /**
+* Saves a handle in the state.
+*
+* @param checkpointId
+* @throws IOException
+*/
+   private void saveHandleInState(final long checkpointId) throws 
IOException {
+   //only add handle if a new OperatorState was created since the 
last snapshot
+   if (out != null) {
+   StateHandle handle = 
out.closeAndGetHandle();
+   state.pendingHandles.put(checkpointId, handle);
+   out = null;
+   }
+   }
+
+   @Override
+   public StreamTaskState snapshotOperatorState(final long checkpointId, 
final long timestamp) throws Exception {
+   Stream

[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994610
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java
 ---
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import 
org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Generic Sink that emits its input elements into an arbitrary backend. 
This sink is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly-once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class GenericExactlyOnceSink extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private transient AbstractStateBackend.CheckpointStateOutputView out;
+   private TypeSerializer serializer;
+   protected transient TypeInformation typeInfo;
+   protected final CheckpointCommitter committer;
+   protected final String id;
+
+   private ExactlyOnceState state = new ExactlyOnceState();
+
+   public GenericExactlyOnceSink(CheckpointCommitter committer) {
+   if (committer == null) {
+   throw new IllegalArgumentException("CheckpointCommitter 
argument must not be null.");
+   }
+   this.committer = committer;
+   this.id = UUID.randomUUID().toString();
+   }
+
+   @Override
+   public void open() throws Exception {
+   committer.setOperatorId(id);
+   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
+   committer.open();
+   }
+
+   public void close() throws Exception {
+   committer.close();
+   }
+
+   /**
+* Saves a handle in the state.
+*
+* @param checkpointId
+* @throws IOException
+*/
+   private void saveHandleInState(final long checkpointId) throws 
IOException {
+   //only add handle if a new OperatorState was created since the 
last snapshot
+   if (out != null) {
+   StateHandle handle = 
out.closeAndGetHandle();
+   state.pendingHandles.put(checkpointId, handle);
+   out = null;
+   }
+   }
+
+   @Override
+   public StreamTaskState snapshotOperatorState(final long checkpointId, 
final long timestamp) throws Exception {
+   StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+   saveHandleInState(checkpointId);
+   taskState.setFunctionState(state);
+   return taskState;
+   }
+
+   @Override
+   pub

[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994409
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ *
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private final String host;
+   private final String keyspace;
+   private final String table;
+
+   private transient Cluster cluster;
+   private transient Session session;
+
+   public CassandraCommitter(String host, String keyspace, String table) {
+   this.host = host;
+   this.keyspace = keyspace;
+   this.table = table;
+   }
+
+   @Override
+   public void open() throws Exception {
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+
+   session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " 
with replication={'class':'SimpleStrategy', 'replication_factor':3};");
+   session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + "." 
+ table + " (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY 
(sink_id, sub_id));");
+   session.executeAsync("INSERT INTO " + keyspace + "." + table + 
" (sink_id, sub_id, checkpoint_id) values ('" + operatorId + "', " + subtaskId 
+ ", " + -1 + ");");
+   }
+
+   @Override
+   public void close() throws Exception {
+   session.executeAsync("DELETE FROM " + keyspace + "." + table + 
" where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";");
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void commitCheckpoint(long checkpointID) {
+   SimpleStatement s = new SimpleStatement("UPDATE " + keyspace + 
"." + table + " set checkpoint_id=" + checkpointID + " where sink_id='" + 
operatorId + "' and sub_id=" + subtaskId + ";");
+   s.setConsistencyLevel(ConsistencyLevel.ALL);
+   session.executeAsync(s);
--- End diff --

Why do you execute this asynchronously ? Doesn't this mean we'll never 
learn about errors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148437#comment-15148437
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994409
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ *
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private final String host;
+   private final String keyspace;
+   private final String table;
+
+   private transient Cluster cluster;
+   private transient Session session;
+
+   public CassandraCommitter(String host, String keyspace, String table) {
+   this.host = host;
+   this.keyspace = keyspace;
+   this.table = table;
+   }
+
+   @Override
+   public void open() throws Exception {
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+
+   session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " 
with replication={'class':'SimpleStrategy', 'replication_factor':3};");
+   session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + "." 
+ table + " (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY 
(sink_id, sub_id));");
+   session.executeAsync("INSERT INTO " + keyspace + "." + table + 
" (sink_id, sub_id, checkpoint_id) values ('" + operatorId + "', " + subtaskId 
+ ", " + -1 + ");");
+   }
+
+   @Override
+   public void close() throws Exception {
+   session.executeAsync("DELETE FROM " + keyspace + "." + table + 
" where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";");
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void commitCheckpoint(long checkpointID) {
+   SimpleStatement s = new SimpleStatement("UPDATE " + keyspace + 
"." + table + " set checkpoint_id=" + checkpointID + " where sink_id='" + 
operatorId + "' and sub_id=" + subtaskId + ";");
+   s.setConsistencyLevel(ConsistencyLevel.ALL);
+   session.executeAsync(s);
--- End diff --

Why do you execute this asynchronously ? Doesn't this mean we'll never 
learn about errors?


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148435#comment-15148435
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994316
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ *
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
--- End diff --

I think the entire class should use try {} finally{} blocks to properly 
close connections on failures.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994316
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ *
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
--- End diff --

I think the entire class should use try {} finally{} blocks to properly 
close connections on failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994189
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
--- End diff --

Can the execute() throw an exception? If so, it leaves session and cluster 
open.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148433#comment-15148433
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994189
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
--- End diff --

Can the execute() throw an exception? If so, it leaves session and cluster 
open.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like

[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148432#comment-15148432
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994037
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
+   }
+   preparedStatement = session.prepare(insertQuery);
+   }
+
+   @Override
+   protected void sendValue(Iterable values) throws Exception {
+   //verify that no query failed until now
+   if (exception != null) {
+   throw new Exception(exception);
--- End diff --

This will leave the cluster and session open.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>

[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52994037
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
+   }
+   preparedStatement = session.prepare(insertQuery);
+   }
+
+   @Override
+   protected void sendValue(Iterable values) throws Exception {
+   //verify that no query failed until now
+   if (exception != null) {
+   throw new Exception(exception);
--- End diff --

This will leave the cluster and session open.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52993965
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
+   }
+   preparedStatement = session.prepare(insertQuery);
+   }
+
+   @Override
+   protected void sendValue(Iterable values) throws Exception {
+   //verify that no query failed until now
+   if (exception != null) {
+   throw new Exception(exception);
+   }
+   //set values for prepared statement
+   for (IN value : values) {
+   Object[] fields = new Object[value.getArity()];
+   for (int x = 0; x < value.getArity(); x++) {
+   fields[x] = value.getField(x);
+   }
+   //insert values and send to cassandra
+   ResultSetFuture result = 
session.executeAsync(preparedStatement.bind(fields));
+   //add callback to detect errors

[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148430#comment-15148430
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52993965
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java
 ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraExactlyOnceSink extends 
GenericExactlyOnceSink {
+   private final String host;
+   private final String createQuery;
+   private final String insertQuery;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+
+   public CassandraExactlyOnceSink(String host, String insertQuery, 
CheckpointCommitter committer) {
+   this(host, null, insertQuery, committer);
+   }
+
+   public CassandraExactlyOnceSink(String host, String createQuery, String 
insertQuery, CheckpointCommitter committer) {
+   super(committer);
+   if (host == null) {
+   throw new IllegalArgumentException("Host argument must 
not be null.");
+   }
+   if (insertQuery == null) {
+   throw new IllegalArgumentException("Insert query 
argument must not be null.");
+   }
+   this.host = host;
+   this.createQuery = createQuery;
+   this.insertQuery = insertQuery;
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   session.close();
+   cluster.close();
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+   cluster = Cluster.builder().addContactPoint(host).build();
+   session = cluster.connect();
+   if (createQuery != null) {
+   session.execute(createQuery);
+   }
+   preparedStatement = session.prepare(insertQuery);
+   }
+
+   @Override
+   protected void sendValue(Iterable values) throws Exception {
+   //verify that no query failed until now
+   if (exception != null) {
+   throw new Exception(exception);
+   }
+   //set values for prepared statement
+   for (IN value : values) {
+   Object[] fields = new Object[value.getArity()];
+   for (int x = 0; x < value.getArity(); x++) {
+   fields[x] = value.getF

[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148424#comment-15148424
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52993497
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java
 ---
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import 
org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Generic Sink that emits its input elements into an arbitrary backend. 
This sink is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly-once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class GenericExactlyOnceSink extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private transient AbstractStateBackend.CheckpointStateOutputView out;
+   private TypeSerializer serializer;
+   protected transient TypeInformation typeInfo;
+   protected final CheckpointCommitter committer;
+   protected final String id;
+
+   private ExactlyOnceState state = new ExactlyOnceState();
+
+   public GenericExactlyOnceSink(CheckpointCommitter committer) {
+   if (committer == null) {
+   throw new IllegalArgumentException("CheckpointCommitter 
argument must not be null.");
+   }
+   this.committer = committer;
+   this.id = UUID.randomUUID().toString();
+   }
+
+   @Override
+   public void open() throws Exception {
+   committer.setOperatorId(id);
+   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
+   committer.open();
+   }
+
+   public void close() throws Exception {
+   committer.close();
+   }
+
+   /**
+* Saves a handle in the state.
+*
+* @param checkpointId
+* @throws IOException
+*/
+   private void saveHandleInState(final long checkpointId) throws 
IOException {
+   //only add handle if a new OperatorState was created since the 
last snapshot
+   if (out != null) {
+   StateHandle handle = 
out.closeAndGetHandle();
+   state.pendingHandles.put(checkpointId, handle);
+   out = null;
+   }
+   }
+
+   @Override
+   public StreamTaskState snapshotOperatorState(final long checkpointId, 
final long timestamp) throws Exception {
+   

[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52993497
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java
 ---
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import 
org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Generic Sink that emits its input elements into an arbitrary backend. 
This sink is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly-once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class GenericExactlyOnceSink extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private transient AbstractStateBackend.CheckpointStateOutputView out;
+   private TypeSerializer serializer;
+   protected transient TypeInformation typeInfo;
+   protected final CheckpointCommitter committer;
+   protected final String id;
+
+   private ExactlyOnceState state = new ExactlyOnceState();
+
+   public GenericExactlyOnceSink(CheckpointCommitter committer) {
+   if (committer == null) {
+   throw new IllegalArgumentException("CheckpointCommitter 
argument must not be null.");
+   }
+   this.committer = committer;
+   this.id = UUID.randomUUID().toString();
+   }
+
+   @Override
+   public void open() throws Exception {
+   committer.setOperatorId(id);
+   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
+   committer.open();
+   }
+
+   public void close() throws Exception {
+   committer.close();
+   }
+
+   /**
+* Saves a handle in the state.
+*
+* @param checkpointId
+* @throws IOException
+*/
+   private void saveHandleInState(final long checkpointId) throws 
IOException {
+   //only add handle if a new OperatorState was created since the 
last snapshot
+   if (out != null) {
+   StateHandle handle = 
out.closeAndGetHandle();
+   state.pendingHandles.put(checkpointId, handle);
+   out = null;
+   }
+   }
+
+   @Override
+   public StreamTaskState snapshotOperatorState(final long checkpointId, 
final long timestamp) throws Exception {
+   StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+   saveHandleInState(checkpointId);
+   taskState.setFunctionState(state);
+   return taskState;
+   }
+
+   @Override
+

[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148423#comment-15148423
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52993461
  
--- Diff: docs/apis/streaming/connectors/cassandra.md ---
@@ -0,0 +1,100 @@
+---
+title: "Apache Cassandra Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 1
+sub-nav-title: Cassandra
+---
+
+
+This connector provides a sink that writes data into a 
[Cassandra](https://cassandra.apache.org/) database.
+
+The Flink Cassandra sink integrates with Flink's checkpointing mechanism 
to provide
+exactly-once processing semantics. To achieve that, Flink buffers incoming 
records
+and commits them only when a checkpoint completes.
+
+To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-cassandra{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing RabbitMQ
--- End diff --

whoops.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52993461
  
--- Diff: docs/apis/streaming/connectors/cassandra.md ---
@@ -0,0 +1,100 @@
+---
+title: "Apache Cassandra Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 1
+sub-nav-title: Cassandra
+---
+
+
+This connector provides a sink that writes data into a 
[Cassandra](https://cassandra.apache.org/) database.
+
+The Flink Cassandra sink integrates with Flink's checkpointing mechanism 
to provide
+exactly-once processing semantics. To achieve that, Flink buffers incoming 
records
+and commits them only when a checkpoint completes.
+
+To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-cassandra{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing RabbitMQ
--- End diff --

whoops.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream

2016-02-16 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148421#comment-15148421
 ] 

Till Rohrmann commented on FLINK-3412:
--

We could also do the conversion a bit more explicit by providing a 
{{Scala/JavaConverter}} which has a {{asScala/asJava}} method which returns the 
respective {{DataStream}}. That's also how the Scala API allows you to convert 
Scala collections from and to Java collections.

Something like

{code}
object JavaConverter {
implicit class Java2ScalaConverter[T](dataStream: 
org.apache.flink.streaming.api.datastream.DataStream[T]) {
def asScala: org.apache.flink.streaming.api.scala.DataStream[T] 
= {
...
}
}
}
{code}

> Remove implicit conversions JavaStream / ScalaStream
> 
>
> Key: FLINK-3412
> URL: https://issues.apache.org/jira/browse/FLINK-3412
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> I think the implicit conversions between the Java DataStream and the Scala 
> DataStream are dangerous.
> Because conversions exist in both directions, it is possible to write methods 
> that look like calling functions on the JavaStream, but instead convert it to 
> a Scala stream and call a different method.
> I just accidentally implemented an infinite recursion that way (via two 
> hidden implicit conversions).
> Making the conversions explicit (with a {{wrap()}} function like in the batch 
> API, we add minimally more code internally (nothing is different for users), 
> but avoid such accidental errors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream

2016-02-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148419#comment-15148419
 ] 

Aljoscha Krettek commented on FLINK-3412:
-

+1 I also encountered this on at least on occasion

> Remove implicit conversions JavaStream / ScalaStream
> 
>
> Key: FLINK-3412
> URL: https://issues.apache.org/jira/browse/FLINK-3412
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> I think the implicit conversions between the Java DataStream and the Scala 
> DataStream are dangerous.
> Because conversions exist in both directions, it is possible to write methods 
> that look like calling functions on the JavaStream, but instead convert it to 
> a Scala stream and call a different method.
> I just accidentally implemented an infinite recursion that way (via two 
> hidden implicit conversions).
> Making the conversions explicit (with a {{wrap()}} function like in the batch 
> API, we add minimally more code internally (nothing is different for users), 
> but avoid such accidental errors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream

2016-02-16 Thread Stefano Baghino (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148417#comment-15148417
 ] 

Stefano Baghino commented on FLINK-3412:


This one looks easy, I can take care of it immediately. Shall I assign it to me 
[~StephanEwen]?

> Remove implicit conversions JavaStream / ScalaStream
> 
>
> Key: FLINK-3412
> URL: https://issues.apache.org/jira/browse/FLINK-3412
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> I think the implicit conversions between the Java DataStream and the Scala 
> DataStream are dangerous.
> Because conversions exist in both directions, it is possible to write methods 
> that look like calling functions on the JavaStream, but instead convert it to 
> a Scala stream and call a different method.
> I just accidentally implemented an infinite recursion that way (via two 
> hidden implicit conversions).
> Making the conversions explicit (with a {{wrap()}} function like in the batch 
> API, we add minimally more code internally (nothing is different for users), 
> but avoid such accidental errors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3413) Remove implicit Seq to DataStream conversion

2016-02-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148418#comment-15148418
 ] 

Aljoscha Krettek commented on FLINK-3413:
-

+1

> Remove implicit Seq to DataStream conversion
> 
>
> Key: FLINK-3413
> URL: https://issues.apache.org/jira/browse/FLINK-3413
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>
> The implicit conversion from {{Seq}} to Flink DataStream needs to create 
> internally a new execution environment.
> This method is confusing to use. If one uses the Seq in a program that uses a 
> different execution environment, then different streams run on different 
> execution environments.
> The overhead of manually calling {{env.fromElements(seq)}} is quite low.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148416#comment-15148416
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52993077
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java
 ---
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import 
org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Generic Sink that emits its input elements into an arbitrary backend. 
This sink is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly-once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class GenericExactlyOnceSink extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private transient AbstractStateBackend.CheckpointStateOutputView out;
+   private TypeSerializer serializer;
+   protected transient TypeInformation typeInfo;
+   protected final CheckpointCommitter committer;
+   protected final String id;
+
+   private ExactlyOnceState state = new ExactlyOnceState();
+
+   public GenericExactlyOnceSink(CheckpointCommitter committer) {
+   if (committer == null) {
+   throw new IllegalArgumentException("CheckpointCommitter 
argument must not be null.");
+   }
+   this.committer = committer;
+   this.id = UUID.randomUUID().toString();
+   }
+
+   @Override
+   public void open() throws Exception {
+   committer.setOperatorId(id);
+   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
+   committer.open();
+   }
+
+   public void close() throws Exception {
+   committer.close();
+   }
+
+   /**
+* Saves a handle in the state.
+*
+* @param checkpointId
+* @throws IOException
+*/
+   private void saveHandleInState(final long checkpointId) throws 
IOException {
+   //only add handle if a new OperatorState was created since the 
last snapshot
+   if (out != null) {
+   StateHandle handle = 
out.closeAndGetHandle();
+   state.pendingHandles.put(checkpointId, handle);
+   out = null;
+   }
+   }
+
+   @Override
+   public StreamTaskState snapshotOperatorState(final long checkpointId, 
final long timestamp) throws Exception {
+   Stream

[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52993077
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java
 ---
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import 
org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Generic Sink that emits its input elements into an arbitrary backend. 
This sink is integrated with the checkpointing
+ * mechanism to provide near exactly-once semantics.
+ * 
+ * Incoming records are stored within a {@link 
org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a
+ * checkpoint is completed. Should a job fail while the data is being 
committed, no exactly-once guarantee can be made.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class GenericExactlyOnceSink extends 
AbstractStreamOperator implements OneInputStreamOperator {
+   private transient AbstractStateBackend.CheckpointStateOutputView out;
+   private TypeSerializer serializer;
+   protected transient TypeInformation typeInfo;
+   protected final CheckpointCommitter committer;
+   protected final String id;
+
+   private ExactlyOnceState state = new ExactlyOnceState();
+
+   public GenericExactlyOnceSink(CheckpointCommitter committer) {
+   if (committer == null) {
+   throw new IllegalArgumentException("CheckpointCommitter 
argument must not be null.");
+   }
+   this.committer = committer;
+   this.id = UUID.randomUUID().toString();
+   }
+
+   @Override
+   public void open() throws Exception {
+   committer.setOperatorId(id);
+   
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
+   committer.open();
+   }
+
+   public void close() throws Exception {
+   committer.close();
+   }
+
+   /**
+* Saves a handle in the state.
+*
+* @param checkpointId
+* @throws IOException
+*/
+   private void saveHandleInState(final long checkpointId) throws 
IOException {
+   //only add handle if a new OperatorState was created since the 
last snapshot
+   if (out != null) {
+   StateHandle handle = 
out.closeAndGetHandle();
+   state.pendingHandles.put(checkpointId, handle);
+   out = null;
+   }
+   }
+
+   @Override
+   public StreamTaskState snapshotOperatorState(final long checkpointId, 
final long timestamp) throws Exception {
+   StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+   saveHandleInState(checkpointId);
+   taskState.setFunctionState(state);
+   return taskState;
+   }
+
+   @Override
+   pub

[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148415#comment-15148415
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52992939
  
--- Diff: docs/apis/streaming/connectors/cassandra.md ---
@@ -0,0 +1,100 @@
+---
+title: "Apache Cassandra Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 1
+sub-nav-title: Cassandra
+---
+
+
+This connector provides a sink that writes data into a 
[Cassandra](https://cassandra.apache.org/) database.
+
+The Flink Cassandra sink integrates with Flink's checkpointing mechanism 
to provide
+exactly-once processing semantics. To achieve that, Flink buffers incoming 
records
+and commits them only when a checkpoint completes.
+
+To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-cassandra{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing RabbitMQ
--- End diff --

Installing RabbitMQ?


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52992939
  
--- Diff: docs/apis/streaming/connectors/cassandra.md ---
@@ -0,0 +1,100 @@
+---
+title: "Apache Cassandra Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 1
+sub-nav-title: Cassandra
+---
+
+
+This connector provides a sink that writes data into a 
[Cassandra](https://cassandra.apache.org/) database.
+
+The Flink Cassandra sink integrates with Flink's checkpointing mechanism 
to provide
+exactly-once processing semantics. To achieve that, Flink buffers incoming 
records
+and commits them only when a checkpoint completes.
+
+To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-cassandra{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing RabbitMQ
--- End diff --

Installing RabbitMQ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148413#comment-15148413
 ] 

ASF GitHub Bot commented on FLINK-3396:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52992451
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

I'm not so sure about that, to be honest. What if the 
`restoreLatestCheckpointedState` fails because of some HDFS/ZooKeeper problems. 
Then you would like to try restarting the job, wouldn't you? The client should 
then be notified once all restarting attempts have been exhausted.


> Job submission Savepoint restore logic flawed
> -
>
> Key: FLINK-3396
> URL: https://issues.apache.org/jira/browse/FLINK-3396
> Project: Flink
>  Issue Type: Bug
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.0.0
>
>
> When savepoint restoring fails, the thrown Exception fails the execution 
> graph, but the client is not informed about the failure.
> The expected behaviour is that the submission should be acked with success or 
> failure in any case. With savepoint restore failures, the ack message will be 
> skipped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52992451
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

I'm not so sure about that, to be honest. What if the 
`restoreLatestCheckpointedState` fails because of some HDFS/ZooKeeper problems. 
Then you would like to try restarting the job, wouldn't you? The client should 
then be notified once all restarting attempts have been exhausted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52991586
  
--- Diff: flink-streaming-java/pom.xml ---
@@ -108,6 +108,7 @@ under the License.
maven-surefire-plugin

false
+   -XX:-UseSplitVerifier
--- End diff --

i agree, trying some things at the moment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148406#comment-15148406
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52991586
  
--- Diff: flink-streaming-java/pom.xml ---
@@ -108,6 +108,7 @@ under the License.
maven-surefire-plugin

false
+   -XX:-UseSplitVerifier
--- End diff --

i agree, trying some things at the moment.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3413) Remove implicit Seq to DataStream conversion

2016-02-16 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3413:
---

 Summary: Remove implicit Seq to DataStream conversion
 Key: FLINK-3413
 URL: https://issues.apache.org/jira/browse/FLINK-3413
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.0.0


The implicit conversion from {{Seq}} to Flink DataStream needs to create 
internally a new execution environment.

This method is confusing to use. If one uses the Seq in a program that uses a 
different execution environment, then different streams run on different 
execution environments.

The overhead of manually calling {{env.fromElements(seq)}} is quite low.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148398#comment-15148398
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52991019
  
--- Diff: docs/apis/streaming/connectors/cassandra.md ---
@@ -0,0 +1,100 @@
+---
+title: "Apache Cassandra Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 1
+sub-nav-title: Cassandra
+---
+
+
+This connector provides a sink that writes data into a 
[Cassandra](https://cassandra.apache.org/) database.
+
+The Flink Cassandra sink integrates with Flink's checkpointing mechanism 
to provide
+exactly-once processing semantics. To achieve that, Flink buffers incoming 
records
+and commits them only when a checkpoint completes.
+
+To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-cassandra{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing RabbitMQ
+Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+
+ Cassandra Sink
+
+Flink's Cassandra sink is called `CassandraExactlyOnceSink`.
+
+The constructor accepts the following arguments:
+
+1. The Host address
+2. query to create a new table to write into (optional)
+3. query to insert data the a table
+4. checkpoint committer
+
+A checkpoint committer stores additional information about completed 
checkpoints
+in some resource. You can use a `CassandraCommitter` to store these in a 
separate
+table in cassandra. Note that this table will NOT be cleaned up by Flink.
+
+The CassandraCommitter constructor accepts the following arguments:
+1. Host address
+2. Keyspace
+3. Table name
+
+The CassandraExactlyOnceSink is implemented as a custom operator
+instead of a sink, and as such is a bit more unwieldy than other sinks.
--- End diff --

Yes delete the last subclause.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52991019
  
--- Diff: docs/apis/streaming/connectors/cassandra.md ---
@@ -0,0 +1,100 @@
+---
+title: "Apache Cassandra Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 1
+sub-nav-title: Cassandra
+---
+
+
+This connector provides a sink that writes data into a 
[Cassandra](https://cassandra.apache.org/) database.
+
+The Flink Cassandra sink integrates with Flink's checkpointing mechanism 
to provide
+exactly-once processing semantics. To achieve that, Flink buffers incoming 
records
+and commits them only when a checkpoint completes.
+
+To use this connector, add the following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-cassandra{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution [here]({{ 
site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing RabbitMQ
+Follow the instructions from the [Cassandra Getting Started 
page](http://wiki.apache.org/cassandra/GettingStarted).
+
+ Cassandra Sink
+
+Flink's Cassandra sink is called `CassandraExactlyOnceSink`.
+
+The constructor accepts the following arguments:
+
+1. The Host address
+2. query to create a new table to write into (optional)
+3. query to insert data the a table
+4. checkpoint committer
+
+A checkpoint committer stores additional information about completed 
checkpoints
+in some resource. You can use a `CassandraCommitter` to store these in a 
separate
+table in cassandra. Note that this table will NOT be cleaned up by Flink.
+
+The CassandraCommitter constructor accepts the following arguments:
+1. Host address
+2. Keyspace
+3. Table name
+
+The CassandraExactlyOnceSink is implemented as a custom operator
+instead of a sink, and as such is a bit more unwieldy than other sinks.
--- End diff --

Yes delete the last subclause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148397#comment-15148397
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52990933
  
--- Diff: flink-streaming-java/pom.xml ---
@@ -108,6 +108,7 @@ under the License.
maven-surefire-plugin

false
+   -XX:-UseSplitVerifier
--- End diff --

This seems somehow wrong to me. Would be good to get the tests working 
without changing other modules.


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1640#discussion_r52990933
  
--- Diff: flink-streaming-java/pom.xml ---
@@ -108,6 +108,7 @@ under the License.
maven-surefire-plugin

false
+   -XX:-UseSplitVerifier
--- End diff --

This seems somehow wrong to me. Would be good to get the tests working 
without changing other modules.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector

2016-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148396#comment-15148396
 ] 

ASF GitHub Bot commented on FLINK-3332:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1640#issuecomment-184609151
  
Thanks for your contribution @zentol. Changes look good to me. I only had a 
comment concerning the type serializer and our usage of the term "exactly once".


> Provide an exactly-once Cassandra connector
> ---
>
> Key: FLINK-3332
> URL: https://issues.apache.org/jira/browse/FLINK-3332
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> With FLINK-3311, we are adding a Cassandra connector to Flink.
> It would be good to also provide an "exactly-once" C* connector.
> I would like to first discuss how we are going to implement this in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1640#issuecomment-184609151
  
Thanks for your contribution @zentol. Changes look good to me. I only had a 
comment concerning the type serializer and our usage of the term "exactly once".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream

2016-02-16 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-3412:
---

 Summary: Remove implicit conversions JavaStream / ScalaStream
 Key: FLINK-3412
 URL: https://issues.apache.org/jira/browse/FLINK-3412
 Project: Flink
  Issue Type: Bug
  Components: Scala API
Affects Versions: 0.10.2
Reporter: Stephan Ewen
 Fix For: 1.0.0


I think the implicit conversions between the Java DataStream and the Scala 
DataStream are dangerous.

Because conversions exist in both directions, it is possible to write methods 
that look like calling functions on the JavaStream, but instead convert it to a 
Scala stream and call a different method.
I just accidentally implemented an infinite recursion that way (via two hidden 
implicit conversions).

Making the conversions explicit (with a {{wrap()}} function like in the batch 
API, we add minimally more code internally (nothing is different for users), 
but avoid such accidental errors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


<    1   2   3   >