Hi Ahmad,

Few comments from my side:

    1. FoldFunction is deprecated because of many problems, e.g. no
possibility to merge contents of windows. Therefore you should at least
use the AggregateFunction.

    2. I am not sure if you need to store this in RocksDB, do you expect
24millions product per each tenant in a single window?

    3. I think what you could do is first compute stats for composite
key <tenant, product> and then aggregate them in subsequent operation(if
you need to). This way you could distribute the workload to more
parallel instances.

Best,

Dawid


On 11/10/18 11:33, Ahmad Hassan wrote:
> Hi All,
>
> Thanks for the replies. Here is the code snippet of what we want to
> achieve:
>
> We have sliding windows of 24hrs with 5 minutes apart.
>
> inStream
>  .filter(Objects::nonNull)
>  .keyBy("tenant")
>  .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
> Time.minutes(5)))
>  .fold(new DefaultVector(), new CalculationFold(), new
> MetricCalculationApply());
>
> public class CalculationFold implements FoldFunction<Event, DefaultVector>
> {
> private final MapState<String, DefaultProductMetricVector> products;
> private transient MapStateDescriptor<String,
> DefaultProductMetricVector> descr;
>
> @Override
> public DefaultVector fold(DefaultVector stats, Event event)
> {
> if (products.contains(event.getProductId))
> {
> DefaultProductMetricVector product = products.get(event.getProductId);
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> else
> {
> DefaultProductMetricVector product = new DefaultProductMetricVector();
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> return stats;
> }
>
> *        // Fold function do not allow the open method and
> this.getRuntimeContext*
> //public void open(Configuration parameters) throws Exception
> //{
> // descr = new MapStateDescriptor<>("product", String.class,
> DefaultProductMetricVector.class);
> // products = this.getRuntimeContext().getMapState(descr);
> //}
> }
>
>
> We expect millions of unique products in 24 hour window so that is the
> reason we want to store state on rocksdb of each product class
> DefaultProductMetricVector instance. Otherwise, my understanding is
> that is that if i instantiate a java hashmap of products within
> DefaultVector fold accumulator then for each incoming event the full
> set of products will be deserialised and stored on heap which will
> eventually cause heap overflow error.
>
> Please can you tell us how to solve this problem.
>
> Thanks.
>
> Best Regards,
>
>
> On Wed, 10 Oct 2018 at 10:21, Fabian Hueske <fhue...@gmail.com
> <mailto:fhue...@gmail.com>> wrote:
>
>     Yes, it would be good to post your code.
>     Are you using a FoldFunction in a window (if yes, what window) or
>     as a running aggregate?
>
>     In general, collecting state in a FoldFunction is usually not
>     something that you should do. Did you consider using an
>     AggregateFunction?
>
>     Fabian
>
>     Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler
>     <ches...@apache.org <mailto:ches...@apache.org>>:
>
>         In which method are you calling getRuntimeContext()? This
>         method can only be used after open() has been called.
>
>         On 09.10.2018 17:09, Ahmad Hassan wrote:
>>         Hi,
>>
>>         We want to use MapState inside fold function to keep the map
>>         of all products that we see in 24 hour window to store huge
>>         state in rocksdb rather than overflowing heap. However, I
>>         don't seem to initialise mapstate within foldfunction or any
>>         class that is extending RichMapFunction
>>
>>         private transient MapStateDescriptor<String, String> descr =
>>         new MapStateDescriptor<>("mymap", String.class, String.class);
>>         this.getRuntimeContext().getMapState(descr);
>>
>>         I get error
>>
>>         java.lang.IllegalStateException: The runtime context has not
>>         been initialized.
>>         at
>>         
>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>
>>
>>         Any clues how to get the runtime context please?
>>
>>         Thanks.
>>
>>         Best regards
>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to