Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2017-01-25 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review163035
---




samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 97)


"Got default storage ..."



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 137)


No space before ":" here and elsewhere, including method type annotations 
(e.g. line 169, 185).



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 152)


We should log which directory we're using for the store here at INFO.


- Prateek Maheshwari


On Oct. 22, 2016, 3:06 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 22, 2016, 3:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-24 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153741
---


Ship it!




Looks good to me, thanks.


samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)


s/CHANGE_LOG/CHANGELOG



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 138)


s/is greater than/is older than.


- Prateek Maheshwari


On Oct. 22, 2016, 3:06 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 22, 2016, 3:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-22 Thread Shanthoosh Venkataraman


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java, 
> > line 254
> > 
> >
> > Does jobConfig.getChangeLog...() (implicit conversion) not work?

No, the implicit conversion doesn't work here. The convenience method is part 
of StorageConfig class.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 130
> > 
> >
> > First %s should be store name. Add another %s at the end for 
> > loggedStoreDir.

This log message belongs to the task store, which would in itself contain the 
store name. Adding store name here is unnecessary.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala, line 
> > 144
> > 
> >
> > implicit conversion should probably work.

No, implicit conversion doesn't work here, that is the reason for creating the 
object explicitly.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 186
> > 
> >
> > s/partition/logged storage partition to be consistent with next message.

Done.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 133
> > 
> >
> > Log both last modified time and delete retention ms values too.

Done.


> On Oct. 21, 2016, 12:45 a.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 
> > 57
> > 
> >
> > getChangeLogDeleteRetentionsInMs

Done.


- Shanthoosh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153478
---


On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 22, 2016, 10:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-22 Thread Shanthoosh Venkataraman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
---

(Updated Oct. 22, 2016, 10:06 p.m.)


Review request for samza.


Repository: samza


Description
---

Every local task store is backed up by a kafka changelog topic. Due to log 
compaction, delete tombstones of the changelog topic have a ttl of 
delete.retention.ms. Replaying the events from the changelog that has missing 
delete tombstones, would result in creation of an inconsistent local store(due 
to the missing of some delete events). This patch deletes the local stores in 
which difference between current time and last modified time of the offset file 
is greater than delete.retention.ms during the container startup.


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 
4d40f520e54beb643acd8410c772b75e2f6a9162 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
973ab8cfb3d248bec7efe5e338f5e667f097556d 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
---

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-22 Thread Shanthoosh Venkataraman


> On Oct. 20, 2016, 11:46 p.m., Boris Shkolnik wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 132
> > 
> >
> > nit. using System.currentTimeMillis directly makes it difficult to unit 
> > test.

Done. Injected SystemClock dependency as a parameter.


- Shanthoosh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153468
---


On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 22, 2016, 10:06 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-20 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153478
---



Looks pretty good, few final comments.


samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java (line 
253)


Does jobConfig.getChangeLog...() (implicit conversion) not work?



samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 57)


getChangeLogDeleteRetentionsInMs



samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 58)


See if you can use the named operator instead of the symbolic operator.

I think you might be able to .toMap on the list of pairs.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 104)


Indent by 4 (or whatever continuation indent is)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 106)


If loggedStoreDir isn't present we put null into fileOffset. If that's the 
expected behavior, let's log at info in isStateLoggedStore if 
(!loggedStoreDir.exists())



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 108)


Misleading comment, could be the other condition too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 111)


Will be useful to log at info the read file offset  here (or in 
readOffsetFile)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 112)


Indent by 4 (or whatever continuation indent is)



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 118)


Add method description.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 130)


First %s should be store name. Add another %s at the end for loggedStoreDir.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 133)


log both last modified time and delete retention ms values too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 133)


Log both last modified time and delete retention ms values too.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 182)


s/partition/logged storage partition to be consistent with next message.



samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala (line 144)


implicit conversion should probably work.


- Prateek Maheshwari


On Oct. 19, 2016, 3:04 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 19, 2016, 3:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 

Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-20 Thread Boris Shkolnik

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153468
---




samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 44)


nit. can we call it storeName.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 132)


nit. using System.currentTimeMillis directly makes it difficult to unit 
test.



samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 
(line 323)


isn't it easier just to make method package private?


- Boris Shkolnik


On Oct. 19, 2016, 10:04 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 19, 2016, 10:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-20 Thread Boris Shkolnik

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review153476
---


Ship it!




- Boris Shkolnik


