thx for the example - I'll test it tomorrow. Best regards
On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <[email protected]> wrote: > For some reason attachment dont seem to work. Here is my code: > > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, > * software distributed under the License is distributed on an > * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > * KIND, either express or implied. See the License for the > * specific language governing permissions and limitations > * under the License. > */ > > > import java.util.HashMap; > import java.util.Map; > import java.util.concurrent.ArrayBlockingQueue; > import java.util.concurrent.CountDownLatch; > import java.util.concurrent.ThreadPoolExecutor; > import java.util.concurrent.TimeUnit; > > import org.apache.uima.aae.client.UimaAsynchronousEngine; > import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl; > import org.apache.uima.cas.CAS; > > /** > * Example client application that can instantiate multiple UIMA AS clients > each running in > * a separate thread. > * <p> > * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup> > * <p> > * The application creates as many UIMA AS clients and threads as specified > in the "scaleup" > * argument. Each instance runs in its own thread and has is its own temp > reply queue. It > * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote > service. For this > * a CAS Pool containing a single CAS is sufficient. > * <p> > * Each client sends as many CASes to a remote service as specified in the > "howManyCASesToSend" > * argument. > * <p> > * The application initializes a CountDownLatch to the number of > clients/threads which is than > * used to await completion. When a worker thread completes its run, it > sends a CPC and counts down the > * latch reducing the number of busy threads. When all threads finish, the > application is notified > * and can proceed to cleanup and shutdown. > * > */ > public class MultithreadedClientApp { > public CountDownLatch latch = null; > public void initializeAndRun(String[] args) throws Exception { > try { > int howManyWorkers = Integer.parseInt(args[3]); // how many threads > latch = new CountDownLatch(howManyWorkers); // each worker counts down when > done > // Create Worker threads > ClientWorker[] workers = new ClientWorker[howManyWorkers]; > final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( > howManyWorkers); > // Thread Pool Executor to manages threads > ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers, > howManyWorkers, > Integer.MAX_VALUE, TimeUnit.SECONDS, queue); > // Start all threads > threadPool.prestartAllCoreThreads(); > for( int i=0; i < howManyWorkers; i++ ) { > workers[i] = new ClientWorker(); > // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send > workers[i].initialize(args[0],args[1],Integer.parseInt(args[2])); > } > > for( int i=0; i < howManyWorkers; i++ ) { > threadPool.submit(workers[i]); // start the workers > } > // Each worker counts down the latch after it is done sending CASes > latch.await(); > // All worker threads completed, now stop the clients > for( int i=0; i < howManyWorkers; i++ ) { > workers[i].stop(); // stop UIMA AS clients > } > > threadPool.shutdown(); // cleanup thread pool > System.out.println("All UIMA AS Clients Finished Processing"); > } catch( Exception e ) { > e.printStackTrace(); > } > } > public static void main(String[] args) { > MultithreadedClientApp client = new MultithreadedClientApp(); > try { > if ( args.length != 4 ) { > System.out.println("Usage: "); > } > client.initializeAndRun(args); > } catch( Exception e ) { > e.printStackTrace(); > } > } > public class ClientWorker implements Runnable { > private BaseUIMAAsynchronousEngine_impl uimaASClient = null; > private int howManyCASes = 0; > public void initialize(String brokerUrl, String endpoint, int howManyCASes ) > throws Exception { > uimaASClient = new BaseUIMAAsynchronousEngine_impl(); > Map<String, Object> appCtx = new HashMap<String, Object>(); > // set server URI and Endpoint > // Add Broker URI > appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl); > // Add Queue Name > appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint); > // Add the Cas Pool Size and initial FS heap size > appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1); > > // initialize > uimaASClient.initialize(appCtx); > this.howManyCASes = howManyCASes; > } > public void stop() { > uimaASClient.stop(); > } > public void run() { > try { > int sentSoFar = 0; > CAS cas = uimaASClient.getCAS(); > int count=1; > while( sentSoFar < howManyCASes ) { > > cas.setDocumentText("Some Text"); > > uimaASClient.sendAndReceiveCAS(cas); > System.out.println("Thread:"+Thread.currentThread().getId()+"::: > Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far"); > cas.reset(); > sentSoFar++; > } > uimaASClient.collectionProcessingComplete(); > System.out.println("Thread::"+Thread.currentThread().getId()+" Sent > CPC. Thread Done"); > latch.countDown(); > } catch( Exception e) { > e.printStackTrace(); > } > } > } > } > > > > > On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <[email protected]> wrote: > >> Dietmar, I tried my example application with an Aggregate Service and see >> no problem. Your previous email had no source attached. >> Attached please find an example application code I use in my testing. To >> run it" >> >> java -cp <classpath> MultithreadedClientApp >> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads> >> >> The code adds a short text to each CAS before each call to >> sendAndReceive(). There are no app listeners attached to UIMA AS client. >> >> Jerry >> >> 2010/12/9 Dietmar Gräbner <[email protected]> >> >> Hi Eddie, >>> >>> wouldn't the client requests be serialized in the szenario you propose? >>> >>> Dietmar >>> >>> >>> >>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <[email protected]> >>> wrote: >>> > 2010/12/7 Dietmar Gräbner <[email protected]>: >>> >> I wrote a test client creating multiple threads. Each thread >>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima >>> >> aggregate with the sendAndReceiveCAS() call. When running the program >>> >> with e.g. 100 Threads the client gets stuck after processing X calls. >>> > >>> > FWIW, a similar multithreaded client scenario that has been used with >>> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl >>> > with big enough casPool and have each thread call sendAndReceiveCAS() >>> > using the common API object. >>> > >>> > Eddie >>> > >>> >> >> >
