Repository: accumulo Updated Branches: refs/heads/1.4.6-SNAPSHOT 0f6392400 -> c8e165a31 refs/heads/1.5.2-SNAPSHOT 5363d781d -> 926133889 refs/heads/1.6.0-SNAPSHOT 3934ea6a2 -> 716ea0ee8 refs/heads/master 8856c1f66 -> 46b0f986d
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java index 06b9a7c,0000000..70156b2 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java @@@ -1,223 -1,0 +1,222 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.server.util.reflection.CounterUtils; +import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; + +/** + * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined. + */ + +public class ContinuousVerify extends Configured implements Tool { + public static final VLongWritable DEF = new VLongWritable(-1); + + public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> { + + private LongWritable row = new LongWritable(); + private LongWritable ref = new LongWritable(); + private VLongWritable vrow = new VLongWritable(); + + private long corrupt = 0; + + @Override + public void map(Key key, Value data, Context context) throws IOException, InterruptedException { + long r = Long.parseLong(key.getRow().toString(), 16); + if (r < 0) + throw new IllegalArgumentException(); + + try { + ContinuousWalk.validate(key, data); + } catch (BadChecksumException bce) { + CounterUtils.increment(context.getCounter(Counts.CORRUPT)); + if (corrupt < 1000) { + System.out.println("ERROR Bad checksum : " + key); + } else if (corrupt == 1000) { + System.out.println("Too many bad checksums, not printing anymore!"); + } + corrupt++; + return; + } + + row.set(r); + + context.write(row, DEF); + byte[] val = data.get(); + + int offset = ContinuousWalk.getPrevRowOffset(val); + if (offset > 0) { + ref.set(Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16)); + vrow.set(r); + context.write(ref, vrow); + } + } + } + + public static enum Counts { + UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT + } + + public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> { + private ArrayList<Long> refs = new ArrayList<Long>(); + + @Override + public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException { + + int defCount = 0; + + refs.clear(); + for (VLongWritable type : values) { + if (type.get() == -1) { + defCount++; + } else { + refs.add(type.get()); + } + } + + if (defCount == 0 && refs.size() > 0) { + StringBuilder sb = new StringBuilder(); + String comma = ""; + for (Long ref : refs) { + sb.append(comma); + comma = ","; + sb.append(new String(ContinuousIngest.genRow(ref), Constants.UTF8)); + } + + context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString())); + CounterUtils.increment(context.getCounter(Counts.UNDEFINED)); + + } else if (defCount > 0 && refs.size() == 0) { + CounterUtils.increment(context.getCounter(Counts.UNREFERENCED)); + } else { + CounterUtils.increment(context.getCounter(Counts.REFERENCED)); + } + + } + } + + static class Opts extends ClientOnDefaultTable { + @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true) + String outputDir = "/tmp/continuousVerify"; + + @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class) + int maxMaps = 0; + + @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class) + int reducers = 0; + + @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline") + boolean scanOffline = false; + + public Opts() { + super("ci"); + } + } + + @Override + public int run(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(this.getClass().getName(), args); + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(AccumuloInputFormat.class); + opts.setAccumuloConfigs(job); + + Set<Range> ranges = null; + String clone = opts.getTableName(); + Connector conn = null; + + if (opts.scanOffline) { + Random random = new Random(); + clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl)); + conn = opts.getConnector(); + conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>()); + ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + conn.tableOperations().offline(clone); + AccumuloInputFormat.setInputTableName(job, clone); + AccumuloInputFormat.setOfflineTableScan(job, true); + } else { + ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps); + } + + AccumuloInputFormat.setRanges(job, ranges); + AccumuloInputFormat.setAutoAdjustRanges(job, false); + + job.setMapperClass(CMapper.class); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(VLongWritable.class); + + job.setReducerClass(CReducer.class); + job.setNumReduceTasks(opts.reducers); + + job.setOutputFormatClass(TextOutputFormat.class); + + job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline); + + TextOutputFormat.setOutputPath(job, new Path(opts.outputDir)); + + job.waitForCompletion(true); + + if (opts.scanOffline) { + conn.tableOperations().delete(clone); + } + opts.stopTracing(); + return job.isSuccessful() ? 0 : 1; + } + + /** + * + * @param args + * instanceName zookeepers username password table columns outputpath - * @throws Exception + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args); + if (res != 0) + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java index c522914,0000000..f1dfcd2 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestClean.java @@@ -1,51 -1,0 +1,48 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.io.File; +import java.util.Arrays; + +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; + +public class CacheTestClean { + - /** - * @param args - */ + public static void main(String[] args) throws Exception { + String rootDir = args[0]; + File reportDir = new File(args[1]); + + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + + if (zoo.exists(rootDir)) { + zoo.recursiveDelete(rootDir, NodeMissingPolicy.FAIL); + } + + if (!reportDir.exists()) { + reportDir.mkdir(); + } else { + File[] files = reportDir.listFiles(); + if (files.length != 0) + throw new Exception("dir " + reportDir + " is not empty: " + Arrays.asList(files)); + } + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/RunTests.java index 2b775c5,0000000..06c6fdb mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java @@@ -1,217 -1,0 +1,213 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.server.util.reflection.CounterUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; + +import com.beust.jcommander.Parameter; + +/** + * Runs the functional tests via map-reduce. + * + * First, be sure everything is compiled. + * + * Second, get a list of the tests you want to run: + * + * <pre> + * $ python test/system/auto/run.py -l > tests + * </pre> + * + * Put the list of tests into HDFS: + * + * <pre> + * $ hadoop fs -put tests /user/hadoop/tests + * </pre> + * + * Run the map-reduce job: + * + * <pre> + * $ ./bin/accumulo accumulo.test.functional.RunTests --tests /user/hadoop/tests --output /user/hadoop/results + * </pre> + * + * Note that you will need to have some configuration in conf/accumulo-site.xml (to locate zookeeper). The map-reduce jobs will not use your local accumulo + * instance. + * + */ +public class RunTests extends Configured implements Tool { + + static final public String JOB_NAME = "Functional Test Runner"; + private static final Logger log = Logger.getLogger(RunTests.class); + + private Job job = null; + + private static final int DEFAULT_TIMEOUT_FACTOR = 1; + + static class Opts extends Help { + @Parameter(names="--tests", description="newline separated list of tests to run", required=true) + String testFile; + @Parameter(names="--output", description="destination for the results of tests in HDFS", required=true) + String outputPath; + @Parameter(names="--timeoutFactor", description="Optional scaling factor for timeout for both mapred.task.timeout and -f flag on run.py", required=false) + Integer intTimeoutFactor = DEFAULT_TIMEOUT_FACTOR; + } + + static final String TIMEOUT_FACTOR = RunTests.class.getName() + ".timeoutFactor"; + + static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> { + + private static final String REDUCER_RESULT_START = "::::: "; + private static final int RRS_LEN = REDUCER_RESULT_START.length(); + private Text result = new Text(); + String mapperTimeoutFactor = null; + + private static enum Outcome { + SUCCESS, FAILURE, ERROR, UNEXPECTED_SUCCESS, EXPECTED_FAILURE + } + private static final Map<Character, Outcome> OUTCOME_COUNTERS; + static { + OUTCOME_COUNTERS = new java.util.HashMap<Character, Outcome>(); + OUTCOME_COUNTERS.put('S', Outcome.SUCCESS); + OUTCOME_COUNTERS.put('F', Outcome.FAILURE); + OUTCOME_COUNTERS.put('E', Outcome.ERROR); + OUTCOME_COUNTERS.put('T', Outcome.UNEXPECTED_SUCCESS); + OUTCOME_COUNTERS.put('G', Outcome.EXPECTED_FAILURE); + } + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-m", "-f", mapperTimeoutFactor, "-t", value.toString()); + log.info("Running test " + cmd); + ProcessBuilder pb = new ProcessBuilder(cmd); + pb.directory(new File(context.getConfiguration().get("accumulo.home"))); + pb.redirectErrorStream(true); + Process p = pb.start(); + p.getOutputStream().close(); + InputStream out = p.getInputStream(); + InputStreamReader outr = new InputStreamReader(out, Constants.UTF8); + BufferedReader br = new BufferedReader(outr); + String line; + try { + while ((line = br.readLine()) != null) { + log.info("More: " + line); + if (line.startsWith(REDUCER_RESULT_START)) { + String resultLine = line.substring(RRS_LEN); + if (resultLine.length() > 0) { + Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0)); + if (outcome != null) { + CounterUtils.increment(context.getCounter(outcome)); + } + } + String taskAttemptId = context.getTaskAttemptID().toString(); + result.set(taskAttemptId + " " + resultLine); + context.write(value, result); + } + } + } catch (Exception ex) { + log.error(ex); + context.progress(); + } + + p.waitFor(); + } + + @Override + protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException, InterruptedException { + mapperTimeoutFactor = Integer.toString(context.getConfiguration().getInt(TIMEOUT_FACTOR, DEFAULT_TIMEOUT_FACTOR)); + } + } + + @Override + public int run(String[] args) throws Exception { + job = new Job(getConf(), JOB_NAME); + job.setJarByClass(this.getClass()); + Opts opts = new Opts(); + opts.parseArgs(RunTests.class.getName(), args); + + // this is like 1-2 tests per mapper + Configuration conf = job.getConfiguration(); + conf.setInt("mapred.max.split.size", 40); + conf.set("accumulo.home", System.getenv("ACCUMULO_HOME")); + + // Taking third argument as scaling factor to setting mapred.task.timeout + // and TIMEOUT_FACTOR + conf.setInt("mapred.task.timeout", opts.intTimeoutFactor * 8 * 60 * 1000); + conf.setInt(TIMEOUT_FACTOR, opts.intTimeoutFactor); + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + + // set input + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, new Path(opts.testFile)); + + // set output + job.setOutputFormatClass(TextOutputFormat.class); + FileSystem fs = FileSystem.get(conf); + Path destination = new Path(opts.outputPath); + if (fs.exists(destination)) { + log.info("Deleting existing output directory " + opts.outputPath); + fs.delete(destination, true); + } + TextOutputFormat.setOutputPath(job, destination); + + // configure default reducer: put the results into one file + job.setNumReduceTasks(1); + + // set mapper + job.setMapperClass(TestMapper.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + // don't do anything with the results (yet) a summary would be nice + job.setNumReduceTasks(0); + + // submit the job + log.info("Starting tests"); + return 0; + } + - /** - * @param args - * @throws Exception - */ + public static void main(String[] args) throws Exception { + RunTests tests = new RunTests(); + ToolRunner.run(new Configuration(), tests, args); + tests.job.waitForCompletion(true); + if (!tests.job.isSuccessful()) + System.exit(1); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java index a9b072e,0000000..85cddbb mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java @@@ -1,246 -1,0 +1,243 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.performance.metadata; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Random; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.Stat; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.security.SecurityConstants; +import org.apache.hadoop.io.Text; + +/** + * This little program can be used to write a lot of entries to the !METADATA table and measure the performance of varying numbers of threads doing !METADATA + * lookups using the batch scanner. + * + * + */ + +public class MetadataBatchScanTest { + - /** - * @param args - */ + public static void main(String[] args) throws Exception { + + final Connector connector = new ZooKeeperInstance("acu14", "localhost") + .getConnector(SecurityConstants.SYSTEM_PRINCIPAL, SecurityConstants.getSystemToken()); + + TreeSet<Long> splits = new TreeSet<Long>(); + Random r = new Random(42); + + while (splits.size() < 99999) { + splits.add((r.nextLong() & 0x7fffffffffffffffl) % 1000000000000l); + } + + Text tid = new Text("8"); + Text per = null; + + ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(); + + for (Long split : splits) { + Text er = new Text(String.format("%012d", split)); + KeyExtent ke = new KeyExtent(tid, er, per); + per = er; + + extents.add(ke); + } + + extents.add(new KeyExtent(tid, null, per)); + + if (args[0].equals("write")) { + + BatchWriter bw = connector.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig()); + + for (KeyExtent extent : extents) { + Mutation mut = extent.getPrevRowUpdateMutation(); + new TServerInstance(AddressUtil.parseAddress("192.168.1.100", 4567), "DEADBEEF").putLocation(mut); + bw.addMutation(mut); + } + + bw.close(); + } else if (args[0].equals("writeFiles")) { + BatchWriter bw = connector.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig()); + + for (KeyExtent extent : extents) { + + Mutation mut = new Mutation(extent.getMetadataEntry()); + + String dir = "/t-" + UUID.randomUUID(); + + Constants.METADATA_DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes(Constants.UTF8))); + + for (int i = 0; i < 5; i++) { + mut.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(dir + "/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes(Constants.UTF8))); + } + + bw.addMutation(mut); + } + + bw.close(); + } else if (args[0].equals("scan")) { + + int numThreads = Integer.parseInt(args[1]); + final int numLoop = Integer.parseInt(args[2]); + int numLookups = Integer.parseInt(args[3]); + + HashSet<Integer> indexes = new HashSet<Integer>(); + while (indexes.size() < numLookups) { + indexes.add(r.nextInt(extents.size())); + } + + final List<Range> ranges = new ArrayList<Range>(); + for (Integer i : indexes) { + ranges.add(extents.get(i).toMetadataRange()); + } + + Thread threads[] = new Thread[numThreads]; + + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println(runScanTest(connector, numLoop, ranges)); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + + long t1 = System.currentTimeMillis(); + + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + + long t2 = System.currentTimeMillis(); + + System.out.printf("tt : %6.2f%n", (t2 - t1) / 1000.0); + + } else { + throw new IllegalArgumentException(); + } + + } + + private static ScanStats runScanTest(Connector connector, int numLoop, List<Range> ranges) throws Exception { + Scanner scanner = null;/* + * connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); ColumnFQ.fetch(scanner, + * Constants.METADATA_LOCATION_COLUMN); ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN); + */ + + BatchScanner bs = connector.createBatchScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS, 1); + bs.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY); + Constants.METADATA_PREV_ROW_COLUMN.fetch(bs); + + bs.setRanges(ranges); + + // System.out.println(ranges); + + ScanStats stats = new ScanStats(); + for (int i = 0; i < numLoop; i++) { + ScanStat ss = scan(bs, ranges, scanner); + stats.merge(ss); + } + + return stats; + } + + private static class ScanStat { + long delta1; + long delta2; + int count1; + int count2; + } + + private static class ScanStats { + Stat delta1 = new Stat(); + Stat delta2 = new Stat(); + Stat count1 = new Stat(); + Stat count2 = new Stat(); + + void merge(ScanStat ss) { + delta1.addStat(ss.delta1); + delta2.addStat(ss.delta2); + count1.addStat(ss.count1); + count2.addStat(ss.count2); + } + + @Override + public String toString() { + return "[" + delta1 + "] [" + delta2 + "]"; + } + } + + private static ScanStat scan(BatchScanner bs, List<Range> ranges, Scanner scanner) { + + // System.out.println("ranges : "+ranges); + + ScanStat ss = new ScanStat(); + + long t1 = System.currentTimeMillis(); + int count = 0; + for (@SuppressWarnings("unused") + Entry<Key,Value> entry : bs) { + count++; + } + long t2 = System.currentTimeMillis(); + + ss.delta1 = (t2 - t1); + ss.count1 = count; + + count = 0; + t1 = System.currentTimeMillis(); + /* + * for (Range range : ranges) { scanner.setRange(range); for (Entry<Key, Value> entry : scanner) { count++; } } + */ + + t2 = System.currentTimeMillis(); + + ss.delta2 = (t2 - t1); + ss.count2 = count; + + return ss; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 71970d3,0000000..e6fcd5b mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@@ -1,258 -1,0 +1,252 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.performance.thrift; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + - import org.apache.accumulo.trace.thrift.TInfo; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.thrift.InitialMultiScan; +import org.apache.accumulo.core.data.thrift.InitialScan; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.data.thrift.MapFileInfo; +import org.apache.accumulo.core.data.thrift.MultiScanResult; +import org.apache.accumulo.core.data.thrift.ScanResult; +import org.apache.accumulo.core.data.thrift.TColumn; +import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary; +import org.apache.accumulo.core.data.thrift.TKeyExtent; +import org.apache.accumulo.core.data.thrift.TMutation; +import org.apache.accumulo.core.data.thrift.TRange; +import org.apache.accumulo.core.data.thrift.UpdateErrors; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor; +import org.apache.accumulo.core.tabletserver.thrift.TabletStats; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.client.ClientServiceHandler; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.master.state.Assignment; +import org.apache.accumulo.server.master.state.MetaDataStateStore; +import org.apache.accumulo.server.master.state.MetaDataTableScanner; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.security.SecurityConstants; +import org.apache.accumulo.server.util.TServerUtils; +import org.apache.accumulo.server.zookeeper.TransactionWatcher; ++import org.apache.accumulo.trace.thrift.TInfo; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; + +import com.beust.jcommander.Parameter; + + +/** + * The purpose of this class is to server as fake tserver that is a data sink like /dev/null. NullTserver modifies the !METADATA location entries for a table to + * point to it. This allows thrift performance to be measured by running any client code that writes to a table. + * + */ + +public class NullTserver { + + public static class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface { + + private long updateSession = 1; + + public ThriftClientHandler(Instance instance, TransactionWatcher watcher) { + super(instance, watcher); + } + + @Override + public long startUpdate(TInfo tinfo, TCredentials credentials) { + return updateSession++; + } + + @Override + public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent keyExtent, List<TMutation> mutation) {} + + @Override + public UpdateErrors closeUpdate(TInfo tinfo, long updateID) { + return new UpdateErrors(new HashMap<TKeyExtent,Long>(), new ArrayList<TConstraintViolationSummary>(), new HashMap<TKeyExtent, SecurityErrorCode>()); + } + + @Override + public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime) { + return null; + } + + @Override + public void closeMultiScan(TInfo tinfo, long scanID) {} + + @Override + public void closeScan(TInfo tinfo, long scanID) {} + + @Override + public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) { + return null; + } + + @Override + public ScanResult continueScan(TInfo tinfo, long scanID) { + return null; + } + + @Override + public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent extent, ByteBuffer splitPoint) { + + } + + @Override + public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> batch, List<TColumn> columns, + List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) { + return null; + } + + @Override + public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent extent, TRange range, List<TColumn> columns, int batchSize, + List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated) { + return null; + } + + @Override + public void update(TInfo tinfo, TCredentials credentials, TKeyExtent keyExtent, TMutation mutation) { + + } + + @Override + public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { + return null; + } + + @Override + public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException { + return null; + } + + @Override + public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { + return null; + } + + @Override + public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException, TException {} + + @Override + public void fastHalt(TInfo tinfo, TCredentials credentials, String lock) {} + + @Override + public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {} + + @Override + public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent, boolean save) throws TException {} + + @Override + public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { + return new ArrayList<ActiveScan>(); + } + + @Override + public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {} + + @Override + public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException { + + } + + @Override + public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException { + + } + + @Override + public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException { + + } + - /* - * (non-Javadoc) - * - * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.trace.thrift.TInfo, - * org.apache.accumulo.core.security.thrift.Credentials, java.util.List) - */ + @Override + public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { + } + + @Override + public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { + return new ArrayList<ActiveCompaction>(); + } + } + + static class Opts extends Help { + @Parameter(names={"-i", "--instance"}, description="instance name", required=true) + String iname = null; + @Parameter(names={"-z", "--keepers"}, description="comma-separated list of zookeeper host:ports", required=true) + String keepers = null; + @Parameter(names="--table", description="table to adopt", required=true) + String tableName = null; + @Parameter(names="--port", description="port number to use") + int port = DefaultConfiguration.getInstance().getPort(Property.TSERV_CLIENTPORT); + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(NullTserver.class.getName(), args); + + TransactionWatcher watcher = new TransactionWatcher(); + ThriftClientHandler tch = new ThriftClientHandler(HdfsZooInstance.getInstance(), watcher); + Processor<Iface> processor = new Processor<Iface>(tch); + TServerUtils.startTServer(opts.port, processor, "NullTServer", "null tserver", 2, 1000, 10*1024*1024); + + InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), opts.port); + + // modify !METADATA + ZooKeeperInstance zki = new ZooKeeperInstance(opts.iname, opts.keepers); + String tableId = Tables.getTableId(zki, opts.tableName); + + // read the locations for the table + Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange(); + MetaDataTableScanner s = new MetaDataTableScanner(zki, SecurityConstants.getSystemCredentials(), tableRange); + long randomSessionID = opts.port; + TServerInstance instance = new TServerInstance(addr, randomSessionID); + List<Assignment> assignments = new ArrayList<Assignment>(); + while (s.hasNext()) { + TabletLocationState next = s.next(); + assignments.add(new Assignment(next.extent, instance)); + } + s.close(); + // point them to this server + MetaDataStateStore store = new MetaDataStateStore(); + store.setLocations(assignments); + + while (true) { + UtilWaitThread.sleep(10000); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java index 9d01929,0000000..7cb58c9 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Framework.java @@@ -1,129 -1,0 +1,126 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.randomwalk; + +import java.io.File; +import java.io.FileInputStream; +import java.util.HashMap; +import java.util.Properties; + +import org.apache.log4j.Logger; +import org.apache.log4j.xml.DOMConfigurator; + +import com.beust.jcommander.Parameter; + +public class Framework { + + private static final Logger log = Logger.getLogger(Framework.class); + private HashMap<String,Node> nodes = new HashMap<String,Node>(); + private String configDir = null; + private static final Framework INSTANCE = new Framework(); + + /** + * @return Singleton instance of Framework + */ + public static Framework getInstance() { + return INSTANCE; + } + + public String getConfigDir() { + return configDir; + } + + public void setConfigDir(String confDir) { + configDir = confDir; + } + + /** + * Run random walk framework + * + * @param startName + * Full name of starting graph or test - * @param state - * @param confDir + */ + public int run(String startName, State state, String confDir) { + + try { + System.out.println("confDir " + confDir); + setConfigDir(confDir); + Node node = getNode(startName); + node.visit(state, new Properties()); + } catch (Exception e) { + log.error("Error during random walk", e); + return -1; + } + return 0; + } + + /** + * Creates node (if it does not already exist) and inserts into map + * + * @param id + * Name of node + * @return Node specified by id - * @throws Exception + */ + public Node getNode(String id) throws Exception { + + // check for node in nodes + if (nodes.containsKey(id)) { + return nodes.get(id); + } + + // otherwise create and put in nodes + Node node = null; + if (id.endsWith(".xml")) { + node = new Module(new File(configDir + "modules/" + id)); + } else { + node = (Test) Class.forName(id).newInstance(); + } + nodes.put(id, node); + return node; + } + + static class Opts extends org.apache.accumulo.core.cli.Help { + @Parameter(names="--configDir", required=true, description="directory containing the test configuration") + String configDir; + @Parameter(names="--logDir", required=true, description="location of the local logging directory") + String localLogPath; + @Parameter(names="--logId", required=true, description="a unique log identifier (like a hostname, or pid)") + String logId; + @Parameter(names="--module", required=true, description="the name of the module to run") + String module; + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(Framework.class.getName(), args); + + Properties props = new Properties(); + FileInputStream fis = new FileInputStream(opts.configDir + "/randomwalk.conf"); + props.load(fis); + fis.close(); + + System.setProperty("localLog", opts.localLogPath + "/" + opts.logId); + System.setProperty("nfsLog", props.getProperty("NFS_LOGPATH") + "/" + opts.logId); + + DOMConfigurator.configure(opts.configDir + "logger.xml"); + + State state = new State(props); + int retval = getInstance().run(opts.module, state, opts.configDir); + + System.exit(retval); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java index 1868ade,0000000..b74b6cd mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Node.java @@@ -1,64 -1,0 +1,63 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.randomwalk; + +import java.util.Properties; + +import org.apache.log4j.Logger; + +/** + * Represents a point in graph of RandomFramework + */ +public abstract class Node { + + protected final Logger log = Logger.getLogger(this.getClass()); + long progress = System.currentTimeMillis(); + + /** + * Visits node + * + * @param state + * Random walk state passed between nodes - * @throws Exception + */ + public abstract void visit(State state, Properties props) throws Exception; + + @Override + public boolean equals(Object o) { + if (o == null) + return false; + return toString().equals(o.toString()); + } + + @Override + public String toString() { + return this.getClass().getName(); + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + synchronized public void makingProgress() { + progress = System.currentTimeMillis(); + } + + synchronized public long lastProgress() { + return progress; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java index a0dd37c,0000000..4581b04 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/CheckBalance.java @@@ -1,110 -1,0 +1,107 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.randomwalk.concurrent; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.test.randomwalk.State; +import org.apache.accumulo.test.randomwalk.Test; + +/** + * + */ +public class CheckBalance extends Test { + + static final String LAST_UNBALANCED_TIME = "lastUnbalancedTime"; + static final String UNBALANCED_COUNT = "unbalancedCount"; + - /* (non-Javadoc) - * @see org.apache.accumulo.test.randomwalk.Node#visit(org.apache.accumulo.test.randomwalk.State, java.util.Properties) - */ + @Override + public void visit(State state, Properties props) throws Exception { + log.debug("checking balance"); + Map<String,Long> counts = new HashMap<String,Long>(); + Scanner scanner = state.getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); + scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY); + for (Entry<Key,Value> entry : scanner) { + String location = entry.getKey().getColumnQualifier().toString(); + Long count = counts.get(location); + if (count == null) + count = Long.valueOf(0); + counts.put(location, count + 1); + } + double total = 0.; + for (Long count : counts.values()) { + total += count.longValue(); + } + final double average = total / counts.size(); + final double sd = stddev(counts.values(), average); + log.debug("average " + average + ", standard deviation " + sd); + + // Check for balanced # of tablets on each node + double maxDifference = 2.0 * sd; + String unbalancedLocation = null; + long lastCount = 0L; + boolean balanced = true; + for (Entry<String,Long> entry : counts.entrySet()) { + long thisCount = entry.getValue().longValue(); + if (Math.abs(thisCount - average) > maxDifference) { + balanced = false; + log.debug("unbalanced: " + entry.getKey() + " has " + entry.getValue() + " tablets and the average is " + average); + unbalancedLocation = entry.getKey(); + lastCount = thisCount; + } + } + + // It is expected that the number of tablets will be uneven for short + // periods of time. Don't complain unless we've seen it only unbalanced + // over a 15 minute period and it's been at least three checks. + if (!balanced) { + Long last = state.getLong(LAST_UNBALANCED_TIME); + if (last != null && System.currentTimeMillis() - last > 15 * 60 * 1000) { + Integer count = state.getInteger(UNBALANCED_COUNT); + if (count == null) + count = Integer.valueOf(0); + if (count > 3) + throw new Exception("servers are unbalanced! location " + unbalancedLocation + " count " + lastCount + " too far from average " + average); + count++; + state.set(UNBALANCED_COUNT, count); + } + if (last == null) + state.set(LAST_UNBALANCED_TIME, System.currentTimeMillis()); + } else { + state.remove(LAST_UNBALANCED_TIME); + state.remove(UNBALANCED_COUNT); + } + } + + private static double stddev(Collection<Long> samples, double avg) { + int num = samples.size(); + double sqrtotal = 0.0; + for (Long s : samples) { + double diff = s.doubleValue() - avg; + sqrtotal += diff * diff; + } - return Math.sqrt(sqrtotal / (double) num); ++ return Math.sqrt(sqrtotal / num); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java ---------------------------------------------------------------------- diff --cc trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java index 049b2a2,0000000..dfa9f0c mode 100644,000000..100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java @@@ -1,132 -1,0 +1,122 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.trace.instrument.receivers; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; + +/** + * Find a Span collector via zookeeper and push spans there via Thrift RPC + * + */ +public class ZooSpanClient extends SendSpansViaThrift { + + private static final Logger log = Logger.getLogger(ZooSpanClient.class); + private static final int TOTAL_TIME_WAIT_CONNECT_MS = 10 * 1000; + private static final int TIME_WAIT_CONNECT_CHECK_MS = 100; + private static final Charset UTF8 = Charset.forName("UTF-8"); + + ZooKeeper zoo = null; + final String path; + final Random random = new Random(); + final List<String> hosts = new ArrayList<String>(); + + public ZooSpanClient(String keepers, final String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException { + super(host, service, millis); + this.path = path; + zoo = new ZooKeeper(keepers, 30 * 1000, new Watcher() { + @Override + public void process(WatchedEvent event) { + try { + if (zoo != null) { + updateHosts(path, zoo.getChildren(path, null)); + } + } catch (Exception ex) { + log.error("unable to get destination hosts in zookeeper", ex); + } + } + }); + for (int i = 0; i < TOTAL_TIME_WAIT_CONNECT_MS; i += TIME_WAIT_CONNECT_CHECK_MS) { + if (zoo.getState().equals(States.CONNECTED)) + break; + try { + Thread.sleep(TIME_WAIT_CONNECT_CHECK_MS); + } catch (InterruptedException ex) { + break; + } + } + zoo.getChildren(path, true); + } + - /* - * (non-Javadoc) - * - * @see trace.instrument.receivers.AsyncSpanReceiver#flush() - */ + @Override + public void flush() { + if (!hosts.isEmpty()) + super.flush(); + } + - /* - * (non-Javadoc) - * - * @see trace.instrument.receivers.AsyncSpanReceiver#sendSpans() - */ + @Override + void sendSpans() { + if (hosts.isEmpty()) { + if (!sendQueue.isEmpty()) { + log.error("No hosts to send data to, dropping queued spans"); + synchronized (sendQueue) { + sendQueue.clear(); + sendQueue.notifyAll(); + } + } + } else { + super.sendSpans(); + } + } + + synchronized private void updateHosts(String path, List<String> children) { + log.debug("Scanning trace hosts in zookeeper: " + path); + try { + List<String> hosts = new ArrayList<String>(); + for (String child : children) { + byte[] data = zoo.getData(path + "/" + child, null, null); + hosts.add(new String(data, UTF8)); + } + this.hosts.clear(); + this.hosts.addAll(hosts); + log.debug("Trace hosts: " + this.hosts); + } catch (Exception ex) { + log.error("unable to get destination hosts in zookeeper", ex); + } + } + + @Override + synchronized protected String getSpanKey(Map<String,String> data) { + if (hosts.size() > 0) { + String host = hosts.get(random.nextInt(hosts.size())); + log.debug("sending data to " + host); + return host; + } + return null; + } +}