Jorn, Thanks for your response.
I am pasting below a snippet of code which shows drools integration when facts/events are picked up after reading through a File (FileReader->readLine()), it works as expected and I have tested it for wide range of record data in a File. Same code doesn't work when I try to do same thing on a streaming incoming data generated out of same File. I have used several batch durations, from 1 to 50 seconds. Every execution shows that rules did not fire on some valid facts/events. I also thought of it being an issue with multi-threading; but that is not the case as well. I have verified. Hope this provides with with all the relevant information. Regards, Ajit /* * Copyright (c) 2015. Capiot Software India Pvt Ltd. * Author: a...@capiot.com */ package com.capiot.analytics.spark.file; import com.capiot.analytics.spark.Person; import com.capiot.analytics.spark.util.KnowledgeBaseHelperUtil; import com.capiot.analytics.spark.util.TrackingAgendaEventListener; import org.apache.spark.api.java.function.VoidFunction; import org.drools.runtime.StatefulKnowledgeSession; import java.io.BufferedWriter; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class RuleExceutionFunction implements VoidFunction <Person> { static StatefulKnowledgeSession knowledgeSession; static List<Person> customersWithOffers = Collections.synchronizedList(new ArrayList()); //static Map<Integer, String> map = Collections.synchronizedMap(new HashMap()); static TrackingAgendaEventListener agendaEventListener = new TrackingAgendaEventListener(); static AtomicInteger count = new AtomicInteger(0); //private static final File f = new File("C:\\Users\\bajit\\Documents\\customerOffers_5k_file.csv"); private static PrintWriter pw = null; private static PrintWriter pwp = null; private static final long serialVersionUID = 2370; public RuleExceutionFunction() throws Exception { if (knowledgeSession == null) { knowledgeSession = KnowledgeBaseHelperUtil.getStatefulKnowledgeSession("offers.drl"); knowledgeSession.addEventListener(agendaEventListener); { pw = new PrintWriter(new BufferedWriter(new java.io.FileWriter ("C:\\Users\\bajit\\Documents\\customerOffers_file_5k.csv") ), true); pwp = new PrintWriter(new BufferedWriter(new java.io.FileWriter ("C:\\Users\\bajit\\Documents\\processed_customers_file_5k" + ".csv") ), true); } } } @Override public void call(Person person) throws Exception { //List<Person> facts = rdd.collect(); //Apply rules on facts here //synchronized (this) { knowledgeSession.insert(person); int i = knowledgeSession.fireAllRules(); } //System.out.println("++++++ '"+ agendaEventListener.activationsToString()); if (person.hasOffer()) { customersWithOffers.add(person); //Files.append(person.toString() + System.getProperty("line.separator"), f, Charset.defaultCharset()); pw.println(person.toString()); } pwp.println(person.toString()); count.getAndIncrement(); } public StatefulKnowledgeSession getSession() { return knowledgeSession; } public List<Person> getCustomersWithOffers() { return customersWithOffers; } } On Mon, Jul 6, 2015 at 10:21 AM, Jörn Franke <jornfra...@gmail.com> wrote: > Can you provide the result set you are using and specify how you > integrated the drools engine? > Drools basically is based on a large shared memory. Hence, if you have > several tasks in Shark they end up using different shared memory areas. > A full integration of drools requires some sophisticated design and > probably rewriting of the rules evaluation algorithm, so you probably have > to rewrite that engine from scratch. > > Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <a...@capiot.com> a écrit : > >> Hi, >> >> I am trying to integrate Drools rules API with Spark so that the solution >> could solve few CEP centric use cases. >> >> When I read data from a local file (simple FileWriter -> readLine()), I >> see that all my rules are reliably fired and everytime I get the results as >> expected. I have tested with file record sizes from 5K to 5M; results are >> as expected, every time. >> >> However when I try to receive same data through a stream (I created a >> simple ServerSocket, and am reading the file and writing to the socket line >> by line) using a custom socket receiver; even though I see that data is >> received at the custom receiver's end; perhaps store() API has an issue, >> and data is not reliably persisted, (I am >> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended). >> >> Result is that my rules don't get fired reliably, and everytime I get a >> different result. It also could be internal data loss within Spark engine. >> >> I am using a a single Windows based server, and latest 1.4.0. >> >> I have attached code for custom receiver, and my socket server which >> streams file data as text. >> >> Can someone pls shed more light on this issue? I have read in the >> documentation that a reliable receiver needs to implement >> *store(multi-records)*, but couldn't find any example. >> >> Many thanks in advance for any inputs or suggestions for trying out. >> >> Regards, >> Ajit >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org > >