Author: btellier
Date: Sat Nov 28 13:09:44 2015
New Revision: 1716966

URL: http://svn.apache.org/viewvc?rev=1716966&view=rev
Log:
MAILBOX-211 Adding a Kafka publishing system

Added:
    james/project/trunk/mailbox/kafka/
    james/project/trunk/mailbox/kafka/pom.xml
    james/project/trunk/mailbox/kafka/src/
    james/project/trunk/mailbox/kafka/src/main/
    james/project/trunk/mailbox/kafka/src/main/java/
    james/project/trunk/mailbox/kafka/src/main/java/org/
    james/project/trunk/mailbox/kafka/src/main/java/org/apache/
    james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/
    james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/
    
james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/
    
james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaMessageConsumer.java
    
james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaPublisher.java
Modified:
    james/project/trunk/mailbox/pom.xml
    james/project/trunk/server/app/pom.xml
    james/project/trunk/server/pom.xml

Added: james/project/trunk/mailbox/kafka/pom.xml
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/kafka/pom.xml?rev=1716966&view=auto
==============================================================================
--- james/project/trunk/mailbox/kafka/pom.xml (added)
+++ james/project/trunk/mailbox/kafka/pom.xml Sat Nov 28 13:09:44 2015
@@ -0,0 +1,249 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>apache-james-mailbox</artifactId>
+        <groupId>org.apache.james</groupId>
+        <version>0.6-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <name>Apache James :: Mailbox :: Kafka</name>
+
+    <artifactId>apache-james-mailbox-kafka</artifactId>
+
+    <profiles>
+        <profile>
+            <id>disable-build-for-older-jdk</id>
+            <activation>
+                <jdk>(,1.8)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-jar-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>default-jar</id>
+                                <phase>none</phase>
+                            </execution>
+                            <execution>
+                                <id>jar</id>
+                                <phase>none</phase>
+                            </execution>
+                            <execution>
+                                <id>test-jar</id>
+                                <phase>none</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>default-compile</id>
+                                <phase>none</phase>
+                            </execution>
+                            <execution>
+                                <id>default-testCompile</id>
+                                <phase>none</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>default-test</id>
+                                <phase>none</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <artifactId>maven-source-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>attach-sources</id>
+                                <phase>none</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <artifactId>maven-install-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>default-install</id>
+                                <phase>none</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <artifactId>maven-resources-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>default-resources</id>
+                                <phase>none</phase>
+                            </execution>
+                            <execution>
+                                <id>default-testResources</id>
+                                <phase>none</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <artifactId>maven-site-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>attach-descriptor</id>
+                                <phase>none</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>build-for-jdk-8</id>
+            <activation>
+                <jdk>[1.8,)</jdk>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>${project.groupId}</groupId>
+                    <artifactId>apache-james-mailbox-api</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>${project.groupId}</groupId>
+                    <artifactId>apache-james-mailbox-api</artifactId>
+                    <scope>test</scope>
+                    <type>test-jar</type>
+                </dependency>
+                <dependency>
+                    <groupId>${project.groupId}</groupId>
+                    <artifactId>apache-james-mailbox-store</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>${project.groupId}</groupId>
+                    <artifactId>apache-james-mailbox-store</artifactId>
+                    <scope>test</scope>
+                    <type>test-jar</type>
+                </dependency>
+
+                <dependency>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                    <scope>test</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.kafka</groupId>
+                    <artifactId>kafka_2.10</artifactId>
+                    <version>0.8.2.2</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.assertj</groupId>
+                    <artifactId>assertj-core</artifactId>
+                    <scope>test</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.mockito</groupId>
+                    <artifactId>mockito-core</artifactId>
+                    <scope>test</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-simple</artifactId>
+                    <scope>test</scope>
+                </dependency>
+            </dependencies>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <archive>
+                                <manifest>
+                                    
<mainClass>fully.qualified.MainClass</mainClass>
+                                </manifest>
+                            </archive>
+                            <descriptorRefs>
+                                
<descriptorRef>jar-with-dependencies</descriptorRef>
+                            </descriptorRefs>
+                        </configuration>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-compiler-plugin</artifactId>
+                        <configuration>
+                            <source>1.8</source>
+                            <target>1.8</target>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>disable-animal-sniffer</id>
+            <activation>
+                <jdk>[1.6,)</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>animal-sniffer-maven-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>check_java_6</id>
+                                <phase>none</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>fully.qualified.MainClass</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

