[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r46264109
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+/**
+ * {@link StateBackend} for storing checkpoints in JDBC supporting 
databases.
+ * Key-Value state is stored out-of-core and is lazily fetched using the
+ * {@link LazyDbKvState} implementation. A different backend can also be
+ * provided in the constructor to store the non-partitioned states. A 
common use
+ * case would be to store the key-value states in the database and store 
larger
+ * non-partitioned states on a distributed file system.
+ * 
+ * This backend implementation also allows the sharding of the checkpointed
+ * states among multiple database instances, which can be enabled by 
passing
+ * multiple database urls to the {@link DbBackendConfig} instance.
+ * 
+ * By default there are multiple tables created in the given databases: 1 
table
+ * for non-partitioned checkpoints and 1 table for each key-value state in 
the
+ * streaming program.
+ * 
+ * To control table creation, insert/lookup operations and to provide
+ * compatibility for different SQL implementations, a custom
+ * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}.
+ *
+ */
+public class DbStateBackend extends StateBackend {
+
+   private static final long serialVersionUID = 1L;
+   private static final Logger LOG = 
LoggerFactory.getLogger(DbStateBackend.class);
+
+   private Random rnd;
+
+   // --
+
+   private Environment env;
--- End diff --

`env` is not serializable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r46264128
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+/**
+ * {@link StateBackend} for storing checkpoints in JDBC supporting 
databases.
+ * Key-Value state is stored out-of-core and is lazily fetched using the
+ * {@link LazyDbKvState} implementation. A different backend can also be
+ * provided in the constructor to store the non-partitioned states. A 
common use
+ * case would be to store the key-value states in the database and store 
larger
+ * non-partitioned states on a distributed file system.
+ * 
+ * This backend implementation also allows the sharding of the checkpointed
+ * states among multiple database instances, which can be enabled by 
passing
+ * multiple database urls to the {@link DbBackendConfig} instance.
+ * 
+ * By default there are multiple tables created in the given databases: 1 
table
+ * for non-partitioned checkpoints and 1 table for each key-value state in 
the
+ * streaming program.
+ * 
+ * To control table creation, insert/lookup operations and to provide
+ * compatibility for different SQL implementations, a custom
+ * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}.
+ *
+ */
+public class DbStateBackend extends StateBackend {
+
+   private static final long serialVersionUID = 1L;
+   private static final Logger LOG = 
LoggerFactory.getLogger(DbStateBackend.class);
+
+   private Random rnd;
+
+   // --
+
+   private Environment env;
+
+   // --
+
+   private final DbBackendConfig dbConfig;
+   private final DbAdapter dbAdapter;
+
+   private ShardedConnection connections;
+
+   private final int numSqlRetries;
+   private final int sqlRetrySleep;
+
+   private PreparedStatement insertStatement;
--- End diff --

`PreparedStatement` is not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r46264229
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+/**
+ * {@link StateBackend} for storing checkpoints in JDBC supporting 
databases.
+ * Key-Value state is stored out-of-core and is lazily fetched using the
+ * {@link LazyDbKvState} implementation. A different backend can also be
+ * provided in the constructor to store the non-partitioned states. A 
common use
+ * case would be to store the key-value states in the database and store 
larger
+ * non-partitioned states on a distributed file system.
+ * 
+ * This backend implementation also allows the sharding of the checkpointed
+ * states among multiple database instances, which can be enabled by 
passing
+ * multiple database urls to the {@link DbBackendConfig} instance.
+ * 
+ * By default there are multiple tables created in the given databases: 1 
table
+ * for non-partitioned checkpoints and 1 table for each key-value state in 
the
+ * streaming program.
+ * 
+ * To control table creation, insert/lookup operations and to provide
+ * compatibility for different SQL implementations, a custom
+ * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}.
+ *
+ */
+public class DbStateBackend extends StateBackend {
--- End diff --

`StateBackend` implements the `Serializable` interface. Does this mean that 
`DbStateBackend` must also be `Serializable`? If this is the case, then this 
condition is violated because `env` and `insertStatement` are not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1305


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-24 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-159205217
  
I think you can go ahead. It's in contrib and you guys are battle-testing 
it anyways... :wink: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-23 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-159048039
  
If no objections I would like to merge this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-22 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-158799081
  
@StephanEwen, @rmetzger:
I addressed the comments regarding the logs and the state id.

I also added a final improvement:

-Now compaction is executed in a background thread using a 
SingleThreadedExecutor
-At empty checkpoints a keepalive call is executed against the connections 
to avoid connection drops

These changes are in the last 2 commits, so if you guys +1 these last 
modifications I will merge it. I guess the compaction part is the most 
interesting here.

thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-21 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45544375
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.eclipse.jetty.util.log.Log;
+
+/**
+ * State handle implementation for storing checkpoints as byte arrays in
+ * databases using the {@link MySqlAdapter} defined in the {@link 
DbBackendConfig}.
+ * 
+ */
+public class DbStateHandle implements Serializable, StateHandle {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String jobId;
+   private final DbBackendConfig dbConfig;
+
+   private final long checkpointId;
+   private final long checkpointTs;
+
+   private final long handleId;
+
+   public DbStateHandle(String jobId, long checkpointId, long 
checkpointTs, long handleId, DbBackendConfig dbConfig) {
+   this.checkpointId = checkpointId;
+   this.handleId = handleId;
+   this.jobId = jobId;
+   this.dbConfig = dbConfig;
+   this.checkpointTs = checkpointTs;
+   }
+
+   protected byte[] getBytes() throws IOException {
+   return retry(new Callable() {
+   public byte[] call() throws Exception {
+   try (ShardedConnection con = 
dbConfig.createShardedConnection()) {
+   return 
dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, 
checkpointTs, handleId);
+   }
+   }
+   }, dbConfig.getMaxNumberOfSqlRetries(), 
dbConfig.getSleepBetweenSqlRetries());
+   }
+
+   @Override
+   public void discardState() {
+   try {
+   retry(new Callable() {
+   public Boolean call() throws Exception {
+   try (ShardedConnection con = 
dbConfig.createShardedConnection()) {
+   
dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, 
checkpointTs, handleId);
+   }
+   return true;
+   }
+   }, dbConfig.getMaxNumberOfSqlRetries(), 
dbConfig.getSleepBetweenSqlRetries());
+   } catch (IOException e) {
+   // We don't want to fail the job here, but log the 
error.
+   if (Log.isDebugEnabled()) {
--- End diff --

We could add a checkstyle rule for that, but I would like to solve the 
problem in a different way: I recently opened a JIRA for checking whether a 
Flink module is only using dependencies it has explicitly declared (forbidding 
to rely on transitive dependencies). WIth that check, we would also identify 
cases like this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-20 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45474858
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
 * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
 * checkpoint data.
-* 
+*
 * @throws Exception Exceptions can be forwarded and will be logged by 
the system
 */
public abstract void close() throws Exception;
-   
+
// 

//  key/value state
// 

 
/**
 * Creates a key/value state backed by this state backend.
-* 
+*
+* @param operatorId Unique id for the operator creating the state
+* @param stateName Name of the created state
 * @param keySerializer The serializer for the key.
 * @param valueSerializer The serializer for the value.
 * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
 * @param  The type of the key.
 * @param  The type of the value.
-* 
+*
 * @return A new key/value state backed by this backend.
-* 
+*
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
-   public abstract  KvState createKvState(
+   public abstract  KvState createKvState(int 
operatorId, String stateName,
--- End diff --

Thanks for the description of the sharding. The issue is that you need a 
deterministic table name that each KeyValueState can create independently.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-20 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-158423218
  
Had an offline chat with @gyfora with the following outcome:
  - A deterministic state identifier is needed
  - Small change to pass that identifier as a single ID String, initially 
internally constructed by state name + operator ID (as in this implementation)
  - That way, the streaming runtime can change handling of state names and 
operator IDs without breaking state backend implementations

With these changes, looks good to merge.

+1 from my side


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-20 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45479951
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.eclipse.jetty.util.log.Log;
+
+/**
+ * State handle implementation for storing checkpoints as byte arrays in
+ * databases using the {@link MySqlAdapter} defined in the {@link 
DbBackendConfig}.
+ * 
+ */
+public class DbStateHandle implements Serializable, StateHandle {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String jobId;
+   private final DbBackendConfig dbConfig;
+
+   private final long checkpointId;
+   private final long checkpointTs;
+
+   private final long handleId;
+
+   public DbStateHandle(String jobId, long checkpointId, long 
checkpointTs, long handleId, DbBackendConfig dbConfig) {
+   this.checkpointId = checkpointId;
+   this.handleId = handleId;
+   this.jobId = jobId;
+   this.dbConfig = dbConfig;
+   this.checkpointTs = checkpointTs;
+   }
+
+   protected byte[] getBytes() throws IOException {
+   return retry(new Callable() {
+   public byte[] call() throws Exception {
+   try (ShardedConnection con = 
dbConfig.createShardedConnection()) {
+   return 
dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, 
checkpointTs, handleId);
+   }
+   }
+   }, dbConfig.getMaxNumberOfSqlRetries(), 
dbConfig.getSleepBetweenSqlRetries());
+   }
+
+   @Override
+   public void discardState() {
+   try {
+   retry(new Callable() {
+   public Boolean call() throws Exception {
+   try (ShardedConnection con = 
dbConfig.createShardedConnection()) {
+   
dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, 
checkpointTs, handleId);
+   }
+   return true;
+   }
+   }, dbConfig.getMaxNumberOfSqlRetries(), 
dbConfig.getSleepBetweenSqlRetries());
+   } catch (IOException e) {
+   // We don't want to fail the job here, but log the 
error.
+   if (Log.isDebugEnabled()) {
--- End diff --

Good catch, thanks Robert :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-20 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45479621
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.eclipse.jetty.util.log.Log;
+
+/**
+ * State handle implementation for storing checkpoints as byte arrays in
+ * databases using the {@link MySqlAdapter} defined in the {@link 
DbBackendConfig}.
+ * 
+ */
+public class DbStateHandle implements Serializable, StateHandle {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String jobId;
+   private final DbBackendConfig dbConfig;
+
+   private final long checkpointId;
+   private final long checkpointTs;
+
+   private final long handleId;
+
+   public DbStateHandle(String jobId, long checkpointId, long 
checkpointTs, long handleId, DbBackendConfig dbConfig) {
+   this.checkpointId = checkpointId;
+   this.handleId = handleId;
+   this.jobId = jobId;
+   this.dbConfig = dbConfig;
+   this.checkpointTs = checkpointTs;
+   }
+
+   protected byte[] getBytes() throws IOException {
+   return retry(new Callable() {
+   public byte[] call() throws Exception {
+   try (ShardedConnection con = 
dbConfig.createShardedConnection()) {
+   return 
dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, 
checkpointTs, handleId);
+   }
+   }
+   }, dbConfig.getMaxNumberOfSqlRetries(), 
dbConfig.getSleepBetweenSqlRetries());
+   }
+
+   @Override
+   public void discardState() {
+   try {
+   retry(new Callable() {
+   public Boolean call() throws Exception {
+   try (ShardedConnection con = 
dbConfig.createShardedConnection()) {
+   
dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, 
checkpointTs, handleId);
+   }
+   return true;
+   }
+   }, dbConfig.getMaxNumberOfSqlRetries(), 
dbConfig.getSleepBetweenSqlRetries());
+   } catch (IOException e) {
+   // We don't want to fail the job here, but log the 
error.
+   if (Log.isDebugEnabled()) {
--- End diff --

I think you accidentally used Jetty's logging here (see `import 
org.eclipse.jetty.util.log.Log`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45377370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
 * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
 * checkpoint data.
-* 
+*
 * @throws Exception Exceptions can be forwarded and will be logged by 
the system
 */
public abstract void close() throws Exception;
-   
+
// 

//  key/value state
// 

 
/**
 * Creates a key/value state backed by this state backend.
-* 
+*
+* @param operatorId Unique id for the operator creating the state
+* @param stateName Name of the created state
 * @param keySerializer The serializer for the key.
 * @param valueSerializer The serializer for the value.
 * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
 * @param  The type of the key.
 * @param  The type of the value.
-* 
+*
 * @return A new key/value state backed by this backend.
-* 
+*
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
-   public abstract  KvState createKvState(
+   public abstract  KvState createKvState(int 
operatorId, String stateName,
--- End diff --

Let me first describe how sharding works than I will give a concrete 
example. 
Key-Value pairs are sharded by key not by the subtask. This means that each 
parallel subtask maintains a connection to all the shards and partitions the 
states before writing them to the appropriate shards according to the user 
defined partitioner (in the backend config). This is much better than sharding 
by subtask because we can later change the parallelism of the job without 
affecting the state and also lets us defined a more elaborate sharding strategy 
through the partitioner.

This means, when a kv state is created we create a table for that kvstate 
in each shard. If we would do it according to your suggestion we would need to 
create numShards number of tables for each parallel instance (total of p*ns) 
for each kvstate. Furthermore this makes the fancy sharding useless because we 
cannot change the job parallelism. So we need to make sure that parallel 
subtasks of a given operator write to the same state tables (so we only have ns 
number of tables regardless of the parallelism).

In order to do this we need something that uniqely identifies a given state 
in the streaming program (and parallel instances should have the same id).

The information required to create such unique state id is an identifier 
for the operator that has the state + the name of the state. (The information 
obtained from the environment is not enough because chained operators have the 
same environment, therefore if they have conflicting state names the id is not 
unique). The only thing that identifies an operator in the logical streaming 
program is the operator id assigned by the jobgraphbuilder (thats the whole 
point of having it). 

An example job with p=2 and numshards = 3:

chained map -> filter, both the mapper and filter has a state named 
"count", and let's assume that mapper has opid 1 and filter 2.

In this case the mapper would create 3 db tables (1 on each shard) with the 
same name kvstate_count_1_jobId. The filter would also create 3 tables with 
names: kvstate_count_2_jobId

All mapper instances would write to all three database shards, and the same 
goes for all the filters.

I hope you get what I am trying to say. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45351298
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
 ---
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.Serializable;
+import java.sql.SQLException;
+import java.util.List;
+
+import 
org.apache.flink.contrib.streaming.state.ShardedConnection.Partitioner;
+
+import com.google.common.collect.Lists;
+
+/**
+ * 
+ * Configuration object for {@link DbStateBackend}, containing information 
to
+ * shard and connect to the databases that will store the state 
checkpoints.
+ *
+ */
+public class DbBackendConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   // Database connection properties
+   private final String userName;
+   private final String userPassword;
+   private final List shardUrls;
+
+   // JDBC Driver + DbAdapter information
+   private DbAdapter dbAdapter = new MySqlAdapter();
+   private String JDBCDriver = null;
+
+   private int maxNumberOfSqlRetries = 5;
+   private int sleepBetweenSqlRetries = 100;
+
+   // KvState properties
+   private int kvStateCacheSize = 1;
+   private int maxKvInsertBatchSize = 1000;
+   private float maxKvEvictFraction = 0.1f;
+   private int kvStateCompactionFreq = -1;
+
+   private Partitioner shardPartitioner;
+
+   /**
+* Creates a new sharded database state backend configuration with the 
given
+* parameters and default {@link MySqlAdapter}.
+* 
+* @param dbUserName
+*The username used to connect to the database at the given 
url.
+* @param dbUserPassword
+*The password used to connect to the database at the given 
url
+*and username.
+* @param dbShardUrls
+*The list of JDBC urls of the databases that will be used 
as
+*shards for the state backend. Sharding of the state will
+*happen based on the subtask index of the given task.
+*/
+   public DbBackendConfig(String dbUserName, String dbUserPassword, 
List dbShardUrls) {
+   this.userName = dbUserName;
+   this.userPassword = dbUserPassword;
+   this.shardUrls = dbShardUrls;
+   }
+
+   /**
+* Creates a new database state backend configuration with the given
+* parameters and default {@link MySqlAdapter}.
+* 
+* @param dbUserName
+*The username used to connect to the database at the given 
url.
+* @param dbUserPassword
+*The password used to connect to the database at the given 
url
+*and username.
+* @param dbUrl
+*The JDBC url of the database for example
+*"jdbc:mysql://localhost:3306/flinkdb".
+*/
+   public DbBackendConfig(String dbUserName, String dbUserPassword, String 
dbUrl) {
+   this(dbUserName, dbUserPassword, Lists.newArrayList(dbUrl));
+   }
+
+   /**
+* The username used to connect to the database at the given urls.
+*/
+   public String getUserName() {
+   return userName;
+   }
+
+   /**
+* The password used to connect to the database at the given url and
+* username.
+*/
+   public String getUserPassword() {
+   return userPassword;
+   }
+
+   /**
+* Number of database shards defined.
+*/
+   public int getNumberOfShards() {
+   return shardUrls.size();
+   }
+
+   /**
+* Database shard urls as provided in the constructor.
+* 
+*/
+   public List getShardUrls() {
+   return shardUrls;
+ 

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45351896
  
--- Diff: 
flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
 ---
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.Serializable;
+import java.sql.SQLException;
+import java.util.List;
+
+import 
org.apache.flink.contrib.streaming.state.ShardedConnection.Partitioner;
+
+import com.google.common.collect.Lists;
+
+/**
+ * 
+ * Configuration object for {@link DbStateBackend}, containing information 
to
+ * shard and connect to the databases that will store the state 
checkpoints.
+ *
+ */
+public class DbBackendConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   // Database connection properties
+   private final String userName;
+   private final String userPassword;
+   private final List shardUrls;
+
+   // JDBC Driver + DbAdapter information
+   private DbAdapter dbAdapter = new MySqlAdapter();
+   private String JDBCDriver = null;
+
+   private int maxNumberOfSqlRetries = 5;
+   private int sleepBetweenSqlRetries = 100;
+
+   // KvState properties
+   private int kvStateCacheSize = 1;
+   private int maxKvInsertBatchSize = 1000;
+   private float maxKvEvictFraction = 0.1f;
+   private int kvStateCompactionFreq = -1;
+
+   private Partitioner shardPartitioner;
+
+   /**
+* Creates a new sharded database state backend configuration with the 
given
+* parameters and default {@link MySqlAdapter}.
+* 
+* @param dbUserName
+*The username used to connect to the database at the given 
url.
+* @param dbUserPassword
+*The password used to connect to the database at the given 
url
+*and username.
+* @param dbShardUrls
+*The list of JDBC urls of the databases that will be used 
as
+*shards for the state backend. Sharding of the state will
+*happen based on the subtask index of the given task.
+*/
+   public DbBackendConfig(String dbUserName, String dbUserPassword, 
List dbShardUrls) {
+   this.userName = dbUserName;
+   this.userPassword = dbUserPassword;
+   this.shardUrls = dbShardUrls;
+   }
+
+   /**
+* Creates a new database state backend configuration with the given
+* parameters and default {@link MySqlAdapter}.
+* 
+* @param dbUserName
+*The username used to connect to the database at the given 
url.
+* @param dbUserPassword
+*The password used to connect to the database at the given 
url
+*and username.
+* @param dbUrl
+*The JDBC url of the database for example
+*"jdbc:mysql://localhost:3306/flinkdb".
+*/
+   public DbBackendConfig(String dbUserName, String dbUserPassword, String 
dbUrl) {
+   this(dbUserName, dbUserPassword, Lists.newArrayList(dbUrl));
+   }
+
+   /**
+* The username used to connect to the database at the given urls.
+*/
+   public String getUserName() {
+   return userName;
+   }
+
+   /**
+* The password used to connect to the database at the given url and
+* username.
+*/
+   public String getUserPassword() {
+   return userPassword;
+   }
+
+   /**
+* Number of database shards defined.
+*/
+   public int getNumberOfShards() {
+   return shardUrls.size();
+   }
+
+   /**
+* Database shard urls as provided in the constructor.
+* 
+*/
+   public List getShardUrls() {
+   return shardUrls;
+   }
 

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45362408
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
 * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
 * checkpoint data.
-* 
+*
 * @throws Exception Exceptions can be forwarded and will be logged by 
the system
 */
public abstract void close() throws Exception;
-   
+
// 

//  key/value state
// 

 
/**
 * Creates a key/value state backed by this state backend.
-* 
+*
+* @param operatorId Unique id for the operator creating the state
+* @param stateName Name of the created state
 * @param keySerializer The serializer for the key.
 * @param valueSerializer The serializer for the value.
 * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
 * @param  The type of the key.
 * @param  The type of the value.
-* 
+*
 * @return A new key/value state backed by this backend.
-* 
+*
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
-   public abstract  KvState createKvState(
+   public abstract  KvState createKvState(int 
operatorId, String stateName,
--- End diff --

I would like to get rid of this change and simply let the state backend 
create a UID for the state name.

This method is called one per proper creation of a state (so it should not 
need deterministic state naming). Recovery happens from the state handle, which 
can store all required info.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-158107515
  
I have a final comment inline. Otherwise, I think this is good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-158074782
  
Looking though this again...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45371097
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
 * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
 * checkpoint data.
-* 
+*
 * @throws Exception Exceptions can be forwarded and will be logged by 
the system
 */
public abstract void close() throws Exception;
-   
+
// 

//  key/value state
// 

 
/**
 * Creates a key/value state backed by this state backend.
-* 
+*
+* @param operatorId Unique id for the operator creating the state
+* @param stateName Name of the created state
 * @param keySerializer The serializer for the key.
 * @param valueSerializer The serializer for the value.
 * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
 * @param  The type of the key.
 * @param  The type of the value.
-* 
+*
 * @return A new key/value state backed by this backend.
-* 
+*
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
-   public abstract  KvState createKvState(
+   public abstract  KvState createKvState(int 
operatorId, String stateName,
--- End diff --

I am not completely sure what you mean here.

Multiple different states can have the same name in different tasks. As far 
as I know we dont assume unique state names. This gets worse if the chained 
tasks have states with the same name then they actually go to the same backend 
as well.

I dont see how to go around this without an operator id. Could you please 
clarify your idea?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45371919
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
 * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
 * checkpoint data.
-* 
+*
 * @throws Exception Exceptions can be forwarded and will be logged by 
the system
 */
public abstract void close() throws Exception;
-   
+
// 

//  key/value state
// 

 
/**
 * Creates a key/value state backed by this state backend.
-* 
+*
+* @param operatorId Unique id for the operator creating the state
+* @param stateName Name of the created state
 * @param keySerializer The serializer for the key.
 * @param valueSerializer The serializer for the value.
 * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
 * @param  The type of the key.
 * @param  The type of the value.
-* 
+*
 * @return A new key/value state backed by this backend.
-* 
+*
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
-   public abstract  KvState createKvState(
+   public abstract  KvState createKvState(int 
operatorId, String stateName,
--- End diff --

I suggest to not let the operator supply an ID and name, but simply leave 
the naming of the state to the state backend. The SqlStateBackend could just 
use `UUID.randomUUID().toString()` instead of `operatorId+stateName`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45372303
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
 * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
 * checkpoint data.
-* 
+*
 * @throws Exception Exceptions can be forwarded and will be logged by 
the system
 */
public abstract void close() throws Exception;
-   
+
// 

//  key/value state
// 

 
/**
 * Creates a key/value state backed by this state backend.
-* 
+*
+* @param operatorId Unique id for the operator creating the state
+* @param stateName Name of the created state
 * @param keySerializer The serializer for the key.
 * @param valueSerializer The serializer for the value.
 * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
 * @param  The type of the key.
 * @param  The type of the value.
-* 
+*
 * @return A new key/value state backed by this backend.
-* 
+*
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
-   public abstract  KvState createKvState(
+   public abstract  KvState createKvState(int 
operatorId, String stateName,
--- End diff --

The point is that all parallel instances write to the same set of tables. 
This will way sharding is transparently handled and the job parallelism can 
actually change without affecting the state. (No need to repartition it)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45372569
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
 * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
 * checkpoint data.
-* 
+*
 * @throws Exception Exceptions can be forwarded and will be logged by 
the system
 */
public abstract void close() throws Exception;
-   
+
// 

//  key/value state
// 

 
/**
 * Creates a key/value state backed by this state backend.
-* 
+*
+* @param operatorId Unique id for the operator creating the state
+* @param stateName Name of the created state
 * @param keySerializer The serializer for the key.
 * @param valueSerializer The serializer for the value.
 * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
 * @param  The type of the key.
 * @param  The type of the value.
-* 
+*
 * @return A new key/value state backed by this backend.
-* 
+*
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
-   public abstract  KvState createKvState(
+   public abstract  KvState createKvState(int 
operatorId, String stateName,
--- End diff --

Otherwise you will have to create p*numShards tables and you wont even know 
what state is in it from looking at the table names


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45372620
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
 * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
 * checkpoint data.
-* 
+*
 * @throws Exception Exceptions can be forwarded and will be logged by 
the system
 */
public abstract void close() throws Exception;
-   
+
// 

//  key/value state
// 

 
/**
 * Creates a key/value state backed by this state backend.
-* 
+*
+* @param operatorId Unique id for the operator creating the state
+* @param stateName Name of the created state
 * @param keySerializer The serializer for the key.
 * @param valueSerializer The serializer for the value.
 * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
 * @param  The type of the key.
 * @param  The type of the value.
-* 
+*
 * @return A new key/value state backed by this backend.
-* 
+*
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
-   public abstract  KvState createKvState(
+   public abstract  KvState createKvState(int 
operatorId, String stateName,
--- End diff --

What you mention depends on the parallel subtask ID (which is already given 
in the initialize() method). The operatorId and name are the same for all 
parallel instances anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-19 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1305#discussion_r45373479
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
@@ -64,33 +64,35 @@
/**
 * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
 * checkpoint data.
-* 
+*
 * @throws Exception Exceptions can be forwarded and will be logged by 
the system
 */
public abstract void close() throws Exception;
-   
+
// 

//  key/value state
// 

 
/**
 * Creates a key/value state backed by this state backend.
-* 
+*
+* @param operatorId Unique id for the operator creating the state
+* @param stateName Name of the created state
 * @param keySerializer The serializer for the key.
 * @param valueSerializer The serializer for the value.
 * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
 * @param  The type of the key.
 * @param  The type of the value.
-* 
+*
 * @return A new key/value state backed by this backend.
-* 
+*
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
-   public abstract  KvState createKvState(
+   public abstract  KvState createKvState(int 
operatorId, String stateName,
--- End diff --

The "name" (as a string) of the state is a very API specific thing that no 
other part of the runtime is concerned with. The operator ID is something 
specific to the StreamGraphBuilder and not to the streaming tasks at all. I 
think we are tying things together here that should not be tied together.

I still do not understand how this affects sharding. Does the shard 
assignment depend on the state name (rather than the parallel subtask / 
JobVertexId) ?

I only see that the table names will have the task name instead of the name 
of the state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-16 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-156964684
  
I'm looking at it again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-15 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-156809400
  
I have updated the description, and ran some more cluster tests without any 
issues.

It would be good if you all could do a second round of reviews please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-12 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-156034877
  
I would like to push this soon if no objections


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-12 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-156038439
  
I agree that this will be an important backend and good to have in. :) But 
do we need to push this right now? I think we should wait a little and make 
sure that it fits well into the other ongoing changes I think Stephan and 
Aljoscha are working on. At least let's wait for someone to review it again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-12 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-156039726
  
Well, I don't know what they are working on... It would be easier not 
having to rebase state backend api changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-12 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-156064645
  
I totally understand your point, but I think it's OK that changes of this 
scope take longer to review and get in (my HA PR took over a month or so to get 
in). At the end of the day, it matters more that we get this right (because it 
covers a very important use case) than getting it in a few days earlier.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-11 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-155765000
  
Should we do a final iteration over this and merge this to contrib?

The description got slightly out of date when I changed this back so that 
it stores the state by timestamp (but its basically ctr+f replace id with 
timestamp)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-04 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-153872782
  
I updated the sharding logic to do mod hashing by default on the keys for 
the number of shards, and the user can also add a custom Partitioner to 
implement custom sharding strategy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-04 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-153696208
  
I also removed the sharding logic now as I think it was pretty weak and not 
very useful (it maintained 1 connection per subtask which would break if we 
change parallelism)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-04 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-153695943
  
I updated this PR with the reworked logic, it has several advantages over 
the previous timestamp based solution (including the elimination of 
transactions from the logic).

The only problem I see is that the derby batch inserts happen row by row 
currently as it does not have the insert or update semantics.

This rework will also make it easier to write connectors to non 
transactional stores like Cassandra


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-03 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-153289042
  
Cool stuff, really! This is very much in line with what I had in mind for a 
SQL backend.

Let me check if I understood everything correct (and see where my 
understanding is wrong), because I think we should be able to make an "exactly 
once" version of this based that mechanism. I am basically rephrasing what you 
describe in a different model.

### Basic Mode

What this is effectively doing is a batched and asynchronous version of 
distributed 2-phase commit transactions. The phases look basically like this:

  - **Adding data**: Pipe all modifications into the database, but not 
commit the transaction. They are tagged with the timestamp of the upcoming 
checkpoint (or any coordinated increasing version counter). This can happen in 
the background thread, for as long as the in-operator cache holds all edits 
that are not in the database yet.

  - **Pre-commit**: This is when the checkpoint is triggered. All pending 
edits are written into the database and then the transaction is committed. The 
state handle only includes the timestamp used on the elements. In the classical 
2-phase transactions, after a task acks the pre-commit, it has to be able to 
recover to that state, which is given here. The checkpoint is not immediately 
valid for recovery though, which means that recovery has to have either a 
filter, or issue a query that deletes all records with timestamps larger than 
the version given during recovery. After the pre-commit, the timestamp is 
locally incremented and work can continue.

  - **Full commit**: This happens implicitly when the checkpoint 
coordinator marks the checkpoint as complete.

  - **Recovery**: The timestamp (or version counter) of the last successful 
checkpoint is restored, the deletion of records that were committed (but where 
the checkpoint did not succeed as a whole) happens, then records are lazily 
fetched. 

So far, this should give exactly once guarantees, or am I overlooking 
something?

### Compacting

Whenever the "checkpoint complete" notification comes (or every so many 
changes) you trigger a clean-up query in the background. Given that the SQL 
database has a not completely terrible query planner, this SQL statement would 
be okay efficient (single semi join).
```
DELETE FROM "table name" t1
WHERE EXISTS 
  (SELECT *
 FROM "table name" t2
WHERE t2.handle_id = t1.handle_id
  AND t2.timestamp > t1.timestamp//-- a newer version exists for 
the same handle
  AND t2.timestamp <= GLOBAL_VERSION //-- and the newer version is 
globally committed
  )
```
The good thing is that by virtue of having the incrementing global 
versions, we can set the isolation level for the query to "read uncommitted", 
which means that it will not lock anything and thus not compete with any other 
ongoing modifications.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-03 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-153351332
  
@gyfora That's what I meant, basically the timestamps could subsume the 
role of the checkpointIds. I.e. The checkpointIds have the semantics of the 
timestamps and the timestamps would not be required. (Or the checkpointId would 
be removed and timestamps remain, depends on how you look at it... :smile: )



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-03 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-153339094
  
@StephanEwen 
Thanks for the comments. You are right the main idea is exactly as you 
described. 
The reason why exactly-once is violated in some corner cases because it can 
happen that the pre-commit phase of the previous checkpoint is still failing 
during recovery. 

If we assume that the previous job is completely killed of, no writing to 
the database whatsoever after that happens, then we can properly clean up 
during recovery. 

This unfortunately does not seem to hold if you set the retry wait time to 
very low (like 0 ms in the snapshot). What this means is that the failed job is 
still writing the failed snapshot to the database after you recovered and 
cleaned up.

As for the compaction, I came up with something very similar for compaction 
but here is the funny thing and my problem. The query you wrote will run 
properly on Derby but is invalid on MySql (you cannot create a subquery for the 
same table as you are modifying). In mysql you need to create an inner join, 
but that will not work in Derby :P 

In any case I have made a prototype of this on: 
https://github.com/gyfora/flink/tree/compaction
The user can define the frequency of compaction (compact every so many 
checkpoints). And it also makes sure that compaction and cleanup is only 
executed on 1 subtask to avoid double work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-03 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-153307495
  
@aljoscha 
 1. I was initially using the MockEnvironments but I added the 
DummyEnvironment for several reasons: I wanted control over the JobId and the 
number of subtasks for which I would have changed the MockEnvironment. Also I 
wanted to avoid having to clean up the memorymanager and other resources as I 
really don't need them
 2. I don't really understand what you mean here, the recovery timestamp is 
only used for cleanup on restore
 3. Imagine a scenario where 2 task are restoring . 1 restores quickly and 
starts writing new timestamps. If we call cleanup on the other task it will 
remove the new states if we don't bound by recovery timestamp. This can happen 
easily.

I don't know about the allOrNothingState :/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-03 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-153305943
  
Just some remarks:
 - `DummyEnvironment` seems unnecessary, we already have 
`StreamMockEnvironment`. I think it could be reused.

- In the first version you had both the timestamp and checkpoint and 
recovery/key lookup took both into account. The recent version uses just the 
timestamp for lookup. Both introduce the new restore timestamp in the restore 
methods.

- The cleanup of failed checkpoints took into account the checkpoint and 
the recovery timestamp, but I think the recovery timestamp was always redundant 
since the condition in the SQL statement would always hold.

=> I think the timestamp is not needed. Can't everything be implemented by 
just using the (monotonically rising) checkpoint IDs?

Also, this is unrelated but maybe @StephanEwen or @gyfora know: Why to we 
have the `allOrNothingState` in 
`CheckpointCoordinator.restoreLatestCheckpointedState`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-03 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-15538
  
2. Ah, I meant the lookupTimestamp. In an earlier version you used both the 
checkpointId and lookupTimestamp to perform key lookups.
3. I see, in this implementation of state the timestamp has basically 
assumed the role of the checkpointId and the checkpointId is (I think) 
completely ignored. Correct? Couldn't we then change the semantics of the 
checkpointId to work like the timestamps (they are somewhat logical, not 
physical timestamps anyways)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-03 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-153376152
  
My last commit introduces automatic compaction with user specified 
frequency. It also allows the KvStates to implement the CheckpointNotifier 
interface in which case they will also get notified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-02 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-152948336
  
Thanks for the great write up!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-11-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-152944197
  
Good stuff! Will need a day more to look through this, but this is a cool 
way of doing stateful stream computation :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-10-31 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-152767697
  
My last commit (that was meant to solve the problems with failed tasks 
writing to the db) introduced some issues with the exactly once guarantees. I 
will look into it tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-10-28 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1305#issuecomment-151884357
  
Wow, a lot of stuff. I will look into it once the release is out. :smiley: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Out-of-core state backend for JDBC databases

2015-10-27 Thread gyfora
GitHub user gyfora opened a pull request:

https://github.com/apache/flink/pull/1305

Out-of-core state backend for JDBC databases

Detailed description incoming...

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gyfora/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1305.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1305


commit b793bca20b79c1fe38ed7a31deca485e7d109060
Author: Gyula Fora 
Date:   2015-10-26T08:58:49Z

[FLINK-2916] [streaming] Expose operator and task information to 
StateBackend

commit c35949f5e765f377799730a973b374eeea9c3921
Author: Gyula Fora 
Date:   2015-10-27T17:31:04Z

[FLINK-2924] [streaming] Out-of-core state backend for JDBC databases




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---