[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...

2018-03-07 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5660

[FLINK-8861] [table] Add support for batch queries in SQL Client

## What is the purpose of the change

This PR added support for batch queries in SQL Client.

## Brief change log

 - Added a `StaticResult` and a `BatchResult` for the batch query results.
 - Added related methods to `ResultStore` for static results and renamed 
the existing methods with a prefix "dynamic".
 - Added the logic for retrieving batch query results consulting to 
`Dataset.collect()`.
 - Adapted the viewing logic for static results to a "two-phase" table 
result view.
 - Added the first-page option to `CliTableResultView.java`.
 - Replaced some default values with `""` in `Execution.java`.

## Verifying this change

This change can be verified by the added test case 
`testBatchQueryExecution()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5660.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5660


commit 85225d504114fe80b1dc6cd85cb5e3daf1a55d36
Author: Xingcan Cui 
Date:   2018-03-07T17:12:55Z

[FLINK-8861][table]Add support for batch queries in SQL Client




---


[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389906#comment-16389906
 ] 

ASF GitHub Bot commented on FLINK-8861:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5660

[FLINK-8861] [table] Add support for batch queries in SQL Client

## What is the purpose of the change

This PR added support for batch queries in SQL Client.

## Brief change log

 - Added a `StaticResult` and a `BatchResult` for the batch query results.
 - Added related methods to `ResultStore` for static results and renamed 
the existing methods with a prefix "dynamic".
 - Added the logic for retrieving batch query results consulting to 
`Dataset.collect()`.
 - Adapted the viewing logic for static results to a "two-phase" table 
result view.
 - Added the first-page option to `CliTableResultView.java`.
 - Replaced some default values with `""` in `Execution.java`.

## Verifying this change

This change can be verified by the added test case 
`testBatchQueryExecution()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5660.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5660


commit 85225d504114fe80b1dc6cd85cb5e3daf1a55d36
Author: Xingcan Cui 
Date:   2018-03-07T17:12:55Z

[FLINK-8861][table]Add support for batch queries in SQL Client




> Add support for batch queries in SQL Client
> ---
>
> Key: FLINK-8861
> URL: https://issues.apache.org/jira/browse/FLINK-8861
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> Similar to streaming queries, it should be possible to execute batch queries 
> in the SQL Client and collect the results using {{DataSet.collect()}} for 
> debugging purposes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8892) Travis Build: Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) on project flink-tests_2.11

2018-03-07 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8892.
---
   Resolution: Duplicate
Fix Version/s: (was: 1.6.0)

> Travis Build: Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11
> ---
>
> Key: FLINK-8892
> URL: https://issues.apache.org/jira/browse/FLINK-8892
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Priority: Critical
> Attachments: image-2018-03-07-10-05-38-456.png
>
>
> {code:java}
> ..
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.455 sec - 
> in org.apache.flink.test.broadcastvars.BroadcastBranchingITCase
> Aborted (core dumped)
> Results :
> Tests run: 1413, Failures: 0, Errors: 0, Skipped: 24
> 08:03:40.358 [INFO] 
> 
> 08:03:40.358 [INFO] BUILD FAILURE
> 08:03:40.359 [INFO] 
> 
> 08:03:40.359 [INFO] Total time: 27:06 min
> 08:03:40.359 [INFO] Finished at: 2018-03-07T08:03:40+00:00
> 08:03:40.781 [INFO] Final Memory: 81M/705M
> 08:03:40.781 [INFO] 
> 
> 08:03:40.782 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: ExecutionException: 
> org.apache.maven.surefire.booter.SurefireBooterForkException: Error occurred 
> in starting fork, check output in log -> [Help 1]
> 08:03:40.782 [ERROR] 
> 08:03:40.782 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 08:03:40.782 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 08:03:40.782 [ERROR] 
> 08:03:40.783 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 08:03:40.783 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (7889).
> ./tools/travis_mvn_watchdog.sh: line 532:  7889 Terminated  
> watchdog
> PRODUCED build artifacts.
> build_info  mvn-1.log  mvn-2.log  mvn.out
> COMPRESSING build artifacts.
> {code}
> Failed builds: 
> - https://travis-ci.org/apache/flink/jobs/350178841
> - https://travis-ci.org/apache/flink/jobs/349445796



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172934683
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
--- End diff --

how is the capacity range determined - is it recommended by RocksDB?

the msg should be: "capacity should be between " + MIN + " and " + MAX


---


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390155#comment-16390155
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972736
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390157#comment-16390157
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972678
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390152#comment-16390152
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972120
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390156#comment-16390156
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972454
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390154#comment-16390154
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972943
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972120
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static LocalFlinkMiniCluster cluster = null;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static 

[jira] [Closed] (FLINK-8877) Configure Kryo's log level based on Flink's log level

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8877.
---

> Configure Kryo's log level based on Flink's log level
> -
>
> Key: FLINK-8877
> URL: https://issues.apache.org/jira/browse/FLINK-8877
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Kryo uses its embedded MinLog for logging.
> When Flink is set to trace, Kryo should be set to trace as well. Other log 
> levels should not be uses, as even debug logging in Kryo results in excessive 
> logging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8876) Improve concurrent access handling in stateful serializers

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8876.
-
Resolution: Fixed

Fixed by resolving all sub-issues

> Improve concurrent access handling in stateful serializers
> --
>
> Key: FLINK-8876
> URL: https://issues.apache.org/jira/browse/FLINK-8876
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Some stateful serializers produce incorrect results when accidentally 
> accessed by multiple threads concurrently.
>  To better catch these cases, I suggest to add concurrency checks that are 
> active only when debug logging is enabled, and during test runs.
> This is inspired by Kryo's checks for concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8876) Improve concurrent access handling in stateful serializers

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8876.
---

> Improve concurrent access handling in stateful serializers
> --
>
> Key: FLINK-8876
> URL: https://issues.apache.org/jira/browse/FLINK-8876
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Some stateful serializers produce incorrect results when accidentally 
> accessed by multiple threads concurrently.
>  To better catch these cases, I suggest to add concurrency checks that are 
> active only when debug logging is enabled, and during test runs.
> This is inspired by Kryo's checks for concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8878) Check for concurrent access to Kryo Serializer

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8878.
-
Resolution: Fixed

Fixed in
  - 1.5.0 via 8a77dbf16febea72d389b2dc497e63cb768a3d2d
  - 1.6.0 via 57ff6e8930db0bfdd8e7cbb8418d9a4b46ca4a61