Added: 
james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaMessageConsumer.java?rev=1716966&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaMessageConsumer.java
 (added)
+++ 
james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaMessageConsumer.java
 Sat Nov 28 13:09:44 2015
@@ -0,0 +1,130 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.kafka;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import org.apache.james.mailbox.store.publisher.MessageConsumer;
+import org.apache.james.mailbox.store.publisher.MessageReceiver;
+import org.apache.james.mailbox.store.publisher.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class KafkaMessageConsumer implements MessageConsumer {
+
+    private class Consumer implements Runnable {
+
+        private KafkaStream m_stream;
+
+        public Consumer(KafkaStream a_stream) {
+            m_stream = a_stream;
+        }
+
+        @SuppressWarnings("unchecked")
+        public void run() {
+            for (MessageAndMetadata<byte[], byte[]> aM_stream : 
(Iterable<MessageAndMetadata<byte[], byte[]>>) m_stream) {
+                messageReceiver.receiveSerializedEvent(aM_stream.message());
+            }
+        }
+    }
+
+    private static final String ZK_SESSION_TIMEOUT = "400";
+    private static final String ZK_SYNC_TIME = "200";
+    private static final String AUTO_COMMIT8INTERVAL_MS ="1000";
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaMessageConsumer.class);
+
+    private final ConsumerConnector consumer;
+    private final int numberOfTread;
+    private MessageReceiver messageReceiver;
+    private ExecutorService executor;
+    private boolean isInitialized;
+
+
+    public KafkaMessageConsumer(String zookeeperConnectionString,
+                                String groupId,
+                                int numberOfThread) {
+        this.consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeperConnectionString,
 groupId));
+        this.numberOfTread = numberOfThread;
+        this.isInitialized = false;
+    }
+
+    @Override
+    public void setMessageReceiver(MessageReceiver messageReceiver) {
+        if (!isInitialized) {
+            this.messageReceiver = messageReceiver;
+        } else {
+            throw new RuntimeException("Can not change the MessageReceiver of 
a running KafkaMessageConsumer");
+        }
+    }
+
+    @PreDestroy
+    public void destroy() {
+        if (consumer != null) consumer.shutdown();
+        if (executor != null) executor.shutdown();
+        this.isInitialized = false;
+    }
+
+    @PostConstruct
+    public void init(Topic topic) {
+        if(!isInitialized) {
+            this.isInitialized = true;
+            List<KafkaStream<byte[], byte[]>> streams = 
getKafkaStreams(topic.getValue());
+            executor = Executors.newFixedThreadPool(numberOfTread);
+            startConsuming(streams);
+        } else {
+            LOG.warn("This Kafka Message Receiver was already launched.");
+        }
+    }
+
+    private List<KafkaStream<byte[], byte[]>> getKafkaStreams(String topic) {
+        Map<String, Integer> topicCountMap = new HashMap<>();
+        topicCountMap.put(topic, numberOfTread);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
+        return consumerMap.get(topic);
+    }
+
+    private void startConsuming(List<KafkaStream<byte[], byte[]>> streams) {
+        for (final KafkaStream stream : streams) {
+            executor.submit(new Consumer(stream));
+        }
+    }
+
+    private ConsumerConfig createConsumerConfig(String 
zookeeperConnectionString, String groupId) {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", zookeeperConnectionString);
+        props.put("group.id", groupId);
+        props.put("zookeeper.session.timeout.ms", ZK_SESSION_TIMEOUT);
+        props.put("zookeeper.sync.time.ms", ZK_SYNC_TIME);
+        props.put("auto.commit.interval.ms", AUTO_COMMIT8INTERVAL_MS);
+        return new ConsumerConfig(props);
+    }
+
+}
\ No newline at end of file

