Hi,
I tested your example and worked for both configurations
(MeetingDetectorTae with and without the <analysisEngine> part. The
main difference between your client and mine is the separation of the
initialize and the submit process in your example.
Here is my code (Main class and Worker Thread):
The main class:
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
/**
*
* A multithreaded testClient calling a worker.
*
*/
public class MultithreadedTestClient {
//a server timeout not used in the current example
public static final int LINGREP_SERVER_CONNECTION_TIMEOUT = 600000;
public static final int TEST_NUMBEROFTHREADS = 100; // 10 threads
public static Logger theLog =
Logger.getLogger(MultithreadedTestClient.class);
/**
* Main method for the test
*
* @param args no arguments are parsed
* @throws Exception some problem
*/
public static void main(java.lang.String args[]) throws Exception {
// specify logfile settings
if (System.getProperty("log4j.configuration")==null){
DOMConfigurator.configure("resources/config/log4j.xml");
}
for (int i = 0; i < TEST_NUMBEROFTHREADS; i++) {
Runnable r = new MinimalWorkerThread(i);
new Thread(r).start();
}
}
}
And the Thread Worker:
import java.util.HashMap;
import java.util.Map;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;
/**
* a runnable initializing and calling the Webservice
*/
public class MinimalWorkerThread implements Runnable {
private int mId = 0;
/**
* Start time of the processing - used to compute elapsed time.
*/
private UimaAsynchronousEngine uimaEEEngine = null;
Map<String, Object> appCtx = new HashMap<String, Object>();
/**
* Constructor for the class. Parses command line arguments and
sets the values of fields in this
* instance. If command line is invalid prints a message and calls
System.exit().
*
* @param args
* command line arguments into the program - see class description
*/
public MinimalWorkerThread(int id) throws Exception {
mId = id;
//Initzialize the AppContext
uimaEEEngine = new BaseUIMAAsynchronousEngine_impl();
// Add Broker URI
appCtx.put(UimaAsynchronousEngine.ServerUri,
"tcp://143.205.174.93:61616");
// Add Endpoint
appCtx.put(UimaAsynchronousEngine.Endpoint, "MeetingDetectorTaeQueue");
appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
}
public void run() {
try {
System.out.println("running " + mId);
//initialize the client
uimaEEEngine.initialize(appCtx);
String text = "Id " + mId +"This is a nice test sentence.
And a second. Including a third.";
// send an empty CAS
CAS cas = uimaEEEngine.getCAS();
//cas.setDocumentLanguage("en");
cas.setDocumentText(text);
uimaEEEngine.sendAndReceiveCAS(cas);
uimaEEEngine.collectionProcessingComplete();
System.out.println("Thread id " + mId + " returned " +
cas.getDocumentText().substring(0, 5));
cas.reset();
uimaEEEngine.stop();
} catch (Exception e) {
System.err.println("Exception during Processing!");
e.printStackTrace();
}
}
}
On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <[email protected]> wrote:
> For some reason attachment dont seem to work. Here is my code:
>
> /*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements. See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing,
> * software distributed under the License is distributed on an
> * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> * KIND, either express or implied. See the License for the
> * specific language governing permissions and limitations
> * under the License.
> */
>
>
> import java.util.HashMap;
> import java.util.Map;
> import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
>
> import org.apache.uima.aae.client.UimaAsynchronousEngine;
> import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
> import org.apache.uima.cas.CAS;
>
> /**
> * Example client application that can instantiate multiple UIMA AS clients
> each running in
> * a separate thread.
> * <p>
> * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
> * <p>
> * The application creates as many UIMA AS clients and threads as specified
> in the "scaleup"
> * argument. Each instance runs in its own thread and has is its own temp
> reply queue. It
> * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
> service. For this
> * a CAS Pool containing a single CAS is sufficient.
> * <p>
> * Each client sends as many CASes to a remote service as specified in the
> "howManyCASesToSend"
> * argument.
> * <p>
> * The application initializes a CountDownLatch to the number of
> clients/threads which is than
> * used to await completion. When a worker thread completes its run, it
> sends a CPC and counts down the
> * latch reducing the number of busy threads. When all threads finish, the
> application is notified
> * and can proceed to cleanup and shutdown.
> *
> */
> public class MultithreadedClientApp {
> public CountDownLatch latch = null;
> public void initializeAndRun(String[] args) throws Exception {
> try {
> int howManyWorkers = Integer.parseInt(args[3]); // how many threads
> latch = new CountDownLatch(howManyWorkers); // each worker counts down when
> done
> // Create Worker threads
> ClientWorker[] workers = new ClientWorker[howManyWorkers];
> final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
> howManyWorkers);
> // Thread Pool Executor to manages threads
> ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
> howManyWorkers,
> Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
> // Start all threads
> threadPool.prestartAllCoreThreads();
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i] = new ClientWorker();
> // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
> workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
> }
>
> for( int i=0; i < howManyWorkers; i++ ) {
> threadPool.submit(workers[i]); // start the workers
> }
> // Each worker counts down the latch after it is done sending CASes
> latch.await();
> // All worker threads completed, now stop the clients
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i].stop(); // stop UIMA AS clients
> }
>
> threadPool.shutdown(); // cleanup thread pool
> System.out.println("All UIMA AS Clients Finished Processing");
> } catch( Exception e ) {
> e.printStackTrace();
> }
> }
> public static void main(String[] args) {
> MultithreadedClientApp client = new MultithreadedClientApp();
> try {
> if ( args.length != 4 ) {
> System.out.println("Usage: ");
> }
> client.initializeAndRun(args);
> } catch( Exception e ) {
> e.printStackTrace();
> }
> }
> public class ClientWorker implements Runnable {
> private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
> private int howManyCASes = 0;
> public void initialize(String brokerUrl, String endpoint, int howManyCASes )
> throws Exception {
> uimaASClient = new BaseUIMAAsynchronousEngine_impl();
> Map<String, Object> appCtx = new HashMap<String, Object>();
> // set server URI and Endpoint
> // Add Broker URI
> appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
> // Add Queue Name
> appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
> // Add the Cas Pool Size and initial FS heap size
> appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>
> // initialize
> uimaASClient.initialize(appCtx);
> this.howManyCASes = howManyCASes;
> }
> public void stop() {
> uimaASClient.stop();
> }
> public void run() {
> try {
> int sentSoFar = 0;
> CAS cas = uimaASClient.getCAS();
> int count=1;
> while( sentSoFar < howManyCASes ) {
>
> cas.setDocumentText("Some Text");
>
> uimaASClient.sendAndReceiveCAS(cas);
> System.out.println("Thread:"+Thread.currentThread().getId()+":::
> Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
> cas.reset();
> sentSoFar++;
> }
> uimaASClient.collectionProcessingComplete();
> System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
> CPC. Thread Done");
> latch.countDown();
> } catch( Exception e) {
> e.printStackTrace();
> }
> }
> }
> }
>
>
>
>
> On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <[email protected]> wrote:
>
>> Dietmar, I tried my example application with an Aggregate Service and see
>> no problem. Your previous email had no source attached.
>> Attached please find an example application code I use in my testing. To
>> run it"
>>
>> java -cp <classpath> MultithreadedClientApp
>> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>>
>> The code adds a short text to each CAS before each call to
>> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>>
>> Jerry
>>
>> 2010/12/9 Dietmar Gräbner <[email protected]>
>>
>> Hi Eddie,
>>>
>>> wouldn't the client requests be serialized in the szenario you propose?
>>>
>>> Dietmar
>>>
>>>
>>>
>>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <[email protected]>
>>> wrote:
>>> > 2010/12/7 Dietmar Gräbner <[email protected]>:
>>> >> I wrote a test client creating multiple threads. Each thread
>>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>>> >> aggregate with the sendAndReceiveCAS() call. When running the program
>>> >> with e.g. 100 Threads the client gets stuck after processing X calls.
>>> >
>>> > FWIW, a similar multithreaded client scenario that has been used with
>>> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>>> > using the common API object.
>>> >
>>> > Eddie
>>> >
>>>
>>
>>
>