Author: bago
Date: Mon Apr 30 09:46:46 2007
New Revision: 533796

URL: http://svn.apache.org/viewvc?view=rev&rev=533796
Log:
Changed StagedMultipleSPFExecutor to use a static int for lookups identifier.
Also refactored it to limit the pending lookup queue to 50 and block new lookup 
submissions otherwise (execution).
Added a flag to DNSServiceAsynchSimulator to make it simulate the Asynchronous 
behaviour by using multiple threads.
Refactored DNSServiceXBillImpl to export a static method to convert a dnsjava's 
Record[] into our DNSService List<String> result.

Modified:
    
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java
    
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java
    
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java
    
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java

Modified: 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java
URL: 
http://svn.apache.org/viewvc/james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java?view=diff&rev=533796&r1=533795&r2=533796
==============================================================================
--- 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java
 (original)
+++ 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java
 Mon Apr 30 09:46:46 2007
@@ -33,7 +33,7 @@
      * @param id the identification key for the response.
      * @param responsePool the queue where the response will be appended.
      */
-    public void getRecordsAsynch(DNSRequest request, Object id,
+    public void getRecordsAsynch(DNSRequest request, int id,
             final IResponseQueue responsePool);
 
 }

Modified: 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java
URL: 
http://svn.apache.org/viewvc/james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java?view=diff&rev=533796&r1=533795&r2=533796
==============================================================================
--- 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java
 (original)
+++ 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java
 Mon Apr 30 09:46:46 2007
@@ -62,10 +62,18 @@
 
     }
 
+    private static int id = 1;
+    
+    // TODO: sooner or later we have to avoid overflow and restart.
+    private synchronized int nextId() {
+        return id++;
+    }
+    
     private Logger log;
     private DNSAsynchLookupService dnsProbe;
     private Thread worker;
     private Map sessions;
+    private Map results;
     private ResponseQueueImpl responseQueue;
 
     public StagedMultipleSPFExecutor(Logger log, DNSAsynchLookupService 
service) {
@@ -75,6 +83,7 @@
         this.responseQueue = new ResponseQueueImpl();
 
         this.sessions = Collections.synchronizedMap(new HashMap());
+        this.results = Collections.synchronizedMap(new HashMap());
 
         this.worker = new Thread(this);
         this.worker.setDaemon(true);
@@ -83,10 +92,17 @@
     }
 
     /**
+     * Execute the non-blocking part of the processing and returns.
+     * If the working queue is full (50 pending responses) this method will 
not return
+     * until the queue is again not full.
+     * 
      * @see 
org.apache.james.jspf.core.SPFExecutor#execute(org.apache.james.jspf.core.SPFSession,
 org.apache.james.jspf.FutureSPFResult)
      */
     public void execute(SPFSession session, FutureSPFResult result) {
+        execute(session, result, true);
+    }
         
+    public void execute(SPFSession session, FutureSPFResult result, boolean 
throttle) {
         SPFChecker checker;
         while ((checker = session.popChecker()) != null) {
             // only execute checkers we added (better recursivity)
@@ -95,12 +111,8 @@
                 DNSLookupContinuation cont = checker.checkSPF(session);
                 // if the checker returns a continuation we return it
                 if (cont != null) {
-                    sessions.put(session, result);
-                    
session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
-                    dnsProbe.getRecordsAsynch(cont.getRequest(), session, 
responseQueue);
+                    invokeAsynchService(session, result, cont, throttle);
                     return;
-                } else {
-                    sessions.remove(sessions);
                 }
             } catch (Exception e) {
                 while (e != null) {
@@ -118,14 +130,36 @@
         result.setSPFResult(session);
     }
 
+    /**
+     * throttle should be true only when the caller thread is the client and 
not the worker thread.
+     * We could even remove the throttle parameter and check the currentThread.
+     * This way the worker is never "blocked" while outside callers will be 
blocked if our
+     * queue is too big (so this is not fully "asynchronous").
+     */
+    private synchronized void invokeAsynchService(SPFSession session,
+            FutureSPFResult result, DNSLookupContinuation cont, boolean 
throttle) {
+        while (throttle && results.size() > 50) {
+            try {
+                this.wait(100);
+            } catch (InterruptedException e) {
+            }
+        }
+        int nextId = nextId();
+        sessions.put(new Integer(nextId), session);
+        results.put(new Integer(nextId), result);
+        session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
+        dnsProbe.getRecordsAsynch(cont.getRequest(), nextId, responseQueue);
+    }
+
     public void run() {
 
         while (true) {
             
             IResponse resp = responseQueue.removeResponse();
             
-            SPFSession session = (SPFSession) resp.getId();
-            FutureSPFResult result = (FutureSPFResult) 
sessions.remove(session);
+            Integer respId = (Integer) resp.getId();
+            SPFSession session = (SPFSession) sessions.remove(respId);
+            FutureSPFResult result = (FutureSPFResult) results.remove(respId);
             
             DNSLookupContinuation cont = (DNSLookupContinuation) 
session.getAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION);
             
@@ -141,11 +175,9 @@
                 cont = cont.getListener().onDNSResponse(response, session);
                 
                 if (cont != null) {
-                    dnsProbe.getRecordsAsynch(cont.getRequest(), session, 
responseQueue);
-                    
session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
-                    sessions.put(session, result);
+                    invokeAsynchService(session, result, cont, false);
                 } else {
-                    execute(session, result);
+                    execute(session, result, false);
                 }
 
             } catch (Exception e) {
@@ -159,7 +191,7 @@
                         e = ex;
                     }
                 }
-                execute(session, result);
+                execute(session, result, false);
             }
         }
     }