> Check for concurrent access to Kryo Serializer
> --
>
> Key: FLINK-8878
> URL: https://issues.apache.org/jira/browse/FLINK-8878
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> On debug log level and during tests, the {{KryoSerializer}} should check 
> whether it is concurrently accessed, and throw an exception in that case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8877) Configure Kryo's log level based on Flink's log level

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8877.
-
Resolution: Fixed

Fixed in
  - 1.5.0 via b0418b41f8fa02d3217b760c5bdfcdd7efdc1eac
  - 1.6.0 via 06110d27d5fcbf939610e2adc780e7ad1c467f6f

> Configure Kryo's log level based on Flink's log level
> -
>
> Key: FLINK-8877
> URL: https://issues.apache.org/jira/browse/FLINK-8877
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Kryo uses its embedded MinLog for logging.
> When Flink is set to trace, Kryo should be set to trace as well. Other log 
> levels should not be uses, as even debug logging in Kryo results in excessive 
> logging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8878) Check for concurrent access to Kryo Serializer

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8878.
---

> Check for concurrent access to Kryo Serializer
> --
>
> Key: FLINK-8878
> URL: https://issues.apache.org/jira/browse/FLINK-8878
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> On debug log level and during tests, the {{KryoSerializer}} should check 
> whether it is concurrently accessed, and throw an exception in that case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389934#comment-16389934
 ] 

ASF GitHub Bot commented on FLINK-8876:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5645
  
Merging this after fixing the comment...


> Improve concurrent access handling in stateful serializers
> --
>
> Key: FLINK-8876
> URL: https://issues.apache.org/jira/browse/FLINK-8876
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Some stateful serializers produce incorrect results when accidentally 
> accessed by multiple threads concurrently.
>  To better catch these cases, I suggest to add concurrency checks that are 
> active only when debug logging is enabled, and during test runs.
> This is inspired by Kryo's checks for concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5645: FLINK-8876 Improve concurrent access handling in stateful...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5645
  
Merging this after fixing the comment...


---


[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389933#comment-16389933
 ] 

ASF GitHub Bot commented on FLINK-8876:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172939401
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,9 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   // we do not have concurrency checks here, because serialize() 
and
+   // deserialize() do the checks and the current mechanism does 
not handle
--- End diff --

oh, right, fixing...


> Improve concurrent access handling in stateful serializers
> --
>
> Key: FLINK-8876
> URL: https://issues.apache.org/jira/browse/FLINK-8876
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Some stateful serializers produce incorrect results when accidentally 
> accessed by multiple threads concurrently.
>  To better catch these cases, I suggest to add concurrency checks that are 
> active only when debug logging is enabled, and during test runs.
> This is inspired by Kryo's checks for concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172939401
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,9 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   // we do not have concurrency checks here, because serialize() 
and
+   // deserialize() do the checks and the current mechanism does 
not handle
--- End diff --

oh, right, fixing...


---


