[GitHub] nwangtw commented on a change in pull request #2891: [WIP] Refactor StatefulStorage

2018-05-07 Thread GitBox
nwangtw commented on a change in pull request #2891: [WIP] Refactor 
StatefulStorage
URL: https://github.com/apache/incubator-heron/pull/2891#discussion_r186349909
 
 

 ##
 File path: heron/proto/ckptmgr.proto
 ##
 @@ -171,7 +175,25 @@ message CleanStatefulCheckpointResponse {
   repeated string cleaned_checkpoint_ids = 2;
 }
 
-// stmgr -> ckptmgr messages
+/*
+ * stmgr -> ckptmgr messages
+ */
+
+// This message encapsulates the info associated with
+// state of an instance/partition
+message InstanceStateCheckpoint {
+  required string checkpoint_id = 1;
+  required bytes state = 2;
+  // A version string that can be specified by user. It can be used to 
translate
+  // checkpoint data from older versions to the latest version when necessary.
+  optional string data_version = 3;
+}
+
+// This message encapsulates the info associated with
+// checkpoint metadata of a component
+message CheckpointComponentMetadata {
+  required int32 parallelism = 1;
 
 Review comment:
   @skanjila here is the doc: 
https://docs.google.com/document/d/1p443mcdD607Vf9aCnMq5ESVMFrEuneGlceXmtUSvVMQ/edit#heading=h.myl6mycadhfh


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nwangtw commented on a change in pull request #2891: [WIP] Refactor StatefulStorage

2018-05-04 Thread GitBox
nwangtw commented on a change in pull request #2891: [WIP] Refactor 
StatefulStorage
URL: https://github.com/apache/incubator-heron/pull/2891#discussion_r186226835
 
 

 ##
 File path: 
heron/spi/src/java/org/apache/heron/spi/statefulstorage/IStatefulStorage.java
 ##
 @@ -16,37 +16,65 @@
 
 import java.util.Map;
 
-import org.apache.heron.proto.system.PhysicalPlans;
-
+/**
+ * The interface of all storage classes for checkpoints.
+ * For each checkpoint, two types of data are stored:
+ * - Component Meta Data (one per component).
+ * - Instance Checkpoint Data (one per instance or patition)
+ * Each Stateful Storage implementation needs to handle them accordingly.
+ */
 public interface IStatefulStorage {
   /**
* Initialize the Stateful Storage
-   *
+   * @param topologyName The name of the topology.
* @param conf An unmodifiableMap containing basic configuration
-   * Attempts to modify the returned map,
-   * whether direct or via its collection views, result in an 
UnsupportedOperationException.
*/
-  void init(Map conf) throws StatefulStorageException;
+  void init(String topologyName, final Map conf)
+  throws StatefulStorageException;
 
   /**
* Closes the Stateful Storage
*/
   void close();
 
-  // Store the checkpoint
-  void store(Checkpoint checkpoint) throws StatefulStorageException;
-
-  // Retrieve the checkpoint
-  Checkpoint restore(String topologyName, String checkpointId,
- PhysicalPlans.Instance instanceInfo) throws 
StatefulStorageException;
-
-  // TODO(mfu): We should refactor all interfaces in IStatefulStorage,
-  // TODO(mfu): instead providing Class Checkpoint, we should provide an 
Context class,
-  // TODO(mfu): It should:
-  // TODO(mfu): 1. Provide meta data access, like topologyName
-  // TODO(mfu): 2. Provide utils method to parse the protobuf object, like 
getTaskId()
-  // TODO(mfu): 3. Common methods, like getCheckpointDir()
-  // Dispose the checkpoint
-  void dispose(String topologyName, String oldestCheckpointId, boolean 
deleteAll)
-  throws StatefulStorageException;
+  /**
+   * Store instance checkpoint.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @param checkpoint The checkpoint data.
+   */
+  void storeCheckpoint(final CheckpointPartitionInfo info, final Checkpoint 
checkpoint)
+  throws StatefulStorageException;
+
+  /**
+   * Retrieve instance checkpoint.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @return The checkpoint data from the specified blob id.
+   */
+  Checkpoint restoreCheckpoint(final CheckpointPartitionInfo info) throws 
StatefulStorageException;
+
+  /**
+   * Store medata data for component. Ideally this function should only be 
called once
+   * for each component.
+   * @param info The information (reference key) for the checkpoint partition.
+   * @param metadata The checkpoint metadata from a component.
+   */
+  void storeComponentMetaData(final CheckpointPartitionInfo info, final 
CheckpointMetadata metadata)
 
 Review comment:
   Good point!
   
   Then storage implementation need to make this decision.
   
   Instance 0 should be responsible for the meta data in distributed storages; 
and for local storages, all instances need to export the data (and the data 
needs to be stored in the instance/partition directory). StatefulStorage 
implementation should know the information and handle the function correctly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nwangtw commented on a change in pull request #2891: [WIP] Refactor StatefulStorage

2018-05-04 Thread GitBox
nwangtw commented on a change in pull request #2891: [WIP] Refactor 
StatefulStorage
URL: https://github.com/apache/incubator-heron/pull/2891#discussion_r186219947
 
 

 ##
 File path: heron/proto/ckptmgr.proto
 ##
 @@ -171,7 +175,25 @@ message CleanStatefulCheckpointResponse {
   repeated string cleaned_checkpoint_ids = 2;
 }
 
-// stmgr -> ckptmgr messages
+/*
+ * stmgr -> ckptmgr messages
+ */
+
+// This message encapsulates the info associated with
+// state of an instance/partition
+message InstanceStateCheckpoint {
+  required string checkpoint_id = 1;
+  required bytes state = 2;
+  // A version string that can be specified by user. It can be used to 
translate
+  // checkpoint data from older versions to the latest version when necessary.
+  optional string data_version = 3;
+}
+
+// This message encapsulates the info associated with
+// checkpoint metadata of a component
+message CheckpointComponentMetadata {
+  required int32 parallelism = 1;
 
 Review comment:
   kk. component name could be useful for viewing data. Will add.
   
   Firstly, stateful data is organized around components (the schema should be 
the same for all instances/partitions of the component and could be different 
between components). It is overall a 3-level tree structure: topology is the 
root, component is the mid level and instance/partition is the leaf level. We 
need to store component level data in storage.
   
   Secondly, the parallelism value is important to detect the plan changes and 
handle the change correctly. In our first state, we can drop the stateful data 
when parallelism doesn't match. In future, we should handle repartition based 
on the old and new parallelisms of the component.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services