[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-12 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r256144714
 
 

 ##
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##
 @@ -562,9 +566,14 @@ private boolean isLoggedStoreValid(String storeName, File 
loggedStoreDir) {
 (long) new 
StorageConfig(config).getChangeLogDeleteRetentionsInMs().get(storeName).get();
   }
 
+  // if the store has no changelogSSP, simply use null
+  SystemStreamPartition changelogSSP = null;
 
 Review comment:
   Can we short circuit here as well and return false instead of having it go 
through the path of validating offsets?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-12 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r256143994
 
 

 ##
 File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##
 @@ -532,14 +531,19 @@ private void cleanBaseDirsAndReadOffsetFiles() {
 LOG.info("Deleting logged storage partition directory " + 
loggedStorePartitionDir.toPath().toString());
 FileUtil.rm(loggedStorePartitionDir);
   } else {
-String offset = 
StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, OFFSET_FILE_NAME);
+
+// if the store has no changelogSSP, simply use null
+SystemStreamPartition changelogSSP = null;
+if (changelogSystemStreams.containsKey(storeName))
 
 Review comment:
   Should we just wrap all the code below(until updating fileOffsets) inside 
this if block?
   It gets rid of the scenario where `changelogSSP` can be null but still we 
try fetching offsets for it. It also gets rid of potential NPEs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-12 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r256068006
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##
 @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) {
* Read and return the contents of the offset file.
*
* @param storagePartitionDir the base directory of the store
-   * @param offsetFileName name of the offset file
* @return the content of the offset file if it exists for the store, null 
otherwise.
*/
-  public static String readOffsetFile(File storagePartitionDir, String 
offsetFileName) {
-String offset = null;
-File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+  public static Map readOffsetFile(File 
storagePartitionDir, Set storeSSPs) {
+Map offsets = null;
+String fileContents;
+File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
 String storePath = storagePartitionDir.getPath();
 
 if (offsetFileRef.exists()) {
   LOG.info("Found offset file in storage partition directory: {}", 
storePath);
   try {
-offset = FileUtil.readWithChecksum(offsetFileRef);
+fileContents = FileUtil.readWithChecksum(offsetFileRef);
+try {
+  offsets = OBJECT_MAPPER.readValue(fileContents, 
OFFSETS_TYPE_REFERENCE);
+} catch (JsonParseException | JsonMappingException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
+} catch (IOException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
 
 Review comment:
   If the contents are not proper json, wouldn't we get JsonParseException? I 
was referring to IOException block since that would mean either we couldn't 
read the file successfully or the file was not found. FileNotFoundException 
extends IOException.
   
   It is sufficient to have only two catch blocks one for json parse failures 
and another to catch all other failure scenarios.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-12 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r256068006
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##
 @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) {
* Read and return the contents of the offset file.
*
* @param storagePartitionDir the base directory of the store
-   * @param offsetFileName name of the offset file
* @return the content of the offset file if it exists for the store, null 
otherwise.
*/
-  public static String readOffsetFile(File storagePartitionDir, String 
offsetFileName) {
-String offset = null;
-File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+  public static Map readOffsetFile(File 
storagePartitionDir, Set storeSSPs) {
+Map offsets = null;
+String fileContents;
+File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
 String storePath = storagePartitionDir.getPath();
 
 if (offsetFileRef.exists()) {
   LOG.info("Found offset file in storage partition directory: {}", 
storePath);
   try {
-offset = FileUtil.readWithChecksum(offsetFileRef);
+fileContents = FileUtil.readWithChecksum(offsetFileRef);
+try {
+  offsets = OBJECT_MAPPER.readValue(fileContents, 
OFFSETS_TYPE_REFERENCE);
+} catch (JsonParseException | JsonMappingException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
+} catch (IOException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
 
 Review comment:
   If the contents are not proper json, wouldn't we get JsonParseException? I 
was referring to IOException block since that would mean either we couldn't 
read the file successfully or the file was not found. FileNotFoundException 
extends IOException.
   
   It is sufficient to have only two catch blocks one for parse exception and 
another to catch all other failure scenarios.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-11 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r255824624
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##
 @@ -115,6 +129,36 @@ public static boolean isOffsetFileValid(File storeDir, 
String offsetFileName) {
 return hasValidOffsetFile;
   }
 
+  /**
+   * Write the given SSP-Offset map into the offsets file.
+   * @param storeBaseDir the base directory of the store
+   * @param storeName the store name to use
+   * @param taskName the task name which is referencing the store
+   * @param offsets The SSP-offset to write
+   * @return
+   * @throws IOException
+   */
+  public static File writeOffsetFile(File storeBaseDir, String storeName, 
TaskName taskName,
+  Map offsets) throws IOException {
+File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName), OFFSET_FILE_NAME);
+String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
+FileUtil.writeWithChecksum(offsetFile, fileContents);
+return offsetFile;
 
 Review comment:
   why does this have to return the file?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-11 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r255822722
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
 ##
 @@ -65,17 +61,12 @@
  */
 public class TaskSideInputStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskSideInputStorageManager.class);
-  private static final String OFFSET_FILE = "SIDE-INPUT-OFFSETS";
   private static final long STORE_DELETE_RETENTION_MS = 
TimeUnit.DAYS.toMillis(1); // same as changelog delete retention
-  private static final ObjectMapper OBJECT_MAPPER = 
SamzaObjectMapper.getObjectMapper();
-  private static final TypeReference> 
OFFSETS_TYPE_REFERENCE =
-  new TypeReference>() { };
-  private static final ObjectWriter OBJECT_WRITER = 
OBJECT_MAPPER.writerWithType(OFFSETS_TYPE_REFERENCE);
 
   private final Clock clock;
   private final Map storeToProcessor;
   private final Map stores;
-  private final String storeBaseDir;
+  private final File storeBaseDir;
 
 Review comment:
   I am guessing you made this change to reuse the helpers between CSM and 
TSSM. I used string to get away from the pattern of using file to represent 
storeBaseDir since we don't use the storeBaseDir for anything else except as a 
prefix to create the absolute offset path.
   
   I kept it simple as a string since it also simplifies testing where it 
didn't make sense to create a file handle to a directory to construct storage 
manager.
   
   Thoughts?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-11 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r255823615
 
 

 ##
 File path: 
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
 ##
 @@ -567,6 +566,22 @@ class TestTaskStorageManager extends MockitoSugar {
 testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, 
upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
   }
 
+  @Test
+  def testReadOfOldOffsetFormat(): Unit = {
+// Create a file in old single-offset format, with a sample offset
+val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName)
+val storeFile = new File(storeDirectory, "store.sst")
+val offsetFile = new File(storeDirectory, "OFFSET")
 
 Review comment:
   nit: make the offset file name public in the utils and reuse it here? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-11 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r255819677
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##
 @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) {
* Read and return the contents of the offset file.
*
* @param storagePartitionDir the base directory of the store
-   * @param offsetFileName name of the offset file
* @return the content of the offset file if it exists for the store, null 
otherwise.
*/
-  public static String readOffsetFile(File storagePartitionDir, String 
offsetFileName) {
-String offset = null;
-File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+  public static Map readOffsetFile(File 
storagePartitionDir, Set storeSSPs) {
+Map offsets = null;
+String fileContents;
+File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
 String storePath = storagePartitionDir.getPath();
 
 if (offsetFileRef.exists()) {
   LOG.info("Found offset file in storage partition directory: {}", 
storePath);
   try {
-offset = FileUtil.readWithChecksum(offsetFileRef);
+fileContents = FileUtil.readWithChecksum(offsetFileRef);
+try {
+  offsets = OBJECT_MAPPER.readValue(fileContents, 
OFFSETS_TYPE_REFERENCE);
+} catch (JsonParseException | JsonMappingException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
+} catch (IOException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
 
 Review comment:
   this is not needed right? when we get here, it can only mean we were unable 
to read the file. Does this also cover file not exists scenario?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mynameborat commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

2019-02-11 Thread GitBox
mynameborat commented on a change in pull request #915: Consolidating offset 
read and write for store-offsets and side-inputs, maintaining backward 
compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r255819974
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##
 @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) {
* Read and return the contents of the offset file.
*
* @param storagePartitionDir the base directory of the store
-   * @param offsetFileName name of the offset file
* @return the content of the offset file if it exists for the store, null 
otherwise.
*/
-  public static String readOffsetFile(File storagePartitionDir, String 
offsetFileName) {
-String offset = null;
-File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+  public static Map readOffsetFile(File 
storagePartitionDir, Set storeSSPs) {
+Map offsets = null;
+String fileContents;
+File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
 String storePath = storagePartitionDir.getPath();
 
 if (offsetFileRef.exists()) {
   LOG.info("Found offset file in storage partition directory: {}", 
storePath);
   try {
-offset = FileUtil.readWithChecksum(offsetFileRef);
+fileContents = FileUtil.readWithChecksum(offsetFileRef);
+try {
+  offsets = OBJECT_MAPPER.readValue(fileContents, 
OFFSETS_TYPE_REFERENCE);
+} catch (JsonParseException | JsonMappingException e) {
+  LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+  offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(x -> x, y -> fileContents)) : null;
 
 Review comment:
   maybe return an empty map instead of null?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services