[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172963487
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static LocalFlinkMiniCluster cluster = null;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static 

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972943
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static LocalFlinkMiniCluster cluster = null;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static 

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972678
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static LocalFlinkMiniCluster cluster = null;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static 

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972454
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static LocalFlinkMiniCluster cluster = null;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static 

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172972736
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static LocalFlinkMiniCluster cluster = null;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390175#comment-16390175
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172977111
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390204#comment-16390204
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172978556
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.time;
+
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from 
{@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+   private final Duration time;
--- End diff --

I find this a bit confusing to use `Duration` here, because it really does 
not hold a duration, but an absolute point in time (in the future) evaluated 
against `System.nanoTime()`. I would simply use a `long deadlineNanos` here, 
which also makes the `isOverdue()` check (the most frequent one) cheaper.

You can (and should) still use `Duration` for the arithmetic (adding time, 
etc) - simply convert to nanos.



> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390206#comment-16390206
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172978813
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.time;
+
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from 
{@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+   private final Duration time;
+
+   private Deadline(Duration time) {
+   this.time = time;
+   }
+
+   public Deadline plus(Duration other) {
+   return new Deadline(time.plus(other));
+   }
+
+   /**
+* Returns the time left between the deadline and now.
+*/
+   public Duration timeLeft() {
+   return time.minus(Duration.ofNanos(System.nanoTime()));
--- End diff --

Is this expected to go negative, or simply stay at 0 when overdue?


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172980021
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   zkServer = new TestingServer();
+
+   Configuration config = new Configuration();
+   config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+ 

[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172980585
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
--- End diff --

I have similar comments for this test as for #5654 - the test is fins in 
general, but can probably be simplified slightly, plus a bit of code style 
cleanup.


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172979459
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -223,6 +224,81 @@
}
}
 
+   /**
+* Retry the given operation with the given delay in between successful 
completions where the
+* result does not match a given predicate.
+*
+* @param operation to retry
+* @param retryDelay delay between retries
+* @param deadline A deadline that specifies at what point we should 
stop retrying
+* @param acceptancePredicate Predicate to test whether the result is 
acceptable
+* @param scheduledExecutor executor to be used for the retry operation
+* @param  type of the result
+* @return Future which retries the given operation a given amount of 
times and delays the retry
+*   in case the predicate isn't matched
+*/
+   public static  CompletableFuture retrySuccesfulWithDelay(
+   final Supplier operation,
+   final Time retryDelay,
--- End diff --

Deadline uses `Duration`, this method uses `Time`.


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172978813
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.time;
+
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from 
{@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+   private final Duration time;
+
+   private Deadline(Duration time) {
+   this.time = time;
+   }
+
+   public Deadline plus(Duration other) {
+   return new Deadline(time.plus(other));
+   }
+
+   /**
+* Returns the time left between the deadline and now.
+*/
+   public Duration timeLeft() {
+   return time.minus(Duration.ofNanos(System.nanoTime()));
--- End diff --

Is this expected to go negative, or simply stay at 0 when overdue?


---


[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172978556
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.time;
+
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from 
{@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+   private final Duration time;
--- End diff --

I find this a bit confusing to use `Duration` here, because it really does 
not hold a duration, but an absolute point in time (in the future) evaluated 
against `System.nanoTime()`. I would simply use a `long deadlineNanos` here, 
which also makes the `isOverdue()` check (the most frequent one) cheaper.

You can (and should) still use `Duration` for the arithmetic (adding time, 
etc) - simply convert to nanos.



---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tweise @tzulitai 

I would suggest to solve this the following way, which should be both 
simple and cover our cases:

  - We extend the current periodic watermark generators for idleness. We 
can do that for example by maintaining a record counter and remembering the 
last counter and a System.nanoTime() timestamp each time the call whether to 
generate a watermark is called. If no record came for too long, return a 
special watermark object that indicated "idle". Or change the return type to 
return either 'none', 'idle', or 'watermark'

  - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka 
Consumer does. That part needs to be added to the Kinesis consumer anyways.

That way, we automatically get per-shard idleness in Kinesis and 
per-partition idleness in Kafka without doing anything specific for the source 
connectors.

We can then also remove the idleness logic from the source context - it 
would be duplicate there.


---


[GitHub] flink pull request #5620: [FLINK-8824] [kafka] Replace getCanonicalName with...

2018-03-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5620


---


[GitHub] flink pull request #5643: can integrate and support on apache kudu ?

2018-03-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5643


---


[GitHub] flink pull request #5646: [hotfix] [javadocs] minor javadoc fix in Timestamp...

2018-03-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5646


---


[GitHub] flink pull request #5614: [FLINK-8827] When FLINK_CONF_DIR contains spaces, ...

2018-03-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5614


---


[GitHub] flink issue #5655: [FLINK-8487] Verify ZooKeeper checkpoint store behaviour ...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5655
  
This code is (except for lambdas) identical to #5654
Please apply the same changes here as to the other PR (with regard to the 
review comments).


---


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390207#comment-16390207
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172979459
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -223,6 +224,81 @@
}
}
 
+   /**
+* Retry the given operation with the given delay in between successful 
completions where the
+* result does not match a given predicate.
+*
+* @param operation to retry
+* @param retryDelay delay between retries
+* @param deadline A deadline that specifies at what point we should 
stop retrying
+* @param acceptancePredicate Predicate to test whether the result is 
acceptable
+* @param scheduledExecutor executor to be used for the retry operation
+* @param  type of the result
+* @return Future which retries the given operation a given amount of 
times and delays the retry
+*   in case the predicate isn't matched
+*/
+   public static  CompletableFuture retrySuccesfulWithDelay(
+   final Supplier operation,
+   final Time retryDelay,
--- End diff --

Deadline uses `Duration`, this method uses `Time`.


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390205#comment-16390205
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172980021
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
+
+   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
+
+   @BeforeClass
+   public static 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390208#comment-16390208
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5656#discussion_r172980585
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestLogger {
--- End diff --

I have similar comments for this test as for #5654 - the test is fins in 
general, but can probably be simplified slightly, plus a bit of code style 
cleanup.


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing 

[jira] [Commented] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390163#comment-16390163
 ] 

ASF GitHub Bot commented on FLINK-8824:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5620


> In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
> --
>
> Key: FLINK-8824
> URL: https://issues.apache.org/jira/browse/FLINK-8824
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The connector uses {{getCanonicalName()}} in all places, gather than 
> {{getClassName()}}.
> {{getCanonicalName()}}'s intention is to normalize class names for arrays, 
> etc, but is problematic when instantiating classes from class names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8827) When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390162#comment-16390162
 ] 

ASF GitHub Bot commented on FLINK-8827:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5614


> When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed
> -
>
> Key: FLINK-8827
> URL: https://issues.apache.org/jira/browse/FLINK-8827
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: Red Hat Enterprise Linux Server release 6.5 (Santiago)
>Reporter: Donghui Xu
>Priority: Major
>
> When the path of FLINK_CONF_DIR including spaces, executing zookeeper related 
> scripts failed with the following error message: Expect binary expression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172977111
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private static LocalFlinkMiniCluster cluster = null;
+
+   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
+   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+   private static 

[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390174#comment-16390174
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5655
  
This code is (except for lambdas) identical to #5654
Please apply the same changes here as to the other PR (with regard to the 
review comments).


> State loss after multiple restart attempts
> --
>
> Key: FLINK-8487
> URL: https://issues.apache.org/jira/browse/FLINK-8487
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
> .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
> .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
> ..
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8879) Add concurrent access check to AvroSerializer

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8879.
---

> Add concurrent access check to AvroSerializer
> -
>
> Key: FLINK-8879
> URL: https://issues.apache.org/jira/browse/FLINK-8879
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> On debug log level and during tests, the AvroSerializer should check whether 
> it is concurrently accessed, and throw an exception in that case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8879) Add concurrent access check to AvroSerializer

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8879.
-
Resolution: Fixed

Fixed in
  - 1.5.0 via 6ec1b784e5fea4d9d5208d44caf6fefde14f4aa8
  - 1.6.0 via be7c89596a3b9cd8805a90aaf32336ec2759a1f7

> Add concurrent access check to AvroSerializer
> -
>
> Key: FLINK-8879
> URL: https://issues.apache.org/jira/browse/FLINK-8879
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> On debug log level and during tests, the AvroSerializer should check whether 
> it is concurrently accessed, and throw an exception in that case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8863) Add user-defined function support in SQL Client

2018-03-07 Thread Xingcan Cui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui reassigned FLINK-8863:
--

Assignee: Xingcan Cui

> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue.
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to:
> {code}
> functions:
>   - name: testFunction
> from: class   <-- optional, default: class
> class: org.my.MyScalarFunction
> constructor:  <-- optional, needed for 
> certain types of functions
>   - 42.0
>   - class: org.my.Class  <-- possibility to create 
> objects via properties
> constructor: 
>   - 1
>   - true
>   - false
>   - "whatever"
>   - type: INT
> value: 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-07 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tzulitai @StephanEwen the current idleness detection in the source context 
isn't a replacement for what is required to deal with an inactive partition (or 
Kinesis shard). When a connector subtask consumes multiple partitions and one 
is idle, then it should be possible to still generate a watermark. This can 
only be solved outside of the connector when the multiple source partitions are 
visible (like it would be for an operator with multiple input streams).


---


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389940#comment-16389940
 ] 

ASF GitHub Bot commented on FLINK-5479:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tzulitai @StephanEwen the current idleness detection in the source context 
isn't a replacement for what is required to deal with an inactive partition (or 
Kinesis shard). When a connector subtask consumes multiple partitions and one 
is idle, then it should be possible to still generate a watermark. This can 
only be solved outside of the connector when the multiple source partitions are 
visible (like it would be for an operator with multiple input streams).


> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8799) Make AbstractYarnClusterDescriptor immutable

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390201#comment-16390201
 ] 

