Author: rgreig
Date: Mon Apr 9 09:12:49 2007
New Revision: 526807
URL: http://svn.apache.org/viewvc?view=rev&rev=526807
Log:
Got rid of some uses of System.out instead of log4j logging.
Modified:
incubator/qpid/branches/M2/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
Modified:
incubator/qpid/branches/M2/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java?view=diff&rev=526807&r1=526806&r2=526807
==============================================================================
---
incubator/qpid/branches/M2/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
(original)
+++
incubator/qpid/branches/M2/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
Mon Apr 9 09:12:49 2007
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,33 +7,35 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
package org.apache.qpid.example.publisher;
-import org.apache.log4j.Logger;
-
import java.io.File;
+import javax.jms.JMSException;
+
+import org.apache.log4j.Logger;
+
import org.apache.qpid.example.shared.FileUtils;
import org.apache.qpid.example.shared.Statics;
-import javax.jms.JMSException;
-
/**
* Class that sends message files to the Publisher to distribute
* using files as input
* Must set properties for host in properties file or uses in vm broker
*/
-public class FileMessageDispatcher {
+public class FileMessageDispatcher
+{
protected static final Logger _logger =
Logger.getLogger(FileMessageDispatcher.class);
@@ -48,30 +51,30 @@
public static void main(String[] args)
{
- //Check command line args ok - must provide a path or file for us to
dispatch
+ // Check command line args ok - must provide a path or file for us to
dispatch
if (args.length == 0)
{
- System.err.println("Usage: FileMessageDispatcher
<filesToDispatch>" + "");
+ System.out.println("Usage: FileMessageDispatcher
<filesToDispatch>" + "");
}
else
{
try
{
- //publish message(s) from file(s) to configured queue
+ // publish message(s) from file(s) to configured queue
publish(args[0]);
- //Move payload file(s) to archive location as no error
+ // Move payload file(s) to archive location as no error
FileUtils.moveFileToNewDir(args[0],
System.getProperties().getProperty(Statics.ARCHIVE_PATH));
}
- catch(Exception e)
+ catch (Exception e)
{
- //log error and exit
+ // log error and exit
_logger.error("Error trying to dispatch message: " + e);
System.exit(1);
}
finally
{
- //clean up before exiting
+ // clean up before exiting
if (getPublisher() != null)
{
getPublisher().cleanup();
@@ -98,10 +101,10 @@
File tempFile = new File(path);
if (tempFile.isDirectory())
{
- //while more files in dir publish them
+ // while more files in dir publish them
File[] files = tempFile.listFiles();
- if (files == null || files.length == 0)
+ if ((files == null) || (files.length == 0))
{
_logger.info("FileMessageDispatcher - No files to publish in
input directory: " + tempFile);
}
@@ -109,10 +112,10 @@
{
for (File file : files)
{
- //Create message factory passing in payload path
+ // Create message factory passing in payload path
FileMessageFactory factory = new
FileMessageFactory(getPublisher().getSession(), file.toString());
- //Send the message generated from the payload using the
_publisher
+ // Send the message generated from the payload using the
_publisher
getPublisher().sendMessage(factory.createEventMessage());
}
@@ -120,11 +123,11 @@
}
else
{
- //handle a single file
- //Create message factory passing in payload path
- FileMessageFactory factory = new
FileMessageFactory(getPublisher().getSession(),tempFile.toString());
+ // handle a single file
+ // Create message factory passing in payload path
+ FileMessageFactory factory = new
FileMessageFactory(getPublisher().getSession(), tempFile.toString());
- //Send the message generated from the payload using the _publisher
+ // Send the message generated from the payload using the _publisher
getPublisher().sendMessage(factory.createEventMessage());
}
}
@@ -145,15 +148,15 @@
*/
private static Publisher getPublisher()
{
- if (_publisher != null)
- {
- return _publisher;
- }
+ if (_publisher != null)
+ {
+ return _publisher;
+ }
- //Create a _publisher
- _publisher = new Publisher();
+ // Create a _publisher
+ _publisher = new Publisher();
- return _publisher;
+ return _publisher;
}
}
Modified:
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=526807&r1=526806&r2=526807
==============================================================================
---
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
(original)
+++
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
Mon Apr 9 09:12:49 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.pool;
-import org.apache.qpid.pool.Event.CloseEvent;
-
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.qpid.pool.Event.CloseEvent;
+
public class PoolingFilter extends IoFilterAdapter implements
Job.JobCompletionHandler
{
private static final Logger _logger =
Logger.getLogger(PoolingFilter.class);
@@ -49,12 +50,12 @@
void fireAsynchEvent(IoSession session, Event event)
{
Job job = getJobForSession(session);
- // job.acquire(); //prevents this job being removed from _jobs
+ // job.acquire(); //prevents this job being removed from _jobs
job.add(event);
- //Additional checks on pool to check that it hasn't shutdown.
+ // Additional checks on pool to check that it hasn't shutdown.
// The alternative is to catch the RejectedExecutionException that
will result from executing on a shutdown pool
- if (job.activate() && _poolReference.getPool() != null &&
!_poolReference.getPool().isShutdown())
+ if (job.activate() && (_poolReference.getPool() != null) &&
!_poolReference.getPool().isShutdown())
{
_poolReference.getPool().execute(job);
}
@@ -70,16 +71,6 @@
private Job getJobForSession(IoSession session)
{
return (Job) session.getAttribute(_name);
-
-/* if(job == null)
- {
- System.err.println("Error in " + _name);
- Thread.dumpStack();
- }
-
-
- job = _jobs.get(session);
- return job == null ? createJobForSession(session) : job;*/
}
private Job createJobForSession(IoSession session)
@@ -89,35 +80,36 @@
private Job addJobForSession(IoSession session, Job job)
{
- //atomic so ensures all threads agree on the same job
+ // atomic so ensures all threads agree on the same job
Job existing = _jobs.putIfAbsent(session, job);
- return existing == null ? job : existing;
+
+ return (existing == null) ? job : existing;
}
- //Job.JobCompletionHandler
+ // Job.JobCompletionHandler
public void completed(IoSession session, Job job)
{
-// if (job.isComplete())
-// {
-// job.release();
-// if (!job.isReferenced())
-// {
-// _jobs.remove(session);
-// }
-// }
-// else
- if(!job.isComplete())
+ // if (job.isComplete())
+ // {
+ // job.release();
+ // if (!job.isReferenced())
+ // {
+ // _jobs.remove(session);
+ // }
+ // }
+ // else
+ if (!job.isComplete())
{
// ritchiem : 2006-12-13 Do we need to perform the additional
checks here?
- // Can the pool be shutdown at this point?
- if (job.activate() && _poolReference.getPool() != null &&
!_poolReference.getPool().isShutdown())
+ // Can the pool be shutdown at this point?
+ if (job.activate() && (_poolReference.getPool() != null) &&
!_poolReference.getPool().isShutdown())
{
_poolReference.getPool().execute(job);
}
}
}
- //IoFilter methods that are processed by threads on the pool
+ // IoFilter methods that are processed by threads on the pool
public void sessionOpened(final NextFilter nextFilter, final IoSession
session) throws Exception
{
@@ -129,37 +121,33 @@
nextFilter.sessionClosed(session);
}
- public void sessionIdle(final NextFilter nextFilter, final IoSession
session,
- final IdleStatus status) throws Exception
+ public void sessionIdle(final NextFilter nextFilter, final IoSession
session, final IdleStatus status) throws Exception
{
nextFilter.sessionIdle(session, status);
}
- public void exceptionCaught(final NextFilter nextFilter, final IoSession
session,
- final Throwable cause) throws Exception
+ public void exceptionCaught(final NextFilter nextFilter, final IoSession
session, final Throwable cause) throws Exception
{
- nextFilter.exceptionCaught(session,cause);
+ nextFilter.exceptionCaught(session, cause);
}
- public void messageReceived(final NextFilter nextFilter, final IoSession
session,
- final Object message) throws Exception
+ public void messageReceived(final NextFilter nextFilter, final IoSession
session, final Object message) throws Exception
{
- nextFilter.messageReceived(session,message);
+ nextFilter.messageReceived(session, message);
}
- public void messageSent(final NextFilter nextFilter, final IoSession
session,
- final Object message) throws Exception
+ public void messageSent(final NextFilter nextFilter, final IoSession
session, final Object message) throws Exception
{
nextFilter.messageSent(session, message);
}
- public void filterWrite(final NextFilter nextFilter, final IoSession
session,
- final WriteRequest writeRequest) throws Exception
+ public void filterWrite(final NextFilter nextFilter, final IoSession
session, final WriteRequest writeRequest)
+ throws Exception
{
nextFilter.filterWrite(session, writeRequest);
}
- //IoFilter methods that are processed on current thread (NOT on pooled
thread)
+ // IoFilter methods that are processed on current thread (NOT on pooled
thread)
public void filterClose(NextFilter nextFilter, IoSession session) throws
Exception
{
@@ -201,8 +189,8 @@
super(refCountingPool, name);
}
- public void messageReceived(final NextFilter nextFilter, final
IoSession session,
- final Object message) throws Exception
+ public void messageReceived(final NextFilter nextFilter, final
IoSession session, final Object message)
+ throws Exception
{
fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter,
message));
@@ -223,9 +211,8 @@
super(refCountingPool, name);
}
-
- public void filterWrite(final NextFilter nextFilter, final IoSession
session,
- final WriteRequest writeRequest) throws
Exception
+ public void filterWrite(final NextFilter nextFilter, final IoSession
session, final WriteRequest writeRequest)
+ throws Exception
{
fireAsynchEvent(session, new Event.WriteEvent(nextFilter,
writeRequest));
}
@@ -234,21 +221,17 @@
{
fireAsynchEvent(session, new CloseEvent(nextFilter));
}
-
}
- public static PoolingFilter
createAynschReadPoolingFilter(ReferenceCountingExecutorService
refCountingPool,String name)
+ public static PoolingFilter
createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,
String name)
{
- return new AsynchReadPoolingFilter(refCountingPool,name);
+ return new AsynchReadPoolingFilter(refCountingPool, name);
}
-
- public static PoolingFilter
createAynschWritePoolingFilter(ReferenceCountingExecutorService
refCountingPool,String name)
+ public static PoolingFilter
createAynschWritePoolingFilter(ReferenceCountingExecutorService
refCountingPool, String name)
{
- return new AsynchWritePoolingFilter(refCountingPool,name);
+ return new AsynchWritePoolingFilter(refCountingPool, name);
}
}
-
-