On Oct. 19, 2016, 10:04 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 19, 2016, 10:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-19 Thread Shanthoosh Venkataraman


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 156
> > 
> >
> > Unrelated, but let's make this info.

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala, line 
> > 33
> > 
> >
> > Usually more readable if you write this as a multiplication: 1 * 24 * 
> > 60 * 60 * 1000L

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, 
> > line 532
> > 
> >
> > Prefer passing the one config that we need explicitly instead of 
> > passing the config object.

Moved to use changeLogDeleteRetentions map.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 29
> > 
> >
> > Unrelated to RB but prefer explicit imports.

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 26
> > 
> >
> > Delete or import explicitly.

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 107
> > 
> >
> > Add method description.

Done.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 114
> > 
> >
> > Another case we ran into on Friday - if the oldest offset in the 
> > changelog topic is newer than the offset in the OFFSET file. Do you need to 
> > handle that here?
> > 
> > Nitpick: would isStaleStore be clearer?

Discussed offline. This is a regular scenario that happens with compaction 
(message expiration in general w.r.t topics). When the offset in the offset 
file is older than oldest offset in changelog, it indicates that compaction has 
happened. To not miss messages from the topic in the consumption, users have to 
consume from the oldest offset in the changelog, which is controlled by the 
config parameter systems.name.consumer.auto.offset.reset.


> On Oct. 17, 2016, 11:08 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala,
> >  line 122
> > 
> >
> > Mention somewhere in the message that this means that the store is 
> > stale.

Done.


- Shanthoosh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review152976
---


On Oct. 19, 2016, 10:04 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 19, 2016, 10:04 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   

Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-19 Thread Shanthoosh Venkataraman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
---

(Updated Oct. 19, 2016, 10:04 p.m.)


Review request for samza.


Repository: samza


Description
---

Every local task store is backed up by a kafka changelog topic. Due to log 
compaction, delete tombstones of the changelog topic have a ttl of 
delete.retention.ms. Replaying the events from the changelog that has missing 
delete tombstones, would result in creation of an inconsistent local store(due 
to the missing of some delete events). This patch deletes the local stores in 
which difference between current time and last modified time of the offset file 
is greater than delete.retention.ms during the container startup.


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 
4d40f520e54beb643acd8410c772b75e2f6a9162 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
973ab8cfb3d248bec7efe5e338f5e667f097556d 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
---

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-17 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review152976
---




samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 33)


Usually more readable if you write this as a multiplication: 1 * 24 * 60 * 
60 * 1000L



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
532)


Prefer passing the one config that we need explicitly instead of passing 
the config object.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 26)


Delete or import explicitly.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 29)


Unrelated to RB but prefer explicit imports.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 71)


SystemClock exists so that you can pass a "Clock" to your method/class and 
mock it in tests. Let's either do that (preferred) or use 
System.currentTimeMillis() directly.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 98)


Looks like we've updated `fileOffset` in `#readOffsetFile` as a side effect 
even when the store is stale. Is that what we want here?



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 100)


Add an INFO message here.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 106)


Add method description.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 113)


Another case we ran into on Friday - if the oldest offset in the changelog 
topic is newer than the offset in the OFFSET file. Do you need to handle that 
here?

Nitpick: would isStaleStore be clearer?



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 118)


Looks like this is already logged at line 163?



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 119)


Don't `return` in scala code.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 121)


Mention somewhere in the message that this means that the store is stale.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 150)


I'd prefer to split this into two methods - existence check and file read. 
Would be even nicer if fileOffset was updated explicitly (after staleness 
checks etc.) and not as a side effect of reading the file.

If you don't, let's add return type to method signature.



samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
(line 155)


Unrelated, but let's make this info.


- Prateek Maheshwari


On Oct. 17, 2016, 3:40 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 17, 2016, 3:40 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   

Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-17 Thread Shanthoosh Venkataraman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
---

(Updated Oct. 17, 2016, 10:40 p.m.)


Review request for samza.


Repository: samza


Description
---

Every local task store is backed up by a kafka changelog topic. Due to log 
compaction, delete tombstones of the changelog topic have a ttl of 
delete.retention.ms. Replaying the events from the changelog that has missing 
delete tombstones, would result in creation of an inconsistent local store(due 
to the missing of some delete events). This patch deletes the local stores in 
which difference between current time and last modified time of the offset file 
is greater than delete.retention.ms during the container startup.


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 
4d40f520e54beb643acd8410c772b75e2f6a9162 
  samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
