Diff
Modified: trunk/core/pom.xml (1008 => 1009)
--- trunk/core/pom.xml 2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/pom.xml 2005-12-05 15:41:37 UTC (rev 1009)
@@ -194,7 +194,12 @@
<version>1.2_Java1.3</version>
<scope>test</scope>
</dependency>
-
+
+ <dependency>
+ <groupId>lucene</groupId>
+ <artifactId>lucene</artifactId>
+ <version>1.4.3</version>
+ </dependency>
</dependencies>
<build>
Modified: trunk/core/project.xml (1008 => 1009)
--- trunk/core/project.xml 2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/project.xml 2005-12-05 15:41:37 UTC (rev 1009)
@@ -54,6 +54,11 @@
</dependency>
<dependency>
+ <id>lucene</id>
+ <version>${lucene_version}</version>
+ </dependency>
+
+ <dependency>
<id>concurrent</id>
<version>${concurrent_version}</version>
</dependency>
Added: trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorQueryMBean.java (1008 => 1009)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorQueryMBean.java 2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorQueryMBean.java 2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,44 @@
+/**
+ * <a href="" The open source ESB</a>
+ *
+ * Copyright 2005 Datasul B2B Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi.audit;
+
+import javax.jbi.messaging.ExchangeStatus;
+
+/**
+ * Main interface for ServiceMix auditor query.
+ * This interface may be used to query upon exchanges.
+ *
+ * @author George Gastaldi (gastaldi)
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+public interface AuditorQueryMBean extends AuditorMBean {
+ String[] findExchangesIDsByStatus(ExchangeStatus status) throws AuditorException;
+ String[] findExchangesIDsByMessageContent(String type, String content) throws AuditorException;
+ String[] findExchangesIDsByMessageProperty(String type, String property, String value) throws AuditorException;
+
+ /**
+ * Searches for Exchanges IDs using the supplied key-field and the expected content of the field
+ * @param field
+ * @param fieldValue
+ * @return exchange ids
+ * @throws AuditorException if an error occurs
+ */
+ String[] getExchangeIds(String field, String fieldValue) throws AuditorException;
+}
\ No newline at end of file
Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/DefaultLuceneCallback.java (1008 => 1009)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/DefaultLuceneCallback.java 2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/DefaultLuceneCallback.java 2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,62 @@
+/**
+ * <a href="" The open source ESB</a>
+ *
+ * Copyright 2005 Datasul B2B Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi.audit.lucene;
+
+import java.io.IOException;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.lucene.queryParser.QueryParser;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+
+/**
+ * Default Lucene Callback implementation. Used on LuceneAuditor
+ * @author George Gastaldi (gastaldi)
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+public class DefaultLuceneCallback implements LuceneCallback {
+ private String field;
+ private String query;
+
+ public DefaultLuceneCallback(String field, String query) {
+ this.field = field;
+ this.query = query;
+ }
+
+ public Object doCallback(IndexSearcher is) throws IOException {
+ try {
+ Query queryObj = QueryParser.parse(query, field, new StandardAnalyzer());
+ Hits hits = is.search(queryObj);
+ int total = hits.length();
+ String[] ids = new String[total];
+ for (int i = 0; i < total; i++) {
+ Document d = hits.doc(i);
+ ids[i] = d.get("org.servicemix.exchangeid");
+ }
+ return ids;
+ } catch (ParseException pe) {
+ return new String[0];
+ }
+ }
+
+}
Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneAuditor.java (1008 => 1009)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneAuditor.java 2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneAuditor.java 2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,205 @@
+/**
+ * <a href="" The open source ESB</a>
+ *
+ * Copyright 2005 Datasul B2B Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
+package org.servicemix.jbi.audit.lucene;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.servicemix.jbi.audit.AbstractAuditor;
+import org.servicemix.jbi.audit.AuditorException;
+import org.servicemix.jbi.audit.AuditorMBean;
+import org.servicemix.jbi.audit.AuditorQueryMBean;
+import org.servicemix.jbi.jaxp.SourceTransformer;
+
+/**
+ * Lucene AuditorQuery implementation.
+ * It uses Lucene as the indexing mechanism for searching Exchanges
+ * and needs a delegated AuditorMBean to persist Exchanges.
+ *
+ * The Content of messages are stored as:
+ * - org.servicemix.in.contents
+ * - org.servicemix.out.contents, if exists
+ * - org.servicemix.fault.contents, if exists
+ *
+ * Properties for IN Messages are stored as:
+ * - org.servicemix.in.propertyname
+ * - org.servicemix.out.propertyname, if exists
+ * - org.servicemix.fault.propertyname, if exists
+ *
+ * @author George Gastaldi
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+public class LuceneAuditor extends AbstractAuditor implements AuditorQueryMBean {
+ private AuditorMBean delegatedAuditor;
+ private LuceneIndexer luceneIndexer = new LuceneIndexer();
+
+ protected void doStart() throws JBIException {
+ super.doStart();
+ if (delegatedAuditor == null) {
+ throw new JBIException("A delegated auditor must be provided");
+ }
+ }
+
+ /**
+ * @return Returns the luceneIndexer.
+ */
+ public LuceneIndexer getLuceneIndexer() {
+ return luceneIndexer;
+ }
+
+ /**
+ * @param luceneIndexer The luceneIndexer to set.
+ */
+ public void setLuceneIndexer(LuceneIndexer luceneIndexer) {
+ this.luceneIndexer = luceneIndexer;
+ }
+
+ /**
+ * @return Returns the delegatedAuditor.
+ */
+ public AuditorMBean getDelegatedAuditor() {
+ return delegatedAuditor;
+ }
+
+ /**
+ * @param delegatedAuditor The delegatedAuditor to set.
+ */
+ public void setDelegatedAuditor(AuditorMBean delegatedAuditor) {
+ this.delegatedAuditor = delegatedAuditor;
+ if (delegatedAuditor instanceof AbstractAuditor) {
+ ((AbstractAuditor)delegatedAuditor).setAsContainerListener(false);
+ }
+ }
+
+ public int getExchangeCount() throws AuditorException {
+ return this.delegatedAuditor.getExchangeCount();
+ }
+
+ public String[] getExchangeIds(int fromIndex, int toIndex) throws AuditorException {
+ return this.delegatedAuditor.getExchangeIds(fromIndex,toIndex);
+ }
+
+ public MessageExchange[] getExchanges(String[] ids) throws AuditorException {
+ return this.delegatedAuditor.getExchanges(ids);
+ }
+
+ public int deleteExchanges(int fromIndex, int toIndex) throws AuditorException {
+ //TODO: Remove ids from Lucene Index
+ return this.delegatedAuditor.deleteExchanges(fromIndex,toIndex);
+ }
+
+ public int deleteExchanges(String[] ids) throws AuditorException {
+ try {
+ this.luceneIndexer.remove(ids);
+ } catch (IOException io) {
+ throw new AuditorException(io);
+ }
+ return this.delegatedAuditor.deleteExchanges(ids);
+ }
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ try {
+ Document doc = createDocument(exchange);
+ this.luceneIndexer.add(doc,exchange.getExchangeId());
+ } catch (IOException e) {
+ throw new MessagingException("Error while adding to lucene",e);
+ }
+ if (delegatedAuditor instanceof AbstractAuditor) {
+ ((AbstractAuditor)delegatedAuditor).onMessageExchange(exchange);
+ }
+ }
+
+ public String getDescription() {
+ return "Lucene Auditor";
+ }
+
+ public String[] findExchangesIDsByStatus(ExchangeStatus status) throws AuditorException {
+ String field = "org.servicemix.exchangestatus";
+ return getExchangeIds(field,String.valueOf(status));
+ }
+
+ public String[] findExchangesIDsByMessageContent(String type, String content) throws AuditorException {
+ String field = "org.servicemix."+type+".contents";
+ return getExchangeIds(field,content);
+ }
+
+ public String[] findExchangesIDsByMessageProperty(String type, String property, String value) throws AuditorException {
+ if (property != null && !property.startsWith("org.servicemix")) {
+ property = "org.servicemix."+type+"."+property;
+ }
+ return getExchangeIds(property,value);
+ }
+
+ protected Document createDocument(MessageExchange me) throws MessagingException {
+ try {
+ //This could be in a separated class (a LuceneDocumentProvider)
+ SourceTransformer st = new SourceTransformer();
+ Document d = new Document();
+ d.add(Field.Keyword("org.servicemix.exchangeid",me.getExchangeId()));
+ d.add(Field.Keyword("org.servicemix.exchangestatus",String.valueOf(me.getStatus())));
+
+ String[] types = {"in","out","fault"};
+ for (int i=0;i<types.length;i++) {
+ String type = types[i];
+ NormalizedMessage nm = me.getMessage(type);
+ if (nm != null) {
+ d.add(Field.UnStored("org.servicemix."+type+".contents",st.contentToString(nm)));
+ addMessagePropertiesToDocument(nm,d,type);
+ }
+ }
+ return d;
+ } catch (MessagingException mse) {
+ throw mse;
+ } catch (Exception ex) {
+ throw new MessagingException("Error while creating Lucene Document",ex);
+ }
+ }
+
+ protected void addMessagePropertiesToDocument(NormalizedMessage nm,
+ Document document, String type) throws MessagingException {
+ Set propertyNames = nm.getPropertyNames();
+ for (Iterator iter = propertyNames.iterator(); iter.hasNext();) {
+ String propertyName = (String) iter.next();
+ Object value = nm.getProperty(propertyName);
+ if (value instanceof String)
+ //org.servicemix.out.myproperty
+ document.add(Field.Keyword("org.servicemix."+type+"."+propertyName,String.valueOf(value)));
+ }
+ }
+
+ public String[] getExchangeIds(String queryContent, String field) throws AuditorException {
+ DefaultLuceneCallback dfc = new DefaultLuceneCallback(queryContent,field);
+ try {
+ return (String[])luceneIndexer.search(dfc);
+ } catch (IOException e) {
+ throw new AuditorException("Error while getting Exchange IDs",e);
+ }
+ }
+}
Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneCallback.java (1008 => 1009)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneCallback.java 2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneCallback.java 2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,40 @@
+/**
+ * <a href="" The open source ESB</a>
+ *
+ * Copyright 2005 Datasul B2B Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi.audit.lucene;
+
+import java.io.IOException;
+
+import org.apache.lucene.search.IndexSearcher;
+
+/**
+ * Lucene Callback Interface.
+ * Used on searching to be executed on synchronized blocks.
+ * @author George Gastaldi
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+public interface LuceneCallback {
+ /**
+ * Called by the LuceneIndexer
+ * @param is IndexSearcher provided by the indexer
+ * @return an object from the query
+ * @throws IOException if an error occurs during opening/searching of the index
+ */
+ Object doCallback(IndexSearcher is) throws IOException;
+}
Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneIndexer.java (1008 => 1009)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneIndexer.java 2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/LuceneIndexer.java 2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,125 @@
+/**
+ * <a href="" The open source ESB</a>
+ *
+ * Copyright 2005 Datasul B2B Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi.audit.lucene;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lucene.analysis.SimpleAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.FSDirectory;
+/**
+ * Utility class for Lucene API.
+ * @author george
+ * @since 2.1
+ * @version $Revision: 001 $
+ */
+
+public class LuceneIndexer {
+ protected FSDirectory directory;
+ private File segmentFile;
+
+ public LuceneIndexer() {
+ }
+
+ public FSDirectory getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(FSDirectory directory) {
+ this.directory = directory;
+ }
+
+ public void setDirectoryName(File directoryName) throws IOException {
+ this.segmentFile = new File(directoryName,"segments");
+ this.directory = FSDirectory.getDirectory(directoryName.toString(),!this.segmentFile.exists());
+ }
+
+ /**
+ * Drop object from Lucene index
+ */
+ protected void remove(String id) throws IOException {
+ synchronized (directory) {
+ IndexReader ir = IndexReader.open(directory);
+ try{
+ ir.delete(new Term("org.servicemix.exchangeid", id));
+ }
+ finally{
+ ir.close();
+ }
+ }
+ }
+
+ protected void remove(String[] ids) throws IOException {
+ if (ids != null && ids.length > 0) {
+ synchronized (directory) {
+ IndexReader ir = IndexReader.open(directory);
+ try{
+ for (int i=0;i<ids.length;i++)
+ ir.delete(new Term("org.servicemix.exchangeid", ids[i]));
+ }
+ finally{
+ ir.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Add object to Lucene index
+ */
+ public void add(Document lucDoc, String id) throws IOException {
+ synchronized (directory) {
+ IndexWriter writer = new IndexWriter(directory, new SimpleAnalyzer(), !segmentFile.exists());
+ try{
+ writer.addDocument(lucDoc);
+ }
+ finally{
+ writer.close();
+ }
+ }
+ }
+
+ /**
+ * called when an existing document is updated.
+ */
+ public void update(Document lucDoc, String id) throws IOException {
+ remove(id);
+ add(lucDoc,id);
+ }
+
+
+ public Object search (LuceneCallback lc) throws IOException {
+ synchronized (directory) {
+ IndexReader ir = IndexReader.open(directory);
+ IndexSearcher is = new IndexSearcher(ir);
+ try{
+ return lc.doCallback(is);
+ }
+ finally{
+ is.close();
+ ir.close();
+ }
+ }
+ }
+}
\ No newline at end of file
Added: trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/package.html (1008 => 1009)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/package.html 2005-12-05 07:23:27 UTC (rev 1008)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/lucene/package.html 2005-12-05 15:41:37 UTC (rev 1009)
@@ -0,0 +1,9 @@
+<html>
+<head>
+</head>
+<body>
+
+Lucene auditor query implementation.
+
+</body>
+</html>