[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235479#comment-16235479
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4910


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235151#comment-16235151
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4910
  
LGTM, thanks for the work @GJL. Merging this ..


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16226537#comment-16226537
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147939418
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 ---
@@ -83,49 +79,6 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}
 
-   @Test(timeout = 3L)
--- End diff --

Yes that's right. Code of this test is also in `FlinkKafkaProducerTest`.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225160#comment-16225160
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147741136
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
+
+   @Mock
+   private Logger mockLogger;
--- End diff --

The test does not rely on any mocks anymore. I am not 100% happy with it 
because we use log4j `1.x` which is not maintained anymore and in log4j `2.x`, 
the APIs have changed a lot: 
http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225146#comment-16225146
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147739898
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

Ok, I like `ignoreFailuresAfterTransactionTimeout`. The method name is now 
the same, though.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225119#comment-16225119
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147736898
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

Ok, I like `ignoreFailuresAfterTransactionTimeout`


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225114#comment-16225114
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147735664
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 ---
@@ -83,49 +79,6 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}
 
-   @Test(timeout = 3L)
--- End diff --

The tests that I removed were already in `FlinkKafkaProducerTests`. 
Probably some copy paste error.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225068#comment-16225068
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147727170
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
+
+   /**
+* If a transaction's elapsed time reaches this percentage of the 
transactionTimeout, a warning
+* message will be logged. Value must be in range [0,1]. Negative value 
disables warnings.
+*/
+   private double transactionTimeoutWarningRatio = -1;
--- End diff --

I tend to agree with @pnowojski about the API surface but I think in this 
case this is a valid safety net for possible future transaction sinks.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225064#comment-16225064
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147654876
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 ---
@@ -83,49 +79,6 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}
 
-   @Test(timeout = 3L)
--- End diff --

Why are these removed? Did they never actually test anything?


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225030#comment-16225030
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147717560
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
+
+   @Mock
+   private Logger mockLogger;
--- End diff --

True, you are right. One would need to implement the `org.slf4j.Logger` 
interface.
- http://projects.lidalia.org.uk/slf4j-test/ provides a `org.slf4j.Logger` 
implementation for unit testing but this approach comes with other problems 
(e.g. https://github.com/Mahoney/slf4j-test/issues/15, [NOP if log level is  
disabled](https://github.com/Mahoney/slf4j-test/blob/master/src/main/java/uk/org/lidalia/slf4jtest/TestLogger.java#L449)

Right now I don't see a way to do it properly. Any help is welcome.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224867#comment-16224867
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147693238
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
+
+   @Mock
+   private Logger mockLogger;
--- End diff --

Are you sure? `Logger.warn(String, Object...)`, `Logger.warn(String, 
Object)`, `Logger.warn(String, Object, Object)` and `Logger.warn(String)` are 
different methods.

This shows, that you are not testing for the effects (warning message being 
logged somewhere), but you test whether one particular method was called or not.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224868#comment-16224868
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147693317
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
+
+   /**
+* If a transaction's elapsed time reaches this percentage of the 
transactionTimeout, a warning
+* message will be logged. Value must be in range [0,1]. Negative value 
disables warnings.
+*/
+   private double transactionTimeoutWarningRatio = -1;
--- End diff --

I'm not convinced, since this will make our public api larger and more 
difficult to maintain. However if @aljoscha is ok with that I will yield ;)


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224866#comment-16224866
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147692599
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

Part of my concern was that the name is quite long (same applies for the 
methods). Maybe `ignoreFailuresAfterTimeout`? Or 
`ignoreFailuresAfterTransactionTimeout`?



> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224816#comment-16224816
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147687514
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -362,4 +483,39 @@ public void setContext(Optional context) {
this.context = context;
}
}
+
+   /**
+* Adds metadata (currently only the start time of the transaction) to 
the transaction object.
+*/
+   @VisibleForTesting
+   static class TransactionHolder {
--- End diff --

I will leave it as is. It's an internal class and it is clear what it does.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224814#comment-16224814
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147687302
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -922,6 +945,22 @@ private void readObject(java.io.ObjectInputStream in) 
throws IOException, ClassN
producersPool = new ProducersPool();
}
 
