When several threads make calls on a different hosts (for each thread a
different machine, by example), and when a connection takes some time
(host unreachable...), all the threads asking for a connection to the
JConnectionMgr are locked because the method newCltConnection is fully
synchronized.
I join a modified class that allows simultaneous access when
destinations are different.
The main problem was to avoid duplicates of the same connection. This
has been performed by adding the connection before it is really created
to connections[] and then making a synchronized call to acquire() (out
of the synchronized(this)).
I join too a simple code that shows the problem and tests the fix.
It can be launched by small shell srcipts in test/ dir (but you need a
jonathan.jar in jar/ dir):
'run_server' starts the server
'run_old_client <URServer> <RServer>' calls helloWorld method on
servers using your jonathan.jar
'run_new_client <URServer> <RServer>' calls helloWorld method on
servers but with the new
ConnectionMgr
<URServer> : unreachable server (this can be a down or disconnected
machine)
what is important is that a call to this machine should take a long
time.
<RServer> : reachable server (a machine on which the server is running)
note: I use jeremie stubs.
/***
* Jonathan: an Open Distributed Processing Environment
* Copyright (C) 1999 France Telecom R&D
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Release: 2.0
*
* Contact: [EMAIL PROTECTED]
*
* Author: Bruno Dumant
*
*/
package org.objectweb.jonathan.libs.resources.tcpip;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.objectweb.jonathan.apis.kernel.Kernel;
import org.objectweb.jonathan.apis.kernel.JonathanException;
import org.objectweb.jonathan.apis.contexts.ExportException;
import org.objectweb.jonathan.apis.protocols.ip.*;
import org.objectweb.jonathan.apis.resources.Chunk;
/**
* Default implementation of a connection manager and factory.
*/
public class JConnectionMgr implements TcpIpConnectionMgr {
Connection[] connections = new Connection[101];
int connects;
SrvConnectionFactory server_connection_factories;
Connection first_idle;
Connection last_idle;
int idle;
TcpIpConnectionMgr factory;
/**
* Maximum number of idle connections kept by this manager.
* This constant is initialized by the <code>jonathan.tcpip.max_idle</code>
* {@link org.objectweb.jonathan.apis.kernel.Kernel#getProperties()
* kernel property}.
*/
public int max_idle;
/**
* Creates a new JConnectionMgr.
*
* @param kernel a kernel instance;
* @exception JonathanException if something goes wrong.
*/
public JConnectionMgr(Kernel kernel)
throws JonathanException {
connects = 0;
java.util.Properties properties = kernel.getProperties();
String prop;
try {
prop =
properties.getProperty("jonathan.tcpip.max_idle",
"22");
max_idle = Integer.parseInt(prop);
factory = (TcpIpConnectionMgr)
kernel.bind("jonathan.tcpip.connection_factory");
} catch (Exception e) {
throw new JonathanException(e);
}
}
/**
* Returns the canonical host name of the provided host.
* @param hostname a host name
* @return the corresponding canonical host name.
*/
public String getCanonicalHostName(String hostname) {
return factory.getCanonicalHostName(hostname);
}
/**
* Returns a new client connection.
* <p>
* This method is called by a protocol.
* The protocol provides a session (i.e. an object
* representing an abstract communication channel) and expects a connection
* (i.e. a communication resource). The returned connection must have been
* built using the provided session, or be a connection associated with a
* session having the same destination as the provided session.
* @param host the host name of the distant server;
* @param port the port number of a server socket on that host;
* @param session a TcpIp session
* @return a connection for that session.
* @exception JonathanException if an error occurs.
*/
//public synchronized TcpIpConnection newCltConnection(String host, short port,
// TcpIpSession session)
public IpConnection newCltConnection(String host, int port,
IpSession session)
throws JonathanException {
Connection connection;
synchronized (this) {
host = getCanonicalHostName(host);
int hash = host.hashCode() + port;
int len = connections.length;
int index = (hash & 0x7FFFFFFF) % len;
connection = connections[index];
while (connection != null
&& !(connection.getPort() == port
&& connection.getHostName().equals(host))) {
if (connection.getSession() == null) {
connection.setSession(session);
break;
} else if (connection.getSession().equals(session)) {
break;
}
connection = connection.next;
}
if (connection == null) {
connection = new Connection(host, port, session);
Connection first = connections[index];
connections[index] = connection;
connection.next = first;
connects++;
if (connects > len / 2) {
rehash(len);
}
}
}
connection.acquire();
return connection;
}
/**
* Returns a new server connection factory encapsulating a server socket on the
* provided port. If port = 0, an anonymous server socket is opened.
* @param port the expected port of the server socket;
* @return a server connection factory.
* @exception JonathanException if an error occurs.
*/
//public TcpIpSrvConnectionFactory newSrvConnectionFactory(short port)
public TcpIpSrvConnectionFactory newSrvConnectionFactory(int port)
throws JonathanException {
SrvConnectionFactory fac = server_connection_factories;
while (fac != null) {
if (fac.getPort() == port) {
break;
}
fac = fac.next;
}
if (fac == null) {
fac = server_connection_factories;
server_connection_factories =
new SrvConnectionFactory(factory.newSrvConnectionFactory(port));
server_connection_factories.next = fac;
return server_connection_factories;
} else {
return fac;
}
}
/**
* Removes the connection from the set of managed connections.
* @param connection the connection to remove.
*/
void remove(Connection connection) {
int hash = connection.hashCode();
int len = connections.length;
int index = (hash & 0x7FFFFFFF) % len;
Connection current = connections[index];
Connection prev = null;
while (current != null) {
if (connection == current) {
if (prev == null) {
connections[index] = current.next;
} else {
prev.next = current.next;
}
return;
} else {
current = current.next;
}
}
}
void add(Connection connection) {
int hash = connection.hashCode();
int len = connections.length;
int index = (hash & 0x7FFFFFFF) % len;
Connection first = connections[index];
connections[index] = connection;
connection.next = first;
connects++;
if (connects > len / 2) {
rehash(len);
}
}
void removeConnectionFactory(SrvConnectionFactory factory) {
SrvConnectionFactory connection_factory =
server_connection_factories, prev = null;
while (connection_factory != null) {
if (connection_factory == factory) {
if (prev != null) {
prev.next = connection_factory.next;
} else {
server_connection_factories = connection_factory.next;
}
return;
}
prev = connection_factory;
connection_factory = connection_factory.next;
}
}
/** rehashes the hash table, if necessary. */
void rehash(int len) {
Connection entry, next_entry, first_entry;
int index;
int new_len = 2 * len + 1;
Connection[] new_table = new Connection[new_len];
for (int i = 0; i < len; i++) {
entry = connections[i];
while (entry != null) {
next_entry = entry.next;
// rehashing
index = (entry.hashCode() & 0x7FFFFFFF) % new_len;
first_entry = new_table[index];
new_table[index] = entry;
entry.next = first_entry;
//
entry = next_entry;
}
}
connections = new_table;
}
/**
* Implementation of TcpIpConnection.
*/
//public class Connection implements TcpIpConnection {
public class Connection implements IpConnection {
/** Next connection in the idle list (null if connection is active) */
Connection next_idle;
/** Previous connection in the idle list (null if connection is active) */
Connection prev_idle;
/** Number of objects using that connection. */
int acquired;
/** The next connection in a collision list */
Connection next;
//TcpIpConnection delegate;
IpConnection delegate;
String host;
int port;
IpSession session;
/**
* Builds a new connection.
*/
//protected Connection(TcpIpConnection delegate)
protected Connection(String host, int port, IpSession session)
throws JonathanException {
this.host = host;
this.port = port;
this.session = session;
this.delegate = null;
acquired = 0;
if (last_idle != null) {
last_idle.next_idle = this;
last_idle = this;
} else {
last_idle = first_idle = this;
}
}
public int available() throws IOException {
return delegate.available();
}
public void receive(Chunk chunk,int to_read) throws IOException {
delegate.receive(chunk,to_read);
}
public void emit(Chunk chunk) throws IOException {
delegate.emit(chunk);
}
/**
* Returns the port number of the underlying socket.
* @return the port number of the underlying socket.
*/
//public short getPort() {
public int getPort() {
return port;
}
/**
* Returns the host name of the underlying socket.
* @return the host name of the underlying socket.
*/
public String getHostName() {
return host;
}
/**
* Returns the session attached to this connection.
* @return the session attached to this connection.
*/
//public TcpIpSession getSession() {
public IpSession getSession() {
return session;
}
/**
* Attaches a new session to this connection.
* @param session the session to be attached to the target connection.
*/
//public void setSession(TcpIpSession session) {
public void setSession(IpSession session) {
this.session = session;
delegate.setSession(session);
}
/**
* Deletes this connection, removing it from the connection manager, and
* closing the socket. This method should not be used a a socket user unless
* a problem occurs on the connection, like an exception when trying to read
* or to write data.
*/
public void delete() {
synchronized(JConnectionMgr.this) {
delegate.delete();
withdraw();
}
}
/**
* Returns when the socket is acquired. This information is taken into
* account by the connection manager to avoid closing connections still in
* use.
*/
public void acquire()
throws JonathanException {
synchronized(this) {
if (delegate == null) {
delegate = factory.newCltConnection(host,port,session);
}
}
synchronized(JConnectionMgr.this) {
acquired++;
if (acquired == 1) {
if (prev_idle != null) {
prev_idle.next_idle = next_idle;
} else {
first_idle = next_idle;
}
if (next_idle != null) {
next_idle.prev_idle = prev_idle;
} else {
last_idle = prev_idle;
}
prev_idle = next_idle = null;
idle--;
}
}
}
/**
* Releases this connection. This is to indicate to the connection manager
* that the target connection is no longer used.
*/
public void release() {
synchronized(JConnectionMgr.this) {
acquired--;
if (acquired == 0) {
if (last_idle != null) {
last_idle.next_idle = this;
last_idle = this;
} else {
last_idle = first_idle = this;
}
if (idle >= max_idle) {
if (first_idle.delegate != null) {
first_idle.delegate.release();
}
first_idle.withdraw();
}
idle++;
}
}
}
void withdraw() {
remove(this);
if (next_idle != null) {
if (prev_idle != null) {
prev_idle.next_idle = next_idle;
next_idle.prev_idle = prev_idle;
} else {
first_idle = next_idle;
next_idle.prev_idle = null;
}
idle--;
} else if (prev_idle != null) {
last_idle = prev_idle;
prev_idle.next_idle = null;
idle--;
} else if (last_idle == this) {
// may occur if max_idle <= 0
last_idle = null;
prev_idle = null;
idle--;
}
}
public String toString() {
return "JConnectionMgr.Connection" + System.identityHashCode(this)
+ "[" + delegate + "]";
}
public int hashCode() {
int hashcode = delegate.getPort() + delegate.getHostName().hashCode();
return hashcode;
}
}
class SrvConnectionFactory implements TcpIpSrvConnectionFactory {
TcpIpSrvConnectionFactory delegate;
SrvConnectionFactory next;
SrvConnectionFactory(TcpIpSrvConnectionFactory delegate)
throws JonathanException {
this.delegate = delegate;
}
//public TcpIpConnection newSrvConnection(TcpIpSession session)
public IpConnection newSrvConnection(IpSession session)
throws JonathanException {
return delegate.newSrvConnection(session);
}
//public short getPort() {
public int getPort() {
return delegate.getPort();
}
public String getHostName() {
return delegate.getHostName();
}
public void close() {
synchronized(JConnectionMgr.this) {
delegate.close();
removeConnectionFactory(this);
}
}
}
}
threads&jeremie.tar.gz