ASF GitHub Bot commented on FLINK-8799:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5617#discussion_r172979651
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -172,6 +181,88 @@ public AbstractYarnClusterDescriptor(
userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
this.configurationDirectory = 
Preconditions.checkNotNull(configurationDirectory);
+
+   String yarnQueueConfigValue = 
flinkConfiguration.getString(YarnConfigOptions.YARN_QUEUE);
--- End diff --

The difficulty of this ticket (FLINK-8799) is that `flinkConfiguration` is 
mutable. As long as a reference of `flinkConfiguration` can possibly leak, this 
class remains mutable. Also, there are some private methods that mutate the 
configuration. There are several places where we would need to make defensive 
copies, e.g., in the constructor. 


> Make AbstractYarnClusterDescriptor immutable
> 
>
> Key: FLINK-8799
> URL: https://issues.apache.org/jira/browse/FLINK-8799
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.0
>
>
> {{AbstractYarnClusterDescriptor}} should be made immutable. Currently, its 
> internal configuration is modified from different places which makes it 
> difficult to reason about the code. For example, it should not be possible to 
> modify the {{zookeeperNamespace}} using a setter method. A user of this class 
> should be forced to provide all information prior to creating the instance, 
> e.g., by passing a {{org.apache.flink.configuration.Configuration}} object.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8824.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in
  - 1.5.0 via 70347bcc788dea1d4f64539701e60bafcc772946
  - 1.6.0 via 75a4aaea8b051aa6dae52d421f7b4d7eeab99486

> In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
> --
>
> Key: FLINK-8824
> URL: https://issues.apache.org/jira/browse/FLINK-8824
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> The connector uses {{getCanonicalName()}} in all places, gather than 
> {{getClassName()}}.
> {{getCanonicalName()}}'s intention is to normalize class names for arrays, 
> etc, but is problematic when instantiating classes from class names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390245#comment-16390245
 ] 

ASF GitHub Bot commented on FLINK-5479:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tweise @tzulitai 

I would suggest to solve this the following way, which should be both 
simple and cover our cases:

  - We extend the current periodic watermark generators for idleness. We 
can do that for example by maintaining a record counter and remembering the 
last counter and a System.nanoTime() timestamp each time the call whether to 
generate a watermark is called. If no record came for too long, return a 
special watermark object that indicated "idle". Or change the return type to 
return either 'none', 'idle', or 'watermark'

  - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka 
Consumer does. That part needs to be added to the Kinesis consumer anyways.

That way, we automatically get per-shard idleness in Kinesis and 
per-partition idleness in Kafka without doing anything specific for the source 
connectors.

We can then also remove the idleness logic from the source context - it 
would be duplicate there.


> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5659: [FLINK-8661] [table] Add support for batch queries...

2018-03-07 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5659

[FLINK-8661] [table] Add support for batch queries in SQL Client

## What is the purpose of the change

This PR added support for batch queries in SQL Client.

## Brief change log

 - Added a `StaticResult` and a `BatchResult` for the batch query results.
 - Added related methods to `ResultStore` for static results and renamed 
the existing methods with a prefix "dynamic".
 - Added the logic for retrieving batch query results consulting to 
`Dataset.collect()`.
 - Adapted the viewing logic for static results to a "two-phase" table 
result view.
 - Added the first-page option to `CliTableResultView.java`.
 - Replaced some default values with `""` in `Execution.java`.

## Verifying this change

This change can be verified by the added test case 
`testBatchQueryExecution()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5659.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5659


commit 8f21a38dd60ab8c41893ec8c52bfcc47fc54648e
Author: Xingcan Cui 
Date:   2018-03-07T17:12:55Z

[FLINK-8661][table]Add support for batch queries in SQL Client




---


[jira] [Created] (FLINK-8892) Travis Build: Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) on project flink-tests_2.11

2018-03-07 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8892:
---

 Summary: Travis Build: Failed to execute goal 
org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) 
on project flink-tests_2.11
 Key: FLINK-8892
 URL: https://issues.apache.org/jira/browse/FLINK-8892
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.6.0
Reporter: Bowen Li
 Fix For: 1.6.0
 Attachments: image-2018-03-07-10-05-38-456.png


{code:java}
..
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.455 sec - in 
org.apache.flink.test.broadcastvars.BroadcastBranchingITCase
Aborted (core dumped)
Results :
Tests run: 1413, Failures: 0, Errors: 0, Skipped: 24
08:03:40.358 [INFO] 

08:03:40.358 [INFO] BUILD FAILURE
08:03:40.359 [INFO] 

08:03:40.359 [INFO] Total time: 27:06 min
08:03:40.359 [INFO] Finished at: 2018-03-07T08:03:40+00:00
08:03:40.781 [INFO] Final Memory: 81M/705M
08:03:40.781 [INFO] 

08:03:40.782 [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) 
on project flink-tests_2.11: ExecutionException: 
org.apache.maven.surefire.booter.SurefireBooterForkException: Error occurred in 
starting fork, check output in log -> [Help 1]
08:03:40.782 [ERROR] 
08:03:40.782 [ERROR] To see the full stack trace of the errors, re-run Maven 
with the -e switch.
08:03:40.782 [ERROR] Re-run Maven using the -X switch to enable full debug 
logging.
08:03:40.782 [ERROR] 
08:03:40.783 [ERROR] For more information about the errors and possible 
solutions, please read the following articles:
08:03:40.783 [ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
MVN exited with EXIT CODE: 1.
Trying to KILL watchdog (7889).
./tools/travis_mvn_watchdog.sh: line 532:  7889 Terminated  watchdog
PRODUCED build artifacts.
build_info  mvn-1.log  mvn-2.log  mvn.out
COMPRESSING build artifacts.
{code}


Failed builds: 

- https://travis-ci.org/apache/flink/jobs/350178841
- https://travis-ci.org/apache/flink/jobs/349445796



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8661) Replace Collections.EMPTY_MAP with Collections.emptyMap()

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389898#comment-16389898
 ] 

ASF GitHub Bot commented on FLINK-8661:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5659

[FLINK-8661] [table] Add support for batch queries in SQL Client

## What is the purpose of the change

This PR added support for batch queries in SQL Client.

## Brief change log

 - Added a `StaticResult` and a `BatchResult` for the batch query results.
 - Added related methods to `ResultStore` for static results and renamed 
the existing methods with a prefix "dynamic".
 - Added the logic for retrieving batch query results consulting to 
`Dataset.collect()`.
 - Adapted the viewing logic for static results to a "two-phase" table 
result view.
 - Added the first-page option to `CliTableResultView.java`.
 - Replaced some default values with `""` in `Execution.java`.

## Verifying this change

This change can be verified by the added test case 
`testBatchQueryExecution()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5659.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5659


commit 8f21a38dd60ab8c41893ec8c52bfcc47fc54648e
Author: Xingcan Cui 
Date:   2018-03-07T17:12:55Z

[FLINK-8661][table]Add support for batch queries in SQL Client




