[GitHub] flink pull request: FLINK-2380: allow to specify the default files...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1524#discussion_r53014234 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala --- @@ -176,6 +184,8 @@ abstract class ApplicationMasterBase { jobManagerPort, webServerPort, slots, taskManagerCount, dynamicPropertiesEncodedString) + //todo should I also set the FS default here --- End diff -- No. I'll remove this TODO when merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed
[ https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148628#comment-15148628 ] ASF GitHub Bot commented on FLINK-3396: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1633#issuecomment-184694713 Travis has passed. Did you have another look at this @tillrohrmann? > Job submission Savepoint restore logic flawed > - > > Key: FLINK-3396 > URL: https://issues.apache.org/jira/browse/FLINK-3396 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > When savepoint restoring fails, the thrown Exception fails the execution > graph, but the client is not informed about the failure. > The expected behaviour is that the submission should be acked with success or > failure in any case. With savepoint restore failures, the ack message will be > skipped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1633#issuecomment-184694713 Travis has passed. Did you have another look at this @tillrohrmann? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3416) [py] .bat files fail when path contains spaces
Chesnay Schepler created FLINK-3416: --- Summary: [py] .bat files fail when path contains spaces Key: FLINK-3416 URL: https://issues.apache.org/jira/browse/FLINK-3416 Project: Flink Issue Type: Bug Components: Python API Affects Versions: 1.0.0 Reporter: Chesnay Schepler Priority: Minor Fix For: 1.0.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3115) Update Elasticsearch connector to 2.X
[ https://issues.apache.org/jira/browse/FLINK-3115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148622#comment-15148622 ] Robert Metzger commented on FLINK-3115: --- [~smarthi], can you give us an update on the progress with this issue? > Update Elasticsearch connector to 2.X > - > > Key: FLINK-3115 > URL: https://issues.apache.org/jira/browse/FLINK-3115 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Suneel Marthi > Fix For: 1.0.0 > > > The Elasticsearch connector is not up to date anymore. In version 2.X the API > changed. The code needs to be adapted. Probably it makes sense to have a new > class {{ElasticsearchSink2}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3415) TimestampExctractor accepts negative watermarks
Stephan Ewen created FLINK-3415: --- Summary: TimestampExctractor accepts negative watermarks Key: FLINK-3415 URL: https://issues.apache.org/jira/browse/FLINK-3415 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Priority: Critical Fix For: 1.0.0 When the timestamp extractor returns a negative value for a watermark, it is accepted, as long as it is larger than the previous negative value, with the initial reference value being Long.MIN_VALUE -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3304) AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode
[ https://issues.apache.org/jira/browse/FLINK-3304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-3304. --- Resolution: Fixed Fix Version/s: 1.0.0 Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/c658763d > AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode > -- > > Key: FLINK-3304 > URL: https://issues.apache.org/jira/browse/FLINK-3304 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.0, 0.10.1 >Reporter: Sebastian Klemke >Assignee: Klou > Fix For: 1.0.0 > > > Quoting flink cli (schema and names modified): > "The program finished with the following exception: > User-defined object org.apache.flink.api.java.io.AvroOutputFormat@5f253dfb > (org.apache.flink.api.java.io.AvroOutputFormat) contains non-serializable > field userDefinedSchema = > {"type":"record","name":"Pojo","namespace":"com.example","fields":[{"name":"id","type":["null","string"],"default":null,"subtype":"objectid"}],"EntityVersion":"0.1.0"} > > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper.(UserCodeObjectWrapper.java:84) > > org.apache.flink.api.common.operators.GenericDataSinkBase.(GenericDataSinkBase.java:68) > > org.apache.flink.api.java.operators.DataSink.translateToDataFlow(DataSink.java:258) > > org.apache.flink.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:64) > > org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49) > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:939) > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:907) > > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:57) > com.example.Tool.main(Tool.java:86) > Shutting down YARN cluster" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3304: Making the Avro Schema serializabl...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1635 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3304) AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode
[ https://issues.apache.org/jira/browse/FLINK-3304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148613#comment-15148613 ] ASF GitHub Bot commented on FLINK-3304: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1635 > AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode > -- > > Key: FLINK-3304 > URL: https://issues.apache.org/jira/browse/FLINK-3304 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.0, 0.10.1 >Reporter: Sebastian Klemke >Assignee: Klou > > Quoting flink cli (schema and names modified): > "The program finished with the following exception: > User-defined object org.apache.flink.api.java.io.AvroOutputFormat@5f253dfb > (org.apache.flink.api.java.io.AvroOutputFormat) contains non-serializable > field userDefinedSchema = > {"type":"record","name":"Pojo","namespace":"com.example","fields":[{"name":"id","type":["null","string"],"default":null,"subtype":"objectid"}],"EntityVersion":"0.1.0"} > > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper.(UserCodeObjectWrapper.java:84) > > org.apache.flink.api.common.operators.GenericDataSinkBase.(GenericDataSinkBase.java:68) > > org.apache.flink.api.java.operators.DataSink.translateToDataFlow(DataSink.java:258) > > org.apache.flink.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:64) > > org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49) > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:939) > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:907) > > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:57) > com.example.Tool.main(Tool.java:86) > Shutting down YARN cluster" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3304) AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode
[ https://issues.apache.org/jira/browse/FLINK-3304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148612#comment-15148612 ] ASF GitHub Bot commented on FLINK-3304: --- Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1635#issuecomment-184688675 Thanks a lot @rmetzger ! > AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode > -- > > Key: FLINK-3304 > URL: https://issues.apache.org/jira/browse/FLINK-3304 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.0, 0.10.1 >Reporter: Sebastian Klemke >Assignee: Klou > > Quoting flink cli (schema and names modified): > "The program finished with the following exception: > User-defined object org.apache.flink.api.java.io.AvroOutputFormat@5f253dfb > (org.apache.flink.api.java.io.AvroOutputFormat) contains non-serializable > field userDefinedSchema = > {"type":"record","name":"Pojo","namespace":"com.example","fields":[{"name":"id","type":["null","string"],"default":null,"subtype":"objectid"}],"EntityVersion":"0.1.0"} > > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper.(UserCodeObjectWrapper.java:84) > > org.apache.flink.api.common.operators.GenericDataSinkBase.(GenericDataSinkBase.java:68) > > org.apache.flink.api.java.operators.DataSink.translateToDataFlow(DataSink.java:258) > > org.apache.flink.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:64) > > org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49) > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:939) > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:907) > > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:57) > com.example.Tool.main(Tool.java:86) > Shutting down YARN cluster" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3304) AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode
[ https://issues.apache.org/jira/browse/FLINK-3304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148609#comment-15148609 ] ASF GitHub Bot commented on FLINK-3304: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1635#issuecomment-184688215 Merging ... > AvroOutputFormat.setSchema() doesn't work in yarn-cluster mode > -- > > Key: FLINK-3304 > URL: https://issues.apache.org/jira/browse/FLINK-3304 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.0, 0.10.1 >Reporter: Sebastian Klemke >Assignee: Klou > > Quoting flink cli (schema and names modified): > "The program finished with the following exception: > User-defined object org.apache.flink.api.java.io.AvroOutputFormat@5f253dfb > (org.apache.flink.api.java.io.AvroOutputFormat) contains non-serializable > field userDefinedSchema = > {"type":"record","name":"Pojo","namespace":"com.example","fields":[{"name":"id","type":["null","string"],"default":null,"subtype":"objectid"}],"EntityVersion":"0.1.0"} > > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper.(UserCodeObjectWrapper.java:84) > > org.apache.flink.api.common.operators.GenericDataSinkBase.(GenericDataSinkBase.java:68) > > org.apache.flink.api.java.operators.DataSink.translateToDataFlow(DataSink.java:258) > > org.apache.flink.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:64) > > org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49) > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:939) > > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:907) > > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:57) > com.example.Tool.main(Tool.java:86) > Shutting down YARN cluster" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3304: Making the Avro Schema serializabl...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1635#issuecomment-184688675 Thanks a lot @rmetzger ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3304: Making the Avro Schema serializabl...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1635#issuecomment-184688215 Merging ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3315] Fix Slot Sharing in Streaming API
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53010068 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -202,6 +203,29 @@ public String getUid() { } /** +* Returns the slot sharing group of this transformation. +* +* @see #setSlotSharingGroup(String) +*/ + public String getSlotSharingGroup() { + return slotSharingGroup; + } + + /** +* Sets the slot sharing group of this transformation. Parallels instances of operations that --- End diff -- Parallels typo (also in other variants of this method) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148596#comment-15148596 ] ASF GitHub Bot commented on FLINK-3315: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1641#discussion_r53010068 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java --- @@ -202,6 +203,29 @@ public String getUid() { } /** +* Returns the slot sharing group of this transformation. +* +* @see #setSlotSharingGroup(String) +*/ + public String getSlotSharingGroup() { + return slotSharingGroup; + } + + /** +* Sets the slot sharing group of this transformation. Parallels instances of operations that --- End diff -- Parallels typo (also in other variants of this method) > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3299) Remove ApplicationID from Environment
[ https://issues.apache.org/jira/browse/FLINK-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148586#comment-15148586 ] ASF GitHub Bot commented on FLINK-3299: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1642#issuecomment-184683583 In general the DbStateBackend should not misbehave under the current assumptions. What Ufuk means is that the DbStateBackend does not make any assumption about the job parallelism if using the App id (we could change the job parallelism). This will not hold now but it does not matter as we don't support this feature yet. > Remove ApplicationID from Environment > - > > Key: FLINK-3299 > URL: https://issues.apache.org/jira/browse/FLINK-3299 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > {{ApplicationID}} is used to identify an application across many job > submissions (for example after restoring from a savepoint). This is currently > exposed in the {{Environment}}, which might be unnecessary. > State backends, which need the ID can generate it themselves and store it as > part of their state handle. > This has to be checked with the DB state backend, which currently uses this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3299] Remove ApplicationID from Environ...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1642#issuecomment-184683583 In general the DbStateBackend should not misbehave under the current assumptions. What Ufuk means is that the DbStateBackend does not make any assumption about the job parallelism if using the App id (we could change the job parallelism). This will not hold now but it does not matter as we don't support this feature yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3299) Remove ApplicationID from Environment
[ https://issues.apache.org/jira/browse/FLINK-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148560#comment-15148560 ] ASF GitHub Bot commented on FLINK-3299: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1642#issuecomment-184677192 What would that corner case be? > Remove ApplicationID from Environment > - > > Key: FLINK-3299 > URL: https://issues.apache.org/jira/browse/FLINK-3299 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > {{ApplicationID}} is used to identify an application across many job > submissions (for example after restoring from a savepoint). This is currently > exposed in the {{Environment}}, which might be unnecessary. > State backends, which need the ID can generate it themselves and store it as > part of their state handle. > This has to be checked with the DB state backend, which currently uses this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3299] Remove ApplicationID from Environ...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1642#issuecomment-184677192 What would that corner case be? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3299) Remove ApplicationID from Environment
[ https://issues.apache.org/jira/browse/FLINK-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148558#comment-15148558 ] ASF GitHub Bot commented on FLINK-3299: --- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/1642 [FLINK-3299] Remove ApplicationID from Environment As per discussion in the issue, we decided to remove the the `ApplicationID`. Replaces the app ID in RocksDB and DB backend with job ID. I've talked to @gyfora about the DB backend and he mentioned a possible problem with savepoints in certain corner cases, but all in all the change should be OK. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 3299-app_id Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1642.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1642 commit f88e4d540b71f9a4e683b44a030ad6b267f132cb Author: Ufuk Celebi Date: 2016-02-11T15:04:08Z [FLINK-3299] Remove ApplicationID from Environment > Remove ApplicationID from Environment > - > > Key: FLINK-3299 > URL: https://issues.apache.org/jira/browse/FLINK-3299 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > {{ApplicationID}} is used to identify an application across many job > submissions (for example after restoring from a savepoint). This is currently > exposed in the {{Environment}}, which might be unnecessary. > State backends, which need the ID can generate it themselves and store it as > part of their state handle. > This has to be checked with the DB state backend, which currently uses this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3299] Remove ApplicationID from Environ...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/1642 [FLINK-3299] Remove ApplicationID from Environment As per discussion in the issue, we decided to remove the the `ApplicationID`. Replaces the app ID in RocksDB and DB backend with job ID. I've talked to @gyfora about the DB backend and he mentioned a possible problem with savepoints in certain corner cases, but all in all the change should be OK. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 3299-app_id Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1642.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1642 commit f88e4d540b71f9a4e683b44a030ad6b267f132cb Author: Ufuk Celebi Date: 2016-02-11T15:04:08Z [FLINK-3299] Remove ApplicationID from Environment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3315) Fix Slot Sharing in Streaming API
[ https://issues.apache.org/jira/browse/FLINK-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148548#comment-15148548 ] ASF GitHub Bot commented on FLINK-3315: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1641 [FLINK-3315] Fix Slot Sharing in Streaming API This changes slot sharing settings to single method slotSharingGroup(String) on DataStream. Operations inherit the slot sharing group of the input if all input operations are in the same slot sharing group. The default slot sharing group is "default" this can also be explicitly set using slotSharingGroup("default"). This overrides the inheriting behaviour. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink slotsharing-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1641.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1641 commit f9bd1d25b26e639318dc737c3f7a1ce75df445fe Author: Aljoscha Krettek Date: 2016-02-02T12:11:12Z [FLINK-3315] Fix Slot Sharing in Streaming API This changes slot sharing settings to single method slotSharingGroup(String) on DataStream. Operations inherit the slot sharing group of the input if all input operations are in the same slot sharing group. The default slot sharing group is "default" this can also be explicitly set using slotSharingGroup("default"). This overrides the inheriting behaviour. > Fix Slot Sharing in Streaming API > - > > Key: FLINK-3315 > URL: https://issues.apache.org/jira/browse/FLINK-3315 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, the slot sharing/resource group logic is a bit "nebulous". The > slot sharing group that operators are put in depends on the order in which > operations are created. For example, in this case: > {code} > Source a = env.source() > Source b = env.source() > a.map().startNewResourceGroup().sink() > b.map().sink() > {code} > We end up with two resource groups: > - group 1: source a > - group 2: map(), sink(), source b, map(), sink() > The reason is that the slot sharing id is incremented when transforming the > {{startNewResouceGroup()}} call and all operators that are transformed > afterwards in graph traversal get that new slot sharing id. > (There is also {{isolateResources()}} which can be used to isolate an > operator.) > What I propose is to remove {{startNewResourceGroup()}} and > {{isolateResouces()}} and replace it with {{slotSharingGroup(String)}}. By > default, operations would be in slot sharing group "default". This allows > very fine grained control over what operators end up in which slot sharing > group. For example, I could have this topology: > {code} > Source a = env.source().slotSharingGroup("sources") > Source b = env.source().slotSharingGroup("sources") > a.map().slotSharingGroup("heavy a").sink().slotSharingGroup("sinks") > b.map().slotSharingGroup("heavy b").sink().slotSharingGroup("sinks") > {code} > Which would isolate the lightweight sources and sinks in a group and put > heavy operations inside their own slot groups. > This is a bit more low level than the previous API and requires more calls > than a simple {{startNewResourceGroup()}} but I think not many people would > use this feature and this design makes it very clear what operations end up > in the same group. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3315] Fix Slot Sharing in Streaming API
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1641 [FLINK-3315] Fix Slot Sharing in Streaming API This changes slot sharing settings to single method slotSharingGroup(String) on DataStream. Operations inherit the slot sharing group of the input if all input operations are in the same slot sharing group. The default slot sharing group is "default" this can also be explicitly set using slotSharingGroup("default"). This overrides the inheriting behaviour. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink slotsharing-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1641.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1641 commit f9bd1d25b26e639318dc737c3f7a1ce75df445fe Author: Aljoscha Krettek Date: 2016-02-02T12:11:12Z [FLINK-3315] Fix Slot Sharing in Streaming API This changes slot sharing settings to single method slotSharingGroup(String) on DataStream. Operations inherit the slot sharing group of the input if all input operations are in the same slot sharing group. The default slot sharing group is "default" this can also be explicitly set using slotSharingGroup("default"). This overrides the inheriting behaviour. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148547#comment-15148547 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1634#issuecomment-184672115 Merged, please close @twalthr > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3226] Casting support for arithmetic op...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1634#issuecomment-184672115 Merged, please close @twalthr --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148546#comment-15148546 ] ASF GitHub Bot commented on FLINK-3226: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1639#issuecomment-184672051 Merged, please close @twalthr > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3226] Translation of explicit casting
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1639#issuecomment-184672051 Merged, please close @twalthr --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream
[ https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148498#comment-15148498 ] Stefano Baghino commented on FLINK-3412: I like the approach suggested by [~till.rohrmann], I've made a prototype here: [https://github.com/radicalbit/flink/commit/bff5870d5578de9d1aaffc648cacffac79da81a3] > Remove implicit conversions JavaStream / ScalaStream > > > Key: FLINK-3412 > URL: https://issues.apache.org/jira/browse/FLINK-3412 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.10.2 >Reporter: Stephan Ewen > Fix For: 1.0.0 > > > I think the implicit conversions between the Java DataStream and the Scala > DataStream are dangerous. > Because conversions exist in both directions, it is possible to write methods > that look like calling functions on the JavaStream, but instead convert it to > a Scala stream and call a different method. > I just accidentally implemented an infinite recursion that way (via two > hidden implicit conversions). > Making the conversions explicit (with a {{wrap()}} function like in the batch > API, we add minimally more code internally (nothing is different for users), > but avoid such accidental errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed
[ https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148495#comment-15148495 ] ASF GitHub Bot commented on FLINK-3396: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r53000913 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- Regarding the `JobSubmitSuccess`: we had it as a follow up to have more fine-grained integration with the the client and left it as a duplicate submit message for the time being (instead of something like `JobRecovered`). The other behaviour is back to the previous state now. I hear you that it makes sense to integrate the state restore behaviour with the execution graph restart. > Job submission Savepoint restore logic flawed > - > > Key: FLINK-3396 > URL: https://issues.apache.org/jira/browse/FLINK-3396 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > When savepoint restoring fails, the thrown Exception fails the execution > graph, but the client is not informed about the failure. > The expected behaviour is that the submission should be acked with success or > failure in any case. With savepoint restore failures, the ack message will be > skipped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r53000913 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- Regarding the `JobSubmitSuccess`: we had it as a follow up to have more fine-grained integration with the the client and left it as a duplicate submit message for the time being (instead of something like `JobRecovered`). The other behaviour is back to the previous state now. I hear you that it makes sense to integrate the state restore behaviour with the execution graph restart. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed
[ https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148485#comment-15148485 ] ASF GitHub Bot commented on FLINK-3396: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52999704 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1079,6 +1079,9 @@ class JobManager( executionGraph.registerExecutionListener(gateway) executionGraph.registerJobStatusListener(gateway) } + +// All good. Submission succeeded! +jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) --- End diff -- Moved this one up to have correct ACKing behaviour. > Job submission Savepoint restore logic flawed > - > > Key: FLINK-3396 > URL: https://issues.apache.org/jira/browse/FLINK-3396 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > When savepoint restoring fails, the thrown Exception fails the execution > graph, but the client is not informed about the failure. > The expected behaviour is that the submission should be acked with success or > failure in any case. With savepoint restore failures, the ack message will be > skipped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52999704 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1079,6 +1079,9 @@ class JobManager( executionGraph.registerExecutionListener(gateway) executionGraph.registerJobStatusListener(gateway) } + +// All good. Submission succeeded! +jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) --- End diff -- Moved this one up to have correct ACKing behaviour. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed
[ https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148480#comment-15148480 ] ASF GitHub Bot commented on FLINK-3396: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52998947 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- Had an offline discussion with Stephan. He agrees with you that the failure in this case is too hard. I'll undo that change by ACK'ing the submission earlier. > Job submission Savepoint restore logic flawed > - > > Key: FLINK-3396 > URL: https://issues.apache.org/jira/browse/FLINK-3396 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > When savepoint restoring fails, the thrown Exception fails the execution > graph, but the client is not informed about the failure. > The expected behaviour is that the submission should be acked with success or > failure in any case. With savepoint restore failures, the ack message will be > skipped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52998947 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- Had an offline discussion with Stephan. He agrees with you that the failure in this case is too hard. I'll undo that change by ACK'ing the submission earlier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3296) DataStream.write*() methods are not flushing properly
[ https://issues.apache.org/jira/browse/FLINK-3296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148479#comment-15148479 ] ASF GitHub Bot commented on FLINK-3296: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1563#issuecomment-184642511 I renamed the method to `writeUsingOutputFormat` and rebased to current master. > DataStream.write*() methods are not flushing properly > - > > Key: FLINK-3296 > URL: https://issues.apache.org/jira/browse/FLINK-3296 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Critical > > The DataStream.write() methods rely on the {{FileSinkFunctionByMillis}} > class, which has a logic for flushing records, even though the underlying > stream is never flushed. This is misleading for users as files are not > written as they would expect it. > The code was initial written with FileOutputFormats in mind, but the types > were not set correctly. This PR opened the write() method to any output > format: https://github.com/apache/flink/pull/706/files -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3296] Remove 'flushing' behavior of the...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1563#issuecomment-184642511 I renamed the method to `writeUsingOutputFormat` and rebased to current master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed
[ https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148478#comment-15148478 ] ASF GitHub Bot commented on FLINK-3396: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52998567 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- The behaviour right now for a failure while doing a job recovery would simply fail the `ExecutionGraph` triggering a restart. A successful job recovery would send a `JobSubmitSuccess` to the client. I'm not sure whether this is actually correct, since the client already received a `JobSubmitMessage` from the `JobManager` while initially submitting the job. But I think this will simply be ignored. Thus, suppressing the restart behaviour in case of a job recovery would actually change the behaviour. If it makes sense and if it is possible to recover from failures while recovering a job or restoring a savepoint, it would make sense to not directly fail the job without restarting. Maybe one should distinguish that based on the actually occurring exception. > Job submission Savepoint restore logic flawed > - > > Key: FLINK-3396 > URL: https://issues.apache.org/jira/browse/FLINK-3396 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > When savepoint restoring fails, the thrown Exception fails the execution > graph, but the client is not informed about the failure. > The expected behaviour is that the submission should be acked with success or > failure in any case. With savepoint restore failures, the ack message will be > skipped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52998567 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- The behaviour right now for a failure while doing a job recovery would simply fail the `ExecutionGraph` triggering a restart. A successful job recovery would send a `JobSubmitSuccess` to the client. I'm not sure whether this is actually correct, since the client already received a `JobSubmitMessage` from the `JobManager` while initially submitting the job. But I think this will simply be ignored. Thus, suppressing the restart behaviour in case of a job recovery would actually change the behaviour. If it makes sense and if it is possible to recover from failures while recovering a job or restoring a savepoint, it would make sense to not directly fail the job without restarting. Maybe one should distinguish that based on the actually occurring exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52998526 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); --- End diff -- I'd just add a try{...}catch{Exception e} block and log that stuff. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148477#comment-15148477 ] ASF GitHub Bot commented on FLINK-3332: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52998526 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); --- End diff -- I'd just add a try{...}catch{Exception e} block and log that stuff. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream
[ https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148470#comment-15148470 ] Robert Metzger commented on FLINK-3412: --- +1 I ran into this issue as well. > Remove implicit conversions JavaStream / ScalaStream > > > Key: FLINK-3412 > URL: https://issues.apache.org/jira/browse/FLINK-3412 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.10.2 >Reporter: Stephan Ewen > Fix For: 1.0.0 > > > I think the implicit conversions between the Java DataStream and the Scala > DataStream are dangerous. > Because conversions exist in both directions, it is possible to write methods > that look like calling functions on the JavaStream, but instead convert it to > a Scala stream and call a different method. > I just accidentally implemented an infinite recursion that way (via two > hidden implicit conversions). > Making the conversions explicit (with a {{wrap()}} function like in the batch > API, we add minimally more code internally (nothing is different for users), > but avoid such accidental errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148469#comment-15148469 ] ASF GitHub Bot commented on FLINK-3332: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52997627 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); --- End diff -- I would add null checks for session and cluster > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148468#comment-15148468 ] ASF GitHub Bot commented on FLINK-3332: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52997588 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); --- End diff -- You are right. Close is called when open fails > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52997627 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); --- End diff -- I would add null checks for session and cluster --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52997588 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); --- End diff -- You are right. Close is called when open fails --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed
[ https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148466#comment-15148466 ] ASF GitHub Bot commented on FLINK-3396: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52997375 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- But then I would not keep the behaviour as it is right now. Instead, we should then consider the job submitted before trying to recover any checkpoint state and keep the restart behaviour. What do you think? > Job submission Savepoint restore logic flawed > - > > Key: FLINK-3396 > URL: https://issues.apache.org/jira/browse/FLINK-3396 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > When savepoint restoring fails, the thrown Exception fails the execution > graph, but the client is not informed about the failure. > The expected behaviour is that the submission should be acked with success or > failure in any case. With savepoint restore failures, the ack message will be > skipped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52997375 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- But then I would not keep the behaviour as it is right now. Instead, we should then consider the job submitted before trying to recover any checkpoint state and keep the restart behaviour. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148456#comment-15148456 ] ASF GitHub Bot commented on FLINK-3332: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995692 --- Diff: docs/apis/streaming/fault_tolerance.md --- @@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks: +Cassandra sink +exactly once --- End diff -- which is also not true. a flink failure while data is being written to cassandra will cause duplicates. you can only say this if writing the data to the final table is completely handled by cassandra (for example by writing into a temporary table, exporting it to csv and importing into the target table; the only way for duplicates is if cassandra fails while importing). > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995692 --- Diff: docs/apis/streaming/fault_tolerance.md --- @@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks: +Cassandra sink +exactly once --- End diff -- which is also not true. a flink failure while data is being written to cassandra will cause duplicates. you can only say this if writing the data to the final table is completely handled by cassandra (for example by writing into a temporary table, exporting it to csv and importing into the target table; the only way for duplicates is if cassandra fails while importing). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995595 --- Diff: docs/apis/streaming/fault_tolerance.md --- @@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks: +Cassandra sink +exactly once --- End diff -- I mean an at most once system will give you also exactly once processing guarantees under the assumption that nothing fails. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148455#comment-15148455 ] ASF GitHub Bot commented on FLINK-3332: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995595 --- Diff: docs/apis/streaming/fault_tolerance.md --- @@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks: +Cassandra sink +exactly once --- End diff -- I mean an at most once system will give you also exactly once processing guarantees under the assumption that nothing fails. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148453#comment-15148453 ] ASF GitHub Bot commented on FLINK-3332: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995525 --- Diff: docs/apis/streaming/fault_tolerance.md --- @@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks: +Cassandra sink +exactly once --- End diff -- That's what is defined as at least once. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995525 --- Diff: docs/apis/streaming/fault_tolerance.md --- @@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks: +Cassandra sink +exactly once --- End diff -- That's what is defined as at least once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148450#comment-15148450 ] ASF GitHub Bot commented on FLINK-3332: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995374 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); + } + preparedStatement = session.prepare(insertQuery); + } + + @Override + protected void sendValue(Iterable values) throws Exception { + //verify that no query failed until now + if (exception != null) { + throw new Exception(exception); --- End diff -- why is close() not called for a failing operation? > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flin
[jira] [Created] (FLINK-3414) Add Scala API for CEP's pattern definition
Till Rohrmann created FLINK-3414: Summary: Add Scala API for CEP's pattern definition Key: FLINK-3414 URL: https://issues.apache.org/jira/browse/FLINK-3414 Project: Flink Issue Type: Improvement Components: CEP Affects Versions: 1.0.0 Reporter: Till Rohrmann Priority: Minor Currently, the CEP library only supports a Java API to specify complex event patterns. In order to make it a bit less verbose for Scala users, it would be nice to also add a Scala API for the CEP library. A Scala API would also allow to pass Scala's anonymous functions as filter conditions or as a select function, for example, or to use partial functions to distinguish between different events. Furthermore, the Scala API could be designed to feel a bit more like a DSL: {code} begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || "middle_2" where _.name equals "foobar" -> "end" where x => x.id <= x.volume {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148451#comment-15148451 ] ASF GitHub Bot commented on FLINK-3332: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995438 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private final String host; + private final String keyspace; + private final String table; + + private transient Cluster cluster; + private transient Session session; + + public CassandraCommitter(String host, String keyspace, String table) { + this.host = host; + this.keyspace = keyspace; + this.table = table; + } + + @Override + public void open() throws Exception { + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + + session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " with replication={'class':'SimpleStrategy', 'replication_factor':3};"); + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));"); + session.executeAsync("INSERT INTO " + keyspace + "." + table + " (sink_id, sub_id, checkpoint_id) values ('" + operatorId + "', " + subtaskId + ", " + -1 + ");"); + } + + @Override + public void close() throws Exception { + session.executeAsync("DELETE FROM " + keyspace + "." + table + " where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";"); + session.close(); + cluster.close(); + } + + @Override + public void commitCheckpoint(long checkpointID) { + SimpleStatement s = new SimpleStatement("UPDATE " + keyspace + "." + table + " set checkpoint_id=" + checkpointID + " where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";"); + s.setConsistencyLevel(ConsistencyLevel.ALL); + session.executeAsync(s); --- End diff -- good point. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995438 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private final String host; + private final String keyspace; + private final String table; + + private transient Cluster cluster; + private transient Session session; + + public CassandraCommitter(String host, String keyspace, String table) { + this.host = host; + this.keyspace = keyspace; + this.table = table; + } + + @Override + public void open() throws Exception { + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + + session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " with replication={'class':'SimpleStrategy', 'replication_factor':3};"); + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));"); + session.executeAsync("INSERT INTO " + keyspace + "." + table + " (sink_id, sub_id, checkpoint_id) values ('" + operatorId + "', " + subtaskId + ", " + -1 + ");"); + } + + @Override + public void close() throws Exception { + session.executeAsync("DELETE FROM " + keyspace + "." + table + " where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";"); + session.close(); + cluster.close(); + } + + @Override + public void commitCheckpoint(long checkpointID) { + SimpleStatement s = new SimpleStatement("UPDATE " + keyspace + "." + table + " set checkpoint_id=" + checkpointID + " where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";"); + s.setConsistencyLevel(ConsistencyLevel.ALL); + session.executeAsync(s); --- End diff -- good point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995374 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); + } + preparedStatement = session.prepare(insertQuery); + } + + @Override + protected void sendValue(Iterable values) throws Exception { + //verify that no query failed until now + if (exception != null) { + throw new Exception(exception); --- End diff -- why is close() not called for a failing operation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2021) Rework examples to use ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148448#comment-15148448 ] ASF GitHub Bot commented on FLINK-2021: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1581#issuecomment-184624521 The `testDetachedPerJobYarnCluster()` YARN test is failing with ``` 10:33:26,969 INFO org.apache.flink.client.CliFrontend - Starting execution of program 10:33:26,969 INFO org.apache.flink.client.program.Client - Starting program in interactive mode 10:33:26,971 ERROR org.apache.flink.client.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runDetached(Client.java:277) at org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:774) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:320) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112) at org.apache.flink.yarn.YarnTestBase$Runner.run(YarnTestBase.java:565) Caused by: java.lang.RuntimeException: Error parsing arguments '[/tmp/junit2827618452146338513/junit4708065236621010716.tmp, /tmp/junit2827618452146338513/junit2411517217475739253]' on '/tmp/junit2827618452146338513/junit4708065236621010716.tmp'. Unexpected value. Please prefix values with -- or -. at org.apache.flink.api.java.utils.ParameterTool.fromArgs(ParameterTool.java:107) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) ... 6 more 10:33:26,974 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. ``` > Rework examples to use ParameterTool > > > Key: FLINK-2021 > URL: https://issues.apache.org/jira/browse/FLINK-2021 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Stefano Baghino >Priority: Minor > Labels: starter > > In FLINK-1525, we introduced the {{ParameterTool}}. > We should port the examples to use the tool. > The examples could look like this (we should maybe discuss it first on the > mailing lists): > {code} > public static void main(String[] args) throws Exception { > ParameterTool pt = ParameterTool.fromArgs(args); > boolean fileOutput = pt.getNumberOfParameters() == 2; > String textPath = null; > String outputPath = null; > if(fileOutput) { > textPath = pt.getRequired("input"); > outputPath = pt.getRequired("output"); > } > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setUserConfig(pt); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148447#comment-15148447 ] ASF GitHub Bot commented on FLINK-3332: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995123 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); --- End diff -- does that mean that close() is not called when an operation fails in open()? > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to fir
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52995123 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); --- End diff -- does that mean that close() is not called when an operation fails in open()? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2021] Rework examples to use ParameterT...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1581#issuecomment-184624521 The `testDetachedPerJobYarnCluster()` YARN test is failing with ``` 10:33:26,969 INFO org.apache.flink.client.CliFrontend - Starting execution of program 10:33:26,969 INFO org.apache.flink.client.program.Client - Starting program in interactive mode 10:33:26,971 ERROR org.apache.flink.client.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runDetached(Client.java:277) at org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:774) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:320) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112) at org.apache.flink.yarn.YarnTestBase$Runner.run(YarnTestBase.java:565) Caused by: java.lang.RuntimeException: Error parsing arguments '[/tmp/junit2827618452146338513/junit4708065236621010716.tmp, /tmp/junit2827618452146338513/junit2411517217475739253]' on '/tmp/junit2827618452146338513/junit4708065236621010716.tmp'. Unexpected value. Please prefix values with -- or -. at org.apache.flink.api.java.utils.ParameterTool.fromArgs(ParameterTool.java:107) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) ... 6 more 10:33:26,974 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148442#comment-15148442 ] ASF GitHub Bot commented on FLINK-3332: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994814 --- Diff: docs/apis/streaming/fault_tolerance.md --- @@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks: +Cassandra sink +exactly once --- End diff -- I would try to summarize the guarantees here. Or maybe add an asterisk to the "exactly once", stating that Cassandra has no support for large atomic inserts, but as long as cassandra is not failing, Flink failures will not cause duplicates. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994814 --- Diff: docs/apis/streaming/fault_tolerance.md --- @@ -176,6 +176,11 @@ state updates) of Flink coupled with bundled sinks: +Cassandra sink +exactly once --- End diff -- I would try to summarize the guarantees here. Or maybe add an asterisk to the "exactly once", stating that Cassandra has no support for large atomic inserts, but as long as cassandra is not failing, Flink failures will not cause duplicates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148439#comment-15148439 ] ASF GitHub Bot commented on FLINK-3332: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994610 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java --- @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +/** + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly-once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public abstract class GenericExactlyOnceSink extends AbstractStreamOperator implements OneInputStreamOperator { + private transient AbstractStateBackend.CheckpointStateOutputView out; + private TypeSerializer serializer; + protected transient TypeInformation typeInfo; + protected final CheckpointCommitter committer; + protected final String id; + + private ExactlyOnceState state = new ExactlyOnceState(); + + public GenericExactlyOnceSink(CheckpointCommitter committer) { + if (committer == null) { + throw new IllegalArgumentException("CheckpointCommitter argument must not be null."); + } + this.committer = committer; + this.id = UUID.randomUUID().toString(); + } + + @Override + public void open() throws Exception { + committer.setOperatorId(id); + committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask()); + committer.open(); + } + + public void close() throws Exception { + committer.close(); + } + + /** +* Saves a handle in the state. +* +* @param checkpointId +* @throws IOException +*/ + private void saveHandleInState(final long checkpointId) throws IOException { + //only add handle if a new OperatorState was created since the last snapshot + if (out != null) { + StateHandle handle = out.closeAndGetHandle(); + state.pendingHandles.put(checkpointId, handle); + out = null; + } + } + + @Override + public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception { + Stream
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994610 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java --- @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +/** + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly-once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public abstract class GenericExactlyOnceSink extends AbstractStreamOperator implements OneInputStreamOperator { + private transient AbstractStateBackend.CheckpointStateOutputView out; + private TypeSerializer serializer; + protected transient TypeInformation typeInfo; + protected final CheckpointCommitter committer; + protected final String id; + + private ExactlyOnceState state = new ExactlyOnceState(); + + public GenericExactlyOnceSink(CheckpointCommitter committer) { + if (committer == null) { + throw new IllegalArgumentException("CheckpointCommitter argument must not be null."); + } + this.committer = committer; + this.id = UUID.randomUUID().toString(); + } + + @Override + public void open() throws Exception { + committer.setOperatorId(id); + committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask()); + committer.open(); + } + + public void close() throws Exception { + committer.close(); + } + + /** +* Saves a handle in the state. +* +* @param checkpointId +* @throws IOException +*/ + private void saveHandleInState(final long checkpointId) throws IOException { + //only add handle if a new OperatorState was created since the last snapshot + if (out != null) { + StateHandle handle = out.closeAndGetHandle(); + state.pendingHandles.put(checkpointId, handle); + out = null; + } + } + + @Override + public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + saveHandleInState(checkpointId); + taskState.setFunctionState(state); + return taskState; + } + + @Override + pub
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994409 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private final String host; + private final String keyspace; + private final String table; + + private transient Cluster cluster; + private transient Session session; + + public CassandraCommitter(String host, String keyspace, String table) { + this.host = host; + this.keyspace = keyspace; + this.table = table; + } + + @Override + public void open() throws Exception { + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + + session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " with replication={'class':'SimpleStrategy', 'replication_factor':3};"); + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));"); + session.executeAsync("INSERT INTO " + keyspace + "." + table + " (sink_id, sub_id, checkpoint_id) values ('" + operatorId + "', " + subtaskId + ", " + -1 + ");"); + } + + @Override + public void close() throws Exception { + session.executeAsync("DELETE FROM " + keyspace + "." + table + " where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";"); + session.close(); + cluster.close(); + } + + @Override + public void commitCheckpoint(long checkpointID) { + SimpleStatement s = new SimpleStatement("UPDATE " + keyspace + "." + table + " set checkpoint_id=" + checkpointID + " where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";"); + s.setConsistencyLevel(ConsistencyLevel.ALL); + session.executeAsync(s); --- End diff -- Why do you execute this asynchronously ? Doesn't this mean we'll never learn about errors? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148437#comment-15148437 ] ASF GitHub Bot commented on FLINK-3332: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994409 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private final String host; + private final String keyspace; + private final String table; + + private transient Cluster cluster; + private transient Session session; + + public CassandraCommitter(String host, String keyspace, String table) { + this.host = host; + this.keyspace = keyspace; + this.table = table; + } + + @Override + public void open() throws Exception { + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + + session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " with replication={'class':'SimpleStrategy', 'replication_factor':3};"); + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));"); + session.executeAsync("INSERT INTO " + keyspace + "." + table + " (sink_id, sub_id, checkpoint_id) values ('" + operatorId + "', " + subtaskId + ", " + -1 + ");"); + } + + @Override + public void close() throws Exception { + session.executeAsync("DELETE FROM " + keyspace + "." + table + " where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";"); + session.close(); + cluster.close(); + } + + @Override + public void commitCheckpoint(long checkpointID) { + SimpleStatement s = new SimpleStatement("UPDATE " + keyspace + "." + table + " set checkpoint_id=" + checkpointID + " where sink_id='" + operatorId + "' and sub_id=" + subtaskId + ";"); + s.setConsistencyLevel(ConsistencyLevel.ALL); + session.executeAsync(s); --- End diff -- Why do you execute this asynchronously ? Doesn't this mean we'll never learn about errors? > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148435#comment-15148435 ] ASF GitHub Bot commented on FLINK-3332: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994316 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { --- End diff -- I think the entire class should use try {} finally{} blocks to properly close connections on failures. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994316 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { --- End diff -- I think the entire class should use try {} finally{} blocks to properly close connections on failures. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994189 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); --- End diff -- Can the execute() throw an exception? If so, it leaves session and cluster open. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148433#comment-15148433 ] ASF GitHub Bot commented on FLINK-3332: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994189 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); --- End diff -- Can the execute() throw an exception? If so, it leaves session and cluster open. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148432#comment-15148432 ] ASF GitHub Bot commented on FLINK-3332: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994037 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); + } + preparedStatement = session.prepare(insertQuery); + } + + @Override + protected void sendValue(Iterable values) throws Exception { + //verify that no query failed until now + if (exception != null) { + throw new Exception(exception); --- End diff -- This will leave the cluster and session open. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink >
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52994037 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); + } + preparedStatement = session.prepare(insertQuery); + } + + @Override + protected void sendValue(Iterable values) throws Exception { + //verify that no query failed until now + if (exception != null) { + throw new Exception(exception); --- End diff -- This will leave the cluster and session open. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52993965 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); + } + preparedStatement = session.prepare(insertQuery); + } + + @Override + protected void sendValue(Iterable values) throws Exception { + //verify that no query failed until now + if (exception != null) { + throw new Exception(exception); + } + //set values for prepared statement + for (IN value : values) { + Object[] fields = new Object[value.getArity()]; + for (int x = 0; x < value.getArity(); x++) { + fields[x] = value.getField(x); + } + //insert values and send to cassandra + ResultSetFuture result = session.executeAsync(preparedStatement.bind(fields)); + //add callback to detect errors
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148430#comment-15148430 ] ASF GitHub Bot commented on FLINK-3332: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52993965 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraExactlyOnceSink.java --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericExactlyOnceSink; + +/** + * Sink that emits its input elements into a Cassandra database. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraExactlyOnceSink extends GenericExactlyOnceSink { + private final String host; + private final String createQuery; + private final String insertQuery; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + + public CassandraExactlyOnceSink(String host, String insertQuery, CheckpointCommitter committer) { + this(host, null, insertQuery, committer); + } + + public CassandraExactlyOnceSink(String host, String createQuery, String insertQuery, CheckpointCommitter committer) { + super(committer); + if (host == null) { + throw new IllegalArgumentException("Host argument must not be null."); + } + if (insertQuery == null) { + throw new IllegalArgumentException("Insert query argument must not be null."); + } + this.host = host; + this.createQuery = createQuery; + this.insertQuery = insertQuery; + } + + @Override + public void close() throws Exception { + super.close(); + session.close(); + cluster.close(); + } + + @Override + public void open() throws Exception { + super.open(); + cluster = Cluster.builder().addContactPoint(host).build(); + session = cluster.connect(); + if (createQuery != null) { + session.execute(createQuery); + } + preparedStatement = session.prepare(insertQuery); + } + + @Override + protected void sendValue(Iterable values) throws Exception { + //verify that no query failed until now + if (exception != null) { + throw new Exception(exception); + } + //set values for prepared statement + for (IN value : values) { + Object[] fields = new Object[value.getArity()]; + for (int x = 0; x < value.getArity(); x++) { + fields[x] = value.getF
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148424#comment-15148424 ] ASF GitHub Bot commented on FLINK-3332: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52993497 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java --- @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +/** + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly-once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public abstract class GenericExactlyOnceSink extends AbstractStreamOperator implements OneInputStreamOperator { + private transient AbstractStateBackend.CheckpointStateOutputView out; + private TypeSerializer serializer; + protected transient TypeInformation typeInfo; + protected final CheckpointCommitter committer; + protected final String id; + + private ExactlyOnceState state = new ExactlyOnceState(); + + public GenericExactlyOnceSink(CheckpointCommitter committer) { + if (committer == null) { + throw new IllegalArgumentException("CheckpointCommitter argument must not be null."); + } + this.committer = committer; + this.id = UUID.randomUUID().toString(); + } + + @Override + public void open() throws Exception { + committer.setOperatorId(id); + committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask()); + committer.open(); + } + + public void close() throws Exception { + committer.close(); + } + + /** +* Saves a handle in the state. +* +* @param checkpointId +* @throws IOException +*/ + private void saveHandleInState(final long checkpointId) throws IOException { + //only add handle if a new OperatorState was created since the last snapshot + if (out != null) { + StateHandle handle = out.closeAndGetHandle(); + state.pendingHandles.put(checkpointId, handle); + out = null; + } + } + + @Override + public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception { +
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52993497 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java --- @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +/** + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly-once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public abstract class GenericExactlyOnceSink extends AbstractStreamOperator implements OneInputStreamOperator { + private transient AbstractStateBackend.CheckpointStateOutputView out; + private TypeSerializer serializer; + protected transient TypeInformation typeInfo; + protected final CheckpointCommitter committer; + protected final String id; + + private ExactlyOnceState state = new ExactlyOnceState(); + + public GenericExactlyOnceSink(CheckpointCommitter committer) { + if (committer == null) { + throw new IllegalArgumentException("CheckpointCommitter argument must not be null."); + } + this.committer = committer; + this.id = UUID.randomUUID().toString(); + } + + @Override + public void open() throws Exception { + committer.setOperatorId(id); + committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask()); + committer.open(); + } + + public void close() throws Exception { + committer.close(); + } + + /** +* Saves a handle in the state. +* +* @param checkpointId +* @throws IOException +*/ + private void saveHandleInState(final long checkpointId) throws IOException { + //only add handle if a new OperatorState was created since the last snapshot + if (out != null) { + StateHandle handle = out.closeAndGetHandle(); + state.pendingHandles.put(checkpointId, handle); + out = null; + } + } + + @Override + public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + saveHandleInState(checkpointId); + taskState.setFunctionState(state); + return taskState; + } + + @Override +
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148423#comment-15148423 ] ASF GitHub Bot commented on FLINK-3332: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52993461 --- Diff: docs/apis/streaming/connectors/cassandra.md --- @@ -0,0 +1,100 @@ +--- +title: "Apache Cassandra Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 1 +sub-nav-title: Cassandra +--- + + +This connector provides a sink that writes data into a [Cassandra](https://cassandra.apache.org/) database. + +The Flink Cassandra sink integrates with Flink's checkpointing mechanism to provide +exactly-once processing semantics. To achieve that, Flink buffers incoming records +and commits them only when a checkpoint completes. + +To use this connector, add the following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-cassandra{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing RabbitMQ --- End diff -- whoops. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52993461 --- Diff: docs/apis/streaming/connectors/cassandra.md --- @@ -0,0 +1,100 @@ +--- +title: "Apache Cassandra Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 1 +sub-nav-title: Cassandra +--- + + +This connector provides a sink that writes data into a [Cassandra](https://cassandra.apache.org/) database. + +The Flink Cassandra sink integrates with Flink's checkpointing mechanism to provide +exactly-once processing semantics. To achieve that, Flink buffers incoming records +and commits them only when a checkpoint completes. + +To use this connector, add the following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-cassandra{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing RabbitMQ --- End diff -- whoops. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream
[ https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148421#comment-15148421 ] Till Rohrmann commented on FLINK-3412: -- We could also do the conversion a bit more explicit by providing a {{Scala/JavaConverter}} which has a {{asScala/asJava}} method which returns the respective {{DataStream}}. That's also how the Scala API allows you to convert Scala collections from and to Java collections. Something like {code} object JavaConverter { implicit class Java2ScalaConverter[T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T]) { def asScala: org.apache.flink.streaming.api.scala.DataStream[T] = { ... } } } {code} > Remove implicit conversions JavaStream / ScalaStream > > > Key: FLINK-3412 > URL: https://issues.apache.org/jira/browse/FLINK-3412 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.10.2 >Reporter: Stephan Ewen > Fix For: 1.0.0 > > > I think the implicit conversions between the Java DataStream and the Scala > DataStream are dangerous. > Because conversions exist in both directions, it is possible to write methods > that look like calling functions on the JavaStream, but instead convert it to > a Scala stream and call a different method. > I just accidentally implemented an infinite recursion that way (via two > hidden implicit conversions). > Making the conversions explicit (with a {{wrap()}} function like in the batch > API, we add minimally more code internally (nothing is different for users), > but avoid such accidental errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream
[ https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148419#comment-15148419 ] Aljoscha Krettek commented on FLINK-3412: - +1 I also encountered this on at least on occasion > Remove implicit conversions JavaStream / ScalaStream > > > Key: FLINK-3412 > URL: https://issues.apache.org/jira/browse/FLINK-3412 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.10.2 >Reporter: Stephan Ewen > Fix For: 1.0.0 > > > I think the implicit conversions between the Java DataStream and the Scala > DataStream are dangerous. > Because conversions exist in both directions, it is possible to write methods > that look like calling functions on the JavaStream, but instead convert it to > a Scala stream and call a different method. > I just accidentally implemented an infinite recursion that way (via two > hidden implicit conversions). > Making the conversions explicit (with a {{wrap()}} function like in the batch > API, we add minimally more code internally (nothing is different for users), > but avoid such accidental errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream
[ https://issues.apache.org/jira/browse/FLINK-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148417#comment-15148417 ] Stefano Baghino commented on FLINK-3412: This one looks easy, I can take care of it immediately. Shall I assign it to me [~StephanEwen]? > Remove implicit conversions JavaStream / ScalaStream > > > Key: FLINK-3412 > URL: https://issues.apache.org/jira/browse/FLINK-3412 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.10.2 >Reporter: Stephan Ewen > Fix For: 1.0.0 > > > I think the implicit conversions between the Java DataStream and the Scala > DataStream are dangerous. > Because conversions exist in both directions, it is possible to write methods > that look like calling functions on the JavaStream, but instead convert it to > a Scala stream and call a different method. > I just accidentally implemented an infinite recursion that way (via two > hidden implicit conversions). > Making the conversions explicit (with a {{wrap()}} function like in the batch > API, we add minimally more code internally (nothing is different for users), > but avoid such accidental errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3413) Remove implicit Seq to DataStream conversion
[ https://issues.apache.org/jira/browse/FLINK-3413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148418#comment-15148418 ] Aljoscha Krettek commented on FLINK-3413: - +1 > Remove implicit Seq to DataStream conversion > > > Key: FLINK-3413 > URL: https://issues.apache.org/jira/browse/FLINK-3413 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.0.0 > > > The implicit conversion from {{Seq}} to Flink DataStream needs to create > internally a new execution environment. > This method is confusing to use. If one uses the Seq in a program that uses a > different execution environment, then different streams run on different > execution environments. > The overhead of manually calling {{env.fromElements(seq)}} is quite low. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148416#comment-15148416 ] ASF GitHub Bot commented on FLINK-3332: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52993077 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java --- @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +/** + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly-once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public abstract class GenericExactlyOnceSink extends AbstractStreamOperator implements OneInputStreamOperator { + private transient AbstractStateBackend.CheckpointStateOutputView out; + private TypeSerializer serializer; + protected transient TypeInformation typeInfo; + protected final CheckpointCommitter committer; + protected final String id; + + private ExactlyOnceState state = new ExactlyOnceState(); + + public GenericExactlyOnceSink(CheckpointCommitter committer) { + if (committer == null) { + throw new IllegalArgumentException("CheckpointCommitter argument must not be null."); + } + this.committer = committer; + this.id = UUID.randomUUID().toString(); + } + + @Override + public void open() throws Exception { + committer.setOperatorId(id); + committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask()); + committer.open(); + } + + public void close() throws Exception { + committer.close(); + } + + /** +* Saves a handle in the state. +* +* @param checkpointId +* @throws IOException +*/ + private void saveHandleInState(final long checkpointId) throws IOException { + //only add handle if a new OperatorState was created since the last snapshot + if (out != null) { + StateHandle handle = out.closeAndGetHandle(); + state.pendingHandles.put(checkpointId, handle); + out = null; + } + } + + @Override + public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception { + Stream
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52993077 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericExactlyOnceSink.java --- @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +/** + * Generic Sink that emits its input elements into an arbitrary backend. This sink is integrated with the checkpointing + * mechanism to provide near exactly-once semantics. + * + * Incoming records are stored within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only committed if a + * checkpoint is completed. Should a job fail while the data is being committed, no exactly-once guarantee can be made. + * + * @param Type of the elements emitted by this sink + */ +public abstract class GenericExactlyOnceSink extends AbstractStreamOperator implements OneInputStreamOperator { + private transient AbstractStateBackend.CheckpointStateOutputView out; + private TypeSerializer serializer; + protected transient TypeInformation typeInfo; + protected final CheckpointCommitter committer; + protected final String id; + + private ExactlyOnceState state = new ExactlyOnceState(); + + public GenericExactlyOnceSink(CheckpointCommitter committer) { + if (committer == null) { + throw new IllegalArgumentException("CheckpointCommitter argument must not be null."); + } + this.committer = committer; + this.id = UUID.randomUUID().toString(); + } + + @Override + public void open() throws Exception { + committer.setOperatorId(id); + committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask()); + committer.open(); + } + + public void close() throws Exception { + committer.close(); + } + + /** +* Saves a handle in the state. +* +* @param checkpointId +* @throws IOException +*/ + private void saveHandleInState(final long checkpointId) throws IOException { + //only add handle if a new OperatorState was created since the last snapshot + if (out != null) { + StateHandle handle = out.closeAndGetHandle(); + state.pendingHandles.put(checkpointId, handle); + out = null; + } + } + + @Override + public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + saveHandleInState(checkpointId); + taskState.setFunctionState(state); + return taskState; + } + + @Override + pub
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148415#comment-15148415 ] ASF GitHub Bot commented on FLINK-3332: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52992939 --- Diff: docs/apis/streaming/connectors/cassandra.md --- @@ -0,0 +1,100 @@ +--- +title: "Apache Cassandra Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 1 +sub-nav-title: Cassandra +--- + + +This connector provides a sink that writes data into a [Cassandra](https://cassandra.apache.org/) database. + +The Flink Cassandra sink integrates with Flink's checkpointing mechanism to provide +exactly-once processing semantics. To achieve that, Flink buffers incoming records +and commits them only when a checkpoint completes. + +To use this connector, add the following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-cassandra{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing RabbitMQ --- End diff -- Installing RabbitMQ? > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52992939 --- Diff: docs/apis/streaming/connectors/cassandra.md --- @@ -0,0 +1,100 @@ +--- +title: "Apache Cassandra Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 1 +sub-nav-title: Cassandra +--- + + +This connector provides a sink that writes data into a [Cassandra](https://cassandra.apache.org/) database. + +The Flink Cassandra sink integrates with Flink's checkpointing mechanism to provide +exactly-once processing semantics. To achieve that, Flink buffers incoming records +and commits them only when a checkpoint completes. + +To use this connector, add the following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-cassandra{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing RabbitMQ --- End diff -- Installing RabbitMQ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3396) Job submission Savepoint restore logic flawed
[ https://issues.apache.org/jira/browse/FLINK-3396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148413#comment-15148413 ] ASF GitHub Bot commented on FLINK-3396: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52992451 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- I'm not so sure about that, to be honest. What if the `restoreLatestCheckpointedState` fails because of some HDFS/ZooKeeper problems. Then you would like to try restarting the job, wouldn't you? The client should then be notified once all restarting attempts have been exhausted. > Job submission Savepoint restore logic flawed > - > > Key: FLINK-3396 > URL: https://issues.apache.org/jira/browse/FLINK-3396 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > When savepoint restoring fails, the thrown Exception fails the execution > graph, but the client is not informed about the failure. > The expected behaviour is that the submission should be acked with success or > failure in any case. With savepoint restore failures, the ack message will be > skipped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1633#discussion_r52992451 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1073,57 +1073,73 @@ class JobManager( // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { -try { - if (isRecovery) { -executionGraph.restoreLatestCheckpointedState() - } - else { -val snapshotSettings = jobGraph.getSnapshotSettings -if (snapshotSettings != null) { - val savepointPath = snapshotSettings.getSavepointPath() +val restoreStateSuccess = + try { +if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() --- End diff -- I'm not so sure about that, to be honest. What if the `restoreLatestCheckpointedState` fails because of some HDFS/ZooKeeper problems. Then you would like to try restarting the job, wouldn't you? The client should then be notified once all restarting attempts have been exhausted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52991586 --- Diff: flink-streaming-java/pom.xml --- @@ -108,6 +108,7 @@ under the License. maven-surefire-plugin false + -XX:-UseSplitVerifier --- End diff -- i agree, trying some things at the moment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148406#comment-15148406 ] ASF GitHub Bot commented on FLINK-3332: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52991586 --- Diff: flink-streaming-java/pom.xml --- @@ -108,6 +108,7 @@ under the License. maven-surefire-plugin false + -XX:-UseSplitVerifier --- End diff -- i agree, trying some things at the moment. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3413) Remove implicit Seq to DataStream conversion
Stephan Ewen created FLINK-3413: --- Summary: Remove implicit Seq to DataStream conversion Key: FLINK-3413 URL: https://issues.apache.org/jira/browse/FLINK-3413 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.0.0 The implicit conversion from {{Seq}} to Flink DataStream needs to create internally a new execution environment. This method is confusing to use. If one uses the Seq in a program that uses a different execution environment, then different streams run on different execution environments. The overhead of manually calling {{env.fromElements(seq)}} is quite low. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148398#comment-15148398 ] ASF GitHub Bot commented on FLINK-3332: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52991019 --- Diff: docs/apis/streaming/connectors/cassandra.md --- @@ -0,0 +1,100 @@ +--- +title: "Apache Cassandra Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 1 +sub-nav-title: Cassandra +--- + + +This connector provides a sink that writes data into a [Cassandra](https://cassandra.apache.org/) database. + +The Flink Cassandra sink integrates with Flink's checkpointing mechanism to provide +exactly-once processing semantics. To achieve that, Flink buffers incoming records +and commits them only when a checkpoint completes. + +To use this connector, add the following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-cassandra{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing RabbitMQ +Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted). + + Cassandra Sink + +Flink's Cassandra sink is called `CassandraExactlyOnceSink`. + +The constructor accepts the following arguments: + +1. The Host address +2. query to create a new table to write into (optional) +3. query to insert data the a table +4. checkpoint committer + +A checkpoint committer stores additional information about completed checkpoints +in some resource. You can use a `CassandraCommitter` to store these in a separate +table in cassandra. Note that this table will NOT be cleaned up by Flink. + +The CassandraCommitter constructor accepts the following arguments: +1. Host address +2. Keyspace +3. Table name + +The CassandraExactlyOnceSink is implemented as a custom operator +instead of a sink, and as such is a bit more unwieldy than other sinks. --- End diff -- Yes delete the last subclause. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52991019 --- Diff: docs/apis/streaming/connectors/cassandra.md --- @@ -0,0 +1,100 @@ +--- +title: "Apache Cassandra Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 1 +sub-nav-title: Cassandra +--- + + +This connector provides a sink that writes data into a [Cassandra](https://cassandra.apache.org/) database. + +The Flink Cassandra sink integrates with Flink's checkpointing mechanism to provide +exactly-once processing semantics. To achieve that, Flink buffers incoming records +and commits them only when a checkpoint completes. + +To use this connector, add the following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-cassandra{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing RabbitMQ +Follow the instructions from the [Cassandra Getting Started page](http://wiki.apache.org/cassandra/GettingStarted). + + Cassandra Sink + +Flink's Cassandra sink is called `CassandraExactlyOnceSink`. + +The constructor accepts the following arguments: + +1. The Host address +2. query to create a new table to write into (optional) +3. query to insert data the a table +4. checkpoint committer + +A checkpoint committer stores additional information about completed checkpoints +in some resource. You can use a `CassandraCommitter` to store these in a separate +table in cassandra. Note that this table will NOT be cleaned up by Flink. + +The CassandraCommitter constructor accepts the following arguments: +1. Host address +2. Keyspace +3. Table name + +The CassandraExactlyOnceSink is implemented as a custom operator +instead of a sink, and as such is a bit more unwieldy than other sinks. --- End diff -- Yes delete the last subclause. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148397#comment-15148397 ] ASF GitHub Bot commented on FLINK-3332: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52990933 --- Diff: flink-streaming-java/pom.xml --- @@ -108,6 +108,7 @@ under the License. maven-surefire-plugin false + -XX:-UseSplitVerifier --- End diff -- This seems somehow wrong to me. Would be good to get the tests working without changing other modules. > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1640#discussion_r52990933 --- Diff: flink-streaming-java/pom.xml --- @@ -108,6 +108,7 @@ under the License. maven-surefire-plugin false + -XX:-UseSplitVerifier --- End diff -- This seems somehow wrong to me. Would be good to get the tests working without changing other modules. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3332) Provide an exactly-once Cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148396#comment-15148396 ] ASF GitHub Bot commented on FLINK-3332: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1640#issuecomment-184609151 Thanks for your contribution @zentol. Changes look good to me. I only had a comment concerning the type serializer and our usage of the term "exactly once". > Provide an exactly-once Cassandra connector > --- > > Key: FLINK-3332 > URL: https://issues.apache.org/jira/browse/FLINK-3332 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > With FLINK-3311, we are adding a Cassandra connector to Flink. > It would be good to also provide an "exactly-once" C* connector. > I would like to first discuss how we are going to implement this in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3332] Add Exactly-Once Cassandra connec...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1640#issuecomment-184609151 Thanks for your contribution @zentol. Changes look good to me. I only had a comment concerning the type serializer and our usage of the term "exactly once". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3412) Remove implicit conversions JavaStream / ScalaStream
Stephan Ewen created FLINK-3412: --- Summary: Remove implicit conversions JavaStream / ScalaStream Key: FLINK-3412 URL: https://issues.apache.org/jira/browse/FLINK-3412 Project: Flink Issue Type: Bug Components: Scala API Affects Versions: 0.10.2 Reporter: Stephan Ewen Fix For: 1.0.0 I think the implicit conversions between the Java DataStream and the Scala DataStream are dangerous. Because conversions exist in both directions, it is possible to write methods that look like calling functions on the JavaStream, but instead convert it to a Scala stream and call a different method. I just accidentally implemented an infinite recursion that way (via two hidden implicit conversions). Making the conversions explicit (with a {{wrap()}} function like in the batch API, we add minimally more code internally (nothing is different for users), but avoid such accidental errors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)