Re: TP4 Processors now support both push- and pull-based semantics.

2019-04-26 Thread bryncooke



On 2019/04/24 12:19:54, Marko Rodriguez  wrote: 
> Hello,
> 
> > I think it would be better to either expose Flowable on the API (or Flow if 
> > you don't want to be tied in to RxJava)
> 
> We definitely don’t want to expose anything “provider specific.” Especially 
> at the Processor interface level. I note your Flow API reference in 
> java.concurrent and have noticed that RxJava mimics many java.concurrent 
> classes (Subscriber, Subscription, etc.). I will dig deeper.
> 
> > 1. Using Consumer will break the Rx chain. This is undesirable as it will 
> > prevent backpressure and cancellation from working properly.
> 
> Understood about breaking the chain.
> 
> > 2. The Scheduler to run the traversal on can be set. For instance, in the 
> > case where only certain threads are allowed to perform IO once the user has 
> > the Flowable they can call subscribeOn before subscribe.
> > 3. Backpressure strategy can be set, such as dropping results on buffer 
> > overflow.
> > 4. Buffer size can be set.
> 
> Hm. Here are my thoughts on the matter.
> 
> RxJava is just one of many Processors that will interact with TP4. If we 
> start exposing backpressure strategies, buffer sizes, etc. at the Processor 
> API level, then we expect other providers to have those concepts. Does Spark 
> support backpressure? Does Hadoop? Does Pipes? ...
> 
> I believe such provider-specific parameterization should happen via 
> language-agnostic configuration. For instance:
> 
> g = g.withProcessor(RxJavaProcessor.class, Map.of(“rxjava.backpressure”, 
> “drop”, “rxjava.bufferSize”, 2000))
> g.V().out().blah()
> 
> Unlike TP3, TP4 users will never interact with our Java API. They will never 
> have a reference to a Processor instance. They only talk to the TP4 VM via 
> Bytecode. However, with that said, systems that will integrate the TP4 VM 
> (e.g. database vendors, data server systems, etc.) will have to handle 
> Processor traverser results in some way (i.e. within Java). Thus, if they are 
> a Reactive architecture, then they will want to be able to Flow, but we need 
> to make sure that java.concurrent Flow semantics doesn't go too far in 
> demanding “unreasonable” behaviors from other Processor implementations. (I 
> need to study the java.concurrent Flow API)
> 
> Thus, I see it like this:
> 
>   1. RxJava specific configuration is not available at the Process API 
> level (only via configuration).
>   2. Drop Consumer and expose java.concurrent Flow in Processor so the 
> chain isn’t broken for systems integrating the TP4 VM.
>   - predicated on java.concurrent Flow having reasonable 
> expectations of non-reactive sources (i.e. processors).
> 
> Does this make sense to you?
> 
> ———
> 
> Stephen said you made a comment regarding ParallelRxJava as not being 
> necessary. If this is a true statement, can you explain your thoughts on 
> ParallelRxJava. My assumptions regarding serial vs. parallel:
> 
>   1. For TP4 VM vendors in a highly concurrent, multi-user environment, 
> multi-threading individual queries is bad.
>   2. For TP4 VM vendors in a lowly concurrent, limited-user environment, 
> multi-threading a single query is good.
>   - also related to the workload — e.g. ParallelRxJava for an AI 
> system where one query at a time is happening over lots of data.
> 
> Thank you for your feedback,
> Marko.
> 
> http://rredux.com 
> 
> 
> 
> 
> > On Apr 24, 2019, at 3:41 AM, brynco...@gmail.com wrote:
> > 
> > 
> > 
> > On 2019/04/23 13:07:09, Marko Rodriguez  > > wrote: 
> >> Hi,
> >> 
> >> Stephen and Bryn were looking over my RxJava implementation the other day 
> >> and Bryn, with his British accent, was like [I paraphrase]:
> >> 
> >>“Whoa dawg! Bro should like totally not be blocking to fill an 
> >> iterator. Gnar gnar for surezies.”
> >> 
> >> Prior to now, Processor implemented Iterator, where for RxJava, 
> >> when you do next()/hasNext() if there were no results in the queue and the 
> >> flowable was still running, then the iterator while()-blocks waiting for a 
> >> result or for the flowable to terminate.
> >> 
> >> This morning I decided to redo the Processor interface (and respective 
> >> implementations) and it is much nicer now. We have two “execute” methods:
> >> 
> >> IteratorProcessor.iterator(Iterator starts)
> >> void Processor.subscribe(Iterator starts, Consumer 
> >> consumer)
> >> 
> >> A processor can only be executed using one of the methods above. Thus, 
> >> depending on context and the underlying processor, the VM determines 
> >> whether to use pull-based or push-based semantics. Pretty neat, eh?
> >> 
> >>
> >> https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
> >>  
> >> 

Re: TP4 Processors now support both push- and pull-based semantics.

2019-04-24 Thread Marko Rodriguez
Hello,

> I think it would be better to either expose Flowable on the API (or Flow if 
> you don't want to be tied in to RxJava)

We definitely don’t want to expose anything “provider specific.” Especially at 
the Processor interface level. I note your Flow API reference in 
java.concurrent and have noticed that RxJava mimics many java.concurrent 
classes (Subscriber, Subscription, etc.). I will dig deeper.

> 1. Using Consumer will break the Rx chain. This is undesirable as it will 
> prevent backpressure and cancellation from working properly.

Understood about breaking the chain.

> 2. The Scheduler to run the traversal on can be set. For instance, in the 
> case where only certain threads are allowed to perform IO once the user has 
> the Flowable they can call subscribeOn before subscribe.
> 3. Backpressure strategy can be set, such as dropping results on buffer 
> overflow.
> 4. Buffer size can be set.

Hm. Here are my thoughts on the matter.

RxJava is just one of many Processors that will interact with TP4. If we start 
exposing backpressure strategies, buffer sizes, etc. at the Processor API 
level, then we expect other providers to have those concepts. Does Spark 
support backpressure? Does Hadoop? Does Pipes? ...

I believe such provider-specific parameterization should happen via 
language-agnostic configuration. For instance:

g = g.withProcessor(RxJavaProcessor.class, Map.of(“rxjava.backpressure”, 
“drop”, “rxjava.bufferSize”, 2000))
g.V().out().blah()

Unlike TP3, TP4 users will never interact with our Java API. They will never 
have a reference to a Processor instance. They only talk to the TP4 VM via 
Bytecode. However, with that said, systems that will integrate the TP4 VM (e.g. 
database vendors, data server systems, etc.) will have to handle Processor 
traverser results in some way (i.e. within Java). Thus, if they are a Reactive 
architecture, then they will want to be able to Flow, but we need to make sure 
that java.concurrent Flow semantics doesn't go too far in demanding 
“unreasonable” behaviors from other Processor implementations. (I need to study 
the java.concurrent Flow API)

Thus, I see it like this:

1. RxJava specific configuration is not available at the Process API 
level (only via configuration).
2. Drop Consumer and expose java.concurrent Flow in Processor so the 
chain isn’t broken for systems integrating the TP4 VM.
- predicated on java.concurrent Flow having reasonable 
expectations of non-reactive sources (i.e. processors).

Does this make sense to you?

———

Stephen said you made a comment regarding ParallelRxJava as not being 
necessary. If this is a true statement, can you explain your thoughts on 
ParallelRxJava. My assumptions regarding serial vs. parallel:

1. For TP4 VM vendors in a highly concurrent, multi-user environment, 
multi-threading individual queries is bad.
2. For TP4 VM vendors in a lowly concurrent, limited-user environment, 
multi-threading a single query is good.
- also related to the workload — e.g. ParallelRxJava for an AI 
system where one query at a time is happening over lots of data.

Thank you for your feedback,
Marko.

http://rredux.com 




> On Apr 24, 2019, at 3:41 AM, brynco...@gmail.com wrote:
> 
> 
> 
> On 2019/04/23 13:07:09, Marko Rodriguez  > wrote: 
>> Hi,
>> 
>> Stephen and Bryn were looking over my RxJava implementation the other day 
>> and Bryn, with his British accent, was like [I paraphrase]:
>> 
>>  “Whoa dawg! Bro should like totally not be blocking to fill an 
>> iterator. Gnar gnar for surezies.”
>> 
>> Prior to now, Processor implemented Iterator, where for RxJava, 
>> when you do next()/hasNext() if there were no results in the queue and the 
>> flowable was still running, then the iterator while()-blocks waiting for a 
>> result or for the flowable to terminate.
>> 
>> This morning I decided to redo the Processor interface (and respective 
>> implementations) and it is much nicer now. We have two “execute” methods:
>> 
>> Iterator  Processor.iterator(Iterator starts)
>> void Processor.subscribe(Iterator starts, Consumer 
>> consumer)
>> 
>> A processor can only be executed using one of the methods above. Thus, 
>> depending on context and the underlying processor, the VM determines whether 
>> to use pull-based or push-based semantics. Pretty neat, eh?
>> 
>>  
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
>>  
>> 
>>  
>> >  
>> 

Re: TP4 Processors now support both push- and pull-based semantics.

2019-04-24 Thread bryncooke



On 2019/04/23 13:07:09, Marko Rodriguez  wrote: 
> Hi,
> 
> Stephen and Bryn were looking over my RxJava implementation the other day and 
> Bryn, with his British accent, was like [I paraphrase]:
> 
>   “Whoa dawg! Bro should like totally not be blocking to fill an 
> iterator. Gnar gnar for surezies.”
> 
> Prior to now, Processor implemented Iterator, where for RxJava, 
> when you do next()/hasNext() if there were no results in the queue and the 
> flowable was still running, then the iterator while()-blocks waiting for a 
> result or for the flowable to terminate.
> 
> This morning I decided to redo the Processor interface (and respective 
> implementations) and it is much nicer now. We have two “execute” methods:
> 
> Iterator   Processor.iterator(Iterator starts)
> void Processor.subscribe(Iterator starts, Consumer 
> consumer)
> 
> A processor can only be executed using one of the methods above. Thus, 
> depending on context and the underlying processor, the VM determines whether 
> to use pull-based or push-based semantics. Pretty neat, eh?
> 
>   
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
>  
> 
> 
> Check out how I do Pipes:
> 
>   
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126
>  
> 
> 
> Pipes is inherently pull-based. However, to simulate push-based semantics, I 
> Thread().start() the iterator.hasNext()/next() and just consume.accept() the 
> results. Thus, as desired, subscribe() returns immediately.
> 
> Next, here is my RxJava implementation.
> 
>   
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65
>  
> 
>   
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86
>  
> 
> 
> You can see how I turn a push-based subscription into a pull-based iteration 
> using the good ‘ol while()-block :).
> 
>   
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102
>  
> 
> 
> ——
> 
> What I need to do next is to redo the RxJava execution planner such that 
> nested traversals (e.g. map(out( are subscription-based with the parent 
> flowable. I don’t quite know how I will do it — but I believe I will have to 
> write custom Publisher/Subscriber objects for use with Flowable.compose() 
> such that onNext() and onComplete() will be called accordingly within the 
> consumer.accept(). It will be tricky as I’m not too good with low-level 
> RxJava, but thems the breaks.
> 
> Please note that my push-based conceptual skills are not the sharpest so if 
> anyone has any recommendations, please advise.
> 
> Take care,
> Marko.
> 
> http://rredux.com 
> 
> 
> 
> 
> 

Hi Marko,

I think it would be better to either expose Flowable on the API (or Flow if you 
don't want to be tied in to RxJava)

void Processor.subscribe(Iterator starts, Consumer 
consumer)
Changes to:
Flowable Processor.flowable(Iterator starts)

There are a few of reasons to do this:
1. Using Consumer will break the Rx chain. This is undesirable as it will 
prevent backpressure and cancellation from working properly.
2. The Scheduler to run the traversal on can be set. For instance, in the case 
where only certain threads are allowed to perform IO once the user has the 
Flowable they can call subscribeOn before subscribe.
3. Backpressure strategy can be set, such as dropping results on buffer 
overflow.
4. Buffer size can be set.

Hope this helps,

Bryn


TP4 Processors now support both push- and pull-based semantics.

2019-04-23 Thread Marko Rodriguez
Hi,

Stephen and Bryn were looking over my RxJava implementation the other day and 
Bryn, with his British accent, was like [I paraphrase]:

“Whoa dawg! Bro should like totally not be blocking to fill an 
iterator. Gnar gnar for surezies.”

Prior to now, Processor implemented Iterator, where for RxJava, when 
you do next()/hasNext() if there were no results in the queue and the flowable 
was still running, then the iterator while()-blocks waiting for a result or for 
the flowable to terminate.

This morning I decided to redo the Processor interface (and respective 
implementations) and it is much nicer now. We have two “execute” methods:

Iterator Processor.iterator(Iterator starts)
void Processor.subscribe(Iterator starts, Consumer 
consumer)

A processor can only be executed using one of the methods above. Thus, 
depending on context and the underlying processor, the VM determines whether to 
use pull-based or push-based semantics. Pretty neat, eh?


https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
 


Check out how I do Pipes:


https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126
 


Pipes is inherently pull-based. However, to simulate push-based semantics, I 
Thread().start() the iterator.hasNext()/next() and just consume.accept() the 
results. Thus, as desired, subscribe() returns immediately.

Next, here is my RxJava implementation.


https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65
 


https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86
 


You can see how I turn a push-based subscription into a pull-based iteration 
using the good ‘ol while()-block :).


https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102
 


——

What I need to do next is to redo the RxJava execution planner such that nested 
traversals (e.g. map(out( are subscription-based with the parent flowable. 
I don’t quite know how I will do it — but I believe I will have to write custom 
Publisher/Subscriber objects for use with Flowable.compose() such that onNext() 
and onComplete() will be called accordingly within the consumer.accept(). It 
will be tricky as I’m not too good with low-level RxJava, but thems the breaks.

Please note that my push-based conceptual skills are not the sharpest so if 
anyone has any recommendations, please advise.

Take care,
Marko.

http://rredux.com