You should double-check your data, you might find that it's null padded or something like that which would screw up the splits. You can do a scan from the shell which might give you hints.
On Tue, Jul 29, 2014 at 3:53 PM, Pelton, Aaron A. <[email protected]> wrote: > I agree with the idea of pooling the writers. > > As for the discussion of the keys. I get what you are saying with choosing > better keys for distribution based on frequency of the chars in the English > language. But, for this test I'm just using apache RandomStringUtils to > create a 2 char random alpha sequence to prepend, so it should be a > moderately distributed sampling of chars. However, let me emphasize that I > mean I'm seeing 1 tablet getting millions of entries in it, compared to the > remaining 35 tablets having no entries or just like 1k. To me that says > something isn't right. > > > -----Original Message----- > From: Josh Elser [mailto:[email protected]] > Sent: Tuesday, July 29, 2014 4:20 PM > To: [email protected] > Subject: Re: Request for Configuration Help for basic test. Tservers dying > and only one tablet being used > > On 7/29/14, 3:20 PM, Pelton, Aaron A. wrote: > > To followup to two of your statements/questions: > > > > 1. Good, pre-splitting your table should help with random data, but if > you're only writing data to one tablet, you're stuck (very similar to > hot-spotting reducers in MapReduce jobs). > > > > - OK so its good that the data is presplitting, but maybe this is > conceptually something that I'm not grasping about accumulo yet, but I > thought specifying the pre-splits is what causes the table to span multiple > tablets on the various tserver initially. However, the core of the data > appears to be in one specific tablet on on tserver. Each tserver appears to > have a few tablets allocated to it for the table I'm working out of. So, > I'm confused as to how to get the data to write to more than just the one > tablet/partition. I would almost think my keys I specified aren't being > matched correctly against incoming data then? > > No, it sounds like you have the idea correctly. Many tablets make up a > table, the split points for a table are what defines those tablet > boundaries. Consider you have a table where the rowID are English words ( > http://en.wikipedia.org/wiki/Letter_frequency#Relative_frequencies_of_the_first_letters_of_a_word_in_the_English_language > ). > > If you split your table on each letter (a-z), you would still see much > more activity to the tablets which host words starting with 'a', 't', and > 's' because you have significantly more data being ingested into those > tablets. > > When designing a table (specifically the rowID of the key), it's desirable > to try to make the rowID as distributed as possible across the entire > table. This helps ensure even processing across all of your nodes. Does > that make sense? > > > 2. What do you actually do when you receive an HTTP request to write to > Accumulo. It sounds like you're reading data and then writing? Is each HTTP > request creating its own BatchWriter? More insight to what a "write" looks > like in your system (in terms of Accumulo API calls) would help us make > recommendations about more efficient things you can do. > > > > Yes each http request gets its own reference to a writer or scanner, > which is closed when thre result is returned from the http request. There > are two rest services. One transforms the data and preforms some indexes > based on it and then sends both data and index to a BatchWriter. The sample > code for the data being written is below. The indexes being written are > similar but use different family and qualifier values. > > > > Text rowId = new Text(id + ":" + time); > > Text fam = new Text(COLUMN_FAMILY_KLV); > > Text qual = new Text(""); > > Value val = new Value(data.getBytes()); > > > > Mutation mut = new Mutation(rowId); > > mut.put(fam, qual, val); > > > > long memBuf = 1_000_000L; > > long timeout = 1000L; > > int numThreads = 10; > > > > BatchWriter writer = null; > > try > > { > > writer = conn.createBatchWriter(TABLE_NAME, memBuf, > timeout, numThreads); > > writer.addMutation(mut); > > } > > catch (Exception x) > > { > > // x.printStackTrace(); > > logger.error(x.toString(), x); > > result = "ERROR"; > > } > > finally > > { > > try > > { > > if (writer != null) > > { > > writer.close(); > > } > > } > > catch (Exception x) > > { > > // x.printStackTrace(); > > logger.error(x.toString(), x); > > result = "ERROR"; > > } > > } > > You could try to make a threadpool for BatchWriters instead of creating a > new one for each HTTP thread. This might help amortize the RPC cost by > sending more than one mutation at a time (the BatchWriter should be thread > safe in this regard). You then just want to call flush() instead of closing > the BatchWriter. > > I remember seeing that there are some optimizations within the BatchWriter > to write a single Mutation, but if you're really trying to saturate your > system, using fewer BatchWriters would likely help you realize more > throughput. > > > At the beginning of the test, a known subset of control data range is > created and uploaded. For the duration of the heart of the test while > ongoing writes occur, queries upon data in that control range are > performed. The rest service that handles the read eventually hits this: > > > > ArrayList<String> latlons = new ArrayList<String>(); > > Authorizations auths = new Authorizations(); > > > > Scanner scan = null; > > try > > { > > scan = conn.createScanner(TABLE_NAME, auths); > > scan.setRange(new Range(id + ":0", id + "::")); // all times > > scan.fetchColumnFamily(new Text(COLUMN_FAMILY_KLV)); > > > > for (Map.Entry<Key, Value> e : scan) > > { > > // do stuff with e > > } > > } > > catch (TableNotFoundException x) > > { > > LOGGER.fatal("The table " + TABLE_NAME + " could not be > found.", x); > > } > > finally > > { > > if (scan != null) > > { > > scan.close(); > > } > > } > > > > -----Original Message----- > > From: Josh Elser [mailto:[email protected]] > > Sent: Tuesday, July 29, 2014 1:43 PM > > To: [email protected] > > Subject: Re: Request for Configuration Help for basic test. Tservers > > dying and only one tablet being used > > > > Some comments inline > > > > On 7/29/14, 1:07 PM, Pelton, Aaron A. wrote: > >> Hi All, > >> > >> I am new to Accumulo and I apologize if the answers to my questions > >> are already posted somewhere. I've done a fair amount of googling and > >> poking around the manuals etc. > >> > >> I am just doing a simple test with two machines, one producing about > >> 600 threads on the network to stream simultaneous writes to a rest > >> service, and the other producing about 300 threads on the network to > >> perform simultaneous queries to a rest service. The rest service has > >> Accumulo API calls in it to write out and query data. > >> > >> I have inherited the following configuration > >> > >> -Squirrel Bundle distribution of Accumulo 1.5.0 > >> > >> -1 Master machine to start and stop Accumulo services on > >> > >> -12 data nodes running tservers. The first three of these also > >> running the zookeeper instances. And, nodes 4-6 running tracers. > >> > >> I have noticed the following issues with configuration and changed > >> them as follows > >> > >> -Changed swapiness to 0 on all nodes > >> > >> -Was getting OutOfMemoryExceptions after the above still, and after > >> running test for long duration. Thus, increased Java Heap size from > >> 1g to 4g, which is still far below the physical ram on the nodes. > >> > >> -Increased java heap from 1g to 2g on master node > >> > >> -I also increased the following properties > >> > >> o <property> > >> > >> o <name>tserver.memory.maps.max</name> > >> > >> o <value>2G</value> > >> > >> o </property> > >> > >> o > >> > >> o <property> > >> > >> o <name>tserver.cache.data.size</name> > >> > >> o <value>512M</value> > >> > >> o </property> > >> > >> o > >> > >> o <property> > >> > >> o <name>tserver.cache.index.size</name> > >> > >> o <value>512M</value> > >> > >> o </property> > >> > >> -Changed the ulimit for virtual memory to unlimited > >> > >> -Changed the ulimit for files opened to 65536 > >> > >> -Changed the ulimit for max user processes to 1024 > > > > These all look good. Just keep in mind that tserver.cache.data.size > > and tserver.cache.index.size will be on the JVM heap while > > tserver.memory.maps.max is off heap (assuming you're using the native > > maps which you very well should be -- I assume Sqrrl's distro set this > > up for you) > > > >> -A tomcat instance with a server socket accepting up to 1,000 threads > >> / user connections to a rest service that eventually makes a read / > >> write out to an Accumulo connector instance. > >> > >> -Changed the zookeeper connection limit max to 0 since this is just a > >> test environment > >> > >> -Noticed that code I had inherited didn't have close calls on the > >> scanner objects in the rest service b/c it was originally designed > >> for Accumulo 1.4 in which there wasn't such an API. > > > > Scanners can clean up after themselves, whereas BatchScanners don't. A > > close method was added to ScannerBase (the parent class of Scanner and > > BatchScanner) to let you seamlessly swap out a Scanner with a > BatchScanner (and vice versa) while not leaking any resources. In short, > you can call Scanner#close, but it's just a no-op. > > > >> -This may be wrong, but in an effort to see my ~900 connections > >> simultaneously get as much access to db writes/reads for servicing, I > >> up'd some thread counts for > >> > >> o <property> > >> > >> o <name>tserver.server.threads.minimum</name> > >> > >> o <value>75</value> > >> > >> o </property> > >> > >> o > >> > >> o <property> > >> > >> o <name>master.server.threads.minimum</name> > >> > >> o <value>300</value> > >> > >> o </property> > >> > >> I have a couple of problems to note: > >> > >> 1.Ingest speeds seem kinda slow. I would anticipate network overhead > >> but not enough to reduce writes to 125 records / sec when each record > >> is only a few kB. > > > > What do you actually do when you receive an HTTP request to write to > Accumulo. It sounds like you're reading data and then writing? Is each HTTP > request creating its own BatchWriter? More insight to what a "write" looks > like in your system (in terms of Accumulo API calls) would help us make > recommendations about more efficient things you can do. > > > >> a.I believe this is due to the fact that I'm only seeing one tserver > >> primarily active at ingesting, with one tbalet in particular for the > >> table receiving the bulk of the data. > >> > >> b.I have added pre-splits upon table creation for each letter of the > >> alphabet, plus the digits 0-9. As this is a test with a simple loop > >> creating ID values, I throw 2 alpha chars randomly in front of the > >> generated number in my loop and use that as the ID to distribute > >> hopefully the IDs across tablets for this table. A record ID > >> ingested might look like "bk1234:8876", whereby it has random 2 > >> chars, orig ID value, colon, and a timestamp. Sample pre-splitting: > >> (Granted the array could be constructed more gracefully, but for a > quick test, meh). > >> > >> *try* > >> > >> { > >> > >> conn.tableOperations().create(/TABLE_NAME/); > >> > >> *final*SortedSet<Text> sortedSplits = *new*TreeSet<Text>(); > >> > >> *for*(String binPrefix : *new*String[] { "a", "b", "c", "d", "e", > >> "f", "g", "h", "i", "j", "k", "l", "m", > >> > >> "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z", "1", > >> "2", "3", "4", "5", "6", "7", > >> > >> "8", "9", "0"}) > >> > >> { > >> > >> sortedSplits.add(*new*Text(binPrefix)); > >> > >> } > >> > >> conn.tableOperations().addSplits(/TABLE_NAME/, sortedSplits); > >> > >> } > >> > >> *catch*(TableExistsException | TableNotFoundException exception) > >> > >> { > >> > >> /LOGGER/.warn("Could not create table or sorted splits", exception); > >> > >> } > > > > Good, pre-splitting your table should help with random data, but if > you're only writing data to one tablet, you're stuck (very similar to > hot-spotting reducers in MapReduce jobs). > > > >> 2.Tservers running on the data node halt after about 4 hours in of > >> processing. I'm attempting to ingest into the billions, hopefully > >> trillions of records range. Generally it is the ones that aren't > >> under load in the beginning, until finally the one that is handling > >> the bulk of the load crashes typically last. In the beginning, I > >> noticed in the tserver logs the OutOfMemoryException, but haven't > >> seen that in the past few runs after the memory adjustments. In fact > >> the tserver log doesn't say anything about why it stopped. Also > >> didn't notice anything unusual in the zookeeper log other than the > occasional CancelledKeyException. > > > > Make sure you check both the tserver_hostname.debug.log, > tserver_hostname.out and tserver_hostname.err files. OOMEs sometimes don't > make it to the log file because of the JVM tearing down. You should be able > to find something as to why the tserver stopped. > > > >> 3.Lastly can anyone approximate with the 12 nodes that I have, what > >> kind of ingest speed should I see if things were configured correctly > >> in number of records per second based on small record sizes of a few kB. > >> And, is anything obviously wrong with the configurations mentioned > >> above that would improve throughput? > > > > Generally, a "normal" machine will be able to do ingest of about 200k > records at 150bytes for ~30MB/s. > > > > You might also want to try increasing tserver.mutation.queue.max to 1M > in accumulo-site.xml (restart required). You can find some extra > information about that on the releases notes: > > http://accumulo.apache.org/release_notes/1.5.1.html#known-issues. Not > sure if Sqrrl's distribution has done this already for you. > > > > > >> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > >> > >> -- > >> > >> Sincerely, > >> > >> Aaron Pelton > >> >
