Based on the test code below coprocessor calls do not scale for concurrent
client. What is actually a 10-20 msec response for single client degrades to
even 2000 msec for 32 clients on a cluster with 32 cores on ec2. This is with a
simple test endpoint which just returns a map with one string,double without
any computation. (code is also below)
Would like to hear from anybody using coprocessors with good latency responses
(version of Hbase etc, we are on 0.94.1) . Maybe James @ Phoenix can respond
with how Phoenix handles concurrent connections. (Appreciate it, James)
test/test10.log:Total search elapsed time: 430
test/test10.log:Total search elapsed time: 100
test/test10.log:Total search elapsed time: 274
test/test10.log:Total search elapsed time: 157
test/test10.log:Total search elapsed time: 182
test/test10.log:Total search elapsed time: 222
test/test10.log:Total search elapsed time: 415
test/test10.log:Total search elapsed time: 180
test/test10.log:Total search elapsed time: 58
test/test10.log:Total search elapsed time: 228
test/test10.log:Total search elapsed time: 248
test/test10.log:Total search elapsed time: 138
test/test10.log:Total search elapsed time: 144
test/test10.log:Total search elapsed time: 141
test/test11.log:Total search elapsed time: 2407
test/test11.log:Total search elapsed time: 240
test/test11.log:Total search elapsed time: 260
test/test11.log:Total search elapsed time: 304
test/test11.log:Total search elapsed time: 129
test/test11.log:Total search elapsed time: 300
test/test11.log:Total search elapsed time: 465
test/test11.log:Total search elapsed time: 282
test/test11.log:Total search elapsed time: 248
test/test11.log:Total search elapsed time: 218
test/test11.log:Total search elapsed time: 356
test/test11.log:Total search elapsed time: 268
test/test11.log:Total search elapsed time: 147
test/test11.log:Total search elapsed time: 227
test/test11.log:Total search elapsed time: 189
test/test11.log:Total search elapsed time: 97
test/test11.log:Total search elapsed time: 231
test/test11.log:Total search elapsed time: 231
test/test11.log:Total search elapsed time: 56
test/test11.log:Total search elapsed time: 298
test/test11.log:Total search elapsed time: 165
test/test11.log:Total search elapsed time: 285
================================ Updated client to account for issues with
connection setup interfering with rpc calls ======================
package com.serendio.hbase.coprocessor;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
public class Search {
public static final String TEST_TABLE = "c4";
public static final String TEST_FAMILY = "info";
public static final String TEST_QUALIFIER = "c1";
/**
* ``
*
* @param args
*/
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
HTableInterface table = null;
try {
table = new HTable(conf, TEST_TABLE);
} catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("Connection setup time: "
+ Long.toString(end - start));
Map<byte[], Map<String, Double>> results = null;
final Map<String, Double> input = new HashMap<String, Double>();
input.put("test", 1.0);
final int topN = 10;
Thread.sleep(2000);
int loop = Integer.parseInt(args[0]);
for (int i = 0; i < loop; i++) {
start = System.currentTimeMillis();
try {
results = table.coprocessorExec(SearchProtocol.class, null,
null, new Call<SearchProtocol, Map<String, Double>>() {
@Override
public Map<String, Double> call(
SearchProtocol instance) throws IOException {
// TODO Auto-generated method stub
return instance.foo(input, topN);
}
});
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (Throwable e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
end = System.currentTimeMillis();
System.out.println("Total search elapsed time: "
+ Long.toString(end - start));
}
}
Regards,
- kiru
________________________________
From: Kiru Pakkirisamy <[email protected]>
To: "[email protected]" <[email protected]>
Sent: Friday, August 30, 2013 1:56 PM
Subject: Re: Coprocessor concurrency
Vlad,
I am running in separate processes. But I think I found at least part of the
problem.
It is the HTable connection setup that is affecting it
If I setup all the connections in all the processes for all the tables then and
then call the endpoints it goes through ok.
At least for the example below.
But with my real code where one endpoint does some computation of atleast
400-500msec. It is degrading exponentially.
(I will reduce the handler count and try ). Thanks and appreciate your response.
These things are blocking me from doing real scalability testing. We got single
client down to 1.3 sec (target is under a sec) and now I want to move on to
scalability.
Regards,
- kiru
________________________________
From: Vladimir Rodionov <[email protected]>
To: "[email protected]" <[email protected]>; Kiru Pakkirisamy
<[email protected]>
Sent: Friday, August 30, 2013 12:34 PM
Subject: RE: Coprocessor concurrency
Can you check RS thread stack traces during your test? If there are some
concurrency/contention issues they will pop up in a stack traces.
Also, make sure that you run your client apps in a separate processes.
Best regards,
Vladimir Rodionov
Principal Platform Engineer
Carrier IQ, www.carrieriq.com
e-mail: [email protected]
________________________________________
From: Kiru Pakkirisamy [[email protected]]
Sent: Friday, August 30, 2013 1:49 AM
To: hbase mailing list
Subject: Coprocessor concurrency
See below a null endpoint which takes 60-70msec on my 4 node ec2 cluster for a
table with 45 regions.
When I run 64 concurrent clients for this the latency jumps to 3000-3700 msec.
(zookeeper maxClientCnxs is set to 0 (unlimited) and hbase regionserver handler
count is 800). I hope I am not missing any configs for concurrency)
----------------------------------------------------------------------------------------------------------------------------------------------------------
package com.serendio.hbase.coprocessor;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
//Aggregation implementation at a region.
public class SearchEndpoint extends BaseEndpointCoprocessor implements
SearchProtocol {
@Override
public Map<String, Double> foo(Map<String, Double> input, int topN)
throws IOException {
// Implement your logic here
// Map<String, Double> ret = new HashMap<String, Double>();
// ret.put("foo", 1.0);
return null;
}
}
===================================================================
package com.serendio.hbase.coprocessor;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
public interface SearchProtocol extends CoprocessorProtocol {
public Map<String, Double> foo(Map<String, Double> input,
int topN) throws IOException;
}
==========================================================================
package com.serendio.hbase.coprocessor;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
public class Search {
public static final String TEST_TABLE = "c4";
public static final String TEST_FAMILY = "info";
public static final String TEST_QUALIFIER = "c1";
/**
* ``
*
* @param args
*/
public static void main(String[] args) {
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
HTableInterface table = null;
try {
table = new HTable(conf, TEST_TABLE);
} catch (IOException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
Map<byte[], Map<String, Double>> results = null;
final Map<String, Double> input = new HashMap<String, Double>();
input.put("test", 1.0);
final int topN = 10;
long start = System.currentTimeMillis();
try {
results = table.coprocessorExec(SearchProtocol.class, null, null,
new Call<SearchProtocol, Map<String, Double>>() {
@Override
public Map<String, Double> call(SearchProtocol instance)
throws IOException {
// TODO Auto-generated method stub
return instance.foo(input, topN);
}
});
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (Throwable e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("Total search elapsed time: "
+ Long.toString(end - start));
}
}
Regards,
- kiru
Confidentiality Notice: The information contained in this message, including
any attachments hereto, may be confidential and is intended to be read only by
the individual or entity to whom this message is addressed. If the reader of
this message is not the intended recipient or an agent or designee of the
intended recipient, please note that any review, use, disclosure or
distribution of this message or its attachments, in any form, is strictly
prohibited. If you have received this message in error, please immediately
notify the sender and/or [email protected] and delete or destroy any
copy of this message and its attachments.