Hi Amit- *Spring Data Geode* does not offer any additional help for streaming Function results OOTB, particularly since, as Udo says...
"*The one caveat here is that you have to deal with failure and possible duplicates when the function is marked as HA and it might retry/restart upon detection of failure.*" Typically, handling these type of concerns appropriately varies greatly from UC to UC, given different application requirements and SLAs. However, as Udo mentioned, and as the Javadoc <http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html> [1] for o.a.g.cache.execute.ResultCollector states, specifically this... "*Results arrive as they are sent using the ResultSender.sendResult(Object) <http://gemfire-91-javadocs.docs.pivotal.io/org/apache/geode/cache/execute/ResultSender.html#sendResult-T-> and can be used as they arrive.*" ... it is possible to implement a "custom", stream handling, o.a.g.cache.execute.ResultCollector. For example, see here <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java> [2]. In fact, I have written a few tests (e.g. streamingFunctionWithStreamingResultCollector [3]) to illustrate *Function* result streaming and basic stream handling using SDG with a "custom" o.a.g.cache.execute.ResultCollector <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L235-L257> [4]. The *Function* is implemented as a simple POJO method <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L294-L318> [5] using SDG's @GemfireFunction annotation and Geode's o.a.g.cache.execute.ResultSender. There are few nuances with Geode's API that users should be aware of... 1. First of all, and most importantly, all o.a.g.cache.execute.Execution.execute(..) methods [6] block! This is unfortunate. I see no obvious reason why execute(..) methods need to block, particularly since o.a.g.cache.execute.ResultCollector implementations could be made to block if/when necessary. Minimally, it would have been convenient if the Geode Execution API [6] provided 1 or more executeAsync(..) methods. 2. It is actually not apparent from the ResultCollector API that a developer need implement addResult(..) in order to process intermediate results as they arrive. addResult(..) seems more like an internal API Geode uses to "collect" results on the receiver's end, especially since the DistributedMember parameter does not technically serve much purpose. As such, a developer might expect that s/he can call Execution.execute(..) followed immediately by using the ResultCollector.getResult() method to process results until the result set (or stream) is exhausted (e.g. Iterable/Iterator style). After all, the Javadoc for the ResultCollector.getResult() method states... "*It returns the result of function execution, potentially blocking until all the results are available <http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html#endResults--> has been called.*" "*Potentially*" is not absolute, yet, it is (!) since Execution.execute(..) blocks without a developer wrapping such a call with a threaded java.util.concurrent.Executor. The only real indication that ResultCollector.getResult(..) is only reachable upon completion is... "*returns the result of function execution*", which implies that the ResultCollector.getResult() method will not return until the *Function* is complete, which is true, and also is not until the Execution.execute(..) method returns, providing the same Thread calls Execution.execute(..) along with ResultCollector.getResult(). I would also add that a quick review of *Geode's User Guide* on *Function Execution <http://geode.apache.org/docs/guide/12/developing/function_exec/chapter_overview.html>* [10] is less than clear on this matter as well. This is certainly an area SDG can provide added value given the core *Spring Framework's* Async features and Reactive support; I will consider this; See DATAGEODE-36 <https://jira.spring.io/browse/DATAGEODE-36> - "*Add Async, Reactive and Streaming support to the Function Annotation support.* <https://jira.spring.io/browse/DATAGEODE-36>" [7] 3. Finally, you are correct in that Geode's o.a.g.internal.cache.execute.DefaultResultCollector [8] is a rather naive implementation, even for small result sets actually. It is especially not suited for streaming. By way of example, I offer up only a slightly better implementation here <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L183-L233> [9]. Of course, it is subject to the same constraints described in #1 & #2 above, unless you are multi-threading the execution and result handling. Note: [9] is Thread-safe. Anyway, hope this helps and gives you some guidance for your own implementation, which I am certain will be quite a bit more involved. Regards, John [1] *http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html <http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html>* [2] https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java [3] https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L113-L130 [4] https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L235-L257 [5] https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L294-L318 [6] http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/Execution.html [7] https://jira.spring.io/browse/DATAGEODE-36 [8] https://github.com/apache/geode/blob/rel/v1.2.0/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java [9] https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L183-L233 [10] http://geode.apache.org/docs/guide/12/developing/function_exec/chapter_overview.html On Mon, Aug 14, 2017 at 9:14 AM, Amit Pandey <[email protected]> wrote: > Hey Udo, > > I can do that. However I was using @GemfireFunction and didn't understand > how I can do it. Anyways I understand how to do it from the server. > Is there any example code for the Streaming result collector? > > John, > > Does Spring Data Geode have any helpers for this ? > > Regards > > On Mon, Aug 14, 2017 at 9:40 PM, Udo Kohlmeyer <[email protected]> wrote: > >> Hi there Amit. >> >> Have you looked at the ResultSender.sendResult() method on the function? >> You can use sendResult() as often as you like to send chunks of 1000 >> results. You just have to ensure that you "close" the resultSender by >> calling sendLast(). >> >> As for the streaming result collector... Geode does not have a streaming >> interface, but you can implement a custom result collector. In this custom >> result collector you can embed your processing of chunks in the >> "addResult". This way you can process data as soon as the collector >> receives it. >> >> The one caveat here is that you have to deal with failure and possible >> duplicates when the function is marked as HA and it might retry/restart >> upon detection of failure. >> >> --Udo >> >> On 8/14/17 00:14, Amit Pandey wrote: >> >> Also in Spring Data Geode is it possible to send data as soon as I have a >> chunk of say 1000/ I know I can specify batch size but I don't see how I >> can do it like streaming >> >> On Sun, Aug 13, 2017 at 3:08 PM, Amit Pandey <[email protected]> >> wrote: >> >>> Hi All, >>> >>> I have a function which can potentially return a very large data sets. >>> >>> I want to stream data via the functions. Now the default result >>> collector of Geode collects all the data in one large chunk, This might >>> result in very slow operation times. How can I use a streaming result >>> collector? Is there any example of it given? >>> >>> I am using spring-data-geode so if there is something available there >>> that will be great too. >>> >>> Regards >>> >> >> >> > -- -John john.blum10101 (skype)