+   /**
+* Disables the propagation of exceptions thrown when committing 
presumably timed out Kafka
+* transactions during recovery of the job. If a Kafka transaction is 
timed out, a commit will
+* never be successful. Hence, use this feature to avoid recovery loops 
of the Job. Exceptions
+* will still be logged to inform the user that data loss might have 
occurred.
+*
+* Note that we use {@link System#currentTimeMillis()} to track the 
age of a transaction.
+* Moreover, only exceptions thrown during the recovery are caught, 
i.e., the producer will
+* attempt at least one commit of the transaction before giving up.
+*/
+   @Override
+   public FlinkKafkaProducer011 
disableFailurePropagationAfterTransactionTimeout() {
--- End diff --

Thanks. Moved it.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224805#comment-16224805
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147686546
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
--- End diff --

I agree with you that overusing mocks is an anti-pattern. Changed it to a 
real implementation. Please have a look. I am not sure if this is better now 
because after all, the only mocked function on Clock was `.millis()`.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224785#comment-16224785
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147681959
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
+
+   /**
+* If a transaction's elapsed time reaches this percentage of the 
transactionTimeout, a warning
+* message will be logged. Value must be in range [0,1]. Negative value 
disables warnings.
+*/
+   private double transactionTimeoutWarningRatio = -1;
--- End diff --

If exceeding the `transactionTimeout` can result in data loss, I prefer a 
flag here (i.e., `TwoPhaseCommitSinkFunction`). You may want to be warned 
before something bad happens so that parameters can be tuned.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224778#comment-16224778
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147681520
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

Renamed to `failurePropagationAfterTransactionTimeoutDisabled` as this is 
closer to the method that controls this field.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224771#comment-16224771
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147680902
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -293,26 +330,110 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
}
// if in restore we didn't get any userContext or we are 
initializing from scratch
if (userContext == null) {
-   LOG.info("{} - no state to restore", name());
+   log.info("{} - no state to restore", name());
 
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
 
-   currentTransaction = beginTransaction();
-   LOG.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   currentTransaction = beginTransaction0();
+   log.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   }
+
+   /**
+* This method must be the only place to call {@link 
#beginTransaction()} to ensure that the
+* {@link TransactionHolder} is created at the same time.
+*/
+   private TransactionHolder beginTransaction0() throws Exception {
--- End diff --

Will rename.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224763#comment-16224763
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147679890
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -293,26 +330,110 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
}
// if in restore we didn't get any userContext or we are 
initializing from scratch
if (userContext == null) {
-   LOG.info("{} - no state to restore", name());
+   log.info("{} - no state to restore", name());
 
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
 
-   currentTransaction = beginTransaction();
-   LOG.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   currentTransaction = beginTransaction0();
+   log.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   }
+
+   /**
+* This method must be the only place to call {@link 
#beginTransaction()} to ensure that the
+* {@link TransactionHolder} is created at the same time.
+*/
+   private TransactionHolder beginTransaction0() throws Exception {
+   return new TransactionHolder<>(beginTransaction(), 
clock.millis());
+   }
+
+   /**
+* This method must be the only place to call {@link 
#recoverAndCommit(Object)} to ensure that
+* the configuration parameters {@link #transactionTimeout} and
+* {@link #failureOnCommitAfterTransactionTimeoutDisabled} are 
respected.
+*/
+   private void recoverAndCommit(TransactionHolder transactionHolder) 
{
+   try {
+   logWarningIfTimeoutAlmostReached(transactionHolder);
+   recoverAndCommit(transactionHolder.handle);
+   } catch (final Exception e) {
+   final long elapsedTime = clock.millis() - 
transactionHolder.transactionStartTime;
+   if (failureOnCommitAfterTransactionTimeoutDisabled && 
elapsedTime > transactionTimeout) {
+   log.error("Error while committing transaction 
{}. " +
--- End diff --

Good. Will add a statement to the log message.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224760#comment-16224760
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147679597
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -293,26 +330,110 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
}
// if in restore we didn't get any userContext or we are 
initializing from scratch
if (userContext == null) {
-   LOG.info("{} - no state to restore", name());
+   log.info("{} - no state to restore", name());
 
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
 
-   currentTransaction = beginTransaction();
-   LOG.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   currentTransaction = beginTransaction0();
+   log.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   }
+
+   /**
+* This method must be the only place to call {@link 
#beginTransaction()} to ensure that the
+* {@link TransactionHolder} is created at the same time.
+*/
+   private TransactionHolder beginTransaction0() throws Exception {
+   return new TransactionHolder<>(beginTransaction(), 
clock.millis());
+   }
+
+   /**
+* This method must be the only place to call {@link 
#recoverAndCommit(Object)} to ensure that
+* the configuration parameters {@link #transactionTimeout} and
+* {@link #failureOnCommitAfterTransactionTimeoutDisabled} are 
respected.
+*/
+   private void recoverAndCommit(TransactionHolder transactionHolder) 
{
--- End diff --

Will rename.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224759#comment-16224759
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147679516
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

Ok, will do.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224756#comment-16224756
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147679162
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
--- End diff --

Ok will do.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224753#comment-16224753
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147678943
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -442,13 +445,31 @@ public FlinkKafkaProducer011(
throw new 
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be 
supplied in the producer config properties.");
}
 
-   if 
(!producerConfig.contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
+   if 
(!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
long timeout = 
DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
checkState(timeout < Integer.MAX_VALUE && timeout > 0, 
"timeout does not fit into 32 bit integer");

this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) 
timeout);
LOG.warn("Property [%s] not specified. Setting it to 
%s", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
}
 
+   // Enable transactionTimeoutWarnings to avoid silent data loss
+   // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
+   // The KafkaProducer may not throw an exception if the 
transaction failed to commit
+   if (semantic == Semantic.EXACTLY_ONCE) {
+   final Object object = 
this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
--- End diff --

Couldn't find any.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224752#comment-16224752
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147678610
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -194,29 +348,4 @@ public String toString() {
return String.format("FileTransaction[%s]", 
tmpFile.getName());
}
}
-
-   private static class TestContext implements AutoCloseable {
--- End diff --

I am using 
```
@Rule
public TemporaryFolder folder = new TemporaryFolder();
```
to create test folders are files which are cleaned up automatically. Since 
the rule must be a public field in the test, it would have required more work 
to keep the `TestContext`. 


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224744#comment-16224744
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147677068
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
+
+   @Mock
+   private Logger mockLogger;
--- End diff --

The test is asserting on the presence of the substring `This is close to or 
even exceeding the transaction timeout` in the log message. All your changes to 
the code would still pass the test except for the 2nd case because there is an 
assert on the elapsed time. Mockito is used here so that the argument passed to 
`.warn` can be captured. Imo this is not evil as no behavior is mocked, and 
actual effects are tested. 



> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224682#comment-16224682
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147672407
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
--- End diff --

please do not use mockito for classes that are so easy to implement and 
mock in old fashion way (creating `SettableClock` seems to be super easy).


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224668#comment-16224668
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147655000
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -922,6 +945,22 @@ private void readObject(java.io.ObjectInputStream in) 
throws IOException, ClassN
producersPool = new ProducersPool();
}
 
+   /**
+* Disables the propagation of exceptions thrown when committing 
presumably timed out Kafka
+* transactions during recovery of the job. If a Kafka transaction is 
timed out, a commit will
+* never be successful. Hence, use this feature to avoid recovery loops 
of the Job. Exceptions
+* will still be logged to inform the user that data loss might have 
occurred.
+*
+* Note that we use {@link System#currentTimeMillis()} to track the 
age of a transaction.
+* Moreover, only exceptions thrown during the recovery are caught, 
i.e., the producer will
+* attempt at least one commit of the transaction before giving up.
+*/
+   @Override
+   public FlinkKafkaProducer011 
disableFailurePropagationAfterTransactionTimeout() {
--- End diff --

nit: Please move this method to the top, somewhere around 
`setLogFailuresOnly`


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224670#comment-16224670
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147659966
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
--- End diff --

please rename this field, it's clashing with `currentTransaction()` method, 
which is confusing, especially that they have different types.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224666#comment-16224666
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147656635
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
--- End diff --

rename to `propagateTransactionTimeouts` or `ignoreTransactionTimeouts`


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224669#comment-16224669
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147662205
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -362,4 +483,39 @@ public void setContext(Optional context) {
this.context = context;
}
}
+
+   /**
+* Adds metadata (currently only the start time of the transaction) to 
the transaction object.
+*/
+   @VisibleForTesting
+   static class TransactionHolder {
--- End diff --

`TransactionWrapper`? 


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224674#comment-16224674
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147659714
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -293,26 +330,110 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
}
// if in restore we didn't get any userContext or we are 
initializing from scratch
if (userContext == null) {
-   LOG.info("{} - no state to restore", name());
+   log.info("{} - no state to restore", name());
 
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
 
-   currentTransaction = beginTransaction();
-   LOG.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   currentTransaction = beginTransaction0();
+   log.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   }
+
+   /**
+* This method must be the only place to call {@link 
#beginTransaction()} to ensure that the
+* {@link TransactionHolder} is created at the same time.
+*/
+   private TransactionHolder beginTransaction0() throws Exception {
--- End diff --

this `0` in method's name doesn't help in anything. Please rename it to 
either `beginTransactionHolder`, `beginTransactionWrapper`, 
`beginTransactionInternal`, `beginTransactionAndStartTimeoutTimer` or 
`beginTransactionAndMarkTime`


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224673#comment-16224673
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147670239
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -35,60 +42,101 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-   TestContext context;
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   private FileBasedSinkFunction sinkFunction;
+
+   private OneInputStreamOperatorTestHarness harness;
+
+   private AtomicBoolean throwException = new AtomicBoolean();
+
+   private File targetDirectory;
+
+   private File tmpDirectory;
+
+   @Mock
+   private Clock mockClock;
+
+   @Mock
+   private Logger mockLogger;
--- End diff --

As above. Will all of those `mockLogger` tests fail, if you replace:
```
log.warn("Transaction {} has been open for {} ms. " +
"This is close to or even exceeding the 
transaction timeout of {} ms.",
transactionHolder.handle,
elapsedTime,
transactionTimeout);
```
with 
```
log.warn("Transaction {} has been open for too long. " +
"This is close to or even exceeding the 
transaction timeout of {} ms.",
transactionHolder.handle,
transactionTimeout);
```
or
```
log.warn(String.format("Transaction {} has been open 
for {} ms. " +
"This is close to or even exceeding the 
transaction timeout of {} ms.",
transactionHolder.handle,
elapsedTime,
transactionTimeout));
```
?
If so, then this is a perfect example my I consider mockito to be the 
definition of evil. Using mockito in tests in 99% cases is duplicating/copying 
productional code into tests, basically repeating the implementation, which is 
super fragile and brakes on even the tiniest refactors. While good tests 
instead should check/assert the actual effects - not whether specific method 
calls were called and how many times they were called.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224675#comment-16224675
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147663332
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -194,29 +348,4 @@ public String toString() {
return String.format("FileTransaction[%s]", 
tmpFile.getName());
}
}
-
-   private static class TestContext implements AutoCloseable {
--- End diff --

Why did you remove this encapsulation?


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224671#comment-16224671
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147661454
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -293,26 +330,110 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
}
// if in restore we didn't get any userContext or we are 
initializing from scratch
if (userContext == null) {
-   LOG.info("{} - no state to restore", name());
+   log.info("{} - no state to restore", name());
 
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
 
-   currentTransaction = beginTransaction();
-   LOG.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   currentTransaction = beginTransaction0();
+   log.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   }
+
+   /**
+* This method must be the only place to call {@link 
#beginTransaction()} to ensure that the
+* {@link TransactionHolder} is created at the same time.
+*/
+   private TransactionHolder beginTransaction0() throws Exception {
+   return new TransactionHolder<>(beginTransaction(), 
clock.millis());
+   }
+
+   /**
+* This method must be the only place to call {@link 
#recoverAndCommit(Object)} to ensure that
+* the configuration parameters {@link #transactionTimeout} and
+* {@link #failureOnCommitAfterTransactionTimeoutDisabled} are 
respected.
+*/
+   private void recoverAndCommit(TransactionHolder transactionHolder) 
{
+   try {
+   logWarningIfTimeoutAlmostReached(transactionHolder);
+   recoverAndCommit(transactionHolder.handle);
+   } catch (final Exception e) {
+   final long elapsedTime = clock.millis() - 
transactionHolder.transactionStartTime;
+   if (failureOnCommitAfterTransactionTimeoutDisabled && 
elapsedTime > transactionTimeout) {
+   log.error("Error while committing transaction 
{}. " +
--- End diff --

`"Error while committing transaction {}. Data loss might occurred"` 


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224672#comment-16224672
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147661097
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -293,26 +330,110 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
}
// if in restore we didn't get any userContext or we are 
initializing from scratch
if (userContext == null) {
-   LOG.info("{} - no state to restore", name());
+   log.info("{} - no state to restore", name());
 
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
 
-   currentTransaction = beginTransaction();
-   LOG.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   currentTransaction = beginTransaction0();
+   log.debug("{} - started new transaction '{}'", name(), 
currentTransaction);
+   }
+
+   /**
+* This method must be the only place to call {@link 
#beginTransaction()} to ensure that the
+* {@link TransactionHolder} is created at the same time.
+*/
+   private TransactionHolder beginTransaction0() throws Exception {
+   return new TransactionHolder<>(beginTransaction(), 
clock.millis());
+   }
+
+   /**
+* This method must be the only place to call {@link 
#recoverAndCommit(Object)} to ensure that
+* the configuration parameters {@link #transactionTimeout} and
+* {@link #failureOnCommitAfterTransactionTimeoutDisabled} are 
respected.
+*/
+   private void recoverAndCommit(TransactionHolder transactionHolder) 
{
--- End diff --

ditto: overloading adds confusion, because it suggests that both methods 
(`recoverAndCommit(TXN)` and `recoverAndCommit(TransactionHolder)`) are equally 
valid and could be used interchangeably. 

As above, rename to `recoverAndCommitHolder`, `recoverAndCommitWrapper`, 
`recoverAndCommitInternal`, `recoverCommitAndHandleTimeout`


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224667#comment-16224667
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147653427
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -442,13 +445,31 @@ public FlinkKafkaProducer011(
throw new 
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be 
supplied in the producer config properties.");
}
 
-   if 
(!producerConfig.contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
+   if 
(!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
long timeout = 
DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
checkState(timeout < Integer.MAX_VALUE && timeout > 0, 
"timeout does not fit into 32 bit integer");

this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) 
timeout);
LOG.warn("Property [%s] not specified. Setting it to 
%s", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
}
 
+   // Enable transactionTimeoutWarnings to avoid silent data loss
+   // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
+   // The KafkaProducer may not throw an exception if the 
transaction failed to commit
+   if (semantic == Semantic.EXACTLY_ONCE) {
+   final Object object = 
this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
--- End diff --

nit: Don't we have somewhere implemented similar/same parsing/reading 
numeric logic?


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224676#comment-16224676
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4910#discussion_r147658988
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -58,18 +61,37 @@
extends RichSinkFunction
implements CheckpointedFunction, CheckpointListener {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+   private final Logger log;
 
-   protected final ListStateDescriptor> 
stateDescriptor;
+   private final Clock clock;
 
-   protected final LinkedHashMap pendingCommitTransactions = 
new LinkedHashMap<>();
+   protected final LinkedHashMap 
pendingCommitTransactions = new LinkedHashMap<>();
 
-   @Nullable
-   protected TXN currentTransaction;
protected Optional userContext;
 
protected ListState> state;
 
+   private final ListStateDescriptor> stateDescriptor;
+
+   private TransactionHolder currentTransaction;
+
+   /**
+* Specifies the maximum time a transaction should remain open.
+*/
+   private long transactionTimeout = Long.MAX_VALUE;
+
+   /**
+* If true, any exception thrown in {@link #recoverAndCommit(Object)} 
will be caught instead of
+* propagated.
+*/
+   private boolean failureOnCommitAfterTransactionTimeoutDisabled;
+
+   /**
+* If a transaction's elapsed time reaches this percentage of the 
transactionTimeout, a warning
+* message will be logged. Value must be in range [0,1]. Negative value 
disables warnings.
+*/
+   private double transactionTimeoutWarningRatio = -1;
--- End diff --

I would prefer to implement this sanity check in `FlinkKafkaProducer011`, 
since it's a walk around Kafka's bug and unlikely to be useful in general case. 
And after Kafka `0.11.0.2` or `1.0.0` release we won't need this code anymore.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224573#comment-16224573
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4910
  
The changes look very good! @pnowojski could you please also have a look at 
this?


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220959#comment-16220959
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4910
  
@aljoscha 


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220957#comment-16220957
 ] 

ASF GitHub Bot commented on FLINK-7784:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/4910

[FLINK-7784] [kafka-producer] Don't fail TwoPhaseCommitSinkFunction when 
failing to commit during job recovery

## What is the purpose of the change
This makes it possible to configure the TwoPhaseCommitSinkFunction's 
behaviour w.r.t. transaction timeouts.

## Brief change log
  - *Introduce transaction timeouts to TwoPhaseCommitSinkFunction.* 
  - *Timeout can be used to generate warnings if the transaction's age 
approaches the timeout.*
  - *If an exception is thrown during job recovery, the sink can be 
configured not to propagate the exception and instead log it on ERROR level.*


## Verifying this change
This change added tests and can be verified as follows:
  - *Extended unit tests for TwoPhaseCommitSinkFunction to test added 
functionality*
  - *Manually verified the change by running a job with a 
FlinkKafka011Producer with checkpoint interval 27000 and transaction.timeout.ms 
= 3. Warnings were generated correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7784

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4910.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4910


commit 23ae221eb854ce12572988aed5c018aac8919af7
Author: gyao 
Date:   2017-10-26T17:17:55Z

[FLINK-7784] [kafka011-producer] Make TwoPhaseCommitSinkFunction aware of 
transaction timeouts.

TwoPhaseCommitSinkFunction allows to configure a transaction timeout. The
timeout can be used to log warnings if the transaction's age is appraoching
the timeout, and it can be used to swallow exceptions that are likely
irrecoverable. This commit also integrates these changes to the
FlinkKafkaProducer011.

commit 43103c1fb61a6bc1aec6b19c0253fcca281cfba5
Author: gyao 
Date:   2017-10-26T17:25:35Z

[hotfix] [kafka-tests] Clean up FlinkKafkaProducer011Tests




> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-23 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215173#comment-16215173
 ] 

Aljoscha Krettek commented on FLINK-7784:
-

Not by default, but with a configuration setting.

> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit

2017-10-23 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215088#comment-16215088
 ] 

Piotr Nowojski commented on FLINK-7784:
---

Are we sure that we want to ignore such failures by default? The same issue 
that's described here for {{TwoPhaseCommitSinkFunction}} applies also to 
{{BucketingSink}} and in both places, if we ignore those failures in case of 
intermittent errors, there will be a data loss :(

> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> 
>
> Key: FLINK-7784
> URL: https://issues.apache.org/jira/browse/FLINK-7784
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails 
> (either when doing it via the completed checkpoint notification or when 
> trying to commit after restoring after failure). This means that the job will 
> go into an infinite recovery loop because we will always keep failing.
> In some cases it might be better to ignore those failures and keep on 
> processing and this should be the default. We can provide an option that 
> allows failing the sink on failing commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)