Hi Ahmad, Few comments from my side:
1. FoldFunction is deprecated because of many problems, e.g. no possibility to merge contents of windows. Therefore you should at least use the AggregateFunction. 2. I am not sure if you need to store this in RocksDB, do you expect 24millions product per each tenant in a single window? 3. I think what you could do is first compute stats for composite key <tenant, product> and then aggregate them in subsequent operation(if you need to). This way you could distribute the workload to more parallel instances. Best, Dawid On 11/10/18 11:33, Ahmad Hassan wrote: > Hi All, > > Thanks for the replies. Here is the code snippet of what we want to > achieve: > > We have sliding windows of 24hrs with 5 minutes apart. > > inStream > .filter(Objects::nonNull) > .keyBy("tenant") > .window(SlidingProcessingTimeWindows.of(Time.minutes(1440), > Time.minutes(5))) > .fold(new DefaultVector(), new CalculationFold(), new > MetricCalculationApply()); > > public class CalculationFold implements FoldFunction<Event, DefaultVector> > { > private final MapState<String, DefaultProductMetricVector> products; > private transient MapStateDescriptor<String, > DefaultProductMetricVector> descr; > > @Override > public DefaultVector fold(DefaultVector stats, Event event) > { > if (products.contains(event.getProductId)) > { > DefaultProductMetricVector product = products.get(event.getProductId); > product.updatePrice(event.getPrice); > products.put(event.getProductId, product); > } > else > { > DefaultProductMetricVector product = new DefaultProductMetricVector(); > product.updatePrice(event.getPrice); > products.put(event.getProductId, product); > } > return stats; > } > > * // Fold function do not allow the open method and > this.getRuntimeContext* > //public void open(Configuration parameters) throws Exception > //{ > // descr = new MapStateDescriptor<>("product", String.class, > DefaultProductMetricVector.class); > // products = this.getRuntimeContext().getMapState(descr); > //} > } > > > We expect millions of unique products in 24 hour window so that is the > reason we want to store state on rocksdb of each product class > DefaultProductMetricVector instance. Otherwise, my understanding is > that is that if i instantiate a java hashmap of products within > DefaultVector fold accumulator then for each incoming event the full > set of products will be deserialised and stored on heap which will > eventually cause heap overflow error. > > Please can you tell us how to solve this problem. > > Thanks. > > Best Regards, > > > On Wed, 10 Oct 2018 at 10:21, Fabian Hueske <fhue...@gmail.com > <mailto:fhue...@gmail.com>> wrote: > > Yes, it would be good to post your code. > Are you using a FoldFunction in a window (if yes, what window) or > as a running aggregate? > > In general, collecting state in a FoldFunction is usually not > something that you should do. Did you consider using an > AggregateFunction? > > Fabian > > Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler > <ches...@apache.org <mailto:ches...@apache.org>>: > > In which method are you calling getRuntimeContext()? This > method can only be used after open() has been called. > > On 09.10.2018 17:09, Ahmad Hassan wrote: >> Hi, >> >> We want to use MapState inside fold function to keep the map >> of all products that we see in 24 hour window to store huge >> state in rocksdb rather than overflowing heap. However, I >> don't seem to initialise mapstate within foldfunction or any >> class that is extending RichMapFunction >> >> private transient MapStateDescriptor<String, String> descr = >> new MapStateDescriptor<>("mymap", String.class, String.class); >> this.getRuntimeContext().getMapState(descr); >> >> I get error >> >> java.lang.IllegalStateException: The runtime context has not >> been initialized. >> at >> >> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) >> >> >> Any clues how to get the runtime context please? >> >> Thanks. >> >> Best regards > >
signature.asc
Description: OpenPGP digital signature