Hmmm, I see. You could output the delta using flatMapGroupsWithState <https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroupsWithState-org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction-org.apache.spark.sql.streaming.OutputMode-org.apache.spark.sql.Encoder-org.apache.spark.sql.Encoder-org.apache.spark.sql.streaming.GroupStateTimeout-> probably.
On Thu, Oct 26, 2017 at 10:11 PM, Piyush Mukati <piyush.muk...@gmail.com> wrote: > Thanks, Michael > I have explored Aggregator > <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator> > with > update mode. The problem is it will give the overall aggregated value for > the changed. while I only want the delta change in the group as the > aggregation we are doing at sink level too. > > Below is the plan generated with count Aggregator. > > *HashAggregate > StateStoreSave > *HashAggregate, > StateStoreRestore > *HashAggregate, > Exchange > *HashAggregate, > *Project > StreamingRelation > > we are looking for some aggregation which will avoid state > store interactions. > > Also anyone aware of any design doc or some example about how we can add > new operation on dataSet and corresponding physical plan. > > > > On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> - dev >> >> I think you should be able to write an Aggregator >> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>. >> You probably want to run in update mode if you are looking for it to output >> any group that has changed in the batch. >> >> On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com> >> wrote: >> >>> Hi, >>> we are migrating some jobs from Dstream to Structured Stream. >>> >>> Currently to handle aggregations we call map and reducebyKey on each RDD >>> like >>> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b)) >>> >>> The final output of each RDD is merged to the sink with support for >>> aggregation at the sink( Like co-processor at HBase ). >>> >>> In the new DataSet API, I am not finding any suitable API to aggregate >>> over the micro-batch. >>> Most of the aggregation API uses state-store and provide global >>> aggregations. ( with append mode it does not give the change in existing >>> buckets ) >>> Problems we are suspecting are : >>> 1) state-store is tightly linked to the job definitions. while in our >>> case we want may edit the job while keeping the older calculated aggregate >>> as it is. >>> >>> The desired result can be achieved with below dataset APIs. >>> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => >>> merge(valueItr)) >>> while on observing the physical plan it does not call any merge before >>> sort. >>> >>> Anyone aware of API or other workarounds to get the desired result? >>> >> >> >