Convert two source files from CRLF to LF line terminator

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/51db874b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/51db874b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/51db874b

Branch: refs/heads/master
Commit: 51db874bc4698c29b20008a87538287f65f2c943
Parents: 4f0b8cc
Author: Thibaut Sautereau <tsauter...@linagora.com>
Authored: Thu Nov 30 17:00:41 2017 +0700
Committer: Thibaut Sautereau <tsauter...@linagora.com>
Committed: Thu Nov 30 17:02:25 2017 +0700

----------------------------------------------------------------------
 .../store/RandomMailboxSessionIdGenerator.java  |  79 +-
 .../apache/james/queue/file/FileMailQueue.java  | 994 +++++++++----------
 2 files changed, 536 insertions(+), 537 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/51db874b/mailbox/store/src/main/java/org/apache/james/mailbox/store/RandomMailboxSessionIdGenerator.java
----------------------------------------------------------------------
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/RandomMailboxSessionIdGenerator.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/RandomMailboxSessionIdGenerator.java
index ab0bcf4..d71f535 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/RandomMailboxSessionIdGenerator.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/RandomMailboxSessionIdGenerator.java
@@ -1,40 +1,39 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.store;
-
-import java.security.SecureRandom;
-
-import org.apache.james.mailbox.MailboxSessionIdGenerator;
-
-
-/**
- * {@link MailboxSessionIdGenerator} which use a {@link SecureRandom} to 
generate the next Id to use
- * 
- *
- */
-public class RandomMailboxSessionIdGenerator extends 
AbstractMailboxSessionIdGenerator {
-    private final static SecureRandom RANDOM = new SecureRandom();
-
-    @Override
-    protected long generateNextId() {
-        return RANDOM.nextLong();
-    }
-
-}
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.store;
+
+import java.security.SecureRandom;
+
+import org.apache.james.mailbox.MailboxSessionIdGenerator;
+
+
+/**
+ * {@link MailboxSessionIdGenerator} which use a {@link SecureRandom} to
+ * generate the next Id to use
+ */
+public class RandomMailboxSessionIdGenerator extends 
AbstractMailboxSessionIdGenerator {
+    private final static SecureRandom RANDOM = new SecureRandom();
+
+    @Override
+    protected long generateNextId() {
+        return RANDOM.nextLong();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/51db874b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
 
b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
index 0857c41..51813ee 100644
--- 
a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
+++ 
b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
@@ -1,497 +1,497 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.queue.file;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.security.SecureRandom;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.mail.MessagingException;
-import javax.mail.util.SharedFileInputStream;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.james.server.core.MimeMessageCopyOnWriteProxy;
-import org.apache.james.server.core.MimeMessageSource;
-import org.apache.james.lifecycle.api.Disposable;
-import org.apache.james.lifecycle.api.LifecycleUtil;
-import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
-import org.apache.james.queue.api.ManageableMailQueue;
-import org.apache.mailet.Mail;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link ManageableMailQueue} implementation which use the fs to store {@link 
Mail}'s
- * <p/>
- * On create of the {@link FileMailQueue} the {@link #init()} will get called. 
This takes care of
- * loading the needed meta-data into memory for fast access.
- */
-public class FileMailQueue implements ManageableMailQueue {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileMailQueue.class);
-
-    private final ConcurrentHashMap<String, FileItem> keyMappings = new 
ConcurrentHashMap<>();
-    private final BlockingQueue<String> inmemoryQueue = new 
LinkedBlockingQueue<>();
-    private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
-    private final static AtomicLong COUNTER = new AtomicLong();
-    private final String queueDirName;
-    private final File queueDir;
-
-    private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
-    private final boolean sync;
-    private final static String MSG_EXTENSION = ".msg";
-    private final static String OBJECT_EXTENSION = ".obj";
-    private final static String NEXT_DELIVERY = "FileQueueNextDelivery";
-    private final static int SPLITCOUNT = 10;
-    private final static SecureRandom RANDOM = new SecureRandom();
-
-    public FileMailQueue(MailQueueItemDecoratorFactory 
mailQueueItemDecoratorFactory, File parentDir, String queuename, boolean sync) 
throws IOException {
-        this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
-        this.sync = sync;
-        this.queueDir = new File(parentDir, queuename);
-        this.queueDirName = queueDir.getAbsolutePath();
-        init();
-    }
-
-    private void init() throws IOException {
-
-        for (int i = 1; i <= SPLITCOUNT; i++) {
-
-            File qDir = new File(queueDir, Integer.toString(i));
-            FileUtils.forceMkdir(qDir);
-
-            String[] files = qDir.list((dir, name) -> 
name.endsWith(OBJECT_EXTENSION));
-
-            for (String name : files) {
-
-                ObjectInputStream oin = null;
-
-                try {
-
-                    final String msgFileName = name.substring(0, name.length() 
- OBJECT_EXTENSION.length()) + MSG_EXTENSION;
-
-                    FileItem item = new FileItem(qDir.getAbsolutePath() + 
File.separator + name, qDir.getAbsolutePath() + File.separator + msgFileName);
-
-                    oin = new ObjectInputStream(new 
FileInputStream(item.getObjectFile()));
-                    Mail mail = (Mail) oin.readObject();
-                    Long next = (Long) mail.getAttribute(NEXT_DELIVERY);
-                    if (next == null) {
-                        next = 0L;
-                    }
-
-                    final String key = mail.getName();
-                    keyMappings.put(key, item);
-                    if (next <= System.currentTimeMillis()) {
-
-                        try {
-                            inmemoryQueue.put(key);
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            throw new RuntimeException("Unable to init", e);
-                        }
-                    } else {
-
-                        // Schedule a task which will put the mail in the queue
-                        // for processing after a given delay
-                        scheduler.schedule(() -> {
-                            try {
-                                inmemoryQueue.put(key);
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                throw new RuntimeException("Unable to init", 
e);
-                            }
-                        }, next - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
-                    }
-                } catch (ClassNotFoundException | IOException e) {
-                    LOGGER.error("Unable to load Mail", e);
-                } finally {
-                    if (oin != null) {
-                        try {
-                            oin.close();
-                        } catch (Exception e) {
-                            // ignore on close
-                        }
-                    }
-                }
-
-            }
-        }
-    }
-
-    @Override
-    public void enQueue(Mail mail, long delay, TimeUnit unit) throws 
MailQueueException {
-        final String key = mail.getName() + "-" + COUNTER.incrementAndGet();
-        FileOutputStream out = null;
-        FileOutputStream foout = null;
-        ObjectOutputStream oout = null;
-        try {
-            int i = RANDOM.nextInt(SPLITCOUNT) + 1;
-
-            String name = queueDirName + "/" + i + "/" + key;
-
-            final FileItem item = new FileItem(name + OBJECT_EXTENSION, name + 
MSG_EXTENSION);
-            if (delay > 0) {
-                mail.setAttribute(NEXT_DELIVERY, System.currentTimeMillis() + 
unit.toMillis(delay));
-            }
-            foout = new FileOutputStream(item.getObjectFile());
-            oout = new ObjectOutputStream(foout);
-            oout.writeObject(mail);
-            oout.flush();
-            if (sync) foout.getFD().sync();
-            out = new FileOutputStream(item.getMessageFile());
-
-            mail.getMessage().writeTo(out);
-            out.flush();
-            if (sync) out.getFD().sync();
-
-            keyMappings.put(key, item);
-
-            if (delay > 0) {
-                // The message should get delayed so schedule it for later
-                scheduler.schedule(() -> {
-                    try {
-                        inmemoryQueue.put(key);
-
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException("Unable to init", e);
-                    }
-                }, delay, unit);
-
-            } else {
-                inmemoryQueue.put(key);
-            }
-
-            //TODO: Think about exception handling in detail
-        } catch (IOException | MessagingException | InterruptedException e) {
-            throw new MailQueueException("Unable to enqueue mail", e);
-        } finally {
-            if (out != null) {
-                try {
-                    out.close();
-                } catch (IOException e) {
-                    // ignore on close
-                }
-            }
-            if (oout != null) {
-                try {
-                    oout.close();
-                } catch (IOException e) {
-                    // ignore on close
-                }
-            }
-            if (foout != null) {
-                try {
-                    foout.close();
-                } catch (IOException e) {
-                    // ignore on close
-                }
-            }
-        }
-
-    }
-
-    @Override
-    public void enQueue(Mail mail) throws MailQueueException {
-        enQueue(mail, 0, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public MailQueueItem deQueue() throws MailQueueException {
-        try {
-            FileItem item = null;
-            String k = null;
-            while (item == null) {
-                k = inmemoryQueue.take();
-
-                item = keyMappings.get(k);
-
-            }
-            final String key = k;
-            final FileItem fitem = item;
-            ObjectInputStream oin = null;
-            try {
-                final File objectFile = new File(fitem.getObjectFile());
-                final File msgFile = new File(fitem.getMessageFile());
-                oin = new ObjectInputStream(new FileInputStream(objectFile));
-                final Mail mail = (Mail) oin.readObject();
-                mail.setMessage(new MimeMessageCopyOnWriteProxy(new 
FileMimeMessageSource(msgFile)));
-                MailQueueItem fileMailQueueItem = new MailQueueItem() {
-
-                    @Override
-                    public Mail getMail() {
-                        return mail;
-                    }
-
-                    @Override
-                    public void done(boolean success) throws 
MailQueueException {
-                        if (!success) {
-                            try {
-                                inmemoryQueue.put(key);
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                throw new MailQueueException("Unable to 
rollback", e);
-                            }
-                        } else {
-                            fitem.delete();
-                            keyMappings.remove(key);
-                        }
-
-                        LifecycleUtil.dispose(mail);
-                    }
-                };
-                return 
mailQueueItemDecoratorFactory.decorate(fileMailQueueItem);
-
-                // TODO: Think about exception handling in detail
-            } catch (IOException | ClassNotFoundException | MessagingException 
e) {
-                throw new MailQueueException("Unable to dequeue", e);
-            } finally {
-                if (oin != null) {
-                    try {
-                        oin.close();
-                    } catch (IOException e) {
-                        // ignore on close
-                    }
-                }
-            }
-
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new MailQueueException("Unable to dequeue", e);
-        }
-    }
-
-    private final class FileMimeMessageSource extends MimeMessageSource 
implements Disposable {
-
-        private File file;
-        private final SharedFileInputStream in;
-
-        public FileMimeMessageSource(File file) throws IOException {
-            this.file = file;
-            this.in = new SharedFileInputStream(file);
-        }
-
-        @Override
-        public String getSourceId() {
-            return file.getAbsolutePath();
-        }
-
-        /**
-         * Get an input stream to retrieve the data stored in the temporary 
file
-         *
-         * @return a <code>BufferedInputStream</code> containing the data
-         */
-        @Override
-        public InputStream getInputStream() throws IOException {
-            return in.newStream(0, -1);
-        }
-
-        @Override
-        public long getMessageSize() throws IOException {
-            return file.length();
-        }
-
-        @Override
-        public void dispose() {
-            IOUtils.closeQuietly(in);
-            file = null;
-        }
-
-    }
-
-    /**
-     * Helper class which is used to reference the path to the object and msg 
file
-     */
-    private final class FileItem {
-        private final String objectfile;
-        private final String messagefile;
-
-        public FileItem(String objectfile, String messagefile) {
-            this.objectfile = objectfile;
-            this.messagefile = messagefile;
-        }
-
-        public String getObjectFile() {
-            return objectfile;
-        }
-
-        public String getMessageFile() {
-            return messagefile;
-        }
-
-        public void delete() throws MailQueueException {
-            try {
-                FileUtils.forceDelete(new File(getObjectFile()));
-            } catch (IOException e) {
-                throw new MailQueueException("Unable to delete mail");
-            }
-
-            try {
-                FileUtils.forceDelete(new File(getMessageFile()));
-            } catch (IOException e) {
-                LOGGER.debug("Remove of msg file for mail failed");
-            }
-        }
-    }
-
-    @Override
-    public long getSize() throws MailQueueException {
-        return keyMappings.size();
-    }
-
-    @Override
-    public long flush() throws MailQueueException {
-        Iterator<String> keys = keyMappings.keySet().iterator();
-        long i = 0;
-        while (keys.hasNext()) {
-            String key = keys.next();
-            if (!inmemoryQueue.contains(key)) {
-                inmemoryQueue.add(key);
-                i++;
-            }
-        }
-        return i;
-    }
-
-    @Override
-    public long clear() throws MailQueueException {
-        final Iterator<Entry<String, FileItem>> items = 
keyMappings.entrySet().iterator();
-        long count = 0;
-        while (items.hasNext()) {
-            Entry<String, FileItem> entry = items.next();
-            FileItem item = entry.getValue();
-            String key = entry.getKey();
-
-            item.delete();
-            keyMappings.remove(key);
-            count++;
-
-        }
-        return count;
-    }
-
-    /**
-     * TODO: implement me
-     *
-     * @see 
ManageableMailQueue#remove(org.apache.james.queue.api.ManageableMailQueue.Type, 
String)
-     */
-    @Override
-    public long remove(Type type, String value) throws MailQueueException {
-        switch (type) {
-            case Name:
-                FileItem item = keyMappings.remove(value);
-                if (item != null) {
-                    item.delete();
-                    return 1;
-                } else {
-                    return 0;
-                }
-
-            default:
-                break;
-        }
-        throw new MailQueueException("Not supported yet");
-
-    }
-
-    @Override
-    public MailQueueIterator browse() throws MailQueueException {
-        final Iterator<FileItem> items = keyMappings.values().iterator();
-        return new MailQueueIterator() {
-            private MailQueueItemView item = null;
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("Read-only");
-            }
-
-            @Override
-            public MailQueueItemView next() {
-                if (hasNext()) {
-                    MailQueueItemView vitem = item;
-                    item = null;
-                    return vitem;
-                } else {
-
-                    throw new NoSuchElementException();
-                }
-            }
-
-            @Override
-            public boolean hasNext() {
-                if (item == null) {
-                    while (items.hasNext()) {
-                        ObjectInputStream in = null;
-                        try {
-                            in = new ObjectInputStream(new 
FileInputStream(items.next().getObjectFile()));
-                            final Mail mail = (Mail) in.readObject();
-                            item = new MailQueueItemView() {
-
-                                @Override
-                                public long getNextDelivery() {
-                                    return (Long) 
mail.getAttribute(NEXT_DELIVERY);
-                                }
-
-                                @Override
-                                public Mail getMail() {
-                                    return mail;
-                                }
-                            };
-                            return true;
-                        } catch (IOException | ClassNotFoundException e) {
-                            LOGGER.info("Unable to load mail", e);
-                        } finally {
-                            if (in != null) {
-                                try {
-                                    in.close();
-                                } catch (IOException e) {
-                                    // ignore on close
-                                }
-                            }
-                        }
-                    }
-                    return false;
-                } else {
-                    return true;
-                }
-            }
-
-            @Override
-            public void close() {
-                // do nothing
-            }
-        };
-    }
-
-}
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.file;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.security.SecureRandom;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.mail.MessagingException;
+import javax.mail.util.SharedFileInputStream;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.james.server.core.MimeMessageCopyOnWriteProxy;
+import org.apache.james.server.core.MimeMessageSource;
+import org.apache.james.lifecycle.api.Disposable;
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.mailet.Mail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ManageableMailQueue} implementation which use the fs to store {@link 
Mail}'s
+ * <p/>
+ * On create of the {@link FileMailQueue} the {@link #init()} will get called. 
This takes care of
+ * loading the needed meta-data into memory for fast access.
+ */
+public class FileMailQueue implements ManageableMailQueue {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileMailQueue.class);
+
+    private final ConcurrentHashMap<String, FileItem> keyMappings = new 
ConcurrentHashMap<>();
+    private final BlockingQueue<String> inmemoryQueue = new 
LinkedBlockingQueue<>();
+    private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+    private final static AtomicLong COUNTER = new AtomicLong();
+    private final String queueDirName;
+    private final File queueDir;
+
+    private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
+    private final boolean sync;
+    private final static String MSG_EXTENSION = ".msg";
+    private final static String OBJECT_EXTENSION = ".obj";
+    private final static String NEXT_DELIVERY = "FileQueueNextDelivery";
+    private final static int SPLITCOUNT = 10;
+    private final static SecureRandom RANDOM = new SecureRandom();
+
+    public FileMailQueue(MailQueueItemDecoratorFactory 
mailQueueItemDecoratorFactory, File parentDir, String queuename, boolean sync) 
throws IOException {
+        this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
+        this.sync = sync;
+        this.queueDir = new File(parentDir, queuename);
+        this.queueDirName = queueDir.getAbsolutePath();
+        init();
+    }
+
+    private void init() throws IOException {
+
+        for (int i = 1; i <= SPLITCOUNT; i++) {
+
+            File qDir = new File(queueDir, Integer.toString(i));
+            FileUtils.forceMkdir(qDir);
+
+            String[] files = qDir.list((dir, name) -> 
name.endsWith(OBJECT_EXTENSION));
+
+            for (String name : files) {
+
+                ObjectInputStream oin = null;
+
+                try {
+
+                    final String msgFileName = name.substring(0, name.length() 
- OBJECT_EXTENSION.length()) + MSG_EXTENSION;
+
+                    FileItem item = new FileItem(qDir.getAbsolutePath() + 
File.separator + name, qDir.getAbsolutePath() + File.separator + msgFileName);
+
+                    oin = new ObjectInputStream(new 
FileInputStream(item.getObjectFile()));
+                    Mail mail = (Mail) oin.readObject();
+                    Long next = (Long) mail.getAttribute(NEXT_DELIVERY);
+                    if (next == null) {
+                        next = 0L;
+                    }
+
+                    final String key = mail.getName();
+                    keyMappings.put(key, item);
+                    if (next <= System.currentTimeMillis()) {
+
+                        try {
+                            inmemoryQueue.put(key);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException("Unable to init", e);
+                        }
+                    } else {
+
+                        // Schedule a task which will put the mail in the queue
+                        // for processing after a given delay
+                        scheduler.schedule(() -> {
+                            try {
+                                inmemoryQueue.put(key);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                throw new RuntimeException("Unable to init", 
e);
+                            }
+                        }, next - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+                    }
+                } catch (ClassNotFoundException | IOException e) {
+                    LOGGER.error("Unable to load Mail", e);
+                } finally {
+                    if (oin != null) {
+                        try {
+                            oin.close();
+                        } catch (Exception e) {
+                            // ignore on close
+                        }
+                    }
+                }
+
+            }
+        }
+    }
+
+    @Override
+    public void enQueue(Mail mail, long delay, TimeUnit unit) throws 
MailQueueException {
+        final String key = mail.getName() + "-" + COUNTER.incrementAndGet();
+        FileOutputStream out = null;
+        FileOutputStream foout = null;
+        ObjectOutputStream oout = null;
+        try {
+            int i = RANDOM.nextInt(SPLITCOUNT) + 1;
+
+            String name = queueDirName + "/" + i + "/" + key;
+
+            final FileItem item = new FileItem(name + OBJECT_EXTENSION, name + 
MSG_EXTENSION);
+            if (delay > 0) {
+                mail.setAttribute(NEXT_DELIVERY, System.currentTimeMillis() + 
unit.toMillis(delay));
+            }
+            foout = new FileOutputStream(item.getObjectFile());
+            oout = new ObjectOutputStream(foout);
+            oout.writeObject(mail);
+            oout.flush();
+            if (sync) foout.getFD().sync();
+            out = new FileOutputStream(item.getMessageFile());
+
+            mail.getMessage().writeTo(out);
+            out.flush();
+            if (sync) out.getFD().sync();
+
+            keyMappings.put(key, item);
+
+            if (delay > 0) {
+                // The message should get delayed so schedule it for later
+                scheduler.schedule(() -> {
+                    try {
+                        inmemoryQueue.put(key);
+
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException("Unable to init", e);
+                    }
+                }, delay, unit);
+
+            } else {
+                inmemoryQueue.put(key);
+            }
+
+            //TODO: Think about exception handling in detail
+        } catch (IOException | MessagingException | InterruptedException e) {
+            throw new MailQueueException("Unable to enqueue mail", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    // ignore on close
+                }
+            }
+            if (oout != null) {
+                try {
+                    oout.close();
+                } catch (IOException e) {
+                    // ignore on close
+                }
+            }
+            if (foout != null) {
+                try {
+                    foout.close();
+                } catch (IOException e) {
+                    // ignore on close
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void enQueue(Mail mail) throws MailQueueException {
+        enQueue(mail, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public MailQueueItem deQueue() throws MailQueueException {
+        try {
+            FileItem item = null;
+            String k = null;
+            while (item == null) {
+                k = inmemoryQueue.take();
+
+                item = keyMappings.get(k);
+
+            }
+            final String key = k;
+            final FileItem fitem = item;
+            ObjectInputStream oin = null;
+            try {
+                final File objectFile = new File(fitem.getObjectFile());
+                final File msgFile = new File(fitem.getMessageFile());
+                oin = new ObjectInputStream(new FileInputStream(objectFile));
+                final Mail mail = (Mail) oin.readObject();
+                mail.setMessage(new MimeMessageCopyOnWriteProxy(new 
FileMimeMessageSource(msgFile)));
+                MailQueueItem fileMailQueueItem = new MailQueueItem() {
+
+                    @Override
+                    public Mail getMail() {
+                        return mail;
+                    }
+
+                    @Override
+                    public void done(boolean success) throws 
MailQueueException {
+                        if (!success) {
+                            try {
+                                inmemoryQueue.put(key);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                throw new MailQueueException("Unable to 
rollback", e);
+                            }
+                        } else {
+                            fitem.delete();
+                            keyMappings.remove(key);
+                        }
+
+                        LifecycleUtil.dispose(mail);
+                    }
+                };
+                return 
mailQueueItemDecoratorFactory.decorate(fileMailQueueItem);
+
+                // TODO: Think about exception handling in detail
+            } catch (IOException | ClassNotFoundException | MessagingException 
e) {
+                throw new MailQueueException("Unable to dequeue", e);
+            } finally {
+                if (oin != null) {
+                    try {
+                        oin.close();
+                    } catch (IOException e) {
+                        // ignore on close
+                    }
+                }
+            }
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new MailQueueException("Unable to dequeue", e);
+        }
+    }
+
+    private final class FileMimeMessageSource extends MimeMessageSource 
implements Disposable {
+
+        private File file;
+        private final SharedFileInputStream in;
+
+        public FileMimeMessageSource(File file) throws IOException {
+            this.file = file;
+            this.in = new SharedFileInputStream(file);
+        }
+
+        @Override
+        public String getSourceId() {
+            return file.getAbsolutePath();
+        }
+
+        /**
+         * Get an input stream to retrieve the data stored in the temporary 
file
+         *
+         * @return a <code>BufferedInputStream</code> containing the data
+         */
+        @Override
+        public InputStream getInputStream() throws IOException {
+            return in.newStream(0, -1);
+        }
+
+        @Override
+        public long getMessageSize() throws IOException {
+            return file.length();
+        }
+
+        @Override
+        public void dispose() {
+            IOUtils.closeQuietly(in);
+            file = null;
+        }
+
+    }
+
+    /**
+     * Helper class which is used to reference the path to the object and msg 
file
+     */
+    private final class FileItem {
+        private final String objectfile;
+        private final String messagefile;
+
+        public FileItem(String objectfile, String messagefile) {
+            this.objectfile = objectfile;
+            this.messagefile = messagefile;
+        }
+
+        public String getObjectFile() {
+            return objectfile;
+        }
+
+        public String getMessageFile() {
+            return messagefile;
+        }
+
+        public void delete() throws MailQueueException {
+            try {
+                FileUtils.forceDelete(new File(getObjectFile()));
+            } catch (IOException e) {
+                throw new MailQueueException("Unable to delete mail");
+            }
+
+            try {
+                FileUtils.forceDelete(new File(getMessageFile()));
+            } catch (IOException e) {
+                LOGGER.debug("Remove of msg file for mail failed");
+            }
+        }
+    }
+
+    @Override
+    public long getSize() throws MailQueueException {
+        return keyMappings.size();
+    }
+
+    @Override
+    public long flush() throws MailQueueException {
+        Iterator<String> keys = keyMappings.keySet().iterator();
+        long i = 0;
+        while (keys.hasNext()) {
+            String key = keys.next();
+            if (!inmemoryQueue.contains(key)) {
+                inmemoryQueue.add(key);
+                i++;
+            }
+        }
+        return i;
+    }
+
+    @Override
+    public long clear() throws MailQueueException {
+        final Iterator<Entry<String, FileItem>> items = 
keyMappings.entrySet().iterator();
+        long count = 0;
+        while (items.hasNext()) {
+            Entry<String, FileItem> entry = items.next();
+            FileItem item = entry.getValue();
+            String key = entry.getKey();
+
+            item.delete();
+            keyMappings.remove(key);
+            count++;
+
+        }
+        return count;
+    }
+
+    /**
+     * TODO: implement me
+     *
+     * @see 
ManageableMailQueue#remove(org.apache.james.queue.api.ManageableMailQueue.Type, 
String)
+     */
+    @Override
+    public long remove(Type type, String value) throws MailQueueException {
+        switch (type) {
+            case Name:
+                FileItem item = keyMappings.remove(value);
+                if (item != null) {
+                    item.delete();
+                    return 1;
+                } else {
+                    return 0;
+                }
+
+            default:
+                break;
+        }
+        throw new MailQueueException("Not supported yet");
+
+    }
+
+    @Override
+    public MailQueueIterator browse() throws MailQueueException {
+        final Iterator<FileItem> items = keyMappings.values().iterator();
+        return new MailQueueIterator() {
+            private MailQueueItemView item = null;
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("Read-only");
+            }
+
+            @Override
+            public MailQueueItemView next() {
+                if (hasNext()) {
+                    MailQueueItemView vitem = item;
+                    item = null;
+                    return vitem;
+                } else {
+
+                    throw new NoSuchElementException();
+                }
+            }
+
+            @Override
+            public boolean hasNext() {
+                if (item == null) {
+                    while (items.hasNext()) {
+                        ObjectInputStream in = null;
+                        try {
+                            in = new ObjectInputStream(new 
FileInputStream(items.next().getObjectFile()));
+                            final Mail mail = (Mail) in.readObject();
+                            item = new MailQueueItemView() {
+
+                                @Override
+                                public long getNextDelivery() {
+                                    return (Long) 
mail.getAttribute(NEXT_DELIVERY);
+                                }
+
+                                @Override
+                                public Mail getMail() {
+                                    return mail;
+                                }
+                            };
+                            return true;
+                        } catch (IOException | ClassNotFoundException e) {
+                            LOGGER.info("Unable to load mail", e);
+                        } finally {
+                            if (in != null) {
+                                try {
+                                    in.close();
+                                } catch (IOException e) {
+                                    // ignore on close
+                                }
+                            }
+                        }
+                    }
+                    return false;
+                } else {
+                    return true;
+                }
+            }
+
+            @Override
+            public void close() {
+                // do nothing
+            }
+        };
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to