Author: norman
Date: Thu Mar 24 18:51:30 2011
New Revision: 1085069
URL: http://svn.apache.org/viewvc?rev=1085069&view=rev
Log:
Use Netty specific optimizations to make sure we don't OOM because of slow
clients even if using NIO. This fix should work with NIO and Old-IO.See IMAP-265
Added:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChunkFetchProcessor.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/FetchChunkedInput.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapProcessorFactory.java
Modified:
james/server/trunk/imapserver/pom.xml
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelWritableByteChannel.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
Modified: james/server/trunk/imapserver/pom.xml
URL:
http://svn.apache.org/viewvc/james/server/trunk/imapserver/pom.xml?rev=1085069&r1=1085068&r2=1085069&view=diff
==============================================================================
--- james/server/trunk/imapserver/pom.xml (original)
+++ james/server/trunk/imapserver/pom.xml Thu Mar 24 18:51:30 2011
@@ -41,6 +41,10 @@
<artifactId>apache-james-imap-message</artifactId>
</dependency>
<dependency>
+<groupId>org.apache.james</groupId>
+<artifactId>apache-james-imap-processor</artifactId>
+</dependency>
+<dependency>
<groupId>org.apache.james.protocols</groupId>
<artifactId>protocols-impl</artifactId>
</dependency>
Modified:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java?rev=1085069&r1=1085068&r2=1085069&view=diff
==============================================================================
---
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java
(original)
+++
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java
Thu Mar 24 18:51:30 2011
@@ -27,6 +27,7 @@ import org.apache.james.imap.main.Abstra
import org.apache.james.imap.message.response.Literal;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
/**
* {@link AbstractImapResponseWriter} implementation which writes the data to
a {@link Channel}
@@ -47,7 +48,11 @@ public class ChannelImapResponseWriter e
* @see
org.apache.james.imap.main.AbstractImapResponseWriter#write(java.nio.ByteBuffer)
*/
protected void write(ByteBuffer buffer) throws IOException {
- channel.write(ChannelBuffers.wrappedBuffer(buffer));
+ ChannelFuture f =
channel.write(ChannelBuffers.wrappedBuffer(buffer)).awaitUninterruptibly();
+ Throwable t = f.getCause();
+ if (t != null) {
+ throw new IOException(t);
+ }
}
/*
Modified:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelWritableByteChannel.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelWritableByteChannel.java?rev=1085069&r1=1085068&r2=1085069&view=diff
==============================================================================
---
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelWritableByteChannel.java
(original)
+++
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelWritableByteChannel.java
Thu Mar 24 18:51:30 2011
@@ -24,6 +24,7 @@ import java.nio.channels.WritableByteCha
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
/**
* Some adapter class which allows to write to {@link Channel} via a {@link
WritableByteChannel} interface
@@ -55,7 +56,11 @@ public class ChannelWritableByteChannel
byte data[] = new byte[src.remaining()];
src.get(data);
- channel.write(ChannelBuffers.wrappedBuffer(data));
+ ChannelFuture future =
channel.write(ChannelBuffers.wrappedBuffer(data)).awaitUninterruptibly();
+ Throwable t = future.getCause();
+ if (t != null) {
+ throw new IOException(t);
+ }
return data.length;
}
Added:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChunkFetchProcessor.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChunkFetchProcessor.java?rev=1085069&view=auto
==============================================================================
---
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChunkFetchProcessor.java
(added)
+++
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChunkFetchProcessor.java
Thu Mar 24 18:51:30 2011
@@ -0,0 +1,56 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import java.util.List;
+
+import org.apache.james.imap.api.message.FetchData;
+import org.apache.james.imap.api.message.response.StatusResponseFactory;
+import org.apache.james.imap.api.process.ImapProcessor;
+import org.apache.james.imap.api.process.ImapSession;
+import org.apache.james.imap.processor.fetch.FetchProcessor;
+import org.apache.james.mailbox.MailboxException;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageRange;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * {@link FetchProcessor} implementation which does some optimization to
support good performance in NIO and IO related to NETTY
+ *
+ *
+ */
+public class ChunkFetchProcessor extends FetchProcessor {
+
+ public ChunkFetchProcessor(ImapProcessor next, MailboxManager
mailboxManager, StatusResponseFactory factory, int batchSize) {
+ super(next, mailboxManager, factory, batchSize);
+ }
+
+ /**
+ * Wrap the given parameters in a {@link FetchChunkedInput} and write it
the the {@link Channel}
+ *
+ */
+ @Override
+ protected void processMessageRanges(ImapSession session, MessageManager mailbox,
List<MessageRange> range, FetchData fetch, boolean useUids, MailboxSession
mailboxSession, Responder responder) throws MailboxException {
+ Channel channel = ((NettyImapSession) session).getChannel();
+ channel.write(new FetchChunkedInput(session, mailbox, range, fetch,
getFetchGroup(fetch), useUids, mailboxSession, responder));
+ }
+
+}
Added:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/FetchChunkedInput.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/FetchChunkedInput.java?rev=1085069&view=auto
==============================================================================
---
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/FetchChunkedInput.java
(added)
+++
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/FetchChunkedInput.java
Thu Mar 24 18:51:30 2011
@@ -0,0 +1,126 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.james.imap.api.message.FetchData;
+import org.apache.james.imap.api.process.ImapSession;
+import org.apache.james.imap.api.process.ImapProcessor.Responder;
+import org.apache.james.imap.message.response.FetchResponse;
+import org.apache.james.imap.processor.fetch.EnvelopeBuilder;
+import org.apache.james.imap.processor.fetch.FetchResponseBuilder;
+import org.apache.james.mailbox.MailboxException;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageRange;
+import org.apache.james.mailbox.MessageRangeException;
+import org.apache.james.mailbox.MessageResult;
+import org.apache.james.mailbox.MessageManager.MessageCallback;
+import org.apache.james.mailbox.MessageResult.FetchGroup;
+import org.apache.james.mime4j.field.address.parser.ParseException;
+import org.jboss.netty.handler.stream.ChunkedInput;
+
+/**
+ * {@link ChunkedInput} implementation which fetch {@link MessageRange} in
batches and hand them over the the {@link Responder}
+ *
+ *
+ *
+ */
+public class FetchChunkedInput implements ChunkedInput {
+
+ private Iterator<MessageRange> ranges;
+ private MessageManager mailbox;
+ private ImapSession session;
+ private FetchData fetch;
+ private boolean useUids;
+ private MailboxSession mailboxSession;
+ private FetchGroup group;
+ private FetchResponseBuilder builder;
+ private Responder responder;
+
+ public FetchChunkedInput(final ImapSession session, final MessageManager
mailbox, final List<MessageRange> ranges, final FetchData fetch, FetchGroup
group, final boolean useUids, final MailboxSession mailboxSession, final Responder
responder) {
+ this.ranges = ranges.iterator();
+ this.mailbox = mailbox;
+ this.session = session;
+ this.fetch = fetch;
+ this.useUids = useUids;
+ this.mailboxSession = mailboxSession;
+ this.group = group;
+ builder = new FetchResponseBuilder(new
EnvelopeBuilder(session.getLog()));
+ this.responder = responder;
+
+ }
+
+ /**
+ * Do nothing
+ */
+ public void close() throws Exception {
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.jboss.netty.handler.stream.ChunkedInput#hasNextChunk()
+ */
+ public boolean hasNextChunk() throws Exception {
+ return ranges.hasNext();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.jboss.netty.handler.stream.ChunkedInput#isEndOfInput()
+ */
+ public boolean isEndOfInput() throws Exception {
+ return !ranges.hasNext();
+ }
+
+ /**
+ * Fetch the next batch of messages and write the {@link FetchResponse} to
the {@link Responder}. After that is done this method will return null
+ *
+ * TODO: Maybe it would make sense to only write one FetchResponse per
{@link #nextChunk()} call. Need to test this
+ *
+ */
+ public Object nextChunk() throws Exception {
+ if (hasNextChunk()) {
+ mailbox.getMessages(ranges.next(), group, mailboxSession, new
MessageCallback() {
+
+ public void onMessages(Iterator<MessageResult> it) throws
MailboxException {
+ while (it.hasNext()) {
+ final MessageResult result = it.next();
+ try {
+ final FetchResponse response =
builder.build(fetch, result, mailbox, session, useUids);
+ responder.respond(response);
+ } catch (ParseException e) {
+ // we can't for whatever reason parse the
+ // message so just skip it and log it to debug
+ session.getLog().debug("Unable to parse message with
uid " + result.getUid(), e);
+ } catch (MessageRangeException e) {
+ // we can't for whatever reason find the message
+ // so just skip it and log it to debug
+ session.getLog().debug("Unable to find message with uid
" + result.getUid(), e);
+ }
+ }
+ }
+ });
+ }
+
+ return null;
+ }
+}
Modified:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java?rev=1085069&r1=1085068&r2=1085069&view=diff
==============================================================================
---
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
(original)
+++
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
Thu Mar 24 18:51:30 2011
@@ -40,6 +40,7 @@ import org.jboss.netty.handler.codec.fra
import org.jboss.netty.handler.connection.ConnectionLimitUpstreamHandler;
import org.jboss.netty.handler.connection.ConnectionPerIpLimitUpstreamHandler;
import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.HashedWheelTimer;
/**
@@ -145,6 +146,8 @@ public class IMAPServer extends Abstract
pipeline.addLast(CONNECTION_COUNT_HANDLER,
getConnectionCountHandler());
+ pipeline.addLast(CHUNK_WRITE_HANDLER, new
ChunkedWriteHandler());
+
if (isStartTLSSupported()) {
pipeline.addLast(CORE_HANDLER, new
ImapChannelUpstreamHandler(hello, processor, encoder, getLogger(), compress,
getSSLContext(), getEnabledCipherSuites()));
} else {
Modified:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java?rev=1085069&r1=1085068&r2=1085069&view=diff
==============================================================================
---
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
(original)
+++
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
Thu Mar 24 18:51:30 2011
@@ -35,4 +35,6 @@ public interface NettyConstants {
final static String CONNECTION_LIMIT_HANDLER = "connectionLimitHandler";
final static String CONNECTION_LIMIT_PER_IP_HANDLER =
"connectionPerIpLimitHandler";
final static String CONNECTION_COUNT_HANDLER= "connectionCountHandler";
+ final static String CHUNK_WRITE_HANDLER= "chunkWriteHandler";
+
}
Added:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapProcessorFactory.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapProcessorFactory.java?rev=1085069&view=auto
==============================================================================
---
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapProcessorFactory.java
(added)
+++
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapProcessorFactory.java
Thu Mar 24 18:51:30 2011
@@ -0,0 +1,59 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import org.apache.james.imap.api.ImapConstants;
+import org.apache.james.imap.api.message.response.StatusResponseFactory;
+import org.apache.james.imap.api.process.ImapProcessor;
+import org.apache.james.imap.api.process.MailboxTyper;
+import org.apache.james.imap.message.response.UnpooledStatusResponseFactory;
+import org.apache.james.imap.processor.DefaultProcessorChain;
+import org.apache.james.imap.processor.base.ImapResponseMessageProcessor;
+import org.apache.james.imap.processor.base.UnknownRequestProcessor;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.SubscriptionManager;
+
+public class NettyImapProcessorFactory {
+
+ public static final ImapProcessor createDefaultProcessor(final
MailboxManager mailboxManager, final SubscriptionManager subscriptionManager) {
+ return createXListSupportingProcessor(mailboxManager,
subscriptionManager, null, ImapConstants.DEFAULT_BATCH_SIZE);
+ }
+
+ public static final ImapProcessor createDefaultProcessor(final
MailboxManager mailboxManager, final SubscriptionManager subscriptionManager,
int batchSize) {
+ return createXListSupportingProcessor(mailboxManager,
subscriptionManager, null, batchSize);
+ }
+
+ public static final ImapProcessor createXListSupportingProcessor(final
MailboxManager mailboxManager, final SubscriptionManager subscriptionManager,
MailboxTyper mailboxTyper) {
+ return createXListSupportingProcessor(mailboxManager,
subscriptionManager, mailboxTyper, ImapConstants.DEFAULT_BATCH_SIZE);
+ }
+
+ public static final ImapProcessor createXListSupportingProcessor(final
MailboxManager mailboxManager, final SubscriptionManager subscriptionManager,
MailboxTyper mailboxTyper, int batchSize) {
+ final StatusResponseFactory statusResponseFactory = new
UnpooledStatusResponseFactory();
+ final UnknownRequestProcessor unknownRequestImapProcessor = new
UnknownRequestProcessor(
+ statusResponseFactory);
+ final ImapProcessor imap4rev1Chain = DefaultProcessorChain
+ .createDefaultChain(unknownRequestImapProcessor,
+ mailboxManager, subscriptionManager,
statusResponseFactory, mailboxTyper, batchSize);
+ ChunkFetchProcessor fetchProcessor = new
ChunkFetchProcessor(imap4rev1Chain, mailboxManager, statusResponseFactory,
batchSize);
+ final ImapProcessor result = new ImapResponseMessageProcessor(
+ fetchProcessor);
+ return result;
+ }
+
+}
Modified:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java?rev=1085069&r1=1085068&r2=1085069&view=diff
==============================================================================
---
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
(original)
+++
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
Thu Mar 24 18:51:30 2011
@@ -28,6 +28,7 @@ import org.apache.james.imap.api.process
import org.apache.james.imap.api.process.ImapSession;
import org.apache.james.imap.api.process.SelectedMailbox;
import org.apache.james.protocols.impl.SessionLog;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
@@ -56,6 +57,15 @@ public class NettyImapSession implements
this.compress = compress;
}
+ /**
+ * Return the wrapped {@link Channel} which this {@link ImapSession} is
bound to
+ *
+ * @return channel
+ */
+ public Channel getChannel() {
+ return context.getChannel();
+ }
+
/*
* (non-Javadoc)
* @see org.apache.james.imap.api.process.ImapSession#logout()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]