Inline'd this time

On 5/21/14, 12:58 PM, Slater, David M. wrote:
You are correct that the "bin" is largely redundant. I created that because I 
was not guaranteed that the guid was uniformly random (I have seen some that aren't 
uniformly distributed), and I'm not the one who specified it. There is another mechanism 
I didn't mention, which is that the bin is prepended by a timeblock (typically an hour 
span), and my data is streaming. So essentially, I create a number of splits for the next 
timeblock for X bins, and then when the data input moves into that time block it can 
ingest directly onto empty tablets.

Gotcha.

I don't think rfile-info comes on 1.4, but I looked at the !METADATA table, and 
if I'm reading it correctly:

Oops, you're right, I think it was introduced in 1.5, but it's just a wrapper. You can invoke the PrintInfo class directly:

accumulo org.apache.accumulo.core.file.rfile.PrintInfo '/path/to/rfile.rf'

31;14006844|00 file:/t-0014fpy/A0014h4u.rf []    155454467,5450454

This is a 155 MB file with an index block of 5.45 MB. This is a typical size 
for a timeblock|bin combination.

After the data gets over a day old, I do a nightly job to merge the bins for 
each timeblock together, resulting in data like:
31;14000292|00 file:/t-0011bgk/C0011e06.rf []    1922144744,67390597
31;14000292|00 file:/t-0011bgk/C0011ed3.rf []    1942040855,68058489

This is about 4 GB with 140 MB of index. So it looks like the index size is 
about 3.5% of the files, if I'm reading it correctly.

I think you're confused about what those numbers mean. The two numbers in the Value are size in bytes and number of entries, not data size and index size.

This means that your entries are about 30bytes in size, which seems in line with what you described given the encoding/compression Accumulo is doing.

You could try playing with table.file.compress.blocksize. IIRC, if you reduce this value from the default 100k, you would get more blocks per RFile, which means that you get more index records, which, in turn, would mean you can find your records faster at the cost of a larger index.

In total, there about 440 tablets per server, with 4 servers, storing a total 
of about 2.1 TB of data (each server has a single 1 TB HDD).

I enabled bloom filters, but I didn't restart Accumulo. Is it necessary to 
restart Accumulo to do that, or are bloom filters normally generated? I have an 
index cache of 256M for each tserver.

After a quick glance at the code, it appears that every time a Reader is opened to a file, we check the configuration and use a BloomFilter if enabled. I don't think you need to restart the tservers.

Thanks,
David

-----Original Message-----
From: Josh Elser [mailto:[email protected]]
Sent: Wednesday, May 21, 2014 12:18 PM
To: [email protected]
Subject: Re: Improving Batchscanner Performance

I wouldn't expect that you'd see much difference moving the guid to the colfam 
(or colqual for that matter).

A few more questions that come to mind though...

* What's the purpose of the "bin"? Your guid is likely random anyways which 
will give you uniformity (which is what a bin prefix like that is usually meant to 
provide).

* How many splits do you have on this table? At least a few per tserver?

You could also try looking at the size of the index for a couple of rfiles for 
your table (`bin/accumulo rfile-info '/hdfs/path/to/rfile.rf'`). I would think 
that you should have faster lookups than what you noted.

On 5/20/14, 4:34 PM, Slater, David M. wrote:
10-100 entries per node (4 nodes total).

Would changing the data table structure change the batchscanner performance?

I'm using:
row             colFam          colQual         value
bin|guid        --              --              byte[]

would it be faster/slower to use:
row             colFam          colQual         value
bin             guid            --              byte[]

The difference would be that the first would include everything as a Collection 
of ranges, where the second would use a combination of ranges and setting 
column families.

-----Original Message-----
From: Josh Elser [mailto:[email protected]]
Sent: Tuesday, May 20, 2014 3:17 PM
To: [email protected]
Subject: Re: Improving Batchscanner Performance

10-100 entries/s seems slow, but that's mostly a gut feeling without context. 
Is this over more than one node? 10s of nodes?