> Replace Collections.EMPTY_MAP with Collections.emptyMap()
> -
>
> Key: FLINK-8661
> URL: https://issues.apache.org/jira/browse/FLINK-8661
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> The use of Collections.EMPTY_SET and Collections.EMPTY_MAP often causes 
> unchecked assignment and it should be replaced with Collections.emptySet() 
> and Collections.emptyMap() .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390153#comment-16390153
 ] 

ASF GitHub Bot commented on FLINK-8487:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5654#discussion_r172963487
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ */
+public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
+
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
+
+   private static final int NUM_JMS = 1;
+   private static final int NUM_TMS = 1;
+   private static final int NUM_SLOTS_PER_TM = 1;
+
+   @ClassRule
+   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+   private static File haStorageDir;
+
+   private static TestingServer zkServer;
+
+   private 

[GitHub] flink pull request #5659: [FLINK-8661] [table] Add support for batch queries...

2018-03-07 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5659


---


[jira] [Commented] (FLINK-8661) Replace Collections.EMPTY_MAP with Collections.emptyMap()

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389900#comment-16389900
 ] 

ASF GitHub Bot commented on FLINK-8661:
---

Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5659


> Replace Collections.EMPTY_MAP with Collections.emptyMap()
> -
>
> Key: FLINK-8661
> URL: https://issues.apache.org/jira/browse/FLINK-8661
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> The use of Collections.EMPTY_SET and Collections.EMPTY_MAP often causes 
> unchecked assignment and it should be replaced with Collections.emptySet() 
> and Collections.emptyMap() .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390026#comment-16390026
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935214
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
--- End diff --

need synchronization on put() and flush()


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390024#comment-16390024
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172934683
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
--- End diff --

how is the capacity range determined - is it recommended by RocksDB?

the msg should be: "capacity should be between " + MIN + " and " + MAX


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390025#comment-16390025
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935414
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
+
+   this.batch.put(handle, key, value);
+
+   if (++currentSize == capacity) {
+   flush();
+   }
+   }
+
+   public void flush() throws RocksDBException {
+   this.db.write(options, batch);
+   batch.clear();
+   currentSize = 0;
+   }
+
+   @Override
+   public void close() throws RocksDBException {
+   if (batch != null) {
--- End diff --

can batch be null?


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935414
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
+
+   this.batch.put(handle, key, value);
+
+   if (++currentSize == capacity) {
+   flush();
+   }
+   }
+
+   public void flush() throws RocksDBException {
+   this.db.write(options, batch);
+   batch.clear();
+   currentSize = 0;
+   }
+
+   @Override
+   public void close() throws RocksDBException {
+   if (batch != null) {
--- End diff --

can batch be null?


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935214
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
--- End diff --

need synchronization on put() and flush()


---


[GitHub] flink pull request #5617: [FLINK-8799][YARN] Make AbstractYarnClusterDescrip...

2018-03-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5617#discussion_r172979651
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -172,6 +181,88 @@ public AbstractYarnClusterDescriptor(
userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
this.configurationDirectory = 
Preconditions.checkNotNull(configurationDirectory);
+
+   String yarnQueueConfigValue = 
flinkConfiguration.getString(YarnConfigOptions.YARN_QUEUE);
--- End diff --

The difficulty of this ticket (FLINK-8799) is that `flinkConfiguration` is 
mutable. As long as a reference of `flinkConfiguration` can possibly leak, this 
class remains mutable. Also, there are some private methods that mutate the 
configuration. There are several places where we would need to make defensive 
copies, e.g., in the constructor. 


---


[jira] [Resolved] (FLINK-8827) When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8827.
-
   Resolution: Fixed
Fix Version/s: 1.6.0
   1.5.0

Fixed in
  - 1.5.0 via af9a68c5af85feb51de5cac735de1473f008f0fe
  - 1.6.0 via 6b7e9896eb6a4e2fc628403c9159b7652db2e311

> When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed
> -
>
> Key: FLINK-8827
> URL: https://issues.apache.org/jira/browse/FLINK-8827
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: Red Hat Enterprise Linux Server release 6.5 (Santiago)
>Reporter: Donghui Xu
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> When the path of FLINK_CONF_DIR including spaces, executing zookeeper related 
> scripts failed with the following error message: Expect binary expression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8824.
---

> In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
> --
>
> Key: FLINK-8824
> URL: https://issues.apache.org/jira/browse/FLINK-8824
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> The connector uses {{getCanonicalName()}} in all places, gather than 
> {{getClassName()}}.
> {{getCanonicalName()}}'s intention is to normalize class names for arrays, 
> etc, but is problematic when instantiating classes from class names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8827) When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed

2018-03-07 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8827.
---

> When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed
> -
>
> Key: FLINK-8827
> URL: https://issues.apache.org/jira/browse/FLINK-8827
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: Red Hat Enterprise Linux Server release 6.5 (Santiago)
>Reporter: Donghui Xu
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> When the path of FLINK_CONF_DIR including spaces, executing zookeeper related 
> scripts failed with the following error message: Expect binary expression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config

2018-03-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390255#comment-16390255
 ] 

Stephan Ewen commented on FLINK-8867:
-

I would suggest that the fact that these follow-up exceptions disguise the 
original exception is an issue to be fixed in itself.

> Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
> 
>
> Key: FLINK-8867
> URL: https://issues.apache.org/jira/browse/FLINK-8867
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration, State Backends, Checkpointing, YARN
>Affects Versions: 1.4.1, 1.4.2
>Reporter: Shashank Agarwal
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> In our setup, when we put an entry in our Flink_conf file for default schema.
> {code}
> fs.default-scheme: hdfs://mydomain.com:8020/flink
> {code}
> Than application with rocksdb state backend fails with the following 
> exception. When we remove this config it works fine. It's working fine with 
> other state backends.
> {code}
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 
> for operator order ip stream (1/1).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator order ip stream (1/1).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 more
>   Caused by: java.lang.IllegalStateException
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   [CIRCULAR REFERENCE:java.lang.IllegalStateException]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-03-07 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390365#comment-16390365
 ] 

Thomas Weise commented on FLINK-5697:
-

For idleness detection see: [https://github.com/apache/flink/pull/5634]

 

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7129) Support dynamically changing CEP patterns

2018-03-07 Thread Che Lui Shum (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390326#comment-16390326
 ] 

Che Lui Shum commented on FLINK-7129:
-

Hi [~fhueske] and [~dawidwys], may I ask if there is any update on this feature?

> Support dynamically changing CEP patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream

2018-03-07 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390467#comment-16390467
 ] 

Rong Rong commented on FLINK-8690:
--

