Re: Update all values in RocksDB

2016-06-06 Thread David Yu
Hi, Yi,

Yes, the sessions are keyed by the sessionId.

In our case, iterating through all OPEN sessions is inevitable, since that
is precisely where we evaluate (base on timestamp) and close sessions. In
other words, the closed session queue you suggested cannot be constructed
without going through all the sessions periodically.

Can you explain (on a higher level) why iteration through the entries can
be a slow process?

Thanks,
David

On Mon, Jun 6, 2016 at 2:34 PM, Yi Pan  wrote:

> Hi, David,
>
> I would recommend to keep a separate table of closed sessions as a "queue",
> ordered by the time the session is closed. And in your window method, just
> create an iterator in the "queue" and only make progress toward the end of
> the "queue", and do a point deletion in the sessionStore, which I assume
> that would be keyed by the sessionId.
>
> The reason for that is:
> 1) RocksDB is a KV-store and it is super efficient in read/write by key,
> not by iterator
> 2) If you have to use iterator, making sure that the iterator only goes
> toward the "tail" where all meaningful work items will be is important to
> achieve fast and efficient operation. Please refer to this blog from
> RocksDB team:
>
> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
>
> -Yi
>
> On Mon, Jun 6, 2016 at 2:25 PM, David Yu  wrote:
>
> > We use Samza RocksDB to keep track of our user event sessions. The task
> > periodically calls window() to update all sessions in the store and purge
> > all closed sessions.
> >
> > We do all of this in the same iterator loop.
> >
> > Here's how we are doing it:
> >
> >
> > public void window(MessageCollector collector, TaskCoordinator
> coordinator)
> > throws Exception {
> >
> > KeyValueIterator it = sessionStore.all();
> >
> > while (it.hasNext()) {
> >
> > Entry entry = it.next();
> > Session session = entry.getValue();
> >
> > update(session);
> >
> > if (session.getStatus() == Status.CLOSED) {
> > sessionStore.delete(entry.getKey());
> > } else {
> > sessionStore.put(entry.getKey(), session);
> > }
> > }
> > }
> >
> >
> > The question is: is this the correct/efficient way to do a read+update
> for
> > RocksDB?
> >
> > Thanks,
> > David
> >
>


Re: Update all values in RocksDB

2016-06-06 Thread Yi Pan
Hi, David,

I would recommend to keep a separate table of closed sessions as a "queue",
ordered by the time the session is closed. And in your window method, just
create an iterator in the "queue" and only make progress toward the end of
the "queue", and do a point deletion in the sessionStore, which I assume
that would be keyed by the sessionId.

The reason for that is:
1) RocksDB is a KV-store and it is super efficient in read/write by key,
not by iterator
2) If you have to use iterator, making sure that the iterator only goes
toward the "tail" where all meaningful work items will be is important to
achieve fast and efficient operation. Please refer to this blog from
RocksDB team:
https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB

-Yi

On Mon, Jun 6, 2016 at 2:25 PM, David Yu  wrote:

> We use Samza RocksDB to keep track of our user event sessions. The task
> periodically calls window() to update all sessions in the store and purge
> all closed sessions.
>
> We do all of this in the same iterator loop.
>
> Here's how we are doing it:
>
>
> public void window(MessageCollector collector, TaskCoordinator coordinator)
> throws Exception {
>
> KeyValueIterator it = sessionStore.all();
>
> while (it.hasNext()) {
>
> Entry entry = it.next();
> Session session = entry.getValue();
>
> update(session);
>
> if (session.getStatus() == Status.CLOSED) {
> sessionStore.delete(entry.getKey());
> } else {
> sessionStore.put(entry.getKey(), session);
> }
> }
> }
>
>
> The question is: is this the correct/efficient way to do a read+update for
> RocksDB?
>
> Thanks,
> David
>


Re: Review Request 48213: SAMZA-960: Make system producer thread safe

2016-06-06 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48213/#review136338
---


Fix it, then Ship it!





samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala 
(line 86)


I think you can drop this synchronized call. Otherwise the one inside the 
function is redundant.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (lines 64 - 65)


Why not use null to indicate the created state? In particular, why don't 
you null out the producer when it is no longer needed. Leaving a ref holds in 
memory all of the objects reachable from the producer whether they are needed 
or not. Hopefully close on the producer covers that, but the null approach is 
safer with no apparent downside.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 (line 97)


I think I see why you were trying to not null out the producer above. 
However, this can still NPE if start has not been called. Instead, now that 
producer is volatile, why not grab its state at the beginning of this function. 
If it is null throw an error, otherwise you can use it (assuming that producer 
handles close correctly). Another benefit is that you reduce the number of 
volatile reads you need to make.


- Chris Pettitt


On June 3, 2016, 10:09 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48213/
> ---
> 
> (Updated June 3, 2016, 10:09 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the system producers need to be thread safe in order to be used in 
> multithreaded tasks. The following are the changes 
> (ElasticSearchSystemProducer is already thread safe so no change made there):
> 
> In KafkaSystemProducer, remove the buggy retry logic and treat any exception 
> as fatal.
> In HdfsSystemProducer, add synchronization lock to all public methods.
> 
> 
> Diffs
> -
> 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
>  1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  3769e103616dc0f1fd869706cc086e24cd926c48 
>   
> samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java
>  04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  8e32bba6ced090f0fc8d4e5176fe0788df36981d 
> 
> Diff: https://reviews.apache.org/r/48213/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>



Re: Review Request 48182: SAMZA-958: Make store/cache thread safe

2016-06-06 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48182/#review136335
---


Fix it, then Ship it!





samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(line 412)


This might be sufficient, but I've seen some excessively long GC pauses and 
the like on Hudson. Something like 10s has worked well for me in the past and 
the join should take nowhere near that long in most cases.



samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(lines 490 - 491)


I would name these a little differently to improve readability. Something 
like runner3StartedLatch and runner1FinishedLatch; or startRunner2 and 
startRunner1 latch; or similar.



samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala 
(line 527)


How about actually capturing the test failure and rethrowing from the main 
thread? It will give you a much more helpful error message.


- Chris Pettitt


On June 3, 2016, 9:30 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48182/
> ---
> 
> (Updated June 3, 2016, 9:30 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> All the existing stores/cache need to be thread safe in order to be used by 
> multithreaded tasks. The following changes are made to ensure the thread 
> safety of the stores:
> 
> For CachedStore, use sychronized lock for each public function;
> For current InMemoryKeyValueStore, use ConcurrentSkipListMap as underlying 
> map for thread safety.
> For store Iterator, do not support remove functionality (throw 
> UnsupportedOperationException like RocksDb does today).
> 
> 
> Diffs
> -
> 
>   
> samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
>  72f25a354eaa98e8df379d07d9cc8613dfafd13a 
>   
> samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
>  f0965aec5f3ec2a214dc40c70832c58273623749 
>   
> samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
>  b7f1cdc4dbaeea2413cee2ad60d74528f3950513 
>   samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
> c28f8db8cb59bd5415e78535877acc1e5bee0f67 
>   samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala 
> eee74473726cb2a36d0b75fe5c9df737440980bc 
>   
> samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
>  23f8a1a6bee8ef38e0640a4e90778e53d982deeb 
> 
> Diff: https://reviews.apache.org/r/48182/diff/
> 
> 
> Testing
> ---
> 
> Unit tests and local deployment.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>