Hi Amit-

*Spring Data Geode* does not offer any additional help for streaming
Function results OOTB, particularly since, as Udo says...

"*The one caveat here is that you have to deal with failure and possible
duplicates when the function is marked as HA and it might retry/restart
upon detection of failure.*"

Typically, handling these type of concerns appropriately varies greatly
from UC to UC, given different application requirements and SLAs.

However, as Udo mentioned, and as the Javadoc
<http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html>
[1] for o.a.g.cache.execute.ResultCollector states, specifically this...

"*Results arrive as they are sent using the ResultSender.sendResult(Object)
<http://gemfire-91-javadocs.docs.pivotal.io/org/apache/geode/cache/execute/ResultSender.html#sendResult-T->
and
can be used as they arrive.*"

... it is possible to implement a "custom", stream handling,
o.a.g.cache.execute.ResultCollector.

For example, see here
<https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java>
 [2].

In fact, I have written a few tests (e.g.
streamingFunctionWithStreamingResultCollector [3]) to illustrate *Function*
result streaming and basic stream handling using SDG with a "custom"
o.a.g.cache.execute.ResultCollector
<https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L235-L257>
[4].
The *Function* is implemented as a simple POJO method
<https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L294-L318>
[5]
using SDG's @GemfireFunction annotation and Geode's
o.a.g.cache.execute.ResultSender.


There are few nuances with Geode's API that users should be aware of...


1. First of all, and most importantly, all
o.a.g.cache.execute.Execution.execute(..) methods [6] block!

This is unfortunate.  I see no obvious reason why execute(..) methods need
to block, particularly since o.a.g.cache.execute.ResultCollector
implementations could be made to block if/when necessary.

Minimally, it would have been convenient if the Geode Execution API [6]
provided 1 or more executeAsync(..) methods.


2. It is actually not apparent from the ResultCollector API that a developer
need implement addResult(..) in order to process intermediate results as
they arrive.

addResult(..) seems more like an internal API Geode uses to "collect"
results on the receiver's end, especially since the DistributedMember
parameter does not technically serve much purpose.

As such, a developer might expect that s/he can call
Execution.execute(..) followed
immediately by using the ResultCollector.getResult() method to process
results until the result set (or stream) is exhausted (e.g.
Iterable/Iterator style).  After all, the Javadoc for the
ResultCollector.getResult() method states...

"*It returns the result of function execution, potentially blocking
until all the results are available
<http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html#endResults-->
has
been called.*"

"*Potentially*" is not absolute, yet, it is (!) since Execution.execute(..)
blocks without a developer wrapping such a call with a threaded
java.util.concurrent.Executor.  The only real indication that
ResultCollector.getResult(..) is only reachable upon completion is... "*returns
the result of function execution*", which implies that the
ResultCollector.getResult() method will not return until the *Function* is
complete, which is true, and also is not until the Execution.execute(..)
method returns, providing the same Thread calls Execution.execute(..) along
with ResultCollector.getResult().

I would also add that a quick review of *Geode's User Guide* on *Function
Execution
<http://geode.apache.org/docs/guide/12/developing/function_exec/chapter_overview.html>*
[10]
is less than clear on this matter as well.

This is certainly an area SDG can provide added value given the core *Spring
Framework's* Async features and Reactive support; I will consider this; See
DATAGEODE-36 <https://jira.spring.io/browse/DATAGEODE-36> - "*Add Async,
Reactive and Streaming support to the Function Annotation support.*
<https://jira.spring.io/browse/DATAGEODE-36>" [7]


3. Finally, you are correct in that Geode's
o.a.g.internal.cache.execute.DefaultResultCollector [8] is a rather naive
implementation, even for small result sets actually.  It is especially not
suited for streaming.

By way of example, I offer up only a slightly better implementation here
<https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L183-L233>
[9].
Of course, it is subject to the same constraints described in #1 & #2
above, unless you are multi-threading the execution and result handling.
Note: [9] is Thread-safe.


Anyway, hope this helps and gives you some guidance for your own
implementation, which I am certain will be quite a bit more involved.

Regards,
John

[1] 
*http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html
<http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html>*
[2]
https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java
[3]
https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L113-L130
[4]
https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L235-L257
[5]
https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L294-L318
[6]
http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/Execution.html
[7] https://jira.spring.io/browse/DATAGEODE-36
[8]
https://github.com/apache/geode/blob/rel/v1.2.0/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java
[9]
https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L183-L233
[10]
http://geode.apache.org/docs/guide/12/developing/function_exec/chapter_overview.html


On Mon, Aug 14, 2017 at 9:14 AM, Amit Pandey <[email protected]>
wrote:

> Hey Udo,
>
> I can do that. However I was using @GemfireFunction and didn't understand
> how I can do it. Anyways I understand how to do it from the server.
> Is there any example code for the Streaming result collector?
>
> John,
>
> Does Spring Data Geode have any helpers for this ?
>
> Regards
>
> On Mon, Aug 14, 2017 at 9:40 PM, Udo Kohlmeyer <[email protected]> wrote:
>
>> Hi there Amit.
>>
>> Have you looked at the ResultSender.sendResult() method on the function?
>> You can use sendResult() as often as you like to send chunks of 1000
>> results. You just have to ensure that you "close" the resultSender by
>> calling sendLast().
>>
>> As for the streaming result collector... Geode does not have a streaming
>> interface, but you can implement a custom result collector. In this custom
>> result collector you can embed your processing of chunks in the
>> "addResult". This way you can process data as soon as the collector
>> receives it.
>>
>> The one caveat here is that you have to deal with failure and possible
>> duplicates when the function is marked as HA and it might retry/restart
>> upon detection of failure.
>>
>> --Udo
>>
>> On 8/14/17 00:14, Amit Pandey wrote:
>>
>> Also in Spring Data Geode is it possible to send data as soon as I have a
>> chunk of say 1000/ I know I can specify batch size but I don't see how I
>> can do it like streaming
>>
>> On Sun, Aug 13, 2017 at 3:08 PM, Amit Pandey <[email protected]>
>> wrote:
>>
>>> Hi All,
>>>
>>> I have a function which can potentially return a very large data sets.
>>>
>>> I want to stream data via the functions. Now the default result
>>> collector of  Geode collects all the data in one large chunk, This might
>>> result in very slow operation times. How can I use a streaming result
>>> collector? Is there any example of it given?
>>>
>>> I am using spring-data-geode so if there is something available there
>>> that will be great too.
>>>
>>> Regards
>>>
>>
>>
>>
>


-- 
-John
john.blum10101 (skype)

Reply via email to