I created one of the initial (not at all perfect) support trying to create 
special treatment by adding a distinct rule on DataStreamNormRuleSet, so 
logical plan for DataStream directly coverts distinct logicalAggregate to 
FlinkLogicalAggregates, while DataSet still goes through 
AggregateExpandDistinctAggregatesRule.  Please see them here:
https://github.com/walterddr/flink/commit/ef9777cd8859180f38900393967deaabf39b7453
 

[~hequn8128] mentioned that we can override the match() function to achieve 
this. However, in order to separately generate logical plan for DataSet and 
DataStream, current solution I can think off is to add new rule with the 
NormRuleSet. Do you think this is the right path?

Any comments or suggestions are highly appreciate :-)

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-07 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
This is a good proposal, it should also survive a general connector 
refactor that will be necessary to address other code duplication. The Kinesis 
ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a 
reference back to this thread. I would be happy to add the watermark support 
based on the revised generator. Perhaps it would be good to recognize the 
special "idle" watermark in SourceContext also?


---


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390362#comment-16390362
 ] 

ASF GitHub Bot commented on FLINK-5479:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
This is a good proposal, it should also survive a general connector 
refactor that will be necessary to address other code duplication. The Kinesis 
ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a 
reference back to this thread. I would be happy to add the watermark support 
based on the revised generator. Perhaps it would be good to recognize the 
special "idle" watermark in SourceContext also?


> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390632#comment-16390632
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173049537
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
--- End diff --

About the capacity range, I didn't find a specific value recommend by 
RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ)
```
Q: What's the fastest way to load data into RocksDB?
...
2. batch hundreds of keys into one write batch
...
```
I found that they use the word `hundreds`.


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173048697
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
--- End diff --

Hmm... currently, it is only used in single thread. For the best 
performance, I wouldn't like to add synchronization for it, I'd like to add 
annotation for this class that it's not thread safe. We could introduce a new 
class that is thread safe if we really need it. What do you think? 


---


[jira] [Commented] (FLINK-4811) Checkpoint Overview should list failed checkpoints

2018-03-07 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390689#comment-16390689
 ] 

Sihua Zhou commented on FLINK-4811:
---

This jira's state is still Unresolved, I think this has been addressed right?

> Checkpoint Overview should list failed checkpoints
> --
>
> Key: FLINK-4811
> URL: https://issues.apache.org/jira/browse/FLINK-4811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: Stephan Ewen
>Priority: Major
>
> To let users understand what is happening with failed/skipped checkpoints, 
> the web UI should display failed checkpoints as well, with their error cause.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173048763
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
+
+   this.batch.put(handle, key, value);
+
+   if (++currentSize == capacity) {
+   flush();
+   }
+   }
+
+   public void flush() throws RocksDBException {
+   this.db.write(options, batch);
+   batch.clear();
+   currentSize = 0;
+   }
+
+   @Override
+   public void close() throws RocksDBException {
+   if (batch != null) {
--- End diff --

You are right, this `if` can be removed.


---


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390623#comment-16390623
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173048763
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
+
+   this.batch.put(handle, key, value);
+
+   if (++currentSize == capacity) {
+   flush();
+   }
+   }
+
+   public void flush() throws RocksDBException {
+   this.db.write(options, batch);
+   batch.clear();
+   currentSize = 0;
+   }
+
+   @Override
+   public void close() throws RocksDBException {
+   if (batch != null) {
--- End diff --

You are right, this `if` can be removed.


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390622#comment-16390622
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173048697
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
--- End diff --

Hmm... currently, it is only used in single thread. For the best 
performance, I wouldn't like to add synchronization for it, I'd like to add 
annotation for this class that it's not thread safe. We could introduce a new 
class that is thread safe if we really need it. What do you think? 


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8790) Improve performance for recovery from incremental checkpoint

2018-03-07 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8790:
--
Fix Version/s: (was: 1.5.0)
   1.6.0

> Improve performance for recovery from incremental checkpoint
> 
>
> Key: FLINK-8790
> URL: https://issues.apache.org/jira/browse/FLINK-8790
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> When there are multi state handle to be restored, we can improve the 
> performance as follow:
> 1. Choose the best state handle to init the target db
> 2. Use the other state handles to create temp db, and clip the db according 
> to the target key group range (via rocksdb.deleteRange()), this can help use 
> get rid of the `key group check` in 
>  `data insertion loop` and also help us get rid of traversing the useless 
> record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config

2018-03-07 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390847#comment-16390847
 ] 

Shashank Agarwal commented on FLINK-8867:
-

[~StephanEwen] [~srichter] You can check full logs at issue

https://issues.apache.org/jira/browse/FLINK-7756

Actually, when we were debugging that issue we found the root cause is this. 

> Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
> 
>
> Key: FLINK-8867
> URL: https://issues.apache.org/jira/browse/FLINK-8867
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration, State Backends, Checkpointing, YARN
>Affects Versions: 1.4.1, 1.4.2
>Reporter: Shashank Agarwal
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> In our setup, when we put an entry in our Flink_conf file for default schema.
> {code}
> fs.default-scheme: hdfs://mydomain.com:8020/flink
> {code}
> Than application with rocksdb state backend fails with the following 
> exception. When we remove this config it works fine. It's working fine with 
> other state backends.
> {code}
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 
> for operator order ip stream (1/1).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator order ip stream (1/1).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 more
>   Caused by: java.lang.IllegalStateException
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   [CIRCULAR REFERENCE:java.lang.IllegalStateException]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390813#comment-16390813
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@zentol it seems the Travis CI has some problem, always build failed. 
Please review my latest change.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-07 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@zentol it seems the Travis CI has some problem, always build failed. 
Please review my latest change.


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r173049537
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
--- End diff --

About the capacity range, I didn't find a specific value recommend by 
RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ)
```
Q: What's the fastest way to load data into RocksDB?
...
2. batch hundreds of keys into one write batch
...
```
I found that they use the word `hundreds`.


---


[jira] [Created] (FLINK-8893) NPE when netty try to allocate directBuffer

2018-03-07 Thread aitozi (JIRA)
aitozi created FLINK-8893:
-

 Summary: NPE when netty try to allocate directBuffer
 Key: FLINK-8893
 URL: https://issues.apache.org/jira/browse/FLINK-8893
 Project: Flink
  Issue Type: Bug
  Components: Network
Affects Versions: 1.3.2
Reporter: aitozi


Job failed with this exception 