973ab8cfb3d248bec7efe5e338f5e667f097556d 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
---

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-17 Thread Shanthoosh Venkataraman


> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote:
> > Looks better, but I think there's still one major part missing. 
> > 
> > In order to have agreement between a kafka changelog and the task storage, 
> > the changelog should be created with the same delete.retention.ms property. 
> > 
> > There are 2 ways to do this:
> > 1. (preferred) update the kafka system admin to read the samza changelog 
> > property that you've defined (which also needs to be added to the config 
> > table, btw) and create the topic with that value for delete.retention.ms
> > 2. Rename the property so it's one of the "topic-level-property" so it gets 
> > automatically passed to kafka. This is convenient but wouldn't apply to 
> > other systems, which could be useful if those other systems have a delete 
> > retention policy.
> 
> Shanthoosh Venkataraman wrote:
> I think 1) is the only plausible way to accomplish this through job 
> config. delete.retention.ms configuration is associated only with stores 
> changelog, not applicable to topics in general, so making it topic level 
> property might notbe a good idea. Enforcing the delete.retention.ms property 
> is harder to accomplish through config, since kafka is a external system. 
> Ideally, if there's a way to fetch kafka metadata/config(delete.retention.ms) 
> about a topic, during container startups we could fetch that value, rather 
> than expecting the users to specify it.
> 
> Jake Maes wrote:
> Please take a look at 
> org.apache.samza.config.KafkaConfig#getChangelogKafkaProperties

Done.


- Shanthoosh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
---


On Oct. 17, 2016, 10:40 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 17, 2016, 10:40 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> 973ab8cfb3d248bec7efe5e338f5e667f097556d 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-17 Thread Jake Maes


> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote:
> > Looks better, but I think there's still one major part missing. 
> > 
> > In order to have agreement between a kafka changelog and the task storage, 
> > the changelog should be created with the same delete.retention.ms property. 
> > 
> > There are 2 ways to do this:
> > 1. (preferred) update the kafka system admin to read the samza changelog 
> > property that you've defined (which also needs to be added to the config 
> > table, btw) and create the topic with that value for delete.retention.ms
> > 2. Rename the property so it's one of the "topic-level-property" so it gets 
> > automatically passed to kafka. This is convenient but wouldn't apply to 
> > other systems, which could be useful if those other systems have a delete 
> > retention policy.
> 
> Shanthoosh Venkataraman wrote:
> I think 1) is the only plausible way to accomplish this through job 
> config. delete.retention.ms configuration is associated only with stores 
> changelog, not applicable to topics in general, so making it topic level 
> property might notbe a good idea. Enforcing the delete.retention.ms property 
> is harder to accomplish through config, since kafka is a external system. 
> Ideally, if there's a way to fetch kafka metadata/config(delete.retention.ms) 
> about a topic, during container startups we could fetch that value, rather 
> than expecting the users to specify it.

Please take a look at 
org.apache.samza.config.KafkaConfig#getChangelogKafkaProperties


- Jake


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
---


On Oct. 4, 2016, 11:33 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 4, 2016, 11:33 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-13 Thread Shanthoosh Venkataraman


> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote:
> > Looks better, but I think there's still one major part missing. 
> > 
> > In order to have agreement between a kafka changelog and the task storage, 
> > the changelog should be created with the same delete.retention.ms property. 
> > 
> > There are 2 ways to do this:
> > 1. (preferred) update the kafka system admin to read the samza changelog 
> > property that you've defined (which also needs to be added to the config 
> > table, btw) and create the topic with that value for delete.retention.ms
> > 2. Rename the property so it's one of the "topic-level-property" so it gets 
> > automatically passed to kafka. This is convenient but wouldn't apply to 
> > other systems, which could be useful if those other systems have a delete 
> > retention policy.

I think 1) is the only plausible way to accomplish this through job config. 
delete.retention.ms configuration is associated only with stores changelog, not 
applicable to topics in general, so making it topic level property might notbe 
a good idea. Enforcing the delete.retention.ms property is harder to accomplish 
through config, since kafka is a external system. Ideally, if there's a way to 
fetch kafka metadata/config(delete.retention.ms) about a topic, during 
container startups we could fetch that value, rather than expecting the users 
to specify it.


- Shanthoosh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
---


On Oct. 4, 2016, 11:33 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 4, 2016, 11:33 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-04 Thread Jake Maes

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151439
---



