Author: bago
Date: Mon Apr 30 09:46:46 2007
New Revision: 533796
URL: http://svn.apache.org/viewvc?view=rev&rev=533796
Log:
Changed StagedMultipleSPFExecutor to use a static int for lookups identifier.
Also refactored it to limit the pending lookup queue to 50 and block new lookup
submissions otherwise (execution).
Added a flag to DNSServiceAsynchSimulator to make it simulate the Asynchronous
behaviour by using multiple threads.
Refactored DNSServiceXBillImpl to export a static method to convert a dnsjava's
Record[] into our DNSService List<String> result.
Modified:
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java
Modified:
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java
URL:
http://svn.apache.org/viewvc/james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java?view=diff&rev=533796&r1=533795&r2=533796
==============================================================================
---
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java
(original)
+++
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/DNSAsynchLookupService.java
Mon Apr 30 09:46:46 2007
@@ -33,7 +33,7 @@
* @param id the identification key for the response.
* @param responsePool the queue where the response will be appended.
*/
- public void getRecordsAsynch(DNSRequest request, Object id,
+ public void getRecordsAsynch(DNSRequest request, int id,
final IResponseQueue responsePool);
}
Modified:
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java
URL:
http://svn.apache.org/viewvc/james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java?view=diff&rev=533796&r1=533795&r2=533796
==============================================================================
---
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java
(original)
+++
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/core/StagedMultipleSPFExecutor.java
Mon Apr 30 09:46:46 2007
@@ -62,10 +62,18 @@
}
+ private static int id = 1;
+
+ // TODO: sooner or later we have to avoid overflow and restart.
+ private synchronized int nextId() {
+ return id++;
+ }
+
private Logger log;
private DNSAsynchLookupService dnsProbe;
private Thread worker;
private Map sessions;
+ private Map results;
private ResponseQueueImpl responseQueue;
public StagedMultipleSPFExecutor(Logger log, DNSAsynchLookupService
service) {
@@ -75,6 +83,7 @@
this.responseQueue = new ResponseQueueImpl();
this.sessions = Collections.synchronizedMap(new HashMap());
+ this.results = Collections.synchronizedMap(new HashMap());
this.worker = new Thread(this);
this.worker.setDaemon(true);
@@ -83,10 +92,17 @@
}
/**
+ * Execute the non-blocking part of the processing and returns.
+ * If the working queue is full (50 pending responses) this method will
not return
+ * until the queue is again not full.
+ *
* @see
org.apache.james.jspf.core.SPFExecutor#execute(org.apache.james.jspf.core.SPFSession,
org.apache.james.jspf.FutureSPFResult)
*/
public void execute(SPFSession session, FutureSPFResult result) {
+ execute(session, result, true);
+ }
+ public void execute(SPFSession session, FutureSPFResult result, boolean
throttle) {
SPFChecker checker;
while ((checker = session.popChecker()) != null) {
// only execute checkers we added (better recursivity)
@@ -95,12 +111,8 @@
DNSLookupContinuation cont = checker.checkSPF(session);
// if the checker returns a continuation we return it
if (cont != null) {
- sessions.put(session, result);
-
session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
- dnsProbe.getRecordsAsynch(cont.getRequest(), session,
responseQueue);
+ invokeAsynchService(session, result, cont, throttle);
return;
- } else {
- sessions.remove(sessions);
}
} catch (Exception e) {
while (e != null) {
@@ -118,14 +130,36 @@
result.setSPFResult(session);
}
+ /**
+ * throttle should be true only when the caller thread is the client and
not the worker thread.
+ * We could even remove the throttle parameter and check the currentThread.
+ * This way the worker is never "blocked" while outside callers will be
blocked if our
+ * queue is too big (so this is not fully "asynchronous").
+ */
+ private synchronized void invokeAsynchService(SPFSession session,
+ FutureSPFResult result, DNSLookupContinuation cont, boolean
throttle) {
+ while (throttle && results.size() > 50) {
+ try {
+ this.wait(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ int nextId = nextId();
+ sessions.put(new Integer(nextId), session);
+ results.put(new Integer(nextId), result);
+ session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
+ dnsProbe.getRecordsAsynch(cont.getRequest(), nextId, responseQueue);
+ }
+
public void run() {
while (true) {
IResponse resp = responseQueue.removeResponse();
- SPFSession session = (SPFSession) resp.getId();
- FutureSPFResult result = (FutureSPFResult)
sessions.remove(session);
+ Integer respId = (Integer) resp.getId();
+ SPFSession session = (SPFSession) sessions.remove(respId);
+ FutureSPFResult result = (FutureSPFResult) results.remove(respId);
DNSLookupContinuation cont = (DNSLookupContinuation)
session.getAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION);
@@ -141,11 +175,9 @@
cont = cont.getListener().onDNSResponse(response, session);
if (cont != null) {
- dnsProbe.getRecordsAsynch(cont.getRequest(), session,
responseQueue);
-
session.setAttribute(ATTRIBUTE_STAGED_EXECUTOR_CONTINUATION, cont);
- sessions.put(session, result);
+ invokeAsynchService(session, result, cont, false);
} else {
- execute(session, result);
+ execute(session, result, false);
}
} catch (Exception e) {
@@ -159,7 +191,7 @@
e = ex;
}
}
- execute(session, result);
+ execute(session, result, false);
}
}
}
Modified:
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java
URL:
http://svn.apache.org/viewvc/james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java?view=diff&rev=533796&r1=533795&r2=533796
==============================================================================
---
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java
(original)
+++
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceAsynchSimulator.java
Mon Apr 30 09:46:46 2007
@@ -38,6 +38,7 @@
private Thread worker;
private LinkedList queue;
private int waitingThreads = 0;
+ private boolean multiThread;
public static final class Request {
private final DNSRequest value;
@@ -60,8 +61,9 @@
}
- public DNSServiceAsynchSimulator(DNSService service) {
+ public DNSServiceAsynchSimulator(DNSService service, boolean multiThread) {
this.dnsService = service;
+ this.multiThread = multiThread;
this.queue = new LinkedList();
this.worker = new Thread(this);
@@ -74,11 +76,11 @@
/**
* @see
org.apache.james.jspf.core.DNSService#getRecordsAsynch(java.lang.String, int,
java.lang.Object, org.apache.james.jspf.core.IResponseQueue)
*/
- public void getRecordsAsynch(DNSRequest request, Object id,
+ public void getRecordsAsynch(DNSRequest request, int id,
final IResponseQueue responsePool) {
synchronized (queue) {
- queue.addLast(new Request(request, id, responsePool));
+ queue.addLast(new Request(request, new Integer(id), responsePool));
queue.notify();
}
@@ -102,14 +104,33 @@
req = (Request) queue.removeFirst();
}
- IResponseImpl response;
- try {
- response = new IResponseImpl(req.getId(),
dnsService.getRecords(req.getValue()));
- } catch (TimeoutException e) {
- response = new IResponseImpl(req.getId(), e);
- }
+ Runnable runnable = new Runnable() {
+
+ private Request req;
+
+ public void run() {
+ IResponseImpl response;
+ try {
+ response = new IResponseImpl(req.getId(),
dnsService.getRecords(req.getValue()));
+ } catch (TimeoutException e) {
+ response = new IResponseImpl(req.getId(), e);
+ }
- req.getResponseQueue().insertResponse(response);
+ req.getResponseQueue().insertResponse(response);
+ }
+
+ public Runnable setRequest(Request req) {
+ this.req = req;
+ return this;
+ }
+
+ }.setRequest(req);
+
+ if (multiThread) {
+ new Thread(runnable).start();
+ } else {
+ runnable.run();
+ }
}
}
Modified:
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java
URL:
http://svn.apache.org/viewvc/james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java?view=diff&rev=533796&r1=533795&r2=533796
==============================================================================
---
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java
(original)
+++
james/jspf/branches/asynch-jspf/src/main/java/org/apache/james/jspf/impl/DNSServiceXBillImpl.java
Mon Apr 30 09:46:46 2007
@@ -28,6 +28,7 @@
import org.xbill.DNS.Lookup;
import org.xbill.DNS.MXRecord;
import org.xbill.DNS.PTRRecord;
+import org.xbill.DNS.RRset;
import org.xbill.DNS.Record;
import org.xbill.DNS.SPFRecord;
import org.xbill.DNS.TXTRecord;
@@ -116,7 +117,6 @@
throws TimeoutException {
String recordTypeDescription;
int dnsJavaType;
- int recordCount = 0;
switch (request.getRecordType()) {
case DNSRequest.A: recordTypeDescription = "A"; dnsJavaType =
Type.A; break;
case DNSRequest.AAAA: recordTypeDescription = "AAAA"; dnsJavaType
= Type.AAAA; break;
@@ -127,7 +127,6 @@
default: // TODO fail!
return null;
}
- List records;
try {
log.debug("Start "+recordTypeDescription+"-Record lookup for : " +
request.getHostname());
@@ -137,54 +136,59 @@
Record[] rr = query.run();
int queryResult = query.getResult();
+
if (queryResult == Lookup.TRY_AGAIN) {
throw new TimeoutException();
}
- if (rr != null && rr.length > 0) {
- records = new ArrayList();
- for (int i = 0; i < rr.length; i++) {
- String res;
- switch (request.getRecordType()) {
- case DNSRequest.A:
- ARecord a = (ARecord) rr[i];
- res = a.getAddress().getHostAddress();
- break;
- case DNSRequest.AAAA:
- AAAARecord aaaa = (AAAARecord) rr[i];
- res = aaaa.getAddress().getHostAddress();
- break;
- case DNSRequest.MX:
- MXRecord mx = (MXRecord) rr[i];
- res = mx.getTarget().toString();
- break;
- case DNSRequest.PTR:
- PTRRecord ptr = (PTRRecord) rr[i];
- res = IPAddr.stripDot(ptr.getTarget().toString());
- break;
- case DNSRequest.TXT:
- TXTRecord txt = (TXTRecord) rr[i];
- res = txt.rdataToString();
- break;
- case DNSRequest.SPF:
- SPFRecord spf = (SPFRecord) rr[i];
- res = spf.rdataToString();
- break;
- default:
- return null;
- }
- records.add(res);
- }
- recordCount = rr.length;
- } else {
- records = null;
- }
+ List records;
+ records = convertRecordsToList(rr);
- log.debug("Found " + recordCount + "
"+recordTypeDescription+"-Records");
+ log.debug("Found " + (rr != null ? rr.length : 0) + "
"+recordTypeDescription+"-Records");
+ return records;
} catch (TextParseException e) {
// i think this is the best we could do
log.debug("No "+recordTypeDescription+" Record found for host: " +
request.getHostname());
+ return null;
+ }
+ }
+
+ public static List convertRecordsToList(Record[] rr) {
+ List records;
+ if (rr != null && rr.length > 0) {
+ records = new ArrayList();
+ for (int i = 0; i < rr.length; i++) {
+ switch (rr[i].getType()) {
+ case Type.A:
+ ARecord a = (ARecord) rr[i];
+ records.add(a.getAddress().getHostAddress());
+ break;
+ case Type.AAAA:
+ AAAARecord aaaa = (AAAARecord) rr[i];
+ records.add(aaaa.getAddress().getHostAddress());
+ break;
+ case Type.MX:
+ MXRecord mx = (MXRecord) rr[i];
+ records.add(mx.getTarget().toString());
+ break;
+ case Type.PTR:
+ PTRRecord ptr = (PTRRecord) rr[i];
+
records.add(IPAddr.stripDot(ptr.getTarget().toString()));
+ break;
+ case Type.TXT:
+ TXTRecord txt = (TXTRecord) rr[i];
+ records.add(txt.rdataToString());
+ break;
+ case Type.SPF:
+ SPFRecord spf = (SPFRecord) rr[i];
+ records.add(spf.rdataToString());
+ break;
+ default:
+ return null;
+ }
+ }
+ } else {
records = null;
}
return records;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]