{code:java}
Caused by: java.lang.NullPointerException
at io.netty.buffer.PoolChunk.initBufWithSubpage(PoolChunk.java:381)
at io.netty.buffer.PoolChunk.initBufWithSubpage(PoolChunk.java:369)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:194)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
at 
org.apache.flink.runtime.io.network.netty.NettyBufferPool.directBuffer(NettyBufferPool.java:278)
at 
org.apache.flink.runtime.io.network.netty.NettyMessage.allocateBuffer(NettyMessage.java:72)
at 
org.apache.flink.runtime.io.network.netty.NettyMessage.access$000(NettyMessage.java:50)
at 
org.apache.flink.runtime.io.network.netty.NettyMessage$BufferResponse.write(NettyMessage.java:227)
... 26 more
{code}

After research, this may be caused by allocate from muti-thread. 

https://github.com/netty/netty/issues/4198
https://github.com/netty/netty/pull/4388




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api

2018-03-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389832#comment-16389832
 ] 

Stephan Ewen commented on FLINK-8828:
-

[~fhueske] and [~twalthr] Please have a look at [~jelmer]'s comment here, I 
think he has a very good point.

Concerning the method name, could we use a method {{collectStream}} or 
{{collectElements}} or something like that?

Alternatively, could we overload the {{collect}} method and rename the current 
{{collect()}} method eventually?

> Add collect method to DataStream / DataSet scala api
> 
>
> Key: FLINK-8828
> URL: https://issues.apache.org/jira/browse/FLINK-8828
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataSet API, DataStream API, Scala API
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>
> A collect function is a method that takes a Partial Function as its parameter 
> and applies it to all the elements in the collection to create a new 
> collection which satisfies the Partial Function.
> It can be found on all [core scala collection 
> classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html]
>  as well as on spark's [rdd 
> interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD]
> To understand its utility imagine the following scenario :
> Given a DataStream that produces events of type _Purchase_ and _View_ 
> Transform this stream into a stream of purchase amounts over 1000 euros.
> Currently an implementation might look like
> {noformat}
> val x = dataStream
>   .filter(_.isInstanceOf[Purchase])
>   .map(_.asInstanceOf[Purchase])
>   .filter(_.amount > 1000)
>   .map(_.amount){noformat}
> Or alternatively you could do this
> {noformat}
> dataStream.flatMap(_ match {
>   case p: Purchase if p.amount > 1000 => Some(p.amount)
>   case _ => None
> }){noformat}
> But with collect implemented it could look like
> {noformat}
> dataStream.collect {
>   case p: Purchase if p.amount > 1000 => p.amount
> }{noformat}
>  
> Which is a lot nicer to both read and write



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172922843
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,9 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   // we do not have concurrency checks here, because serialize() 
and
+   // deserialize() do the checks and the current mechanism does 
not handle
--- End diff --

looks like the end of the comment is missing


---


[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389855#comment-16389855
 ] 

ASF GitHub Bot commented on FLINK-8876:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5645#discussion_r172922843
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -163,8 +224,9 @@ public T copy(T from, T reuse) {
 
@Override
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-   T value = deserialize(source);
-   serialize(value, target);
+   // we do not have concurrency checks here, because serialize() 
and
+   // deserialize() do the checks and the current mechanism does 
not handle
--- End diff --

looks like the end of the comment is missing


> Improve concurrent access handling in stateful serializers
> --
>
> Key: FLINK-8876
> URL: https://issues.apache.org/jira/browse/FLINK-8876
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Some stateful serializers produce incorrect results when accidentally 
> accessed by multiple threads concurrently.
>  To better catch these cases, I suggest to add concurrency checks that are 
> active only when debug logging is enabled, and during test runs.
> This is inspired by Kryo's checks for concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5658: [FLINK-8856] [TaskManager] Move all cancellation i...

2018-03-07 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5658

 [FLINK-8856] [TaskManager] Move all cancellation interrupt calls to 
TaskCanceller thread

## What is the purpose of the change

This cleans up the code and guards against a JVM bug where `interrupt()` 
calls
block/deadlock if the thread is engaged in certain I/O operations.

In addition, this makes sure that the process really goes away when the 
cancellation
timeout expires, rather than relying on the TaskManager to be able to 
properly handle
the fatal error notification.

Some minor robustness enhancements related to this change are included in 
this PR.

## Verifying this change

The change is motivated by an occasional JVM bug that I could not 
purposefully trigger in tests to guard against rollback to the prior state. All 
tests were passing prior to this change and are passing after this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink fix_task_interrupt

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5658.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5658


commit 36726fc5c277c649dc360975a34bf7be0afd7a0e
Author: Stephan Ewen 
Date:   2018-03-06T14:54:13Z

[hotfix] [taskmanager] Fix checkstyle in Task and TaskTest

commit 385b2032bcaa397d21d28e90efa89a44e12ebe99
Author: Stephan Ewen 
Date:   2018-03-06T14:18:33Z

[FLINK-8856] [TaskManager] Move all cancellation interrupt calls to 
TaskCanceller thread

This cleans up the code and guards against a JVM bug where 'interrupt()' 
calls
block/deadlock if the thread is engaged in certain I/O operations.

In addition, this makes sure that the process really goes away when the 
cancellation
timeout expires, rather than relying on the TaskManager to be able to 
properly handle
the fatal error notification.

commit 3b18d7d9eccc936dc53b05b787f3fd4c19171d4f
Author: Stephan Ewen 
Date:   2018-03-06T15:36:13Z

[FLINK-8883] [core] Make ThreadDeath a fatal error in ExceptionUtils

commit f3884088b210a061dba4d83323884bece1d31864
Author: Stephan Ewen 
Date:   2018-03-06T16:14:54Z

[FLINK-8885] [TaskManager] DispatcherThreadFactory registers a fatal error 
exception handler

In case dispatcher threads let an exception bubble out (does not hanle it), 
the
exception handler terminates the process, to esure we don't leave broken 
processes.

commit 5236bb73a2482ccdf016d1b9bea5cd0f17f2f620
Author: Stephan Ewen 
Date:   2018-03-06T16:18:38Z

[hotfix] [runtime] Harden FatalExitExceptionHandler

In case the logging framework throws an exception when handling the 
exception,
we still kill the process, as intended.




---


[jira] [Commented] (FLINK-8856) Move all interrupt() calls to TaskCanceler

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389860#comment-16389860
 ] 

ASF GitHub Bot commented on FLINK-8856:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5658

 [FLINK-8856] [TaskManager] Move all cancellation interrupt calls to 
TaskCanceller thread

## What is the purpose of the change

This cleans up the code and guards against a JVM bug where `interrupt()` 
calls
block/deadlock if the thread is engaged in certain I/O operations.

In addition, this makes sure that the process really goes away when the 
cancellation
timeout expires, rather than relying on the TaskManager to be able to 
properly handle
the fatal error notification.

