[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5660 [FLINK-8861] [table] Add support for batch queries in SQL Client ## What is the purpose of the change This PR added support for batch queries in SQL Client. ## Brief change log - Added a `StaticResult` and a `BatchResult` for the batch query results. - Added related methods to `ResultStore` for static results and renamed the existing methods with a prefix "dynamic". - Added the logic for retrieving batch query results consulting to `Dataset.collect()`. - Adapted the viewing logic for static results to a "two-phase" table result view. - Added the first-page option to `CliTableResultView.java`. - Replaced some default values with `""` in `Execution.java`. ## Verifying this change This change can be verified by the added test case `testBatchQueryExecution()`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8861 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5660.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5660 commit 85225d504114fe80b1dc6cd85cb5e3daf1a55d36 Author: Xingcan CuiDate: 2018-03-07T17:12:55Z [FLINK-8861][table]Add support for batch queries in SQL Client ---
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389906#comment-16389906 ] ASF GitHub Bot commented on FLINK-8861: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5660 [FLINK-8861] [table] Add support for batch queries in SQL Client ## What is the purpose of the change This PR added support for batch queries in SQL Client. ## Brief change log - Added a `StaticResult` and a `BatchResult` for the batch query results. - Added related methods to `ResultStore` for static results and renamed the existing methods with a prefix "dynamic". - Added the logic for retrieving batch query results consulting to `Dataset.collect()`. - Adapted the viewing logic for static results to a "two-phase" table result view. - Added the first-page option to `CliTableResultView.java`. - Replaced some default values with `""` in `Execution.java`. ## Verifying this change This change can be verified by the added test case `testBatchQueryExecution()`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8861 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5660.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5660 commit 85225d504114fe80b1dc6cd85cb5e3daf1a55d36 Author: Xingcan CuiDate: 2018-03-07T17:12:55Z [FLINK-8861][table]Add support for batch queries in SQL Client > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8892) Travis Build: Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) on project flink-tests_2.11
[ https://issues.apache.org/jira/browse/FLINK-8892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8892. --- Resolution: Duplicate Fix Version/s: (was: 1.6.0) > Travis Build: Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11 > --- > > Key: FLINK-8892 > URL: https://issues.apache.org/jira/browse/FLINK-8892 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.6.0 >Reporter: Bowen Li >Priority: Critical > Attachments: image-2018-03-07-10-05-38-456.png > > > {code:java} > .. > Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.455 sec - > in org.apache.flink.test.broadcastvars.BroadcastBranchingITCase > Aborted (core dumped) > Results : > Tests run: 1413, Failures: 0, Errors: 0, Skipped: 24 > 08:03:40.358 [INFO] > > 08:03:40.358 [INFO] BUILD FAILURE > 08:03:40.359 [INFO] > > 08:03:40.359 [INFO] Total time: 27:06 min > 08:03:40.359 [INFO] Finished at: 2018-03-07T08:03:40+00:00 > 08:03:40.781 [INFO] Final Memory: 81M/705M > 08:03:40.781 [INFO] > > 08:03:40.782 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: ExecutionException: > org.apache.maven.surefire.booter.SurefireBooterForkException: Error occurred > in starting fork, check output in log -> [Help 1] > 08:03:40.782 [ERROR] > 08:03:40.782 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 08:03:40.782 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 08:03:40.782 [ERROR] > 08:03:40.783 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 08:03:40.783 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (7889). > ./tools/travis_mvn_watchdog.sh: line 532: 7889 Terminated > watchdog > PRODUCED build artifacts. > build_info mvn-1.log mvn-2.log mvn.out > COMPRESSING build artifacts. > {code} > Failed builds: > - https://travis-ci.org/apache/flink/jobs/350178841 > - https://travis-ci.org/apache/flink/jobs/349445796 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172934683 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); --- End diff -- how is the capacity range determined - is it recommended by RocksDB? the msg should be: "capacity should be between " + MIN + " and " + MAX ---
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390155#comment-16390155 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972736 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390157#comment-16390157 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972678 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390152#comment-16390152 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972120 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390156#comment-16390156 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972454 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390154#comment-16390154 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972943 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972120 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static
[jira] [Closed] (FLINK-8877) Configure Kryo's log level based on Flink's log level
[ https://issues.apache.org/jira/browse/FLINK-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8877. --- > Configure Kryo's log level based on Flink's log level > - > > Key: FLINK-8877 > URL: https://issues.apache.org/jira/browse/FLINK-8877 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Kryo uses its embedded MinLog for logging. > When Flink is set to trace, Kryo should be set to trace as well. Other log > levels should not be uses, as even debug logging in Kryo results in excessive > logging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8876. - Resolution: Fixed Fixed by resolving all sub-issues > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8876. --- > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8878) Check for concurrent access to Kryo Serializer
[ https://issues.apache.org/jira/browse/FLINK-8878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8878. - Resolution: Fixed Fixed in - 1.5.0 via 8a77dbf16febea72d389b2dc497e63cb768a3d2d - 1.6.0 via 57ff6e8930db0bfdd8e7cbb8418d9a4b46ca4a61 > Check for concurrent access to Kryo Serializer > -- > > Key: FLINK-8878 > URL: https://issues.apache.org/jira/browse/FLINK-8878 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > On debug log level and during tests, the {{KryoSerializer}} should check > whether it is concurrently accessed, and throw an exception in that case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8877) Configure Kryo's log level based on Flink's log level
[ https://issues.apache.org/jira/browse/FLINK-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8877. - Resolution: Fixed Fixed in - 1.5.0 via b0418b41f8fa02d3217b760c5bdfcdd7efdc1eac - 1.6.0 via 06110d27d5fcbf939610e2adc780e7ad1c467f6f > Configure Kryo's log level based on Flink's log level > - > > Key: FLINK-8877 > URL: https://issues.apache.org/jira/browse/FLINK-8877 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Kryo uses its embedded MinLog for logging. > When Flink is set to trace, Kryo should be set to trace as well. Other log > levels should not be uses, as even debug logging in Kryo results in excessive > logging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8878) Check for concurrent access to Kryo Serializer
[ https://issues.apache.org/jira/browse/FLINK-8878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8878. --- > Check for concurrent access to Kryo Serializer > -- > > Key: FLINK-8878 > URL: https://issues.apache.org/jira/browse/FLINK-8878 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > On debug log level and during tests, the {{KryoSerializer}} should check > whether it is concurrently accessed, and throw an exception in that case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389934#comment-16389934 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5645 Merging this after fixing the comment... > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5645: FLINK-8876 Improve concurrent access handling in stateful...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5645 Merging this after fixing the comment... ---
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389933#comment-16389933 ] ASF GitHub Bot commented on FLINK-8876: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172939401 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,9 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + // we do not have concurrency checks here, because serialize() and + // deserialize() do the checks and the current mechanism does not handle --- End diff -- oh, right, fixing... > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172939401 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,9 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + // we do not have concurrency checks here, because serialize() and + // deserialize() do the checks and the current mechanism does not handle --- End diff -- oh, right, fixing... ---
[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172963487 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static
[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972943 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static
[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972678 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static
[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972454 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static
[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172972736 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390175#comment-16390175 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172977111 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390204#comment-16390204 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172978556 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.time; + + +import org.apache.flink.annotation.Internal; + +import java.time.Duration; + +/** + * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}. + */ +@Internal +public class Deadline { + private final Duration time; --- End diff -- I find this a bit confusing to use `Duration` here, because it really does not hold a duration, but an absolute point in time (in the future) evaluated against `System.nanoTime()`. I would simply use a `long deadlineNanos` here, which also makes the `isOverdue()` check (the most frequent one) cheaper. You can (and should) still use `Duration` for the arithmetic (adding time, etc) - simply convert to nanos. > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390206#comment-16390206 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172978813 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.time; + + +import org.apache.flink.annotation.Internal; + +import java.time.Duration; + +/** + * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}. + */ +@Internal +public class Deadline { + private final Duration time; + + private Deadline(Duration time) { + this.time = time; + } + + public Deadline plus(Duration other) { + return new Deadline(time.plus(other)); + } + + /** +* Returns the time left between the deadline and now. +*/ + public Duration timeLeft() { + return time.minus(Duration.ofNanos(System.nanoTime())); --- End diff -- Is this expected to go negative, or simply stay at 0 when overdue? > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172980021 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); +
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172980585 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { --- End diff -- I have similar comments for this test as for #5654 - the test is fins in general, but can probably be simplified slightly, plus a bit of code style cleanup. ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172979459 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -223,6 +224,81 @@ } } + /** +* Retry the given operation with the given delay in between successful completions where the +* result does not match a given predicate. +* +* @param operation to retry +* @param retryDelay delay between retries +* @param deadline A deadline that specifies at what point we should stop retrying +* @param acceptancePredicate Predicate to test whether the result is acceptable +* @param scheduledExecutor executor to be used for the retry operation +* @param type of the result +* @return Future which retries the given operation a given amount of times and delays the retry +* in case the predicate isn't matched +*/ + public static CompletableFuture retrySuccesfulWithDelay( + final Supplieroperation, + final Time retryDelay, --- End diff -- Deadline uses `Duration`, this method uses `Time`. ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172978813 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.time; + + +import org.apache.flink.annotation.Internal; + +import java.time.Duration; + +/** + * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}. + */ +@Internal +public class Deadline { + private final Duration time; + + private Deadline(Duration time) { + this.time = time; + } + + public Deadline plus(Duration other) { + return new Deadline(time.plus(other)); + } + + /** +* Returns the time left between the deadline and now. +*/ + public Duration timeLeft() { + return time.minus(Duration.ofNanos(System.nanoTime())); --- End diff -- Is this expected to go negative, or simply stay at 0 when overdue? ---
[GitHub] flink pull request #5656: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172978556 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.time; + + +import org.apache.flink.annotation.Internal; + +import java.time.Duration; + +/** + * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}. + */ +@Internal +public class Deadline { + private final Duration time; --- End diff -- I find this a bit confusing to use `Duration` here, because it really does not hold a duration, but an absolute point in time (in the future) evaluated against `System.nanoTime()`. I would simply use a `long deadlineNanos` here, which also makes the `isOverdue()` check (the most frequent one) cheaper. You can (and should) still use `Duration` for the arithmetic (adding time, etc) - simply convert to nanos. ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5634 @tweise @tzulitai I would suggest to solve this the following way, which should be both simple and cover our cases: - We extend the current periodic watermark generators for idleness. We can do that for example by maintaining a record counter and remembering the last counter and a System.nanoTime() timestamp each time the call whether to generate a watermark is called. If no record came for too long, return a special watermark object that indicated "idle". Or change the return type to return either 'none', 'idle', or 'watermark' - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka Consumer does. That part needs to be added to the Kinesis consumer anyways. That way, we automatically get per-shard idleness in Kinesis and per-partition idleness in Kafka without doing anything specific for the source connectors. We can then also remove the idleness logic from the source context - it would be duplicate there. ---
[GitHub] flink pull request #5620: [FLINK-8824] [kafka] Replace getCanonicalName with...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5620 ---
[GitHub] flink pull request #5643: can integrate and support on apache kudu ?
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5643 ---
[GitHub] flink pull request #5646: [hotfix] [javadocs] minor javadoc fix in Timestamp...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5646 ---
[GitHub] flink pull request #5614: [FLINK-8827] When FLINK_CONF_DIR contains spaces, ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5614 ---
[GitHub] flink issue #5655: [FLINK-8487] Verify ZooKeeper checkpoint store behaviour ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5655 This code is (except for lambdas) identical to #5654 Please apply the same changes here as to the other PR (with regard to the review comments). ---
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390207#comment-16390207 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172979459 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -223,6 +224,81 @@ } } + /** +* Retry the given operation with the given delay in between successful completions where the +* result does not match a given predicate. +* +* @param operation to retry +* @param retryDelay delay between retries +* @param deadline A deadline that specifies at what point we should stop retrying +* @param acceptancePredicate Predicate to test whether the result is acceptable +* @param scheduledExecutor executor to be used for the retry operation +* @param type of the result +* @return Future which retries the given operation a given amount of times and delays the retry +* in case the predicate isn't matched +*/ + public static CompletableFuture retrySuccesfulWithDelay( + final Supplieroperation, + final Time retryDelay, --- End diff -- Deadline uses `Duration`, this method uses `Time`. > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390205#comment-16390205 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172980021 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(1L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390208#comment-16390208 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172980585 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { --- End diff -- I have similar comments for this test as for #5654 - the test is fins in general, but can probably be simplified slightly, plus a bit of code style cleanup. > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing
[jira] [Commented] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
[ https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390163#comment-16390163 ] ASF GitHub Bot commented on FLINK-8824: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5620 > In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()' > -- > > Key: FLINK-8824 > URL: https://issues.apache.org/jira/browse/FLINK-8824 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Stephan Ewen >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The connector uses {{getCanonicalName()}} in all places, gather than > {{getClassName()}}. > {{getCanonicalName()}}'s intention is to normalize class names for arrays, > etc, but is problematic when instantiating classes from class names. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8827) When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed
[ https://issues.apache.org/jira/browse/FLINK-8827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390162#comment-16390162 ] ASF GitHub Bot commented on FLINK-8827: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5614 > When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed > - > > Key: FLINK-8827 > URL: https://issues.apache.org/jira/browse/FLINK-8827 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: Red Hat Enterprise Linux Server release 6.5 (Santiago) >Reporter: Donghui Xu >Priority: Major > > When the path of FLINK_CONF_DIR including spaces, executing zookeeper related > scripts failed with the following error message: Expect binary expression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172977111 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390174#comment-16390174 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5655 This code is (except for lambdas) identical to #5654 Please apply the same changes here as to the other PR (with regard to the review comments). > State loss after multiple restart attempts > -- > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8879) Add concurrent access check to AvroSerializer
[ https://issues.apache.org/jira/browse/FLINK-8879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8879. --- > Add concurrent access check to AvroSerializer > - > > Key: FLINK-8879 > URL: https://issues.apache.org/jira/browse/FLINK-8879 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > On debug log level and during tests, the AvroSerializer should check whether > it is concurrently accessed, and throw an exception in that case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8879) Add concurrent access check to AvroSerializer
[ https://issues.apache.org/jira/browse/FLINK-8879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8879. - Resolution: Fixed Fixed in - 1.5.0 via 6ec1b784e5fea4d9d5208d44caf6fefde14f4aa8 - 1.6.0 via be7c89596a3b9cd8805a90aaf32336ec2759a1f7 > Add concurrent access check to AvroSerializer > - > > Key: FLINK-8879 > URL: https://issues.apache.org/jira/browse/FLINK-8879 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > On debug log level and during tests, the AvroSerializer should check whether > it is concurrently accessed, and throw an exception in that case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8863) Add user-defined function support in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui reassigned FLINK-8863: -- Assignee: Xingcan Cui > Add user-defined function support in SQL Client > --- > > Key: FLINK-8863 > URL: https://issues.apache.org/jira/browse/FLINK-8863 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > It should be possible to declare user-defined functions in the SQL client. > For now, we limit the registration to classes that implement > {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that > are implemented in SQL are not part of this issue. > I would suggest to introduce a {{functions}} top-level property. The > declaration could look similar to: > {code} > functions: > - name: testFunction > from: class <-- optional, default: class > class: org.my.MyScalarFunction > constructor: <-- optional, needed for > certain types of functions > - 42.0 > - class: org.my.Class <-- possibility to create > objects via properties > constructor: > - 1 > - true > - false > - "whatever" > - type: INT > value: 1 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 @tzulitai @StephanEwen the current idleness detection in the source context isn't a replacement for what is required to deal with an inactive partition (or Kinesis shard). When a connector subtask consumes multiple partitions and one is idle, then it should be possible to still generate a watermark. This can only be solved outside of the connector when the multiple source partitions are visible (like it would be for an operator with multiple input streams). ---
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389940#comment-16389940 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 @tzulitai @StephanEwen the current idleness detection in the source context isn't a replacement for what is required to deal with an inactive partition (or Kinesis shard). When a connector subtask consumes multiple partitions and one is idle, then it should be possible to still generate a watermark. This can only be solved outside of the connector when the multiple source partitions are visible (like it would be for an operator with multiple input streams). > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8799) Make AbstractYarnClusterDescriptor immutable
[ https://issues.apache.org/jira/browse/FLINK-8799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390201#comment-16390201 ] ASF GitHub Bot commented on FLINK-8799: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5617#discussion_r172979651 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -172,6 +181,88 @@ public AbstractYarnClusterDescriptor( userJarInclusion = getUserJarInclusionMode(flinkConfiguration); this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory); + + String yarnQueueConfigValue = flinkConfiguration.getString(YarnConfigOptions.YARN_QUEUE); --- End diff -- The difficulty of this ticket (FLINK-8799) is that `flinkConfiguration` is mutable. As long as a reference of `flinkConfiguration` can possibly leak, this class remains mutable. Also, there are some private methods that mutate the configuration. There are several places where we would need to make defensive copies, e.g., in the constructor. > Make AbstractYarnClusterDescriptor immutable > > > Key: FLINK-8799 > URL: https://issues.apache.org/jira/browse/FLINK-8799 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Major > Fix For: 1.6.0 > > > {{AbstractYarnClusterDescriptor}} should be made immutable. Currently, its > internal configuration is modified from different places which makes it > difficult to reason about the code. For example, it should not be possible to > modify the {{zookeeperNamespace}} using a setter method. A user of this class > should be forced to provide all information prior to creating the instance, > e.g., by passing a {{org.apache.flink.configuration.Configuration}} object. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
[ https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8824. - Resolution: Fixed Fix Version/s: 1.6.0 Fixed in - 1.5.0 via 70347bcc788dea1d4f64539701e60bafcc772946 - 1.6.0 via 75a4aaea8b051aa6dae52d421f7b4d7eeab99486 > In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()' > -- > > Key: FLINK-8824 > URL: https://issues.apache.org/jira/browse/FLINK-8824 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Stephan Ewen >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > The connector uses {{getCanonicalName()}} in all places, gather than > {{getClassName()}}. > {{getCanonicalName()}}'s intention is to normalize class names for arrays, > etc, but is problematic when instantiating classes from class names. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390245#comment-16390245 ] ASF GitHub Bot commented on FLINK-5479: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5634 @tweise @tzulitai I would suggest to solve this the following way, which should be both simple and cover our cases: - We extend the current periodic watermark generators for idleness. We can do that for example by maintaining a record counter and remembering the last counter and a System.nanoTime() timestamp each time the call whether to generate a watermark is called. If no record came for too long, return a special watermark object that indicated "idle". Or change the return type to return either 'none', 'idle', or 'watermark' - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka Consumer does. That part needs to be added to the Kinesis consumer anyways. That way, we automatically get per-shard idleness in Kinesis and per-partition idleness in Kafka without doing anything specific for the source connectors. We can then also remove the idleness logic from the source context - it would be duplicate there. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5659: [FLINK-8661] [table] Add support for batch queries...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5659 [FLINK-8661] [table] Add support for batch queries in SQL Client ## What is the purpose of the change This PR added support for batch queries in SQL Client. ## Brief change log - Added a `StaticResult` and a `BatchResult` for the batch query results. - Added related methods to `ResultStore` for static results and renamed the existing methods with a prefix "dynamic". - Added the logic for retrieving batch query results consulting to `Dataset.collect()`. - Adapted the viewing logic for static results to a "two-phase" table result view. - Added the first-page option to `CliTableResultView.java`. - Replaced some default values with `""` in `Execution.java`. ## Verifying this change This change can be verified by the added test case `testBatchQueryExecution()`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8861 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5659.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5659 commit 8f21a38dd60ab8c41893ec8c52bfcc47fc54648e Author: Xingcan CuiDate: 2018-03-07T17:12:55Z [FLINK-8661][table]Add support for batch queries in SQL Client ---
[jira] [Created] (FLINK-8892) Travis Build: Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) on project flink-tests_2.11
Bowen Li created FLINK-8892: --- Summary: Travis Build: Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) on project flink-tests_2.11 Key: FLINK-8892 URL: https://issues.apache.org/jira/browse/FLINK-8892 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.6.0 Reporter: Bowen Li Fix For: 1.6.0 Attachments: image-2018-03-07-10-05-38-456.png {code:java} .. Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.455 sec - in org.apache.flink.test.broadcastvars.BroadcastBranchingITCase Aborted (core dumped) Results : Tests run: 1413, Failures: 0, Errors: 0, Skipped: 24 08:03:40.358 [INFO] 08:03:40.358 [INFO] BUILD FAILURE 08:03:40.359 [INFO] 08:03:40.359 [INFO] Total time: 27:06 min 08:03:40.359 [INFO] Finished at: 2018-03-07T08:03:40+00:00 08:03:40.781 [INFO] Final Memory: 81M/705M 08:03:40.781 [INFO] 08:03:40.782 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) on project flink-tests_2.11: ExecutionException: org.apache.maven.surefire.booter.SurefireBooterForkException: Error occurred in starting fork, check output in log -> [Help 1] 08:03:40.782 [ERROR] 08:03:40.782 [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. 08:03:40.782 [ERROR] Re-run Maven using the -X switch to enable full debug logging. 08:03:40.782 [ERROR] 08:03:40.783 [ERROR] For more information about the errors and possible solutions, please read the following articles: 08:03:40.783 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException MVN exited with EXIT CODE: 1. Trying to KILL watchdog (7889). ./tools/travis_mvn_watchdog.sh: line 532: 7889 Terminated watchdog PRODUCED build artifacts. build_info mvn-1.log mvn-2.log mvn.out COMPRESSING build artifacts. {code} Failed builds: - https://travis-ci.org/apache/flink/jobs/350178841 - https://travis-ci.org/apache/flink/jobs/349445796 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8661) Replace Collections.EMPTY_MAP with Collections.emptyMap()
[ https://issues.apache.org/jira/browse/FLINK-8661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389898#comment-16389898 ] ASF GitHub Bot commented on FLINK-8661: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5659 [FLINK-8661] [table] Add support for batch queries in SQL Client ## What is the purpose of the change This PR added support for batch queries in SQL Client. ## Brief change log - Added a `StaticResult` and a `BatchResult` for the batch query results. - Added related methods to `ResultStore` for static results and renamed the existing methods with a prefix "dynamic". - Added the logic for retrieving batch query results consulting to `Dataset.collect()`. - Adapted the viewing logic for static results to a "two-phase" table result view. - Added the first-page option to `CliTableResultView.java`. - Replaced some default values with `""` in `Execution.java`. ## Verifying this change This change can be verified by the added test case `testBatchQueryExecution()`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8861 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5659.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5659 commit 8f21a38dd60ab8c41893ec8c52bfcc47fc54648e Author: Xingcan CuiDate: 2018-03-07T17:12:55Z [FLINK-8661][table]Add support for batch queries in SQL Client > Replace Collections.EMPTY_MAP with Collections.emptyMap() > - > > Key: FLINK-8661 > URL: https://issues.apache.org/jira/browse/FLINK-8661 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > The use of Collections.EMPTY_SET and Collections.EMPTY_MAP often causes > unchecked assignment and it should be replaced with Collections.emptySet() > and Collections.emptyMap() . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8487) State loss after multiple restart attempts
[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390153#comment-16390153 ] ASF GitHub Bot commented on FLINK-8487: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r172963487 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private
[GitHub] flink pull request #5659: [FLINK-8661] [table] Add support for batch queries...
Github user xccui closed the pull request at: https://github.com/apache/flink/pull/5659 ---
[jira] [Commented] (FLINK-8661) Replace Collections.EMPTY_MAP with Collections.emptyMap()
[ https://issues.apache.org/jira/browse/FLINK-8661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389900#comment-16389900 ] ASF GitHub Bot commented on FLINK-8661: --- Github user xccui closed the pull request at: https://github.com/apache/flink/pull/5659 > Replace Collections.EMPTY_MAP with Collections.emptyMap() > - > > Key: FLINK-8661 > URL: https://issues.apache.org/jira/browse/FLINK-8661 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > The use of Collections.EMPTY_SET and Collections.EMPTY_MAP often causes > unchecked assignment and it should be replaced with Collections.emptySet() > and Collections.emptyMap() . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390026#comment-16390026 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172935214 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { --- End diff -- need synchronization on put() and flush() > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390024#comment-16390024 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172934683 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); --- End diff -- how is the capacity range determined - is it recommended by RocksDB? the msg should be: "capacity should be between " + MIN + " and " + MAX > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390025#comment-16390025 ] ASF GitHub Bot commented on FLINK-8845: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172935414 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { + + this.batch.put(handle, key, value); + + if (++currentSize == capacity) { + flush(); + } + } + + public void flush() throws RocksDBException { + this.db.write(options, batch); + batch.clear(); + currentSize = 0; + } + + @Override + public void close() throws RocksDBException { + if (batch != null) { --- End diff -- can batch be null? > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172935414 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { + + this.batch.put(handle, key, value); + + if (++currentSize == capacity) { + flush(); + } + } + + public void flush() throws RocksDBException { + this.db.write(options, batch); + batch.clear(); + currentSize = 0; + } + + @Override + public void close() throws RocksDBException { + if (batch != null) { --- End diff -- can batch be null? ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r172935214 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { --- End diff -- need synchronization on put() and flush() ---
[GitHub] flink pull request #5617: [FLINK-8799][YARN] Make AbstractYarnClusterDescrip...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5617#discussion_r172979651 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -172,6 +181,88 @@ public AbstractYarnClusterDescriptor( userJarInclusion = getUserJarInclusionMode(flinkConfiguration); this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory); + + String yarnQueueConfigValue = flinkConfiguration.getString(YarnConfigOptions.YARN_QUEUE); --- End diff -- The difficulty of this ticket (FLINK-8799) is that `flinkConfiguration` is mutable. As long as a reference of `flinkConfiguration` can possibly leak, this class remains mutable. Also, there are some private methods that mutate the configuration. There are several places where we would need to make defensive copies, e.g., in the constructor. ---
[jira] [Resolved] (FLINK-8827) When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed
[ https://issues.apache.org/jira/browse/FLINK-8827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8827. - Resolution: Fixed Fix Version/s: 1.6.0 1.5.0 Fixed in - 1.5.0 via af9a68c5af85feb51de5cac735de1473f008f0fe - 1.6.0 via 6b7e9896eb6a4e2fc628403c9159b7652db2e311 > When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed > - > > Key: FLINK-8827 > URL: https://issues.apache.org/jira/browse/FLINK-8827 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: Red Hat Enterprise Linux Server release 6.5 (Santiago) >Reporter: Donghui Xu >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > When the path of FLINK_CONF_DIR including spaces, executing zookeeper related > scripts failed with the following error message: Expect binary expression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
[ https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8824. --- > In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()' > -- > > Key: FLINK-8824 > URL: https://issues.apache.org/jira/browse/FLINK-8824 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Stephan Ewen >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > The connector uses {{getCanonicalName()}} in all places, gather than > {{getClassName()}}. > {{getCanonicalName()}}'s intention is to normalize class names for arrays, > etc, but is problematic when instantiating classes from class names. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8827) When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed
[ https://issues.apache.org/jira/browse/FLINK-8827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8827. --- > When FLINK_CONF_DIR contains spaces, execute zookeeper related scripts failed > - > > Key: FLINK-8827 > URL: https://issues.apache.org/jira/browse/FLINK-8827 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: Red Hat Enterprise Linux Server release 6.5 (Santiago) >Reporter: Donghui Xu >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > When the path of FLINK_CONF_DIR including spaces, executing zookeeper related > scripts failed with the following error message: Expect binary expression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
[ https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390255#comment-16390255 ] Stephan Ewen commented on FLINK-8867: - I would suggest that the fact that these follow-up exceptions disguise the original exception is an issue to be fixed in itself. > Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config > > > Key: FLINK-8867 > URL: https://issues.apache.org/jira/browse/FLINK-8867 > Project: Flink > Issue Type: Bug > Components: Configuration, State Backends, Checkpointing, YARN >Affects Versions: 1.4.1, 1.4.2 >Reporter: Shashank Agarwal >Priority: Major > Fix For: 1.5.0, 1.4.3 > > > In our setup, when we put an entry in our Flink_conf file for default schema. > {code} > fs.default-scheme: hdfs://mydomain.com:8020/flink > {code} > Than application with rocksdb state backend fails with the following > exception. When we remove this config it works fine. It's working fine with > other state backends. > {code} > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 > for operator order ip stream (1/1).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator order ip stream (1/1). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 more > Caused by: java.lang.IllegalStateException > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > [CIRCULAR REFERENCE:java.lang.IllegalStateException] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390365#comment-16390365 ] Thomas Weise commented on FLINK-5697: - For idleness detection see: [https://github.com/apache/flink/pull/5634] > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7129) Support dynamically changing CEP patterns
[ https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390326#comment-16390326 ] Che Lui Shum commented on FLINK-7129: - Hi [~fhueske] and [~dawidwys], may I ask if there is any update on this feature? > Support dynamically changing CEP patterns > - > > Key: FLINK-7129 > URL: https://issues.apache.org/jira/browse/FLINK-7129 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > > An umbrella task for introducing mechanism for injecting patterns through > coStream -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390467#comment-16390467 ] Rong Rong commented on FLINK-8690: -- I created one of the initial (not at all perfect) support trying to create special treatment by adding a distinct rule on DataStreamNormRuleSet, so logical plan for DataStream directly coverts distinct logicalAggregate to FlinkLogicalAggregates, while DataSet still goes through AggregateExpandDistinctAggregatesRule. Please see them here: https://github.com/walterddr/flink/commit/ef9777cd8859180f38900393967deaabf39b7453 [~hequn8128] mentioned that we can override the match() function to achieve this. However, in order to separately generate logical plan for DataSet and DataStream, current solution I can think off is to add new rule with the NormRuleSet. Do you think this is the right path? Any comments or suggestions are highly appreciate :-) > Update logical rule set to generate FlinkLogicalAggregate explicitly allow > distinct agg on DataStream > - > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 This is a good proposal, it should also survive a general connector refactor that will be necessary to address other code duplication. The Kinesis ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a reference back to this thread. I would be happy to add the watermark support based on the revised generator. Perhaps it would be good to recognize the special "idle" watermark in SourceContext also? ---
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390362#comment-16390362 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 This is a good proposal, it should also survive a general connector refactor that will be necessary to address other code duplication. The Kinesis ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a reference back to this thread. I would be happy to add the watermark support based on the revised generator. Perhaps it would be good to recognize the special "idle" watermark in SourceContext also? > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390632#comment-16390632 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173049537 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); --- End diff -- About the capacity range, I didn't find a specific value recommend by RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ) ``` Q: What's the fastest way to load data into RocksDB? ... 2. batch hundreds of keys into one write batch ... ``` I found that they use the word `hundreds`. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173048697 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { --- End diff -- Hmm... currently, it is only used in single thread. For the best performance, I wouldn't like to add synchronization for it, I'd like to add annotation for this class that it's not thread safe. We could introduce a new class that is thread safe if we really need it. What do you think? ---
[jira] [Commented] (FLINK-4811) Checkpoint Overview should list failed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390689#comment-16390689 ] Sihua Zhou commented on FLINK-4811: --- This jira's state is still Unresolved, I think this has been addressed right? > Checkpoint Overview should list failed checkpoints > -- > > Key: FLINK-4811 > URL: https://issues.apache.org/jira/browse/FLINK-4811 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Reporter: Stephan Ewen >Priority: Major > > To let users understand what is happening with failed/skipped checkpoints, > the web UI should display failed checkpoints as well, with their error cause. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173048763 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { + + this.batch.put(handle, key, value); + + if (++currentSize == capacity) { + flush(); + } + } + + public void flush() throws RocksDBException { + this.db.write(options, batch); + batch.clear(); + currentSize = 0; + } + + @Override + public void close() throws RocksDBException { + if (batch != null) { --- End diff -- You are right, this `if` can be removed. ---
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390623#comment-16390623 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173048763 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { + + this.batch.put(handle, key, value); + + if (++currentSize == capacity) { + flush(); + } + } + + public void flush() throws RocksDBException { + this.db.write(options, batch); + batch.clear(); + currentSize = 0; + } + + @Override + public void close() throws RocksDBException { + if (batch != null) { --- End diff -- You are right, this `if` can be removed. > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390622#comment-16390622 ] ASF GitHub Bot commented on FLINK-8845: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173048697 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); + + this.db = rocksDB; + this.options = options; + this.capacity = capacity; + this.batch = new WriteBatch(this.capacity); + this.currentSize = 0; + } + + public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException { --- End diff -- Hmm... currently, it is only used in single thread. For the best performance, I wouldn't like to add synchronization for it, I'd like to add annotation for this class that it's not thread safe. We could introduce a new class that is thread safe if we really need it. What do you think? > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading > data into RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8790) Improve performance for recovery from incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8790: -- Fix Version/s: (was: 1.5.0) 1.6.0 > Improve performance for recovery from incremental checkpoint > > > Key: FLINK-8790 > URL: https://issues.apache.org/jira/browse/FLINK-8790 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > When there are multi state handle to be restored, we can improve the > performance as follow: > 1. Choose the best state handle to init the target db > 2. Use the other state handles to create temp db, and clip the db according > to the target key group range (via rocksdb.deleteRange()), this can help use > get rid of the `key group check` in > `data insertion loop` and also help us get rid of traversing the useless > record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
[ https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390847#comment-16390847 ] Shashank Agarwal commented on FLINK-8867: - [~StephanEwen] [~srichter] You can check full logs at issue https://issues.apache.org/jira/browse/FLINK-7756 Actually, when we were debugging that issue we found the root cause is this. > Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config > > > Key: FLINK-8867 > URL: https://issues.apache.org/jira/browse/FLINK-8867 > Project: Flink > Issue Type: Bug > Components: Configuration, State Backends, Checkpointing, YARN >Affects Versions: 1.4.1, 1.4.2 >Reporter: Shashank Agarwal >Priority: Major > Fix For: 1.5.0, 1.4.3 > > > In our setup, when we put an entry in our Flink_conf file for default schema. > {code} > fs.default-scheme: hdfs://mydomain.com:8020/flink > {code} > Than application with rocksdb state backend fails with the following > exception. When we remove this config it works fine. It's working fine with > other state backends. > {code} > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 > for operator order ip stream (1/1).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator order ip stream (1/1). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 more > Caused by: java.lang.IllegalStateException > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > [CIRCULAR REFERENCE:java.lang.IllegalStateException] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390813#comment-16390813 ] ASF GitHub Bot commented on FLINK-8756: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5573 @zentol it seems the Travis CI has some problem, always build failed. Please review my latest change. > Support ClusterClient.getAccumulators() in RestClusterClient > > > Key: FLINK-8756 > URL: https://issues.apache.org/jira/browse/FLINK-8756 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: vinoyang >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5573 @zentol it seems the Travis CI has some problem, always build failed. Please review my latest change. ---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5650#discussion_r173049537 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import javax.annotation.Nonnull; + +/** + * A wrapper class to wrap WriteBatch. + */ +public class RocksDBWriteBatchWrapper implements AutoCloseable { + + private final static int MIN_CAPACITY = 100; + private final static int MAX_CAPACITY = 1; + + private final RocksDB db; + + private final WriteBatch batch; + + private final WriteOptions options; + + private final int capacity; + + private int currentSize; + + public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, + @Nonnull WriteOptions options, + int capacity) { + + Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY, + "capacity should at least greater than 100"); --- End diff -- About the capacity range, I didn't find a specific value recommend by RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ) ``` Q: What's the fastest way to load data into RocksDB? ... 2. batch hundreds of keys into one write batch ... ``` I found that they use the word `hundreds`. ---
[jira] [Created] (FLINK-8893) NPE when netty try to allocate directBuffer
aitozi created FLINK-8893: - Summary: NPE when netty try to allocate directBuffer Key: FLINK-8893 URL: https://issues.apache.org/jira/browse/FLINK-8893 Project: Flink Issue Type: Bug Components: Network Affects Versions: 1.3.2 Reporter: aitozi Job failed with this exception {code:java} Caused by: java.lang.NullPointerException at io.netty.buffer.PoolChunk.initBufWithSubpage(PoolChunk.java:381) at io.netty.buffer.PoolChunk.initBufWithSubpage(PoolChunk.java:369) at io.netty.buffer.PoolArena.allocate(PoolArena.java:194) at io.netty.buffer.PoolArena.allocate(PoolArena.java:132) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146) at org.apache.flink.runtime.io.network.netty.NettyBufferPool.directBuffer(NettyBufferPool.java:278) at org.apache.flink.runtime.io.network.netty.NettyMessage.allocateBuffer(NettyMessage.java:72) at org.apache.flink.runtime.io.network.netty.NettyMessage.access$000(NettyMessage.java:50) at org.apache.flink.runtime.io.network.netty.NettyMessage$BufferResponse.write(NettyMessage.java:227) ... 26 more {code} After research, this may be caused by allocate from muti-thread. https://github.com/netty/netty/issues/4198 https://github.com/netty/netty/pull/4388 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api
[ https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389832#comment-16389832 ] Stephan Ewen commented on FLINK-8828: - [~fhueske] and [~twalthr] Please have a look at [~jelmer]'s comment here, I think he has a very good point. Concerning the method name, could we use a method {{collectStream}} or {{collectElements}} or something like that? Alternatively, could we overload the {{collect}} method and rename the current {{collect()}} method eventually? > Add collect method to DataStream / DataSet scala api > > > Key: FLINK-8828 > URL: https://issues.apache.org/jira/browse/FLINK-8828 > Project: Flink > Issue Type: Improvement > Components: Core, DataSet API, DataStream API, Scala API >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > > A collect function is a method that takes a Partial Function as its parameter > and applies it to all the elements in the collection to create a new > collection which satisfies the Partial Function. > It can be found on all [core scala collection > classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html] > as well as on spark's [rdd > interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD] > To understand its utility imagine the following scenario : > Given a DataStream that produces events of type _Purchase_ and _View_ > Transform this stream into a stream of purchase amounts over 1000 euros. > Currently an implementation might look like > {noformat} > val x = dataStream > .filter(_.isInstanceOf[Purchase]) > .map(_.asInstanceOf[Purchase]) > .filter(_.amount > 1000) > .map(_.amount){noformat} > Or alternatively you could do this > {noformat} > dataStream.flatMap(_ match { > case p: Purchase if p.amount > 1000 => Some(p.amount) > case _ => None > }){noformat} > But with collect implemented it could look like > {noformat} > dataStream.collect { > case p: Purchase if p.amount > 1000 => p.amount > }{noformat} > > Which is a lot nicer to both read and write -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5645: FLINK-8876 Improve concurrent access handling in s...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172922843 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,9 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + // we do not have concurrency checks here, because serialize() and + // deserialize() do the checks and the current mechanism does not handle --- End diff -- looks like the end of the comment is missing ---
[jira] [Commented] (FLINK-8876) Improve concurrent access handling in stateful serializers
[ https://issues.apache.org/jira/browse/FLINK-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389855#comment-16389855 ] ASF GitHub Bot commented on FLINK-8876: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5645#discussion_r172922843 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -163,8 +224,9 @@ public T copy(T from, T reuse) { @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - T value = deserialize(source); - serialize(value, target); + // we do not have concurrency checks here, because serialize() and + // deserialize() do the checks and the current mechanism does not handle --- End diff -- looks like the end of the comment is missing > Improve concurrent access handling in stateful serializers > -- > > Key: FLINK-8876 > URL: https://issues.apache.org/jira/browse/FLINK-8876 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Some stateful serializers produce incorrect results when accidentally > accessed by multiple threads concurrently. > To better catch these cases, I suggest to add concurrency checks that are > active only when debug logging is enabled, and during test runs. > This is inspired by Kryo's checks for concurrent access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5658: [FLINK-8856] [TaskManager] Move all cancellation i...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5658 [FLINK-8856] [TaskManager] Move all cancellation interrupt calls to TaskCanceller thread ## What is the purpose of the change This cleans up the code and guards against a JVM bug where `interrupt()` calls block/deadlock if the thread is engaged in certain I/O operations. In addition, this makes sure that the process really goes away when the cancellation timeout expires, rather than relying on the TaskManager to be able to properly handle the fatal error notification. Some minor robustness enhancements related to this change are included in this PR. ## Verifying this change The change is motivated by an occasional JVM bug that I could not purposefully trigger in tests to guard against rollback to the prior state. All tests were passing prior to this change and are passing after this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink fix_task_interrupt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5658.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5658 commit 36726fc5c277c649dc360975a34bf7be0afd7a0e Author: Stephan EwenDate: 2018-03-06T14:54:13Z [hotfix] [taskmanager] Fix checkstyle in Task and TaskTest commit 385b2032bcaa397d21d28e90efa89a44e12ebe99 Author: Stephan Ewen Date: 2018-03-06T14:18:33Z [FLINK-8856] [TaskManager] Move all cancellation interrupt calls to TaskCanceller thread This cleans up the code and guards against a JVM bug where 'interrupt()' calls block/deadlock if the thread is engaged in certain I/O operations. In addition, this makes sure that the process really goes away when the cancellation timeout expires, rather than relying on the TaskManager to be able to properly handle the fatal error notification. commit 3b18d7d9eccc936dc53b05b787f3fd4c19171d4f Author: Stephan Ewen Date: 2018-03-06T15:36:13Z [FLINK-8883] [core] Make ThreadDeath a fatal error in ExceptionUtils commit f3884088b210a061dba4d83323884bece1d31864 Author: Stephan Ewen Date: 2018-03-06T16:14:54Z [FLINK-8885] [TaskManager] DispatcherThreadFactory registers a fatal error exception handler In case dispatcher threads let an exception bubble out (does not hanle it), the exception handler terminates the process, to esure we don't leave broken processes. commit 5236bb73a2482ccdf016d1b9bea5cd0f17f2f620 Author: Stephan Ewen Date: 2018-03-06T16:18:38Z [hotfix] [runtime] Harden FatalExitExceptionHandler In case the logging framework throws an exception when handling the exception, we still kill the process, as intended. ---
[jira] [Commented] (FLINK-8856) Move all interrupt() calls to TaskCanceler
[ https://issues.apache.org/jira/browse/FLINK-8856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389860#comment-16389860 ] ASF GitHub Bot commented on FLINK-8856: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5658 [FLINK-8856] [TaskManager] Move all cancellation interrupt calls to TaskCanceller thread ## What is the purpose of the change This cleans up the code and guards against a JVM bug where `interrupt()` calls block/deadlock if the thread is engaged in certain I/O operations. In addition, this makes sure that the process really goes away when the cancellation timeout expires, rather than relying on the TaskManager to be able to properly handle the fatal error notification. Some minor robustness enhancements related to this change are included in this PR. ## Verifying this change The change is motivated by an occasional JVM bug that I could not purposefully trigger in tests to guard against rollback to the prior state. All tests were passing prior to this change and are passing after this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink fix_task_interrupt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5658.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5658 commit 36726fc5c277c649dc360975a34bf7be0afd7a0e Author: Stephan EwenDate: 2018-03-06T14:54:13Z [hotfix] [taskmanager] Fix checkstyle in Task and TaskTest commit 385b2032bcaa397d21d28e90efa89a44e12ebe99 Author: Stephan Ewen Date: 2018-03-06T14:18:33Z [FLINK-8856] [TaskManager] Move all cancellation interrupt calls to TaskCanceller thread This cleans up the code and guards against a JVM bug where 'interrupt()' calls block/deadlock if the thread is engaged in certain I/O operations. In addition, this makes sure that the process really goes away when the cancellation timeout expires, rather than relying on the TaskManager to be able to properly handle the fatal error notification. commit 3b18d7d9eccc936dc53b05b787f3fd4c19171d4f Author: Stephan Ewen Date: 2018-03-06T15:36:13Z [FLINK-8883] [core] Make ThreadDeath a fatal error in ExceptionUtils commit f3884088b210a061dba4d83323884bece1d31864 Author: Stephan Ewen Date: 2018-03-06T16:14:54Z [FLINK-8885] [TaskManager] DispatcherThreadFactory registers a fatal error exception handler In case dispatcher threads let an exception bubble out (does not hanle it), the exception handler terminates the process, to esure we don't leave broken processes. commit 5236bb73a2482ccdf016d1b9bea5cd0f17f2f620 Author: Stephan Ewen Date: 2018-03-06T16:18:38Z [hotfix] [runtime] Harden FatalExitExceptionHandler In case the logging framework throws an exception when handling the exception, we still kill the process, as intended. > Move all interrupt() calls to TaskCanceler > -- > > Key: FLINK-8856 > URL: https://issues.apache.org/jira/browse/FLINK-8856 > Project: Flink > Issue Type: Bug > Components: TaskManager >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > We need this to work around the following JVM bug: > https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8138622 > To circumvent this problem, the {{TaskCancelerWatchDog}} must not call > {{interrupt()}} at all, but only join on the executing thread (with timeout) > and cause a hard exit once cancellation takes to long. > A user affected by this problem reported this in FLINK-8834 > Personal note: The Thread.join(...) method
[jira] [Updated] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-8845: -- Summary: Use WriteBatch to improve performance for recovery in RocksDB backend (was: Introduce `parallel recovery` mode for full checkpoint (savepoint)) > Use WriteBatch to improve performance for recovery in RocksDB backend > - > > Key: FLINK-8845 > URL: https://issues.apache.org/jira/browse/FLINK-8845 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0 > > > Base on {{ingestExternalFile()}} and {{SstFileWriter}} provided by RocksDB, > we can restore from fully checkpoint (savepoint) in parallel. This can also > be extended to incremental checkpoint easily, but for the sake of simple, we > do this in two separate tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5652 yup. But one profile is already scratching the 50m limit as is :/ ---
[GitHub] flink pull request #5648: [FLINK-8887][flip-6] ClusterClient.getJobStatus ca...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5648#discussion_r172775779 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -404,16 +404,25 @@ public void start() throws Exception { final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner != null) { - return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); - } else { - final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId); - - if (jobDetails != null) { - return CompletableFuture.completedFuture(jobDetails.getStatus()); - } else { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); --- End diff -- This is an asynchronous call that isn't throwing the exception. You have to add a handler to the returned `CompletableFuture`. It also only properly resolves one of the exceptions, and IMO shouldn't catch `Exception` but the specific exceptions we want the workaround to work for as to not hide other issues. In any case, I'm not sure if adding workarounds to the Dispatcher is the right way to go. These issues revealed that some scenarios are not properly handled, and I would prefer waiting for @tillrohrmann to really fix this in the Dispatcher and related components. We can temporarily handle both exceptions in the `MiniClusterClient` by adding a *single* retry (with a short sleep) if a **specific** exception occurs. ---
[jira] [Created] (FLINK-8889) Do not override config values in TestBaseUtils#startCluster
Chesnay Schepler created FLINK-8889: --- Summary: Do not override config values in TestBaseUtils#startCluster Key: FLINK-8889 URL: https://issues.apache.org/jira/browse/FLINK-8889 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.5.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.5.0 {{TestBaseUtils#startCluster}} is a utility method to easily start a cluster. This method receives a Configuration to be used for the cluster, it does however override several values regardless of whether they were set or not. It also sets various options to their default value which shouldn't be necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8890) Compare checkpoints with order in CompletedCheckpoint.checkpointsMatch()
[ https://issues.apache.org/jira/browse/FLINK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8890: Summary: Compare checkpoints with order in CompletedCheckpoint.checkpointsMatch() (was: Compare checkpoints in order in CompletedCheckpoint.checkpointsMatch()) > Compare checkpoints with order in CompletedCheckpoint.checkpointsMatch() > > > Key: FLINK-8890 > URL: https://issues.apache.org/jira/browse/FLINK-8890 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.3.3, 1.5.0, 1.4.2 > > > This method is used, among other things, to check if a list of restored > checkpoints is stable after several restore attempts in the ZooKeeper > checkpoint store. The order of checkpoints is somewhat important because we > want the latest checkpoint to stay the latest checkpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8843) Decouple bind REST address from advertised address
[ https://issues.apache.org/jira/browse/FLINK-8843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389228#comment-16389228 ] ASF GitHub Bot commented on FLINK-8843: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5632 hi @zentol , it seems @tillrohrmann has no free time recently. Would you please review this? > Decouple bind REST address from advertised address > -- > > Key: FLINK-8843 > URL: https://issues.apache.org/jira/browse/FLINK-8843 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: flip-6 > Fix For: 1.5.0, 1.6.0 > > > The {{RestServerEndpoint}} is currently bound to the same address which is > also advertised to the client, namely {{RestOptions#REST_ADDRESS}}. It would > be better to start the {{RestServerEndpoint}} listening on all address by > binding to {{0.0.0.0}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5652: [hotfix][tests] Do not use singleActorSystem in LocalFlin...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5652 All legacy tests going through the `MiniClusterResource` will take longer. I don't know by how much, but we now have to start multiple actor systems and the JM<->TM communication is no longer local. ---
[GitHub] flink issue #5632: [FLINK-8843][REST] Decouple bind REST address from advert...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5632 hi @zentol , it seems @tillrohrmann has no free time recently. Would you please review this? ---