Jorn,

Thanks for your response.

I am pasting below a snippet of code which shows drools integration when
facts/events are picked up after reading through a File
(FileReader->readLine()), it works as expected and I have tested it for
wide range of record data in a File.

Same code doesn't work when I try to do same thing on a streaming incoming
data generated out of same File. I have used several batch durations, from
1 to 50 seconds. Every execution shows that rules did not fire on some
valid facts/events.

I also thought of it being an issue with multi-threading; but that is not
the case as well. I have verified.

Hope this provides with with all the relevant information.

Regards,
Ajit


/*
 * Copyright (c) 2015. Capiot Software India Pvt Ltd.
 * Author: a...@capiot.com
 */

package com.capiot.analytics.spark.file;

import com.capiot.analytics.spark.Person;
import com.capiot.analytics.spark.util.KnowledgeBaseHelperUtil;
import com.capiot.analytics.spark.util.TrackingAgendaEventListener;
import org.apache.spark.api.java.function.VoidFunction;
import org.drools.runtime.StatefulKnowledgeSession;

import java.io.BufferedWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class RuleExceutionFunction implements VoidFunction <Person>
{
    static StatefulKnowledgeSession knowledgeSession;
    static List<Person> customersWithOffers =
Collections.synchronizedList(new ArrayList());
    //static Map<Integer, String> map = Collections.synchronizedMap(new
HashMap());
    static TrackingAgendaEventListener agendaEventListener = new
TrackingAgendaEventListener();

    static AtomicInteger count = new AtomicInteger(0);

    //private static final File f = new
File("C:\\Users\\bajit\\Documents\\customerOffers_5k_file.csv");
    private static PrintWriter pw = null;
    private static PrintWriter pwp = null;

    private static final long serialVersionUID = 2370;

    public RuleExceutionFunction()  throws Exception
    {
        if (knowledgeSession == null)
        {
            knowledgeSession =
KnowledgeBaseHelperUtil.getStatefulKnowledgeSession("offers.drl");
            knowledgeSession.addEventListener(agendaEventListener);

            {
                pw = new PrintWriter(new BufferedWriter(new
java.io.FileWriter

("C:\\Users\\bajit\\Documents\\customerOffers_file_5k.csv")
                ), true);

                pwp = new PrintWriter(new BufferedWriter(new
java.io.FileWriter

 ("C:\\Users\\bajit\\Documents\\processed_customers_file_5k" +
                                                          ".csv")
                ), true);
            }
        }
    }


    @Override
    public void call(Person person) throws Exception
    {
        //List<Person> facts = rdd.collect();
        //Apply rules on facts here
        //synchronized (this)
        {
            knowledgeSession.insert(person);
            int i = knowledgeSession.fireAllRules();
        }

        //System.out.println("++++++ '"+
agendaEventListener.activationsToString());

        if (person.hasOffer())
        {
            customersWithOffers.add(person);
            //Files.append(person.toString() +
System.getProperty("line.separator"), f, Charset.defaultCharset());
            pw.println(person.toString());
        }

        pwp.println(person.toString());

        count.getAndIncrement();
    }

    public StatefulKnowledgeSession getSession()
    {
        return knowledgeSession;
    }

    public List<Person> getCustomersWithOffers()
    {
        return customersWithOffers;
    }
}


On Mon, Jul 6, 2015 at 10:21 AM, Jörn Franke <jornfra...@gmail.com> wrote:

> Can you provide the result set you are using and specify how you
> integrated the drools engine?
> Drools basically is based on a large shared memory. Hence, if you have
> several tasks in Shark they end up using different shared memory areas.
> A full integration of drools requires some sophisticated design and
> probably rewriting of the rules evaluation algorithm, so you probably have
> to rewrite that engine from scratch.
>
> Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar <a...@capiot.com> a écrit :
>
>> Hi,
>>
>> I am trying to integrate Drools rules API with Spark so that the solution
>> could solve few CEP centric use cases.
>>
>> When I read data from a local file (simple FileWriter -> readLine()), I
>> see that all my rules are reliably fired and everytime I get the results as
>> expected. I have tested with file record sizes from 5K to 5M; results are
>> as expected, every time.
>>
>> However when I try to receive same data through a stream (I created a
>> simple ServerSocket, and am reading the file and writing to the socket line
>> by line) using a custom socket receiver; even though I see that data is
>> received at the custom receiver's end; perhaps store() API has an issue,
>> and data is not reliably persisted, (I am
>> using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended).
>>
>> Result is that my rules don't get fired reliably, and everytime I get a
>> different result. It also could be internal data loss within Spark engine.
>>
>> I am using a a single Windows based server, and latest 1.4.0.
>>
>> I have attached code for custom receiver, and my socket server which
>> streams file data as text.
>>
>> Can someone pls shed more light on this issue? I have read in the
>> documentation that a reliable receiver needs to implement
>> *store(multi-records)*, but couldn't find any example.
>>
>> Many thanks in advance for any inputs or suggestions for trying out.
>>
>> Regards,
>> Ajit
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to