Title: [1009] trunk/core: Auditor Query Implementation using Lucene.

Diff

Modified: trunk/core/pom.xml (1008 => 1009)

--- trunk/core/pom.xml	2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/pom.xml	2005-12-05 15:41:37 UTC (rev 1009)
@@ -194,7 +194,12 @@
       <version>1.2_Java1.3</version>
       <scope>test</scope>
     </dependency>
-
+    
+    <dependency>
+      <groupId>lucene</groupId>
+      <artifactId>lucene</artifactId>
+      <version>1.4.3</version>
+    </dependency>
   </dependencies>
 
   <build>

Modified: trunk/core/project.xml (1008 => 1009)

--- trunk/core/project.xml	2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/project.xml	2005-12-05 15:41:37 UTC (rev 1009)
@@ -54,6 +54,11 @@
         </dependency>
 
         <dependency>
+            <id>lucene</id>
+            <version>${lucene_version}</version>
+        </dependency>
+
+        <dependency>
             <id>concurrent</id>
             <version>${concurrent_version}</version>
         </dependency>

Added: trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorQueryMBean.java (1008 => 1009)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorQueryMBean.java	2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorQueryMBean.java	2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,44 @@
+/** 
+ * <a href="" The open source ESB</a> 
+ * 
+ * Copyright 2005 Datasul B2B Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.jbi.audit;
+
+import javax.jbi.messaging.ExchangeStatus;
+
+/**
+ * Main interface for ServiceMix auditor query.
+ * This interface may be used to query upon exchanges.
+ * 
+ * @author George Gastaldi (gastaldi)
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+public interface AuditorQueryMBean extends AuditorMBean {
+	String[] findExchangesIDsByStatus(ExchangeStatus status) throws AuditorException;	
+	String[] findExchangesIDsByMessageContent(String type, String content) throws AuditorException;
+	String[] findExchangesIDsByMessageProperty(String type, String property, String value) throws AuditorException;
+
+	/**
+	 * Searches for Exchanges IDs using the supplied key-field and the expected content of the field 
+	 * @param field
+	 * @param fieldValue
+	 * @return exchange ids
+	 * @throws AuditorException if an error occurs
+	 */
+	String[] getExchangeIds(String field, String fieldValue) throws AuditorException;
+}
\ No newline at end of file

Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/DefaultLuceneCallback.java (1008 => 1009)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/DefaultLuceneCallback.java	2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/DefaultLuceneCallback.java	2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,62 @@
+/** 
+ * <a href="" The open source ESB</a> 
+ * 
+ * Copyright 2005 Datasul B2B Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.jbi.audit.lucene;
+
+import java.io.IOException;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.lucene.queryParser.QueryParser;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+
+/**
+ * Default Lucene Callback implementation. Used on LuceneAuditor
+ * @author George Gastaldi (gastaldi)
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+public class DefaultLuceneCallback implements LuceneCallback {
+	private String field;
+	private String query;
+	
+	public DefaultLuceneCallback(String field, String query) {
+		this.field = field;
+		this.query = query;
+	}
+	
+	public Object doCallback(IndexSearcher is) throws IOException {
+		try {
+			Query queryObj = QueryParser.parse(query, field, new StandardAnalyzer());
+			Hits hits = is.search(queryObj);
+			int total = hits.length();
+			String[] ids = new String[total];
+			for (int i = 0; i < total; i++) {
+				Document d = hits.doc(i);
+				ids[i] = d.get("org.servicemix.exchangeid");
+			}
+			return ids;
+		} catch (ParseException pe) {
+			return new String[0];
+		}
+	}
+
+}

Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneAuditor.java (1008 => 1009)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneAuditor.java	2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneAuditor.java	2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,205 @@
+/** 
+ * <a href="" The open source ESB</a> 
+ * 
+ * Copyright 2005 Datasul B2B Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+
+package org.servicemix.jbi.audit.lucene;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.servicemix.jbi.audit.AbstractAuditor;
+import org.servicemix.jbi.audit.AuditorException;
+import org.servicemix.jbi.audit.AuditorMBean;
+import org.servicemix.jbi.audit.AuditorQueryMBean;
+import org.servicemix.jbi.jaxp.SourceTransformer;
+
+/**
+ * Lucene AuditorQuery implementation. 
+ * It uses Lucene as the indexing mechanism for searching Exchanges 
+ * and needs a delegated AuditorMBean to persist Exchanges. 
+ * 
+ * The Content of messages are stored as:
+ * 	- org.servicemix.in.contents
+ * 	- org.servicemix.out.contents, if exists
+ * 	- org.servicemix.fault.contents, if exists
+ * 
+ * Properties for IN Messages are stored as:
+ * 	- org.servicemix.in.propertyname
+ * 	- org.servicemix.out.propertyname, if exists
+ * 	- org.servicemix.fault.propertyname, if exists
+ * 
+ * @author George Gastaldi
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+public class LuceneAuditor extends AbstractAuditor implements AuditorQueryMBean {
+	private AuditorMBean delegatedAuditor;
+	private LuceneIndexer luceneIndexer = new LuceneIndexer();
+
+	protected void doStart() throws JBIException {
+		super.doStart();
+		if (delegatedAuditor == null) {
+			throw new JBIException("A delegated auditor must be provided");
+		}
+	}
+	
+	/**
+	 * @return Returns the luceneIndexer.
+	 */
+	public LuceneIndexer getLuceneIndexer() {
+		return luceneIndexer;
+	}
+
+	/**
+	 * @param luceneIndexer The luceneIndexer to set.
+	 */
+	public void setLuceneIndexer(LuceneIndexer luceneIndexer) {
+		this.luceneIndexer = luceneIndexer;
+	}
+
+	/**
+	 * @return Returns the delegatedAuditor.
+	 */
+	public AuditorMBean getDelegatedAuditor() {
+		return delegatedAuditor;
+	}
+
+	/**
+	 * @param delegatedAuditor The delegatedAuditor to set.
+	 */
+	public void setDelegatedAuditor(AuditorMBean delegatedAuditor) {
+		this.delegatedAuditor = delegatedAuditor;
+		if (delegatedAuditor instanceof AbstractAuditor) {
+			((AbstractAuditor)delegatedAuditor).setAsContainerListener(false);
+		}
+	}
+
+	public int getExchangeCount() throws AuditorException {
+		return this.delegatedAuditor.getExchangeCount();
+	}
+
+	public String[] getExchangeIds(int fromIndex, int toIndex) throws AuditorException {
+		return this.delegatedAuditor.getExchangeIds(fromIndex,toIndex);
+	}
+
+	public MessageExchange[] getExchanges(String[] ids) throws AuditorException {
+		return this.delegatedAuditor.getExchanges(ids);
+	}
+
+	public int deleteExchanges(int fromIndex, int toIndex) throws AuditorException {
+		//TODO: Remove ids from Lucene Index
+		return this.delegatedAuditor.deleteExchanges(fromIndex,toIndex);
+	}
+
+	public int deleteExchanges(String[] ids) throws AuditorException {
+		try {
+			this.luceneIndexer.remove(ids);
+		} catch (IOException io) {
+			throw new AuditorException(io);
+		}
+		return this.delegatedAuditor.deleteExchanges(ids);
+	}
+
+	public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+		try {
+			Document doc = createDocument(exchange);
+			this.luceneIndexer.add(doc,exchange.getExchangeId());
+		} catch (IOException e) {
+			throw new MessagingException("Error while adding to lucene",e);
+		}
+		if (delegatedAuditor instanceof AbstractAuditor) {
+			((AbstractAuditor)delegatedAuditor).onMessageExchange(exchange);
+		}
+	}
+
+	public String getDescription() {
+		return "Lucene Auditor";
+	}
+
+	public String[] findExchangesIDsByStatus(ExchangeStatus status) throws AuditorException {
+		String field = "org.servicemix.exchangestatus";
+		return getExchangeIds(field,String.valueOf(status));
+	}
+
+	public String[] findExchangesIDsByMessageContent(String type, String content) throws AuditorException {
+		String field = "org.servicemix."+type+".contents";
+		return getExchangeIds(field,content);
+	}
+
+	public String[] findExchangesIDsByMessageProperty(String type, String property, String value) throws AuditorException {
+		if (property != null && !property.startsWith("org.servicemix")) {
+			property = "org.servicemix."+type+"."+property;
+		}
+		return getExchangeIds(property,value);
+	}
+	
+	protected Document createDocument(MessageExchange me) throws MessagingException {
+		try {
+			//This could be in a separated class (a LuceneDocumentProvider)
+			SourceTransformer st = new SourceTransformer();
+			Document d = new Document();
+			d.add(Field.Keyword("org.servicemix.exchangeid",me.getExchangeId()));
+			d.add(Field.Keyword("org.servicemix.exchangestatus",String.valueOf(me.getStatus())));
+			
+			String[] types = {"in","out","fault"};
+			for (int i=0;i<types.length;i++) {
+				String type = types[i];
+				NormalizedMessage nm = me.getMessage(type);
+				if (nm != null) {
+					d.add(Field.UnStored("org.servicemix."+type+".contents",st.contentToString(nm)));
+					addMessagePropertiesToDocument(nm,d,type);
+				}
+			}
+			return d;
+		} catch (MessagingException mse) {
+			throw mse;
+		} catch (Exception ex) {
+			throw new MessagingException("Error while creating Lucene Document",ex);
+		}
+	}
+	
+	protected void addMessagePropertiesToDocument(NormalizedMessage nm, 
+			Document document, String type) throws MessagingException {
+		Set propertyNames = nm.getPropertyNames();
+		for (Iterator iter = propertyNames.iterator(); iter.hasNext();) {
+			String propertyName = (String) iter.next();
+			Object value = nm.getProperty(propertyName);
+			if (value instanceof String) 
+				//org.servicemix.out.myproperty
+				document.add(Field.Keyword("org.servicemix."+type+"."+propertyName,String.valueOf(value)));
+		}
+	}
+	
+	public String[] getExchangeIds(String queryContent, String field) throws AuditorException {
+		DefaultLuceneCallback dfc = new DefaultLuceneCallback(queryContent,field);
+		try {
+			return (String[])luceneIndexer.search(dfc);
+		} catch (IOException e) {
+			throw new AuditorException("Error while getting Exchange IDs",e);
+		}
+	}
+}

Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneCallback.java (1008 => 1009)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneCallback.java	2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneCallback.java	2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,40 @@
+/** 
+ * <a href="" The open source ESB</a> 
+ * 
+ * Copyright 2005 Datasul B2B Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.jbi.audit.lucene;
+
+import java.io.IOException;
+
+import org.apache.lucene.search.IndexSearcher;
+
+/**
+ * Lucene Callback Interface. 
+ * Used on searching to be executed on synchronized blocks. 
+ * @author George Gastaldi
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+public interface LuceneCallback {
+	/**
+	 * Called by the LuceneIndexer 
+	 * @param is IndexSearcher provided by the indexer
+	 * @return an object from the query
+	 * @throws IOException if an error occurs during opening/searching of the index
+	 */	
+	Object doCallback(IndexSearcher is) throws IOException;
+}

Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneIndexer.java (1008 => 1009)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneIndexer.java	2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneIndexer.java	2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,125 @@
+/** 
+ * <a href="" The open source ESB</a> 
+ * 
+ * Copyright 2005 Datasul B2B Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.jbi.audit.lucene;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lucene.analysis.SimpleAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.FSDirectory;
+/**
+ * Utility class for Lucene API.
+ * @author george
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+
+public class LuceneIndexer {
+    protected FSDirectory directory;
+    private File segmentFile;
+    
+    public LuceneIndexer() {
+    }
+    
+    public FSDirectory getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(FSDirectory directory) {
+        this.directory = directory;
+    }
+
+    public void setDirectoryName(File directoryName) throws IOException  {
+    	this.segmentFile = new File(directoryName,"segments");
+    	this.directory = FSDirectory.getDirectory(directoryName.toString(),!this.segmentFile.exists());
+    }
+    
+    /**
+     * Drop object from Lucene index
+     */
+    protected void remove(String id) throws IOException {
+        synchronized (directory) {
+            IndexReader ir = IndexReader.open(directory); 
+            try{
+                ir.delete(new Term("org.servicemix.exchangeid", id));
+            }
+            finally{
+                ir.close();    
+            }
+        }
+    }
+    
+    protected void remove(String[] ids) throws IOException {
+    	if (ids != null && ids.length > 0) {
+	        synchronized (directory) {
+	            IndexReader ir = IndexReader.open(directory); 
+	            try{
+	            	for (int i=0;i<ids.length;i++)
+	            		ir.delete(new Term("org.servicemix.exchangeid", ids[i]));
+	            }
+	            finally{
+	                ir.close();    
+	            }
+	        }
+    	}
+    }
+    
+    /**
+     * Add object to Lucene index
+     */
+    public void add(Document lucDoc, String id) throws IOException {
+        synchronized (directory) {
+            IndexWriter writer = new IndexWriter(directory, new SimpleAnalyzer(), !segmentFile.exists());
+            try{
+                writer.addDocument(lucDoc);
+            }
+            finally{
+                writer.close();    
+            }
+        }
+    }
+    
+    /**
+     * called when an existing document is updated.
+     */
+    public void update(Document lucDoc, String id) throws IOException {
+        remove(id);
+        add(lucDoc,id);
+    }
+    
+    
+    public Object search (LuceneCallback lc) throws IOException {
+        synchronized (directory) {
+            IndexReader ir = IndexReader.open(directory);
+            IndexSearcher is = new IndexSearcher(ir);
+            try{
+                return lc.doCallback(is);
+            }
+            finally{
+            	is.close();
+                ir.close();    
+            }
+        }
+    }
+}
\ No newline at end of file

Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/package.html (1008 => 1009)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/package.html	2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/package.html	2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,9 @@
+<html>
+<head>
+</head>
+<body>
+
+Lucene auditor query implementation.
+
+</body>
+</html>

Reply via email to