A value of 1M would would explain the pause that you see in the beginning. That 
parameter controls the size of the buffer that each tserver will fill before 
sending data back to the BatchScanner. Too small and you pay for the excessive 
RPCs, too large, and like you're seeing, it takes longer for you to get the 
first batch. You should be able to reduce that value and see a much quick first 
result come out of the batchscanner.

Number of rfiles could impact read performance as you have to do a merged-read 
over all of the rfiles for a tablet.

On 5/20/14, 3:08 PM, Slater, David M. wrote:
I'm getting query results around 10-100 entries/s. However, it takes some time 
after starting the data scan to actually have any positive query number. The 
ingest rate into this table is about 10k entries/s.

I don't think this would be a problem with table.scan.max.memory=1M, would it?

Maybe it's a problem with the number of rfiles on disk? Or perhaps the ingest 
is overwhelming the resources?

-----Original Message-----
From: Josh Elser [mailto:[email protected]]
Sent: Tuesday, May 20, 2014 2:42 PM
To: [email protected]
Subject: Re: Improving Batchscanner Performance

No, that is how it's done. The ranges that you provide to the BatchScanner are 
binned to tablets hosted by tabletserver. It will then query up to 
numQueryThreads tservers at once to fetch results in parallel.

The point I was making is that you can only bin ranges within the scope of a 
single BatchScanner, and if you were making repeated calls to your original 
function with differing arguments, you might be incurring some more penalty. 
Like Bob, fetching random sets of rows and data is what I was trying to lead 
you to.

If the bandwidth of fetching the data is not a factor, I would probably agree 
that random reads are an issue. Do you have more details you can give about how 
long it takes to fetch the data for N rows (e.g. number of key-values/second 
and/or amount of data/second)? Are you getting an even distribution across your 
tservers or hot-spotted on a few number (the monitor should help here)? It can 
sometimes be a bit of a balancing act with optimizing locality while avoid 
suffering from hotspots.

On 5/20/14, 2:24 PM, Slater, David M. wrote:
Josh,

The data is not significantly larger than the rows that I'm fetching. in terms 
of bandwidth, the data returned is at least 2 orders of magnitude smaller than 
the ingest rate, so I don't think it's a network issue.

I'm guessing, as Bob suggested, that it has to do with fetching a "random" set 
of rows each time. I had assumed that the batchscanner would take the Collection of 
ranges (when setting batchScanner.setRanges()), sort them, and then fetch data based on 
tablet splits. I'm guessing, based on the discussion, that it is not done that way.

Does the BatchScanner fetch rows based on the ordering of the Collection?

Thanks,
David

-----Original Message-----
From: Josh Elser [mailto:[email protected]]
Sent: Tuesday, May 20, 2014 1:59 PM
To: [email protected]
Subject: Re: Improving Batchscanner Performance

You actually stated it exactly here:

     > I complete the first scan in its entirety

Loading the data into a Collection also implies that you're loading the 
complete set of rows and blocking until you find all rows, or until you fetch 
all of the data.

     > Collection<Text> rows = getRowIDs(new Range("minRow",
"maxRow"), new Text("index"), "mytable", 10, 10000);  >
Collection<byte[]> data = getRowData(rows, "mytable", 10);

Both the BatchScanner and Scanner are returning KeyValue pairs in "batches". 
The client talks to server(s), reads some data and returns it to you. By virtue of you 
loading these results from the Iterator into a Collection, you are consuming *all* 
results before proceeding to fetch the data for the rows.

Now, if, like you said, looking up the rows is drastically faster than fetching 
the data, there's a question as to why this is. Is it safe to assume that the 
data is much larger than the rows you're fetching? Have you tried to see what 
the throughput of fetching this data is? If it's bounded by network speed, you 
could try compressing the data in an iterator server-side before returning it 
to the client.

You could also consider the locality of the rows that you're fetching -- are you fetching 
a "random" set of rows each time and paying a penalty of talking to each server 
to fetch the data when you could ammortize the cost if you fetched the data for rows that 
are close together. A large amount of data being returned is likely going to trump the 
additional cost in talking to many servers.


On 5/20/14, 1:51 PM, Slater, David M. wrote:
Hi Josh,

I should have clarified - I am using a batchscanner for both lookups. I had 
thought of putting it into two different threads, but the first scan is 
typically an order of magnitude faster than the second.

