For some reason attachment dont seem to work. Here is my code:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;
/**
* Example client application that can instantiate multiple UIMA AS clients
each running in
* a separate thread.
* <p>
* Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
* <p>
* The application creates as many UIMA AS clients and threads as specified
in the "scaleup"
* argument. Each instance runs in its own thread and has is its own temp
reply queue. It
* uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
service. For this
* a CAS Pool containing a single CAS is sufficient.
* <p>
* Each client sends as many CASes to a remote service as specified in the
"howManyCASesToSend"
* argument.
* <p>
* The application initializes a CountDownLatch to the number of
clients/threads which is than
* used to await completion. When a worker thread completes its run, it
sends a CPC and counts down the
* latch reducing the number of busy threads. When all threads finish, the
application is notified
* and can proceed to cleanup and shutdown.
*
*/
public class MultithreadedClientApp {
public CountDownLatch latch = null;
public void initializeAndRun(String[] args) throws Exception {
try {
int howManyWorkers = Integer.parseInt(args[3]); // how many threads
latch = new CountDownLatch(howManyWorkers); // each worker counts down when
done
// Create Worker threads
ClientWorker[] workers = new ClientWorker[howManyWorkers];
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
howManyWorkers);
// Thread Pool Executor to manages threads
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
howManyWorkers,
Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
// Start all threads
threadPool.prestartAllCoreThreads();
for( int i=0; i < howManyWorkers; i++ ) {
workers[i] = new ClientWorker();
// 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
}
for( int i=0; i < howManyWorkers; i++ ) {
threadPool.submit(workers[i]); // start the workers
}
// Each worker counts down the latch after it is done sending CASes
latch.await();
// All worker threads completed, now stop the clients
for( int i=0; i < howManyWorkers; i++ ) {
workers[i].stop(); // stop UIMA AS clients
}
threadPool.shutdown(); // cleanup thread pool
System.out.println("All UIMA AS Clients Finished Processing");
} catch( Exception e ) {
e.printStackTrace();
}
}
public static void main(String[] args) {
MultithreadedClientApp client = new MultithreadedClientApp();
try {
if ( args.length != 4 ) {
System.out.println("Usage: ");
}
client.initializeAndRun(args);
} catch( Exception e ) {
e.printStackTrace();
}
}
public class ClientWorker implements Runnable {
private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
private int howManyCASes = 0;
public void initialize(String brokerUrl, String endpoint, int howManyCASes )
throws Exception {
uimaASClient = new BaseUIMAAsynchronousEngine_impl();
Map<String, Object> appCtx = new HashMap<String, Object>();
// set server URI and Endpoint
// Add Broker URI
appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
// Add Queue Name
appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
// Add the Cas Pool Size and initial FS heap size
appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
// initialize
uimaASClient.initialize(appCtx);
this.howManyCASes = howManyCASes;
}
public void stop() {
uimaASClient.stop();
}
public void run() {
try {
int sentSoFar = 0;
CAS cas = uimaASClient.getCAS();
int count=1;
while( sentSoFar < howManyCASes ) {
cas.setDocumentText("Some Text");
uimaASClient.sendAndReceiveCAS(cas);
System.out.println("Thread:"+Thread.currentThread().getId()+":::
Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
cas.reset();
sentSoFar++;
}
uimaASClient.collectionProcessingComplete();
System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
CPC. Thread Done");
latch.countDown();
} catch( Exception e) {
e.printStackTrace();
}
}
}
}
On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <[email protected]> wrote:
> Dietmar, I tried my example application with an Aggregate Service and see
> no problem. Your previous email had no source attached.
> Attached please find an example application code I use in my testing. To
> run it"
>
> java -cp <classpath> MultithreadedClientApp
> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>
> The code adds a short text to each CAS before each call to
> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>
> Jerry
>
> 2010/12/9 Dietmar Gräbner <[email protected]>
>
> Hi Eddie,
>>
>> wouldn't the client requests be serialized in the szenario you propose?
>>
>> Dietmar
>>
>>
>>
>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <[email protected]>
>> wrote:
>> > 2010/12/7 Dietmar Gräbner <[email protected]>:
>> >> I wrote a test client creating multiple threads. Each thread
>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>> >> aggregate with the sendAndReceiveCAS() call. When running the program
>> >> with e.g. 100 Threads the client gets stuck after processing X calls.
>> >
>> > FWIW, a similar multithreaded client scenario that has been used with
>> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>> > using the common API object.
>> >
>> > Eddie
>> >
>>
>
>