Some minor robustness enhancements related to this change are included in 
this PR.

## Verifying this change

The change is motivated by an occasional JVM bug that I could not 
purposefully trigger in tests to guard against rollback to the prior state. All 
tests were passing prior to this change and are passing after this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink fix_task_interrupt

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5658.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5658


commit 36726fc5c277c649dc360975a34bf7be0afd7a0e
Author: Stephan Ewen 
Date:   2018-03-06T14:54:13Z

[hotfix] [taskmanager] Fix checkstyle in Task and TaskTest

commit 385b2032bcaa397d21d28e90efa89a44e12ebe99
Author: Stephan Ewen 
Date:   2018-03-06T14:18:33Z

[FLINK-8856] [TaskManager] Move all cancellation interrupt calls to 
TaskCanceller thread

This cleans up the code and guards against a JVM bug where 'interrupt()' 
calls
block/deadlock if the thread is engaged in certain I/O operations.

In addition, this makes sure that the process really goes away when the 
cancellation
timeout expires, rather than relying on the TaskManager to be able to 
properly handle
the fatal error notification.

commit 3b18d7d9eccc936dc53b05b787f3fd4c19171d4f
Author: Stephan Ewen 
Date:   2018-03-06T15:36:13Z

[FLINK-8883] [core] Make ThreadDeath a fatal error in ExceptionUtils

commit f3884088b210a061dba4d83323884bece1d31864
Author: Stephan Ewen 
Date:   2018-03-06T16:14:54Z

[FLINK-8885] [TaskManager] DispatcherThreadFactory registers a fatal error 
exception handler

In case dispatcher threads let an exception bubble out (does not hanle it), 
the
exception handler terminates the process, to esure we don't leave broken 
processes.

commit 5236bb73a2482ccdf016d1b9bea5cd0f17f2f620
Author: Stephan Ewen 
Date:   2018-03-06T16:18:38Z

[hotfix] [runtime] Harden FatalExitExceptionHandler

In case the logging framework throws an exception when handling the 
exception,
we still kill the process, as intended.




> Move all interrupt() calls to TaskCanceler
> --
>
> Key: FLINK-8856
> URL: https://issues.apache.org/jira/browse/FLINK-8856
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> We need this to work around the following JVM bug: 
> https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8138622
> To circumvent this problem, the {{TaskCancelerWatchDog}} must not call 
> {{interrupt()}} at all, but only join on the executing thread (with timeout) 
> and cause a hard exit once cancellation takes to long.
> A user affected by this problem reported this in FLINK-8834
> Personal note: The Thread.join(...) method 

[jira] [Updated] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-07 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8845:
--
Summary: Use WriteBatch to improve performance for recovery in RocksDB 
backend  (was:  Introduce `parallel recovery` mode for full checkpoint 
(savepoint))

> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{ingestExternalFile()}} and {{SstFileWriter}} provided by RocksDB, 
> we can restore from fully checkpoint (savepoint) in parallel. This can also 
> be extended to incremental checkpoint easily, but for the sake of simple, we 
> do this in two separate tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...

2018-03-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5652
  
yup. But one profile is already scratching the 50m limit as is :/


---


[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...

2018-03-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5648#discussion_r172775779
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -404,16 +404,25 @@ public void start() throws Exception {
final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
if (jobManagerRunner != null) {
-   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
-   } else {
-   final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
-
-   if (jobDetails != null) {
-   return 
CompletableFuture.completedFuture(jobDetails.getStatus());
-   } else {
-   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   try {
+   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
--- End diff --

This is an asynchronous call that isn't throwing the exception. You have to 
add a handler to the returned `CompletableFuture`. It also only properly 
resolves one of the exceptions, and IMO shouldn't catch `Exception` but the 
specific exceptions we want the workaround to work for as to not hide other 
issues.

In any case, I'm not sure if adding workarounds to the Dispatcher is the 
right way to go. These issues revealed that some scenarios are not properly 
handled, and I would prefer waiting for @tillrohrmann to really fix this in the 
Dispatcher and related components.

We can temporarily handle both exceptions in the `MiniClusterClient` by 
adding a *single* retry (with a short sleep) if a **specific** exception occurs.


---


[jira] [Created] (FLINK-8889) Do not override config values in TestBaseUtils#startCluster

2018-03-07 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-8889:
---

 Summary: Do not override config values in 
TestBaseUtils#startCluster
 Key: FLINK-8889
 URL: https://issues.apache.org/jira/browse/FLINK-8889
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.5.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.5.0


{{TestBaseUtils#startCluster}} is a utility method to easily start a cluster. 
This method receives a Configuration to be used for the cluster, it does 
however override several values regardless of whether they were set or not.

It also sets various options to their default value which shouldn't be 
necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8890) Compare checkpoints with order in CompletedCheckpoint.checkpointsMatch()

2018-03-07 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8890:

Summary: Compare checkpoints with order in 
CompletedCheckpoint.checkpointsMatch()  (was: Compare checkpoints in order in 
CompletedCheckpoint.checkpointsMatch())

> Compare checkpoints with order in CompletedCheckpoint.checkpointsMatch()
> 
>
> Key: FLINK-8890
> URL: https://issues.apache.org/jira/browse/FLINK-8890
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.3.3, 1.5.0, 1.4.2
>
>
> This method is used, among other things, to check if a list of restored 
> checkpoints is stable after several restore attempts in the ZooKeeper 
> checkpoint store. The order of checkpoints is somewhat important because we 
> want the latest checkpoint to stay the latest checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8843) Decouple bind REST address from advertised address

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389228#comment-16389228
 ] 

ASF GitHub Bot commented on FLINK-8843:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5632
  
hi @zentol , it seems @tillrohrmann has no free time recently. Would you 
please review this?


> Decouple bind REST address from advertised address
> --
>
> Key: FLINK-8843
> URL: https://issues.apache.org/jira/browse/FLINK-8843
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> The {{RestServerEndpoint}} is currently bound to the same address which is 
> also advertised to the client, namely {{RestOptions#REST_ADDRESS}}. It would 
> be better to start the {{RestServerEndpoint}} listening on all address by 
> binding to {{0.0.0.0}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...

2018-03-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5652
  
All legacy tests going through the `MiniClusterResource` will take longer. 
I don't know by how much, but we now have to start multiple actor systems and 
the JM<->TM communication is no longer local.


---


[GitHub] flink issue #5632: [FLINK-8843][REST] Decouple bind REST address from advert...

2018-03-07 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5632
  
hi @zentol , it seems @tillrohrmann has no free time recently. Would you 
please review this?


---


  1   2   3   >