I think that should work with an aggregate() instead of reduce().

Best,
Aljoscha

On 24.07.20 17:02, Flavio Pompermaier wrote:
In my reduce function I want to compute some aggregation on the sub-results
of a map-partition (that I tried to migrate from DataSet to DataStream
without success).
The original code was something like:

  input.mapPartition(new RowToStringSketches(sketchMapSize)) //
         .reduce(new SketchesStringReducer()) //
         .map(new SketchesStringToStatsPojo(colIndex, topK));

I asked about the simulation of the mapPartition function in the streaming
env in another thread in the mailing list [1] because I was not able to
test it..it seems that the program was exiting before be able to process
anything..
So I gave up on replacing DataSet with DataStream API for the moment..it
seems that there are too many things still to migrate.
Btw, this is the reduce function:

public class SketchesStringReducer extends
RichReduceFunction<Tuple2<byte[], byte[]>> {
   private static final long serialVersionUID = 1L;

   private transient ArrayOfItemsSerDe<String> serDe;

   @Override
   public void open(Configuration parameters) throws Exception {
     this.serDe = new ArrayOfStringsSerDe();
   }

   @Override
   public Tuple2<byte[], byte[]> reduce(Tuple2<byte[], byte[]> t1,
Tuple2<byte[], byte[]> t2)
       throws Exception {
     // merge HLL
     final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0));
     final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0));
     final Union union = new Union(hll1.getLgConfigK());
     union.update(hll1);
     union.update(hll2);
     final byte[] hllSketchBytes = union.getResult().toCompactByteArray();

     // merge Item
     final ItemsSketch<String> s1 =
ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe);
     final ItemsSketch<String> s2 =
ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe);
     final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe);
     return new Tuple2<>(hllSketchBytes, itemSketchBytes);
   }
}

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-use-stream-API-with-this-program-td36715.html#a36767

On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

What are you trying to do in the ReduceFunction? Without knowing the
code, maybe an aggregate(AggregateFunction) is the solution.

Best,
Aljoscha

On 20.07.20 18:03, Flavio Pompermaier wrote:
Thanks Aljosha for the reply. So what can I do in my reduce function that
contains transient variables (i.e. not serializable)?

On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

Hi Flavio,

the reason is that under the covers the ReduceFunction will be used as
the ReduceFunction of a ReducingState. And those cannot be rich
functions because we cannot provide all the required context "inside"
the state backend.

You can see how the ReduceFunction is used to create a
ReducingStateDescriptor here:


https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300

Best,
Aljoscha

On 16.07.20 16:28, Flavio Pompermaier wrote:
Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but
Flink
says
"ReduceFunction of reduce can not be a RichFunction. Please use
reduce(ReduceFunction, WindowFunction) instead."

Is there any good reason for this? Or am I doing something wrong?

Best,
Flavio






Reply via email to