Added: 
james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaPublisher.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaPublisher.java?rev=1716966&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaPublisher.java
 (added)
+++ 
james/project/trunk/mailbox/kafka/src/main/java/org/apache/james/mailbox/kafka/KafkaPublisher.java
 Sat Nov 28 13:09:44 2015
@@ -0,0 +1,77 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.kafka;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.james.mailbox.store.publisher.Publisher;
+import org.apache.james.mailbox.store.publisher.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Properties;
+
+public class KafkaPublisher implements Publisher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaPublisher.class);
+
+    private Producer<String, byte[]> producer;
+    private final int kafka_port;
+    private final String kafka_ip;
+    private boolean producerLaunched;
+
+    public KafkaPublisher(String kafkaHostIpString, int kafka_port) {
+        this.kafka_ip = kafkaHostIpString;
+        this.kafka_port = kafka_port;
+        producerLaunched = false;
+    }
+
+    @PostConstruct
+    @Override
+    public void init() {
+        if (!producerLaunched) {
+            Properties props = new Properties();
+            props.put("metadata.broker.list", kafka_ip + ":" + kafka_port);
+            props.put("serializer.class", "kafka.serializer.DefaultEncoder");
+            props.put("request.required.acks", "1");
+            ProducerConfig config = new ProducerConfig(props);
+            producer = new Producer<>(config);
+            producerLaunched = true;
+        } else {
+            LOG.warn("Kafka producer was already instantiated");
+        }
+    }
+
+
+    @Override
+    public void publish(Topic topic, byte[] message) {
+        producer.send(new KeyedMessage<>(topic.getValue(), message));
+    }
+
+    @PreDestroy
+    @Override
+    public void close() {
+        producer.close();
+    }
+
+}

Modified: james/project/trunk/mailbox/pom.xml
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/pom.xml?rev=1716966&r1=1716965&r2=1716966&view=diff
==============================================================================
--- james/project/trunk/mailbox/pom.xml (original)
+++ james/project/trunk/mailbox/pom.xml Sat Nov 28 13:09:44 2015
@@ -63,6 +63,7 @@
         <module>tika</module>
         <module>tool</module>
         <module>zoo-seq-provider</module>
+        <module>kafka</module>
     </modules>
 
     <scm>

Modified: james/project/trunk/server/app/pom.xml
URL: 
http://svn.apache.org/viewvc/james/project/trunk/server/app/pom.xml?rev=1716966&r1=1716965&r2=1716966&view=diff
==============================================================================
--- james/project/trunk/server/app/pom.xml (original)
+++ james/project/trunk/server/app/pom.xml Sat Nov 28 13:09:44 2015
@@ -417,6 +417,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.james</groupId>
+            <artifactId>apache-james-mailbox-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.james</groupId>
             <artifactId>apache-james-mailbox-spring</artifactId>
             <scope>runtime</scope>
             <exclusions>

Modified: james/project/trunk/server/pom.xml
URL: 
http://svn.apache.org/viewvc/james/project/trunk/server/pom.xml?rev=1716966&r1=1716965&r2=1716966&view=diff
==============================================================================
--- james/project/trunk/server/pom.xml (original)
+++ james/project/trunk/server/pom.xml Sat Nov 28 13:09:44 2015
@@ -550,6 +550,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.james</groupId>
+                <artifactId>apache-james-mailbox-kafka</artifactId>
+                <version>${mailbox.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.james</groupId>
                 <artifactId>apache-james-mailbox-api</artifactId>
                 <version>${mailbox.version}</version>
             </dependency>



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to