Modified: 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java
URL: 
http://svn.apache.org/viewvc/james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java?view=diff&rev=533796&r1=533795&r2=533796
==============================================================================
--- 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java
 (original)
+++ 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java
 Mon Apr 30 09:46:46 2007
@@ -38,6 +38,7 @@
     private Thread worker;
     private LinkedList queue;
     private int waitingThreads = 0;
+    private boolean multiThread;
     
     public static final class Request {
         private final DNSRequest value;
@@ -60,8 +61,9 @@
         
     }
 
-    public DNSServiceAsynchSimulator(DNSService service) {
+    public DNSServiceAsynchSimulator(DNSService service, boolean multiThread) {
         this.dnsService = service;
+        this.multiThread = multiThread;
 
         this.queue = new LinkedList();
         this.worker = new Thread(this);
@@ -74,11 +76,11 @@
     /**
      * @see 
org.apache.james.jspf.core.DNSService#getRecordsAsynch(java.lang.String, int, 
java.lang.Object, org.apache.james.jspf.core.IResponseQueue)
      */
-    public void getRecordsAsynch(DNSRequest request, Object id,
+    public void getRecordsAsynch(DNSRequest request, int id,
             final IResponseQueue responsePool) {
         
         synchronized (queue) {
-            queue.addLast(new Request(request, id, responsePool));
+            queue.addLast(new Request(request, new Integer(id), responsePool));
             queue.notify();
         }
         
@@ -102,14 +104,33 @@
                 req = (Request) queue.removeFirst();
             }
             
-            IResponseImpl response;
-            try {
-                response = new IResponseImpl(req.getId(), 
dnsService.getRecords(req.getValue()));
-            } catch (TimeoutException e) {
-                response = new IResponseImpl(req.getId(), e);
-            }
+            Runnable runnable = new Runnable() {
+
+                private Request req;
+
+                public void run() {
+                    IResponseImpl response;
+                    try {
+                        response = new IResponseImpl(req.getId(), 
dnsService.getRecords(req.getValue()));
+                    } catch (TimeoutException e) {
+                        response = new IResponseImpl(req.getId(), e);
+                    }
 
-            req.getResponseQueue().insertResponse(response);
+                    req.getResponseQueue().insertResponse(response);
+                }
+
+                public Runnable setRequest(Request req) {
+                    this.req = req;
+                    return this;
+                }
+                
+            }.setRequest(req);
+            
+            if (multiThread) {
+                new Thread(runnable).start();
+            } else {
+                runnable.run();
+            }
         }
     }
 

Modified: 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java
URL: 
http://svn.apache.org/viewvc/james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java?view=diff&rev=533796&r1=533795&r2=533796
==============================================================================
--- 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java
 (original)
+++ 
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java
 Mon Apr 30 09:46:46 2007
@@ -28,6 +28,7 @@
 import org.xbill.DNS.Lookup;
 import org.xbill.DNS.MXRecord;
 import org.xbill.DNS.PTRRecord;
+import org.xbill.DNS.RRset;
 import org.xbill.DNS.Record;
 import org.xbill.DNS.SPFRecord;
 import org.xbill.DNS.TXTRecord;
@@ -116,7 +117,6 @@
             throws TimeoutException {
         String recordTypeDescription;
         int dnsJavaType;
-        int recordCount = 0;
         switch (request.getRecordType()) {
             case DNSRequest.A: recordTypeDescription = "A"; dnsJavaType = 
Type.A; break;
             case DNSRequest.AAAA: recordTypeDescription = "AAAA"; dnsJavaType 
= Type.AAAA; break;
@@ -127,7 +127,6 @@
             default: // TODO fail!
                 return null;
         }
-        List records;
         try {
 
             log.debug("Start "+recordTypeDescription+"-Record lookup for : " + 
request.getHostname());
@@ -137,54 +136,59 @@
 
             Record[] rr = query.run();
             int queryResult = query.getResult();
+            
 
             if (queryResult == Lookup.TRY_AGAIN) {
                 throw new TimeoutException();
             }
             
-            if (rr != null && rr.length > 0) {
-                records = new ArrayList();
-                for (int i = 0; i < rr.length; i++) {
-                    String res;
-                    switch (request.getRecordType()) {
-                        case DNSRequest.A:
-                            ARecord a = (ARecord) rr[i];
-                            res = a.getAddress().getHostAddress();
-                            break;
-                        case DNSRequest.AAAA:
-                            AAAARecord aaaa = (AAAARecord) rr[i];
-                            res = aaaa.getAddress().getHostAddress();
-                            break;
-                        case DNSRequest.MX:
-                            MXRecord mx = (MXRecord) rr[i];
-                            res = mx.getTarget().toString();
-                            break;
-                        case DNSRequest.PTR:
-                            PTRRecord ptr = (PTRRecord) rr[i];
-                            res = IPAddr.stripDot(ptr.getTarget().toString());
-                            break;
-                        case DNSRequest.TXT:
-                            TXTRecord txt = (TXTRecord) rr[i];
-                            res = txt.rdataToString();
-                            break;
-                        case DNSRequest.SPF:
-                            SPFRecord spf = (SPFRecord) rr[i];
-                            res = spf.rdataToString();
-                            break;
-                        default:
-                            return null;
-                    }
-                    records.add(res);
-                }
-                recordCount = rr.length;
-            } else {
-                records = null;
-            }
+            List records;
+            records = convertRecordsToList(rr);
             
-            log.debug("Found " + recordCount + " 
"+recordTypeDescription+"-Records");
+            log.debug("Found " + (rr != null ? rr.length : 0) + " 
"+recordTypeDescription+"-Records");
+            return records;
         } catch (TextParseException e) {
             // i think this is the best we could do
             log.debug("No "+recordTypeDescription+" Record found for host: " + 
request.getHostname());
+            return null;
+        }
+    }
+
+    public static List convertRecordsToList(Record[] rr) {
+        List records;
+        if (rr != null && rr.length > 0) {
+            records = new ArrayList();
+            for (int i = 0; i < rr.length; i++) {
+                switch (rr[i].getType()) {
+                    case Type.A:
+                        ARecord a = (ARecord) rr[i];
+                        records.add(a.getAddress().getHostAddress());
+                        break;
+                    case Type.AAAA:
+                        AAAARecord aaaa = (AAAARecord) rr[i];
+                        records.add(aaaa.getAddress().getHostAddress());
+                        break;
+                    case Type.MX:
+                        MXRecord mx = (MXRecord) rr[i];
+                        records.add(mx.getTarget().toString());
+                        break;
+                    case Type.PTR:
+                        PTRRecord ptr = (PTRRecord) rr[i];
+                        
records.add(IPAddr.stripDot(ptr.getTarget().toString()));
+                        break;
+                    case Type.TXT:
+                        TXTRecord txt = (TXTRecord) rr[i];
+                        records.add(txt.rdataToString());
+                        break;
+                    case Type.SPF:
+                        SPFRecord spf = (SPFRecord) rr[i];
+                        records.add(spf.rdataToString());
+                        break;
+                    default:
+                        return null;
+                }
+            }
+        } else {
             records = null;
         }
         return records;



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to