[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r46264109 --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import java.io.IOException; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.InstantiationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; + +/** + * {@link StateBackend} for storing checkpoints in JDBC supporting databases. + * Key-Value state is stored out-of-core and is lazily fetched using the + * {@link LazyDbKvState} implementation. A different backend can also be + * provided in the constructor to store the non-partitioned states. A common use + * case would be to store the key-value states in the database and store larger + * non-partitioned states on a distributed file system. + * + * This backend implementation also allows the sharding of the checkpointed + * states among multiple database instances, which can be enabled by passing + * multiple database urls to the {@link DbBackendConfig} instance. + * + * By default there are multiple tables created in the given databases: 1 table + * for non-partitioned checkpoints and 1 table for each key-value state in the + * streaming program. + * + * To control table creation, insert/lookup operations and to provide + * compatibility for different SQL implementations, a custom + * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}. + * + */ +public class DbStateBackend extends StateBackend { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(DbStateBackend.class); + + private Random rnd; + + // -- + + private Environment env; --- End diff -- `env` is not serializable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r46264128 --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import java.io.IOException; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.InstantiationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; + +/** + * {@link StateBackend} for storing checkpoints in JDBC supporting databases. + * Key-Value state is stored out-of-core and is lazily fetched using the + * {@link LazyDbKvState} implementation. A different backend can also be + * provided in the constructor to store the non-partitioned states. A common use + * case would be to store the key-value states in the database and store larger + * non-partitioned states on a distributed file system. + * + * This backend implementation also allows the sharding of the checkpointed + * states among multiple database instances, which can be enabled by passing + * multiple database urls to the {@link DbBackendConfig} instance. + * + * By default there are multiple tables created in the given databases: 1 table + * for non-partitioned checkpoints and 1 table for each key-value state in the + * streaming program. + * + * To control table creation, insert/lookup operations and to provide + * compatibility for different SQL implementations, a custom + * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}. + * + */ +public class DbStateBackend extends StateBackend { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(DbStateBackend.class); + + private Random rnd; + + // -- + + private Environment env; + + // -- + + private final DbBackendConfig dbConfig; + private final DbAdapter dbAdapter; + + private ShardedConnection connections; + + private final int numSqlRetries; + private final int sqlRetrySleep; + + private PreparedStatement insertStatement; --- End diff -- `PreparedStatement` is not serializable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r46264229 --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import java.io.IOException; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.InstantiationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; + +/** + * {@link StateBackend} for storing checkpoints in JDBC supporting databases. + * Key-Value state is stored out-of-core and is lazily fetched using the + * {@link LazyDbKvState} implementation. A different backend can also be + * provided in the constructor to store the non-partitioned states. A common use + * case would be to store the key-value states in the database and store larger + * non-partitioned states on a distributed file system. + * + * This backend implementation also allows the sharding of the checkpointed + * states among multiple database instances, which can be enabled by passing + * multiple database urls to the {@link DbBackendConfig} instance. + * + * By default there are multiple tables created in the given databases: 1 table + * for non-partitioned checkpoints and 1 table for each key-value state in the + * streaming program. + * + * To control table creation, insert/lookup operations and to provide + * compatibility for different SQL implementations, a custom + * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}. + * + */ +public class DbStateBackend extends StateBackend { --- End diff -- `StateBackend` implements the `Serializable` interface. Does this mean that `DbStateBackend` must also be `Serializable`? If this is the case, then this condition is violated because `env` and `insertStatement` are not serializable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1305 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-159205217 I think you can go ahead. It's in contrib and you guys are battle-testing it anyways... :wink: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-159048039 If no objections I would like to merge this :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-158799081 @StephanEwen, @rmetzger: I addressed the comments regarding the logs and the state id. I also added a final improvement: -Now compaction is executed in a background thread using a SingleThreadedExecutor -At empty checkpoints a keepalive call is executed against the connections to avoid connection drops These changes are in the last 2 commits, so if you guys +1 these last modifications I will merge it. I guess the compaction part is the most interesting here. thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45544375 --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.Callable; + +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.InstantiationUtil; +import org.eclipse.jetty.util.log.Log; + +/** + * State handle implementation for storing checkpoints as byte arrays in + * databases using the {@link MySqlAdapter} defined in the {@link DbBackendConfig}. + * + */ +public class DbStateHandle implements Serializable, StateHandle { + + private static final long serialVersionUID = 1L; + + private final String jobId; + private final DbBackendConfig dbConfig; + + private final long checkpointId; + private final long checkpointTs; + + private final long handleId; + + public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) { + this.checkpointId = checkpointId; + this.handleId = handleId; + this.jobId = jobId; + this.dbConfig = dbConfig; + this.checkpointTs = checkpointTs; + } + + protected byte[] getBytes() throws IOException { + return retry(new Callable() { + public byte[] call() throws Exception { + try (ShardedConnection con = dbConfig.createShardedConnection()) { + return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId); + } + } + }, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries()); + } + + @Override + public void discardState() { + try { + retry(new Callable() { + public Boolean call() throws Exception { + try (ShardedConnection con = dbConfig.createShardedConnection()) { + dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId); + } + return true; + } + }, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries()); + } catch (IOException e) { + // We don't want to fail the job here, but log the error. + if (Log.isDebugEnabled()) { --- End diff -- We could add a checkstyle rule for that, but I would like to solve the problem in a different way: I recently opened a JIRA for checking whether a Flink module is only using dependencies it has explicitly declared (forbidding to rely on transitive dependencies). WIth that check, we would also identify cases like this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45474858 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -64,33 +64,35 @@ /** * Closes the state backend, releasing all internal resources, but does not delete any persistent * checkpoint data. -* +* * @throws Exception Exceptions can be forwarded and will be logged by the system */ public abstract void close() throws Exception; - + // // key/value state // /** * Creates a key/value state backed by this state backend. -* +* +* @param operatorId Unique id for the operator creating the state +* @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. * @param defaultValue The value that is returned when no other value has been associated with a key, yet. * @param The type of the key. * @param The type of the value. -* +* * @return A new key/value state backed by this backend. -* +* * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstractKvState createKvState( + public abstract KvState createKvState(int operatorId, String stateName, --- End diff -- Thanks for the description of the sharding. The issue is that you need a deterministic table name that each KeyValueState can create independently. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-158423218 Had an offline chat with @gyfora with the following outcome: - A deterministic state identifier is needed - Small change to pass that identifier as a single ID String, initially internally constructed by state name + operator ID (as in this implementation) - That way, the streaming runtime can change handling of state names and operator IDs without breaking state backend implementations With these changes, looks good to merge. +1 from my side --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45479951 --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.Callable; + +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.InstantiationUtil; +import org.eclipse.jetty.util.log.Log; + +/** + * State handle implementation for storing checkpoints as byte arrays in + * databases using the {@link MySqlAdapter} defined in the {@link DbBackendConfig}. + * + */ +public class DbStateHandle implements Serializable, StateHandle { + + private static final long serialVersionUID = 1L; + + private final String jobId; + private final DbBackendConfig dbConfig; + + private final long checkpointId; + private final long checkpointTs; + + private final long handleId; + + public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) { + this.checkpointId = checkpointId; + this.handleId = handleId; + this.jobId = jobId; + this.dbConfig = dbConfig; + this.checkpointTs = checkpointTs; + } + + protected byte[] getBytes() throws IOException { + return retry(new Callable() { + public byte[] call() throws Exception { + try (ShardedConnection con = dbConfig.createShardedConnection()) { + return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId); + } + } + }, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries()); + } + + @Override + public void discardState() { + try { + retry(new Callable() { + public Boolean call() throws Exception { + try (ShardedConnection con = dbConfig.createShardedConnection()) { + dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId); + } + return true; + } + }, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries()); + } catch (IOException e) { + // We don't want to fail the job here, but log the error. + if (Log.isDebugEnabled()) { --- End diff -- Good catch, thanks Robert :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45479621 --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.Callable; + +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.InstantiationUtil; +import org.eclipse.jetty.util.log.Log; + +/** + * State handle implementation for storing checkpoints as byte arrays in + * databases using the {@link MySqlAdapter} defined in the {@link DbBackendConfig}. + * + */ +public class DbStateHandle implements Serializable, StateHandle { + + private static final long serialVersionUID = 1L; + + private final String jobId; + private final DbBackendConfig dbConfig; + + private final long checkpointId; + private final long checkpointTs; + + private final long handleId; + + public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) { + this.checkpointId = checkpointId; + this.handleId = handleId; + this.jobId = jobId; + this.dbConfig = dbConfig; + this.checkpointTs = checkpointTs; + } + + protected byte[] getBytes() throws IOException { + return retry(new Callable() { + public byte[] call() throws Exception { + try (ShardedConnection con = dbConfig.createShardedConnection()) { + return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId); + } + } + }, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries()); + } + + @Override + public void discardState() { + try { + retry(new Callable() { + public Boolean call() throws Exception { + try (ShardedConnection con = dbConfig.createShardedConnection()) { + dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId); + } + return true; + } + }, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries()); + } catch (IOException e) { + // We don't want to fail the job here, but log the error. + if (Log.isDebugEnabled()) { --- End diff -- I think you accidentally used Jetty's logging here (see `import org.eclipse.jetty.util.log.Log`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45377370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -64,33 +64,35 @@ /** * Closes the state backend, releasing all internal resources, but does not delete any persistent * checkpoint data. -* +* * @throws Exception Exceptions can be forwarded and will be logged by the system */ public abstract void close() throws Exception; - + // // key/value state // /** * Creates a key/value state backed by this state backend. -* +* +* @param operatorId Unique id for the operator creating the state +* @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. * @param defaultValue The value that is returned when no other value has been associated with a key, yet. * @param The type of the key. * @param The type of the value. -* +* * @return A new key/value state backed by this backend. -* +* * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstractKvState createKvState( + public abstract KvState createKvState(int operatorId, String stateName, --- End diff -- Let me first describe how sharding works than I will give a concrete example. Key-Value pairs are sharded by key not by the subtask. This means that each parallel subtask maintains a connection to all the shards and partitions the states before writing them to the appropriate shards according to the user defined partitioner (in the backend config). This is much better than sharding by subtask because we can later change the parallelism of the job without affecting the state and also lets us defined a more elaborate sharding strategy through the partitioner. This means, when a kv state is created we create a table for that kvstate in each shard. If we would do it according to your suggestion we would need to create numShards number of tables for each parallel instance (total of p*ns) for each kvstate. Furthermore this makes the fancy sharding useless because we cannot change the job parallelism. So we need to make sure that parallel subtasks of a given operator write to the same state tables (so we only have ns number of tables regardless of the parallelism). In order to do this we need something that uniqely identifies a given state in the streaming program (and parallel instances should have the same id). The information required to create such unique state id is an identifier for the operator that has the state + the name of the state. (The information obtained from the environment is not enough because chained operators have the same environment, therefore if they have conflicting state names the id is not unique). The only thing that identifies an operator in the logical streaming program is the operator id assigned by the jobgraphbuilder (thats the whole point of having it). An example job with p=2 and numshards = 3: chained map -> filter, both the mapper and filter has a state named "count", and let's assume that mapper has opid 1 and filter 2. In this case the mapper would create 3 db tables (1 on each shard) with the same name kvstate_count_1_jobId. The filter would also create 3 tables with names: kvstate_count_2_jobId All mapper instances would write to all three database shards, and the same goes for all the filters. I hope you get what I am trying to say. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45351298 --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java --- @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import java.io.Serializable; +import java.sql.SQLException; +import java.util.List; + +import org.apache.flink.contrib.streaming.state.ShardedConnection.Partitioner; + +import com.google.common.collect.Lists; + +/** + * + * Configuration object for {@link DbStateBackend}, containing information to + * shard and connect to the databases that will store the state checkpoints. + * + */ +public class DbBackendConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + // Database connection properties + private final String userName; + private final String userPassword; + private final List shardUrls; + + // JDBC Driver + DbAdapter information + private DbAdapter dbAdapter = new MySqlAdapter(); + private String JDBCDriver = null; + + private int maxNumberOfSqlRetries = 5; + private int sleepBetweenSqlRetries = 100; + + // KvState properties + private int kvStateCacheSize = 1; + private int maxKvInsertBatchSize = 1000; + private float maxKvEvictFraction = 0.1f; + private int kvStateCompactionFreq = -1; + + private Partitioner shardPartitioner; + + /** +* Creates a new sharded database state backend configuration with the given +* parameters and default {@link MySqlAdapter}. +* +* @param dbUserName +*The username used to connect to the database at the given url. +* @param dbUserPassword +*The password used to connect to the database at the given url +*and username. +* @param dbShardUrls +*The list of JDBC urls of the databases that will be used as +*shards for the state backend. Sharding of the state will +*happen based on the subtask index of the given task. +*/ + public DbBackendConfig(String dbUserName, String dbUserPassword, List dbShardUrls) { + this.userName = dbUserName; + this.userPassword = dbUserPassword; + this.shardUrls = dbShardUrls; + } + + /** +* Creates a new database state backend configuration with the given +* parameters and default {@link MySqlAdapter}. +* +* @param dbUserName +*The username used to connect to the database at the given url. +* @param dbUserPassword +*The password used to connect to the database at the given url +*and username. +* @param dbUrl +*The JDBC url of the database for example +*"jdbc:mysql://localhost:3306/flinkdb". +*/ + public DbBackendConfig(String dbUserName, String dbUserPassword, String dbUrl) { + this(dbUserName, dbUserPassword, Lists.newArrayList(dbUrl)); + } + + /** +* The username used to connect to the database at the given urls. +*/ + public String getUserName() { + return userName; + } + + /** +* The password used to connect to the database at the given url and +* username. +*/ + public String getUserPassword() { + return userPassword; + } + + /** +* Number of database shards defined. +*/ + public int getNumberOfShards() { + return shardUrls.size(); + } + + /** +* Database shard urls as provided in the constructor. +* +*/ + public List getShardUrls() { + return shardUrls; +
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45351896 --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java --- @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import java.io.Serializable; +import java.sql.SQLException; +import java.util.List; + +import org.apache.flink.contrib.streaming.state.ShardedConnection.Partitioner; + +import com.google.common.collect.Lists; + +/** + * + * Configuration object for {@link DbStateBackend}, containing information to + * shard and connect to the databases that will store the state checkpoints. + * + */ +public class DbBackendConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + // Database connection properties + private final String userName; + private final String userPassword; + private final List shardUrls; + + // JDBC Driver + DbAdapter information + private DbAdapter dbAdapter = new MySqlAdapter(); + private String JDBCDriver = null; + + private int maxNumberOfSqlRetries = 5; + private int sleepBetweenSqlRetries = 100; + + // KvState properties + private int kvStateCacheSize = 1; + private int maxKvInsertBatchSize = 1000; + private float maxKvEvictFraction = 0.1f; + private int kvStateCompactionFreq = -1; + + private Partitioner shardPartitioner; + + /** +* Creates a new sharded database state backend configuration with the given +* parameters and default {@link MySqlAdapter}. +* +* @param dbUserName +*The username used to connect to the database at the given url. +* @param dbUserPassword +*The password used to connect to the database at the given url +*and username. +* @param dbShardUrls +*The list of JDBC urls of the databases that will be used as +*shards for the state backend. Sharding of the state will +*happen based on the subtask index of the given task. +*/ + public DbBackendConfig(String dbUserName, String dbUserPassword, List dbShardUrls) { + this.userName = dbUserName; + this.userPassword = dbUserPassword; + this.shardUrls = dbShardUrls; + } + + /** +* Creates a new database state backend configuration with the given +* parameters and default {@link MySqlAdapter}. +* +* @param dbUserName +*The username used to connect to the database at the given url. +* @param dbUserPassword +*The password used to connect to the database at the given url +*and username. +* @param dbUrl +*The JDBC url of the database for example +*"jdbc:mysql://localhost:3306/flinkdb". +*/ + public DbBackendConfig(String dbUserName, String dbUserPassword, String dbUrl) { + this(dbUserName, dbUserPassword, Lists.newArrayList(dbUrl)); + } + + /** +* The username used to connect to the database at the given urls. +*/ + public String getUserName() { + return userName; + } + + /** +* The password used to connect to the database at the given url and +* username. +*/ + public String getUserPassword() { + return userPassword; + } + + /** +* Number of database shards defined. +*/ + public int getNumberOfShards() { + return shardUrls.size(); + } + + /** +* Database shard urls as provided in the constructor. +* +*/ + public List getShardUrls() { + return shardUrls; + }
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45362408 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -64,33 +64,35 @@ /** * Closes the state backend, releasing all internal resources, but does not delete any persistent * checkpoint data. -* +* * @throws Exception Exceptions can be forwarded and will be logged by the system */ public abstract void close() throws Exception; - + // // key/value state // /** * Creates a key/value state backed by this state backend. -* +* +* @param operatorId Unique id for the operator creating the state +* @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. * @param defaultValue The value that is returned when no other value has been associated with a key, yet. * @param The type of the key. * @param The type of the value. -* +* * @return A new key/value state backed by this backend. -* +* * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstractKvState createKvState( + public abstract KvState createKvState(int operatorId, String stateName, --- End diff -- I would like to get rid of this change and simply let the state backend create a UID for the state name. This method is called one per proper creation of a state (so it should not need deterministic state naming). Recovery happens from the state handle, which can store all required info. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-158107515 I have a final comment inline. Otherwise, I think this is good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-158074782 Looking though this again... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45371097 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -64,33 +64,35 @@ /** * Closes the state backend, releasing all internal resources, but does not delete any persistent * checkpoint data. -* +* * @throws Exception Exceptions can be forwarded and will be logged by the system */ public abstract void close() throws Exception; - + // // key/value state // /** * Creates a key/value state backed by this state backend. -* +* +* @param operatorId Unique id for the operator creating the state +* @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. * @param defaultValue The value that is returned when no other value has been associated with a key, yet. * @param The type of the key. * @param The type of the value. -* +* * @return A new key/value state backed by this backend. -* +* * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstractKvState createKvState( + public abstract KvState createKvState(int operatorId, String stateName, --- End diff -- I am not completely sure what you mean here. Multiple different states can have the same name in different tasks. As far as I know we dont assume unique state names. This gets worse if the chained tasks have states with the same name then they actually go to the same backend as well. I dont see how to go around this without an operator id. Could you please clarify your idea? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45371919 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -64,33 +64,35 @@ /** * Closes the state backend, releasing all internal resources, but does not delete any persistent * checkpoint data. -* +* * @throws Exception Exceptions can be forwarded and will be logged by the system */ public abstract void close() throws Exception; - + // // key/value state // /** * Creates a key/value state backed by this state backend. -* +* +* @param operatorId Unique id for the operator creating the state +* @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. * @param defaultValue The value that is returned when no other value has been associated with a key, yet. * @param The type of the key. * @param The type of the value. -* +* * @return A new key/value state backed by this backend. -* +* * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstractKvState createKvState( + public abstract KvState createKvState(int operatorId, String stateName, --- End diff -- I suggest to not let the operator supply an ID and name, but simply leave the naming of the state to the state backend. The SqlStateBackend could just use `UUID.randomUUID().toString()` instead of `operatorId+stateName`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45372303 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -64,33 +64,35 @@ /** * Closes the state backend, releasing all internal resources, but does not delete any persistent * checkpoint data. -* +* * @throws Exception Exceptions can be forwarded and will be logged by the system */ public abstract void close() throws Exception; - + // // key/value state // /** * Creates a key/value state backed by this state backend. -* +* +* @param operatorId Unique id for the operator creating the state +* @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. * @param defaultValue The value that is returned when no other value has been associated with a key, yet. * @param The type of the key. * @param The type of the value. -* +* * @return A new key/value state backed by this backend. -* +* * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstractKvState createKvState( + public abstract KvState createKvState(int operatorId, String stateName, --- End diff -- The point is that all parallel instances write to the same set of tables. This will way sharding is transparently handled and the job parallelism can actually change without affecting the state. (No need to repartition it) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45372569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -64,33 +64,35 @@ /** * Closes the state backend, releasing all internal resources, but does not delete any persistent * checkpoint data. -* +* * @throws Exception Exceptions can be forwarded and will be logged by the system */ public abstract void close() throws Exception; - + // // key/value state // /** * Creates a key/value state backed by this state backend. -* +* +* @param operatorId Unique id for the operator creating the state +* @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. * @param defaultValue The value that is returned when no other value has been associated with a key, yet. * @param The type of the key. * @param The type of the value. -* +* * @return A new key/value state backed by this backend. -* +* * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstractKvState createKvState( + public abstract KvState createKvState(int operatorId, String stateName, --- End diff -- Otherwise you will have to create p*numShards tables and you wont even know what state is in it from looking at the table names --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45372620 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -64,33 +64,35 @@ /** * Closes the state backend, releasing all internal resources, but does not delete any persistent * checkpoint data. -* +* * @throws Exception Exceptions can be forwarded and will be logged by the system */ public abstract void close() throws Exception; - + // // key/value state // /** * Creates a key/value state backed by this state backend. -* +* +* @param operatorId Unique id for the operator creating the state +* @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. * @param defaultValue The value that is returned when no other value has been associated with a key, yet. * @param The type of the key. * @param The type of the value. -* +* * @return A new key/value state backed by this backend. -* +* * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstractKvState createKvState( + public abstract KvState createKvState(int operatorId, String stateName, --- End diff -- What you mention depends on the parallel subtask ID (which is already given in the initialize() method). The operatorId and name are the same for all parallel instances anyways. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1305#discussion_r45373479 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java --- @@ -64,33 +64,35 @@ /** * Closes the state backend, releasing all internal resources, but does not delete any persistent * checkpoint data. -* +* * @throws Exception Exceptions can be forwarded and will be logged by the system */ public abstract void close() throws Exception; - + // // key/value state // /** * Creates a key/value state backed by this state backend. -* +* +* @param operatorId Unique id for the operator creating the state +* @param stateName Name of the created state * @param keySerializer The serializer for the key. * @param valueSerializer The serializer for the value. * @param defaultValue The value that is returned when no other value has been associated with a key, yet. * @param The type of the key. * @param The type of the value. -* +* * @return A new key/value state backed by this backend. -* +* * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ - public abstractKvState createKvState( + public abstract KvState createKvState(int operatorId, String stateName, --- End diff -- The "name" (as a string) of the state is a very API specific thing that no other part of the runtime is concerned with. The operator ID is something specific to the StreamGraphBuilder and not to the streaming tasks at all. I think we are tying things together here that should not be tied together. I still do not understand how this affects sharding. Does the shard assignment depend on the state name (rather than the parallel subtask / JobVertexId) ? I only see that the table names will have the task name instead of the name of the state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-156964684 I'm looking at it again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-156809400 I have updated the description, and ran some more cluster tests without any issues. It would be good if you all could do a second round of reviews please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-156034877 I would like to push this soon if no objections --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-156038439 I agree that this will be an important backend and good to have in. :) But do we need to push this right now? I think we should wait a little and make sure that it fits well into the other ongoing changes I think Stephan and Aljoscha are working on. At least let's wait for someone to review it again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-156039726 Well, I don't know what they are working on... It would be easier not having to rebase state backend api changes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-156064645 I totally understand your point, but I think it's OK that changes of this scope take longer to review and get in (my HA PR took over a month or so to get in). At the end of the day, it matters more that we get this right (because it covers a very important use case) than getting it in a few days earlier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-155765000 Should we do a final iteration over this and merge this to contrib? The description got slightly out of date when I changed this back so that it stores the state by timestamp (but its basically ctr+f replace id with timestamp) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153872782 I updated the sharding logic to do mod hashing by default on the keys for the number of shards, and the user can also add a custom Partitioner to implement custom sharding strategy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153696208 I also removed the sharding logic now as I think it was pretty weak and not very useful (it maintained 1 connection per subtask which would break if we change parallelism) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153695943 I updated this PR with the reworked logic, it has several advantages over the previous timestamp based solution (including the elimination of transactions from the logic). The only problem I see is that the derby batch inserts happen row by row currently as it does not have the insert or update semantics. This rework will also make it easier to write connectors to non transactional stores like Cassandra --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153289042 Cool stuff, really! This is very much in line with what I had in mind for a SQL backend. Let me check if I understood everything correct (and see where my understanding is wrong), because I think we should be able to make an "exactly once" version of this based that mechanism. I am basically rephrasing what you describe in a different model. ### Basic Mode What this is effectively doing is a batched and asynchronous version of distributed 2-phase commit transactions. The phases look basically like this: - **Adding data**: Pipe all modifications into the database, but not commit the transaction. They are tagged with the timestamp of the upcoming checkpoint (or any coordinated increasing version counter). This can happen in the background thread, for as long as the in-operator cache holds all edits that are not in the database yet. - **Pre-commit**: This is when the checkpoint is triggered. All pending edits are written into the database and then the transaction is committed. The state handle only includes the timestamp used on the elements. In the classical 2-phase transactions, after a task acks the pre-commit, it has to be able to recover to that state, which is given here. The checkpoint is not immediately valid for recovery though, which means that recovery has to have either a filter, or issue a query that deletes all records with timestamps larger than the version given during recovery. After the pre-commit, the timestamp is locally incremented and work can continue. - **Full commit**: This happens implicitly when the checkpoint coordinator marks the checkpoint as complete. - **Recovery**: The timestamp (or version counter) of the last successful checkpoint is restored, the deletion of records that were committed (but where the checkpoint did not succeed as a whole) happens, then records are lazily fetched. So far, this should give exactly once guarantees, or am I overlooking something? ### Compacting Whenever the "checkpoint complete" notification comes (or every so many changes) you trigger a clean-up query in the background. Given that the SQL database has a not completely terrible query planner, this SQL statement would be okay efficient (single semi join). ``` DELETE FROM "table name" t1 WHERE EXISTS (SELECT * FROM "table name" t2 WHERE t2.handle_id = t1.handle_id AND t2.timestamp > t1.timestamp//-- a newer version exists for the same handle AND t2.timestamp <= GLOBAL_VERSION //-- and the newer version is globally committed ) ``` The good thing is that by virtue of having the incrementing global versions, we can set the isolation level for the query to "read uncommitted", which means that it will not lock anything and thus not compete with any other ongoing modifications. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153351332 @gyfora That's what I meant, basically the timestamps could subsume the role of the checkpointIds. I.e. The checkpointIds have the semantics of the timestamps and the timestamps would not be required. (Or the checkpointId would be removed and timestamps remain, depends on how you look at it... :smile: ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153339094 @StephanEwen Thanks for the comments. You are right the main idea is exactly as you described. The reason why exactly-once is violated in some corner cases because it can happen that the pre-commit phase of the previous checkpoint is still failing during recovery. If we assume that the previous job is completely killed of, no writing to the database whatsoever after that happens, then we can properly clean up during recovery. This unfortunately does not seem to hold if you set the retry wait time to very low (like 0 ms in the snapshot). What this means is that the failed job is still writing the failed snapshot to the database after you recovered and cleaned up. As for the compaction, I came up with something very similar for compaction but here is the funny thing and my problem. The query you wrote will run properly on Derby but is invalid on MySql (you cannot create a subquery for the same table as you are modifying). In mysql you need to create an inner join, but that will not work in Derby :P In any case I have made a prototype of this on: https://github.com/gyfora/flink/tree/compaction The user can define the frequency of compaction (compact every so many checkpoints). And it also makes sure that compaction and cleanup is only executed on 1 subtask to avoid double work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153307495 @aljoscha 1. I was initially using the MockEnvironments but I added the DummyEnvironment for several reasons: I wanted control over the JobId and the number of subtasks for which I would have changed the MockEnvironment. Also I wanted to avoid having to clean up the memorymanager and other resources as I really don't need them 2. I don't really understand what you mean here, the recovery timestamp is only used for cleanup on restore 3. Imagine a scenario where 2 task are restoring . 1 restores quickly and starts writing new timestamps. If we call cleanup on the other task it will remove the new states if we don't bound by recovery timestamp. This can happen easily. I don't know about the allOrNothingState :/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153305943 Just some remarks: - `DummyEnvironment` seems unnecessary, we already have `StreamMockEnvironment`. I think it could be reused. - In the first version you had both the timestamp and checkpoint and recovery/key lookup took both into account. The recent version uses just the timestamp for lookup. Both introduce the new restore timestamp in the restore methods. - The cleanup of failed checkpoints took into account the checkpoint and the recovery timestamp, but I think the recovery timestamp was always redundant since the condition in the SQL statement would always hold. => I think the timestamp is not needed. Can't everything be implemented by just using the (monotonically rising) checkpoint IDs? Also, this is unrelated but maybe @StephanEwen or @gyfora know: Why to we have the `allOrNothingState` in `CheckpointCoordinator.restoreLatestCheckpointedState`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-15538 2. Ah, I meant the lookupTimestamp. In an earlier version you used both the checkpointId and lookupTimestamp to perform key lookups. 3. I see, in this implementation of state the timestamp has basically assumed the role of the checkpointId and the checkpointId is (I think) completely ignored. Correct? Couldn't we then change the semantics of the checkpointId to work like the timestamps (they are somewhat logical, not physical timestamps anyways)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153376152 My last commit introduces automatic compaction with user specified frequency. It also allows the KvStates to implement the CheckpointNotifier interface in which case they will also get notified. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-152948336 Thanks for the great write up! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-152944197 Good stuff! Will need a day more to look through this, but this is a cool way of doing stateful stream computation :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-152767697 My last commit (that was meant to solve the problems with failed tasks writing to the db) introduced some issues with the exactly once guarantees. I will look into it tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-151884357 Wow, a lot of stuff. I will look into it once the release is out. :smiley: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Out-of-core state backend for JDBC databases
GitHub user gyfora opened a pull request: https://github.com/apache/flink/pull/1305 Out-of-core state backend for JDBC databases Detailed description incoming... You can merge this pull request into a Git repository by running: $ git pull https://github.com/gyfora/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1305.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1305 commit b793bca20b79c1fe38ed7a31deca485e7d109060 Author: Gyula ForaDate: 2015-10-26T08:58:49Z [FLINK-2916] [streaming] Expose operator and task information to StateBackend commit c35949f5e765f377799730a973b374eeea9c3921 Author: Gyula Fora Date: 2015-10-27T17:31:04Z [FLINK-2924] [streaming] Out-of-core state backend for JDBC databases --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---