The logic for upperbounding the results returned is outside of the method I 
provided. Since there is a one-to-one relationship between rowIDs and records 
on the second scan, I just limit the number of rows I send to this method.

As for blocking, I'm not sure exactly what you mean. I complete the first scan 
in its entirety, which  before entering this method with the collection of Text 
rowIDs. The method for that is:

public Collection<Text> getRowIDs(Collection<Range> ranges, Text term, String 
tablename, int queryThreads, int limit) throws TableNotFoundException {
             Set<Text> guids = new HashSet<Text>();
             if (!ranges.isEmpty()) {
                 BatchScanner scanner = conn.createBatchScanner(tablename, new 
Authorizations(), queryThreads);
                 scanner.setRanges(ranges);
                 scanner.fetchColumnFamily(term);
                 for (Map.Entry<Key, Value> entry : scanner) {
                     guids.add(entry.getKey().getColumnQualifier());
                     if (guids.size() > limit) {
                         return null;
                     }
                 }
                 scanner.close();
             }
             return guids;
         }

Essentially, my query does:
Collection<Text> rows = getRowIDs(new Range("minRow", "maxRow"),
new Text("index"), "mytable", 10, 10000); Collection<byte[]> data =
getRowData(rows, "mytable", 10);


-----Original Message-----
From: Josh Elser [mailto:[email protected]]
Sent: Tuesday, May 20, 2014 1:32 PM
To: [email protected]
Subject: Re: Improving Batchscanner Performance

Hi David,

Absolutely. What you have here is a classic producer-consumer model.
Your BatchScanner is producing results, which you then consume by your scanner, 
and ultimately return those results to the client.

The problem with your below implementation is that you're not going to be 
polling your batchscanner as aggressively as you could be. You are blocking 
while you can fetch each of those new Ranges from the Scanner before fetching 
new ranges. Have you considered splitting up the BatchScanner and Scanner code 
into two different threads?

You could easily use a ArrayBlockingQueue (or similar) to pass results from the 
BatchScanner to the Scanner. I would imagine that this would give you a fair 
improvement in performance.

Also, it doesn't appear that there's a reason you can't use a BatchScanner for 
both lookups?

One final warning, your current implementation could also hog heap very badly 
if your batchscanner returns too many records. The producer/consumer I proposed 
should help here a little bit, but you should still be asserting upper-bounds 
to avoid running out of heap space in your client.

On 5/20/14, 1:10 PM, Slater, David M. wrote:
Hey everyone,

I'm trying to improve the query performance of batchscans on my data table. I 
first scan over index tables, which returns a set of rowIDs that correspond to 
the records I am interested in. This set of records is fairly randomly (and 
uniformly) distributed across a large number of tablets, due to the randomness 
of the UID and the query itself. Then I want to scan over my data table, which 
is setup as follows:
row                     colFam          colQual         value
rowUID           --                     --                      byte[] of data

These records are fairly small (100s of bytes), but numerous (I may return 
50000 or more). The method I use to obtain this follows. Essentially, I turn 
the rows returned from the first query into a set of ranges to input into the 
batchscanner, and then return those rows, retrieving the value from them.

// returns the data associated with the given collection of rows
          public Collection<byte[]> getRowData(Collection<Text> rows, Text 
dataType, String tablename, int queryThreads) throws TableNotFoundException {
              List<byte[]> values = new ArrayList<byte[]>(rows.size());
              if (!rows.isEmpty()) {
                  BatchScanner scanner = conn.createBatchScanner(tablename, new 
Authorizations(), queryThreads);
                  List<Range> ranges = new ArrayList<Range>();
                  for (Text row : rows) {
                      ranges.add(new Range(row));
                  }
                  scanner.setRanges(ranges);
                  for (Map.Entry<Key, Value> entry : scanner) {
                      values.add(entry.getValue().get());
                  }
                  scanner.close();
              }
              return values;
          }

Is there a more efficient way to do this? I have index caches and bloom filters 
enabled (data caches are not), but I still seem to have a long query lag. Any 
thoughts on how I can improve this?

Thanks,
David

Reply via email to