Looks better, but I think there's still one major part missing. 

In order to have agreement between a kafka changelog and the task storage, the 
changelog should be created with the same delete.retention.ms property. 

There are 2 ways to do this:
1. (preferred) update the kafka system admin to read the samza changelog 
property that you've defined (which also needs to be added to the config table, 
btw) and create the topic with that value for delete.retention.ms
2. Rename the property so it's one of the "topic-level-property" so it gets 
automatically passed to kafka. This is convenient but wouldn't apply to other 
systems, which could be useful if those other systems have a delete retention 
policy.

- Jake Maes


On Oct. 4, 2016, 11:33 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 4, 2016, 11:33 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-04 Thread Shanthoosh Venkataraman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/
---

(Updated Oct. 4, 2016, 11:33 p.m.)


Review request for samza.


Repository: samza


Description
---

Every local task store is backed up by a kafka changelog topic. Due to log 
compaction, delete tombstones of the changelog topic have a ttl of 
delete.retention.ms. Replaying the events from the changelog that has missing 
delete tombstones, would result in creation of an inconsistent local store(due 
to the missing of some delete events). This patch deletes the local stores in 
which difference between current time and last modified time of the offset file 
is greater than delete.retention.ms during the container startup.


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
9329edf7d724f3a0d9235354bb77936f713e3b5f 
  samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
be3f1068f7921c9c8f8697577efea6580672813e 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
05a996c98075ea8ed3767af666b9beeb1933f2a6 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
0b7bcdda1639eea8239a69c31bdf42558e9077d2 
  
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 
4d40f520e54beb643acd8410c772b75e2f6a9162 

Diff: https://reviews.apache.org/r/52476/diff/


Testing
---

Unit testing and manual testing has been done to verify the functionality.


Thanks,

Shanthoosh Venkataraman



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-04 Thread Shanthoosh Venkataraman


> On Oct. 4, 2016, 1:57 a.m., Jake Maes wrote:
> > samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala,
> >  line 293
> > 
> >
> > 2 nits:
> > 1. Can you swap this test with the next one in terms of position? The 
> > tests above and below this one are related, so this one breaks them up, 
> > which just adds cognitive load for the reader.
> > 2. I'm all for descriptive names, but this is almost un-tweet-able. :-) 
> > Could it be shortened to: 
> > testStoreDeletedWhenOffsetFileOlderThanDeleteRetention()

Fixed.


- Shanthoosh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151273
---


On Oct. 3, 2016, 5 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 3, 2016, 5 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>



Re: Review Request 52476: Do not load task store which are older than delete tombstones.

2016-10-03 Thread Jake Maes

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/52476/#review151273
---



Looks good. Just a couple things below.


samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 32)


Typo: deletion.retention.ms is not a valid property. 

http://kafka.apache.org/documentation.html#configuration



samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala 
(line 293)


2 nits:
1. Can you swap this test with the next one in terms of position? The tests 
above and below this one are related, so this one breaks them up, which just 
adds cognitive load for the reader.
2. I'm all for descriptive names, but this is almost un-tweet-able. :-) 
Could it be shortened to: 
testStoreDeletedWhenOffsetFileOlderThanDeleteRetention()


- Jake Maes


On Oct. 3, 2016, 5 p.m., Shanthoosh Venkataraman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/52476/
> ---
> 
> (Updated Oct. 3, 2016, 5 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> Every local task store is backed up by a kafka changelog topic. Due to log 
> compaction, delete tombstones of the changelog topic have a ttl of 
> delete.retention.ms. Replaying the events from the changelog that has missing 
> delete tombstones, would result in creation of an inconsistent local 
> store(due to the missing of some delete events). This patch deletes the local 
> stores in which difference between current time and last modified time of the 
> offset file is greater than delete.retention.ms during the container startup.
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
> 9329edf7d724f3a0d9235354bb77936f713e3b5f 
>   samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
> be3f1068f7921c9c8f8697577efea6580672813e 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 05a996c98075ea8ed3767af666b9beeb1933f2a6 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 0b7bcdda1639eea8239a69c31bdf42558e9077d2 
>   
> samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
>  4d40f520e54beb643acd8410c772b75e2f6a9162 
> 
> Diff: https://reviews.apache.org/r/52476/diff/
> 
> 
> Testing
> ---
> 
> Unit testing and manual testing has been done to verify the functionality.
> 
> 
> Thanks,
> 
> Shanthoosh Venkataraman
> 
>