Hi Josh,

i have attached the Main class,stack trace and Monitor class with some
changes.

I used some cracked code to make changes to Original Monitor class and
Start Monitor class from Main Class.

Now i can get the data i wanted, but some exception is throws and try to
fix it.

Thanks and Regards.



*Rukshan Chathuranga.*

*Department Of Computer Science & Engineering,*

*Faculty Of Engineering,*
*University Of Moratuwa. **Sri Lanka.*

*WEB: http://www.rukspot.com/ <http://rukspot.com/>*


On Sat, Jun 6, 2015 at 10:02 AM, Josh Elser <[email protected]> wrote:

> Rukshan,
>
> Double check when you start the Monitor that you have the MAC's
> accumulo-site.xml on the classpath. If you still see the ZooKeeper
> exception, please provide a stacktrace and some more information on what
> you've changed.
>
> Rukshan Chathuranga wrote:
>
>> hi,
>>
>> I run amc as separate application and try to write monitoring app
>> separately. For that i used the Monitor class with some changes.
>> I need to run them separately and get the statistics data using Monitor
>> class.
>>
>> Thanks and Regards.
>>
>>
>> /Rukshan Chathuranga./
>> /Department Of Computer Science & Engineering,
>> /
>> /Faculty Of Engineering,
>> /
>> /University Of Moratuwa. //Sri Lanka./
>> /WEB: http://www.rukspot.com/ <http://rukspot.com/>
>> /
>>
>>
>> On Fri, Jun 5, 2015 at 10:51 PM, Josh Elser <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>>     Hi Rukshan,
>>
>>     How did you start the Monitor?
>>
>>     The MiniAccumuloCluster is actually a thin wrapper around
>>     MiniAccumuloClusterImpl (in recent versions). If you cast to
>>     MiniAccumuloClusterImpl, you should be able to use the exec method
>>     to start the Monitor for you which will do the proper classpath
>>     setup for you.
>>
>>          ((MiniAccumuloClusterImpl) mac).exec(Monitor.class);
>>
>>     The authentication failure is commonly due to a security measure we
>>     have in place to ensure that rogue services cannot start
>>     participating in an Accumulo cluster. The value of `instance.secret`
>>     in accumulo-site.xml is used to control the ACL in ZooKeeper used to
>>     write to certain znodes (such as those used for service discovery).
>>     If you see a NoAuthException, it's likely that the service you
>>     started found the wrong accumulo-site.xml (or didn't find it at all).
>>
>>     - Josh
>>
>>     Rukshan Chathuranga wrote:
>>
>>         Hi,
>>
>>         i am try to get the Accumulo statistics using
>>         Monitor(org.apache.accumulo.monitor.Monitor) class. But when i
>>         try it i
>>         got the exception as below.
>>         Do you have any idea to fix this.
>>
>>         Note that i am running accumulo as mini cluster.
>>
>>         2015-06-05 21:27:58,052 [monitor.Monitor] WARN : Failed to get
>>         monitor
>>         lock org.apache.zookeeper.KeeperException$NoAuthException:
>>
>>
>>         /Rukshan Chathuranga./
>>         /Department Of Computer Science & Engineering,
>>         /
>>         /Faculty Of Engineering,
>>         /
>>         /University Of Moratuwa. //Sri Lanka./
>>         /WEB: http://www.rukspot.com/ <http://rukspot.com/>
>>         /
>>
>>
>>
2015-06-06 10:37:34,523 [client.ClientConfiguration] WARN : Found no 
client.conf in default paths. Using default client configuration values.
2015-06-06 10:37:35,132 [conf.ConfigSanityCheck] WARN : Use of instance.dfs.uri 
and instance.dfs.dir are deprecated. Consider using instance.volumes instead.
2015-06-06 10:37:35,313 [util.NativeCodeLoader] WARN : Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
2015-06-06 10:37:35,558 [vfs.AccumuloVFSClassLoader] WARN : ignoring classpath 
entry file:///lib/ext/[^.].*.jar
2015-06-06 10:37:35,574 [watcher.MonitorLog4jWatcher] INFO : Enabled 
log-forwarding
2015-06-06 10:37:35,574 [server.Accumulo] INFO : monitor starting
2015-06-06 10:37:35,574 [server.Accumulo] INFO : Instance 
eaca16a3-1dc4-4c00-ba9e-baff0485806b
2015-06-06 10:37:35,577 [server.Accumulo] INFO : Data Version 7
2015-06-06 10:37:35,577 [server.Accumulo] INFO : Attempting to talk to zookeeper
2015-06-06 10:37:35,682 [server.Accumulo] INFO : ZooKeeper connected and 
initialized, attempting to talk to HDFS
2015-06-06 10:37:35,682 [server.Accumulo] INFO : Connected to HDFS
2015-06-06 10:37:35,683 [watcher.MonitorLog4jWatcher] INFO : Changing monitor 
log4j address to rukshan-ThinkPad-T540p:53048
2015-06-06 10:37:35,683 [watcher.MonitorLog4jWatcher] INFO : Enabled 
log-forwarding
2015-06-06 10:37:35,683 [watcher.MonitorLog4jWatcher] INFO : Set watch for 
Monitor Log4j watcher
2015-06-06 10:37:35,686 [server.Accumulo] INFO : crypto.block.stream.size = 1K
2015-06-06 10:37:35,686 [server.Accumulo] INFO : crypto.cipher.algorithm.name = 
NullCipher
2015-06-06 10:37:35,687 [server.Accumulo] INFO : crypto.cipher.key.length = 128
2015-06-06 10:37:35,687 [server.Accumulo] INFO : crypto.cipher.suite = 
NullCipher
2015-06-06 10:37:35,687 [server.Accumulo] INFO : 
crypto.default.key.strategy.cipher.suite = NullCipher
2015-06-06 10:37:35,687 [server.Accumulo] INFO : 
crypto.default.key.strategy.hdfs.uri = 
2015-06-06 10:37:35,687 [server.Accumulo] INFO : 
crypto.default.key.strategy.key.location = /crypto/secret/keyEncryptionKey
2015-06-06 10:37:35,687 [server.Accumulo] INFO : crypto.module.class = 
NullCryptoModule
2015-06-06 10:37:35,687 [server.Accumulo] INFO : 
crypto.override.key.strategy.with.configured.strategy = false
2015-06-06 10:37:35,687 [server.Accumulo] INFO : 
crypto.secret.key.encryption.strategy.class = NullSecretKeyEncryptionStrategy
2015-06-06 10:37:35,688 [server.Accumulo] INFO : crypto.secure.rng = SHA1PRNG
2015-06-06 10:37:35,688 [server.Accumulo] INFO : crypto.secure.rng.provider = 
SUN
2015-06-06 10:37:35,688 [server.Accumulo] INFO : gc.cycle.delay = 5m
2015-06-06 10:37:35,688 [server.Accumulo] INFO : gc.cycle.start = 30s
2015-06-06 10:37:35,688 [server.Accumulo] INFO : gc.file.archive = false
2015-06-06 10:37:35,688 [server.Accumulo] INFO : gc.port.client = 50091
2015-06-06 10:37:35,688 [server.Accumulo] INFO : gc.threads.delete = 16
2015-06-06 10:37:35,688 [server.Accumulo] INFO : gc.trace.percent = 0.01
2015-06-06 10:37:35,688 [server.Accumulo] INFO : gc.trash.ignore = false
2015-06-06 10:37:35,688 [server.Accumulo] INFO : general.classpaths = 
      
      $ACCUMULO_HOME/lib/accumulo-server.jar,
      $ACCUMULO_HOME/lib/accumulo-core.jar,
      $ACCUMULO_HOME/lib/accumulo-start.jar,
      $ACCUMULO_HOME/lib/accumulo-fate.jar,
      $ACCUMULO_HOME/lib/accumulo-proxy.jar,
      $ACCUMULO_HOME/lib/[^.].*.jar,
      
      $ZOOKEEPER_HOME/zookeeper[^.].*.jar,
      
      $HADOOP_CONF_DIR,
      
      
      
      
      $HADOOP_PREFIX/[^.].*.jar,
      $HADOOP_PREFIX/lib/(?!slf4j)[^.].*.jar,
      
    
2015-06-06 10:37:35,688 [server.Accumulo] INFO : 
general.delegation.token.lifetime = 7d
2015-06-06 10:37:35,688 [server.Accumulo] INFO : 
general.delegation.token.update.interval = 1d
2015-06-06 10:37:35,688 [server.Accumulo] INFO : general.dynamic.classpaths = 
$ACCUMULO_HOME/lib/ext/[^.].*.jar
2015-06-06 10:37:35,688 [server.Accumulo] INFO : general.kerberos.keytab = 
2015-06-06 10:37:35,688 [server.Accumulo] INFO : general.kerberos.principal = 
2015-06-06 10:37:35,688 [server.Accumulo] INFO : general.legacy.metrics = false
2015-06-06 10:37:35,688 [server.Accumulo] INFO : general.maven.project.basedir 
= 
2015-06-06 10:37:35,688 [server.Accumulo] INFO : general.rpc.server.type = 
2015-06-06 10:37:35,688 [server.Accumulo] INFO : general.rpc.timeout = 120s
2015-06-06 10:37:35,689 [server.Accumulo] INFO : 
general.security.credential.provider.paths = 
2015-06-06 10:37:35,689 [server.Accumulo] INFO : 
general.server.message.size.max = 1G
2015-06-06 10:37:35,689 [server.Accumulo] INFO : 
general.server.simpletimer.threadpool.size = 1
2015-06-06 10:37:35,689 [server.Accumulo] INFO : general.vfs.cache.dir = 
/tmp/accumulo-vfs-cache-rukshan
2015-06-06 10:37:35,689 [server.Accumulo] INFO : general.vfs.classpaths = 
2015-06-06 10:37:35,689 [server.Accumulo] INFO : general.volume.chooser = 
org.apache.accumulo.server.fs.PerTableVolumeChooser
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.dfs.dir = /accumulo
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.dfs.uri = 
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.rpc.sasl.enabled = 
false
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.rpc.ssl.clientAuth = 
false
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.rpc.ssl.enabled = 
false
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.secret = <hidden>
2015-06-06 10:37:35,689 [server.Accumulo] INFO : 
instance.security.authenticator = 
org.apache.accumulo.server.security.handler.ZKAuthenticator
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.security.authorizor = 
org.apache.accumulo.server.security.handler.ZKAuthorizor
2015-06-06 10:37:35,689 [server.Accumulo] INFO : 
instance.security.permissionHandler = 
org.apache.accumulo.server.security.handler.ZKPermHandler
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.volumes = 
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.volumes.replacements 
= 
2015-06-06 10:37:35,689 [server.Accumulo] INFO : instance.zookeeper.host = 
localhost:2181
2015-06-06 10:37:35,690 [server.Accumulo] INFO : instance.zookeeper.timeout = 
30s
2015-06-06 10:37:35,690 [server.Accumulo] INFO : logger.dir.walog = walogs
2015-06-06 10:37:35,690 [server.Accumulo] INFO : 
master.bulk.rename.threadpool.size = 20
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.bulk.retries = 3
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.bulk.threadpool.size = 5
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.bulk.timeout = 5m
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.fate.threadpool.size = 4
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.lease.recovery.interval 
= 5s
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.port.client = 9999
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.recovery.delay = 10s
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.recovery.max.age = 60m
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.recovery.time.max = 30m
2015-06-06 10:37:35,690 [server.Accumulo] INFO : 
master.replication.coordinator.minthreads = 4
2015-06-06 10:37:35,690 [server.Accumulo] INFO : 
master.replication.coordinator.port = 10001
2015-06-06 10:37:35,690 [server.Accumulo] INFO : 
master.replication.coordinator.threadcheck.time = 5s
2015-06-06 10:37:35,690 [server.Accumulo] INFO : 
master.replication.status.scan.interval = 30s
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.server.threadcheck.time 
= 1s
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.server.threads.minimum 
= 20
2015-06-06 10:37:35,690 [server.Accumulo] INFO : master.tablet.balancer = 
org.apache.accumulo.server.master.balancer.TableLoadBalancer
2015-06-06 10:37:35,690 [server.Accumulo] INFO : 
master.walog.closer.implementation = 
org.apache.accumulo.server.master.recovery.HadoopLogCloser
2015-06-06 10:37:35,690 [server.Accumulo] INFO : monitor.banner.background = 
#304065
2015-06-06 10:37:35,690 [server.Accumulo] INFO : monitor.banner.color = #c4c4c4
2015-06-06 10:37:35,690 [server.Accumulo] INFO : monitor.banner.text = 
2015-06-06 10:37:35,690 [server.Accumulo] INFO : monitor.lock.check.interval = 
5s
2015-06-06 10:37:35,690 [server.Accumulo] INFO : monitor.log.date.format = 
yyyy/MM/dd HH:mm:ss,SSS
2015-06-06 10:37:35,690 [server.Accumulo] INFO : monitor.port.client = 50095
2015-06-06 10:37:35,690 [server.Accumulo] INFO : monitor.port.log4j = 4560
2015-06-06 10:37:35,691 [server.Accumulo] INFO : monitor.ssl.exclude.ciphers = 
2015-06-06 10:37:35,691 [server.Accumulo] INFO : monitor.ssl.include.ciphers = 
2015-06-06 10:37:35,691 [server.Accumulo] INFO : monitor.ssl.include.protocols 
= TLSv1,TLSv1.1,TLSv1.2
2015-06-06 10:37:35,691 [server.Accumulo] INFO : monitor.ssl.keyStore = 
2015-06-06 10:37:35,691 [server.Accumulo] INFO : monitor.ssl.keyStorePassword = 
<hidden>
2015-06-06 10:37:35,691 [server.Accumulo] INFO : monitor.ssl.keyStoreType = 
2015-06-06 10:37:35,691 [server.Accumulo] INFO : monitor.ssl.trustStore = 
2015-06-06 10:37:35,691 [server.Accumulo] INFO : monitor.ssl.trustStorePassword 
= <hidden>
2015-06-06 10:37:35,691 [server.Accumulo] INFO : monitor.ssl.trustStoreType = 
2015-06-06 10:37:35,691 [server.Accumulo] INFO : replication.driver.delay = 0s
2015-06-06 10:37:35,691 [server.Accumulo] INFO : replication.max.unit.size = 64M
2015-06-06 10:37:35,691 [server.Accumulo] INFO : replication.max.work.queue = 
1000
2015-06-06 10:37:35,691 [server.Accumulo] INFO : replication.name = 
2015-06-06 10:37:35,691 [server.Accumulo] INFO : 
replication.receipt.service.port = 10002
2015-06-06 10:37:35,691 [server.Accumulo] INFO : 
replication.receiver.min.threads = 1
2015-06-06 10:37:35,691 [server.Accumulo] INFO : 
replication.receiver.threadcheck.time = 30s
2015-06-06 10:37:35,691 [server.Accumulo] INFO : replication.trace.percent = 0.1
2015-06-06 10:37:35,691 [server.Accumulo] INFO : replication.work.assigner = 
org.apache.accumulo.master.replication.UnorderedWorkAssigner
2015-06-06 10:37:35,692 [server.Accumulo] INFO : 
replication.work.assignment.sleep = 30s
2015-06-06 10:37:35,692 [server.Accumulo] INFO : replication.work.attempts = 10
2015-06-06 10:37:35,692 [server.Accumulo] INFO : 
replication.work.processor.delay = 0s
2015-06-06 10:37:35,692 [server.Accumulo] INFO : 
replication.work.processor.period = 0s
2015-06-06 10:37:35,692 [server.Accumulo] INFO : replication.worker.threads = 4
2015-06-06 10:37:35,692 [server.Accumulo] INFO : rpc.javax.net.ssl.keyStore = 
$ACCUMULO_CONF_DIR/ssl/keystore.jks
2015-06-06 10:37:35,692 [server.Accumulo] INFO : 
rpc.javax.net.ssl.keyStorePassword = <hidden>
2015-06-06 10:37:35,692 [server.Accumulo] INFO : rpc.javax.net.ssl.keyStoreType 
= jks
2015-06-06 10:37:35,693 [server.Accumulo] INFO : rpc.javax.net.ssl.trustStore = 
$ACCUMULO_CONF_DIR/ssl/truststore.jks
2015-06-06 10:37:35,693 [server.Accumulo] INFO : 
rpc.javax.net.ssl.trustStorePassword = <hidden>
2015-06-06 10:37:35,693 [server.Accumulo] INFO : 
rpc.javax.net.ssl.trustStoreType = jks
2015-06-06 10:37:35,693 [server.Accumulo] INFO : rpc.sasl.qop = auth
2015-06-06 10:37:35,693 [server.Accumulo] INFO : rpc.ssl.cipher.suites = 
2015-06-06 10:37:35,693 [server.Accumulo] INFO : rpc.ssl.client.protocol = TLSv1
2015-06-06 10:37:35,693 [server.Accumulo] INFO : 
rpc.ssl.server.enabled.protocols = TLSv1,TLSv1.1,TLSv1.2
2015-06-06 10:37:35,693 [server.Accumulo] INFO : rpc.useJsse = false
2015-06-06 10:37:35,693 [server.Accumulo] INFO : table.balancer = 
org.apache.accumulo.server.master.balancer.DefaultLoadBalancer
2015-06-06 10:37:35,693 [server.Accumulo] INFO : table.bloom.enabled = false
2015-06-06 10:37:35,693 [server.Accumulo] INFO : table.bloom.error.rate = 0.5%
2015-06-06 10:37:35,693 [server.Accumulo] INFO : table.bloom.hash.type = murmur
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.bloom.key.functor = 
org.apache.accumulo.core.file.keyfunctor.RowFunctor
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.bloom.load.threshold = 1
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.bloom.size = 1048576
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.cache.block.enable = 
false
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.cache.index.enable = true
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.classpath.context = 
2015-06-06 10:37:35,694 [server.Accumulo] INFO : 
table.compaction.major.everything.idle = 1h
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.compaction.major.ratio = 
3
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.compaction.minor.idle = 
5m
2015-06-06 10:37:35,694 [server.Accumulo] INFO : 
table.compaction.minor.logs.threshold = 3
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.durability = sync
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.failures.ignore = false
2015-06-06 10:37:35,694 [server.Accumulo] INFO : table.file.blocksize = 0B
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.file.compress.blocksize 
= 100K
2015-06-06 10:37:35,695 [server.Accumulo] INFO : 
table.file.compress.blocksize.index = 128K
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.file.compress.type = gz
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.file.max = 15
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.file.replication = 0
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.file.type = rf
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.formatter = 
org.apache.accumulo.core.util.format.DefaultFormatter
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.groups.enabled = 
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.interepreter = 
org.apache.accumulo.core.util.interpret.DefaultScanInterpreter
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.majc.compaction.strategy 
= org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.replication = false
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.scan.max.memory = 512K
2015-06-06 10:37:35,695 [server.Accumulo] INFO : 
table.security.scan.visibility.default = 
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.split.endrow.size.max = 
10K
2015-06-06 10:37:35,695 [server.Accumulo] INFO : table.split.threshold = 1G
2015-06-06 10:37:35,696 [server.Accumulo] INFO : table.volume.chooser = 
org.apache.accumulo.server.fs.RandomVolumeChooser
2015-06-06 10:37:35,696 [server.Accumulo] INFO : table.walog.enabled = true
2015-06-06 10:37:35,696 [server.Accumulo] INFO : trace.password = <hidden>
2015-06-06 10:37:35,696 [server.Accumulo] INFO : trace.port.client = 12234
2015-06-06 10:37:35,696 [server.Accumulo] INFO : trace.span.receivers = 
org.apache.accumulo.tracer.ZooTraceClient
2015-06-06 10:37:35,696 [server.Accumulo] INFO : trace.table = trace
2015-06-06 10:37:35,696 [server.Accumulo] INFO : trace.token.property.password 
= <hidden>
2015-06-06 10:37:35,696 [server.Accumulo] INFO : trace.token.type = 
org.apache.accumulo.core.client.security.tokens.PasswordToken
2015-06-06 10:37:35,696 [server.Accumulo] INFO : trace.user = root
2015-06-06 10:37:35,696 [server.Accumulo] INFO : trace.zookeeper.path = /tracers
2015-06-06 10:37:35,696 [server.Accumulo] INFO : tserver.archive.walogs = false
2015-06-06 10:37:35,696 [server.Accumulo] INFO : 
tserver.assignment.concurrent.max = 2
2015-06-06 10:37:35,697 [server.Accumulo] INFO : 
tserver.assignment.duration.warning = 10m
2015-06-06 10:37:35,697 [server.Accumulo] INFO : 
tserver.bloom.load.concurrent.max = 4
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.bulk.assign.threads = 1
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.bulk.process.threads = 
1
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.bulk.retry.max = 5
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.bulk.timeout = 5m
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.cache.data.size = 15M
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.cache.index.size = 40M
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.client.timeout = 3s
2015-06-06 10:37:35,697 [server.Accumulo] INFO : 
tserver.compaction.major.concurrent.max = 3
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.compaction.major.delay 
= 30s
2015-06-06 10:37:35,697 [server.Accumulo] INFO : 
tserver.compaction.major.thread.files.open.max = 10
2015-06-06 10:37:35,697 [server.Accumulo] INFO : 
tserver.compaction.major.trace.percent = 0.1
2015-06-06 10:37:35,697 [server.Accumulo] INFO : 
tserver.compaction.minor.concurrent.max = 4
2015-06-06 10:37:35,697 [server.Accumulo] INFO : 
tserver.compaction.minor.trace.percent = 0.1
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.compaction.warn.time = 
10m
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.default.blocksize = 1M
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.dir.memdump = /tmp
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.files.open.idle = 1m
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.hold.time.max = 5m
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.memory.manager = 
org.apache.accumulo.server.tabletserver.LargestFirstMemoryManager
2015-06-06 10:37:35,697 [server.Accumulo] INFO : tserver.memory.maps.max = 256M
2015-06-06 10:37:35,698 [server.Accumulo] INFO : 
tserver.memory.maps.native.enabled = false
2015-06-06 10:37:35,698 [server.Accumulo] INFO : 
tserver.metadata.readahead.concurrent.max = 8
2015-06-06 10:37:35,698 [server.Accumulo] INFO : 
tserver.migrations.concurrent.max = 1
2015-06-06 10:37:35,698 [server.Accumulo] INFO : tserver.monitor.fs = true
2015-06-06 10:37:35,698 [server.Accumulo] INFO : tserver.mutation.queue.max = 1M
2015-06-06 10:37:35,698 [server.Accumulo] INFO : tserver.port.client = 9997
2015-06-06 10:37:35,698 [server.Accumulo] INFO : tserver.port.search = false
2015-06-06 10:37:35,698 [server.Accumulo] INFO : 
tserver.readahead.concurrent.max = 16
2015-06-06 10:37:35,698 [server.Accumulo] INFO : 
tserver.recovery.concurrent.max = 2
2015-06-06 10:37:35,698 [server.Accumulo] INFO : 
tserver.replication.batchwriter.replayer.memory = 50M
2015-06-06 10:37:35,698 [server.Accumulo] INFO : 
tserver.replication.default.replayer = 
org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer
2015-06-06 10:37:35,699 [server.Accumulo] INFO : tserver.scan.files.open.max = 
100
2015-06-06 10:37:35,699 [server.Accumulo] INFO : 
tserver.server.message.size.max = 1G
2015-06-06 10:37:35,699 [server.Accumulo] INFO : 
tserver.server.threadcheck.time = 1s
2015-06-06 10:37:35,699 [server.Accumulo] INFO : tserver.server.threads.minimum 
= 20
2015-06-06 10:37:35,699 [server.Accumulo] INFO : tserver.session.idle.max = 1m
2015-06-06 10:37:35,699 [server.Accumulo] INFO : tserver.sort.buffer.size = 50M
2015-06-06 10:37:35,699 [server.Accumulo] INFO : 
tserver.tablet.split.midpoint.files.max = 30
2015-06-06 10:37:35,699 [server.Accumulo] INFO : 
tserver.total.mutation.queue.max = 50M
2015-06-06 10:37:35,699 [server.Accumulo] INFO : tserver.wal.blocksize = 0
2015-06-06 10:37:35,699 [server.Accumulo] INFO : tserver.wal.replication = 0
2015-06-06 10:37:35,699 [server.Accumulo] INFO : tserver.wal.sync = true
2015-06-06 10:37:35,699 [server.Accumulo] INFO : tserver.wal.sync.method = hsync
2015-06-06 10:37:35,699 [server.Accumulo] INFO : tserver.walog.max.size = 256M
2015-06-06 10:37:35,700 [server.Accumulo] INFO : tserver.workq.threads = 2
2015-06-06 10:37:35,718 [trace.DistributedTrace] INFO : SpanReceiver 
org.apache.accumulo.tracer.ZooTraceClient was loaded successfully.
0
0
2015-06-06 10:37:36,703 [server.Accumulo] WARN : System swappiness setting is 
greater than ten (60) which can cause time-sensitive operations to be delayed.  
Accumulo is time sensitive because it needs to maintain distributed lock 
agreement.
1433567255722 2.56, 
1
1433567255722 2.56, 
1
1433567255722 2.56, 
1
1433567255722 2.56, 
1
1433567255722 2.56, 
1
2015-06-06 10:37:41,384 [rpc.ThriftUtil] WARN : Failed to open transport to 
rukshan-ThinkPad-T540p:59324
org.apache.thrift.transport.TTransportException: java.net.ConnectException: 
Connection refused
java.net.ConnectException: Connection refused
        at 
org.apache.accumulo.core.rpc.ThriftUtil.createClientTransport(ThriftUtil.java:313)
        at 
org.apache.accumulo.core.client.impl.ThriftTransportPool.createNewTransport(ThriftTransportPool.java:478)
        at 
org.apache.accumulo.core.client.impl.ThriftTransportPool.getTransport(ThriftTransportPool.java:410)
        at 
org.apache.accumulo.core.client.impl.ThriftTransportPool.getTransport(ThriftTransportPool.java:388)
        at 
org.apache.accumulo.core.rpc.ThriftUtil.getClient(ThriftUtil.java:117)
        at 
org.apache.accumulo.core.rpc.ThriftUtil.getTServerClient(ThriftUtil.java:160)
        at 
mil.nga.giat.geowave.service.health.data.Monitor.fetchScans(Monitor.java:577)
        at 
mil.nga.giat.geowave.service.health.data.Monitor$2.run(Monitor.java:535)
        at 
org.apache.accumulo.fate.util.LoggingRunnable.run(LoggingRunnable.java:35)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:98)
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:82)
        at 
org.apache.accumulo.core.rpc.TTimeoutTransport.create(TTimeoutTransport.java:55)
        at 
org.apache.accumulo.core.rpc.TTimeoutTransport.create(TTimeoutTransport.java:48)
        at 
org.apache.accumulo.core.rpc.ThriftUtil.createClientTransport(ThriftUtil.java:310)
        ... 9 more
1433567255722 2.56, 1433567260762 0.0, 
2
1433567255722 2.56, 1433567260762 0.0, 
2
1433567255722 2.56, 1433567260762 0.0, 
2

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map.Entry;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.master.thrift.DeadServer;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
import org.apache.accumulo.core.master.thrift.RecoveryStatus;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.util.TableInfoUtil;

public class Data {

	private static Instance instance;
	private static ServerConfigurationFactory config;
	private static AccumuloServerContext context;

public static void main(String[] args) throws AccumuloException,
			AccumuloSecurityException, IOException, IllegalArgumentException,
			IllegalAccessException {
		String instanceName = "geowave";
		String zooServers = "127.0.0.1";
		Instance inst = new ZooKeeperInstance(instanceName, zooServers);
		Connector conn = inst.getConnector("root", "password");

		SecurityUtil.serverLogin(SiteConfiguration.getInstance());

		ServerOpts opts = new ServerOpts();
		final String app = "monitor";
		opts.parseArgs(app, args);
		String hostname = opts.getAddress();

		Accumulo.setupLogging(app);
		VolumeManager fs = VolumeManagerImpl.get();

		// instance = HdfsZooInstance.getInstance();
		instance = conn.getInstance();

		config = new ServerConfigurationFactory(inst);
		context = new AccumuloServerContext(config);
		Accumulo.init(fs, config, app);
		Monitor monitor = new Monitor();
		DistributedTrace.enable(hostname, app, config.getConfiguration());

		Field[] f = monitor.getClass().getDeclaredFields();
		Field in = null;
		Field con = null;
		Field cont = null;
		for (int i = 0; i < f.length; i++) {
			if (f[i].getName().equals("instance")) {
				in = f[i];
			}
			if (f[i].getName().equals("config")) {
				con = f[i];
			}
			if (f[i].getName().equals("context")) {
				cont = f[i];
			}
		}

		in.setAccessible(true);
		in.set(monitor, inst);
		con.setAccessible(true);
		con.set(monitor, config);
		cont.setAccessible(true);
		cont.set(monitor, context);

		try {
			monitor.run(hostname);
		} catch (Exception e) {
			System.out.println(e.getMessage());
		} finally {

//			DistributedTrace.disable();
		}

		// monitor.fetchData();
		System.out.println(monitor.getDataCacheHitRateOverTime().size());

		while (true) {
			System.out.println(monitor.getDataCacheHitRateOverTime().size());
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			List<Pair<Long, Double>> li = monitor.getLoadOverTime();
			for (int i = 0; i < li.size(); i++) {
				Pair<Long, Double> p = li.get(i);
				System.out.print(p.getFirst() + " " + p.getSecond() + ", ");
			}
			System.out.println();
		}
	}

}
package mil.nga.giat.geowave.service.health.data;

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import static java.nio.charset.StandardCharsets.UTF_8;

import java.net.ConnectException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.gc.thrift.GCMonitorService;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.monitor.EmbeddedWebServer;
import org.apache.accumulo.monitor.ZooKeeperStatus;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.TableInfoUtil;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.net.HostAndPort;

/**
 * Serve master statistics with an embedded web server.
 */
public class Monitor {
	private static final Logger log = LoggerFactory.getLogger(Monitor.class);

	private static final int REFRESH_TIME = 5;
	private static long lastRecalc = 0L;
	private static double totalIngestRate = 0.0;
	private static double totalQueryRate = 0.0;
	private static double totalScanRate = 0.0;
	private static long totalEntries = 0L;
	private static int totalTabletCount = 0;
	private static long totalHoldTime = 0;
	private static long totalLookups = 0;
	private static int totalTables = 0;

	private static class MaxList<T> extends LinkedList<Pair<Long, T>> {
		private static final long serialVersionUID = 1L;

		private long maxDelta;

		public MaxList(long maxDelta) {
			this.maxDelta = maxDelta;
		}

		@Override
		public boolean add(Pair<Long, T> obj) {
			boolean result = super.add(obj);

			if (obj.getFirst() - get(0).getFirst() > maxDelta)
				remove(0);

			return result;
		}

	}

	private static final int MAX_TIME_PERIOD = 60 * 60 * 1000;
	private static final List<Pair<Long, Double>> loadOverTime = Collections
			.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Double>> ingestRateOverTime = Collections
			.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Double>> ingestByteRateOverTime = Collections
			.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Integer>> minorCompactionsOverTime = Collections
			.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Integer>> majorCompactionsOverTime = Collections
			.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Double>> lookupsOverTime = Collections
			.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Integer>> queryRateOverTime = Collections
			.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Integer>> scanRateOverTime = Collections
			.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Double>> queryByteRateOverTime = Collections
			.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Double>> indexCacheHitRateOverTime = Collections
			.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
	private static final List<Pair<Long, Double>> dataCacheHitRateOverTime = Collections
			.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
	private static EventCounter lookupRateTracker = new EventCounter();
	private static EventCounter indexCacheHitTracker = new EventCounter();
	private static EventCounter indexCacheRequestTracker = new EventCounter();
	private static EventCounter dataCacheHitTracker = new EventCounter();
	private static EventCounter dataCacheRequestTracker = new EventCounter();

	private static volatile boolean fetching = false;
	private static MasterMonitorInfo mmi;
	private static Map<String, Map<ProblemType, Integer>> problemSummary = Collections
			.emptyMap();
	private static Exception problemException;
	private static GCStatus gcStatus;

	private static Instance instance;

	private static ServerConfigurationFactory config;
	private static AccumuloServerContext context;

	private static EmbeddedWebServer server;

	private ZooLock monitorLock;

	private static class EventCounter {

		Map<String, Pair<Long, Long>> prevSamples = new HashMap<String, Pair<Long, Long>>();
		Map<String, Pair<Long, Long>> samples = new HashMap<String, Pair<Long, Long>>();
		Set<String> serversUpdated = new HashSet<String>();

		void startingUpdates() {
			serversUpdated.clear();
		}

		void updateTabletServer(String name, long sampleTime, long numEvents) {
			Pair<Long, Long> newSample = new Pair<Long, Long>(sampleTime,
					numEvents);
			Pair<Long, Long> lastSample = samples.get(name);

			if (lastSample == null || !lastSample.equals(newSample)) {
				samples.put(name, newSample);
				if (lastSample != null) {
					prevSamples.put(name, lastSample);
				}
			}
			serversUpdated.add(name);
		}

		void finishedUpdating() {
			// remove any tablet servers not updated
			samples.keySet().retainAll(serversUpdated);
			prevSamples.keySet().retainAll(serversUpdated);
		}

		double calculateRate() {
			double totalRate = 0;

			for (Entry<String, Pair<Long, Long>> entry : prevSamples.entrySet()) {
				Pair<Long, Long> prevSample = entry.getValue();
				Pair<Long, Long> sample = samples.get(entry.getKey());

				totalRate += (sample.getSecond() - prevSample.getSecond())
						/ ((sample.getFirst() - prevSample.getFirst()) / (double) 1000);
			}

			return totalRate;
		}

		long calculateCount() {
			long count = 0;

			for (Entry<String, Pair<Long, Long>> entry : prevSamples.entrySet()) {
				Pair<Long, Long> prevSample = entry.getValue();
				Pair<Long, Long> sample = samples.get(entry.getKey());

				count += sample.getSecond() - prevSample.getSecond();
			}

			return count;
		}
	}

	public static void fetchData() {
		double totalIngestRate = 0.;
		double totalIngestByteRate = 0.;
		double totalQueryRate = 0.;
		double totalQueryByteRate = 0.;
		double totalScanRate = 0.;
		long totalEntries = 0;
		int totalTabletCount = 0;
		long totalHoldTime = 0;
		long totalLookups = 0;
		boolean retry = true;

		// only recalc every so often
		long currentTime = System.currentTimeMillis();
		if (currentTime - lastRecalc < REFRESH_TIME * 1000)
			return;

		synchronized (Monitor.class) {
			if (fetching)
				return;
			fetching = true;
		}

		try {
			while (retry) {
				MasterClientService.Iface client = null;
				try {
					client = MasterClient.getConnection(context);
					if (client != null) {
						mmi = client.getMasterStats(Tracer.traceInfo(),
								context.rpcCreds());
						retry = false;
					} else {
						mmi = null;
					}
					Monitor.gcStatus = fetchGcStatus();
				} catch (Exception e) {
					mmi = null;
					log.info("Error fetching stats: " + e);
				} finally {
					if (client != null) {
						MasterClient.close(client);
					}
				}
				if (mmi == null)
					UtilWaitThread.sleep(1000);
			}
			if (mmi != null) {
				int majorCompactions = 0;
				int minorCompactions = 0;

				lookupRateTracker.startingUpdates();
				indexCacheHitTracker.startingUpdates();
				indexCacheRequestTracker.startingUpdates();
				dataCacheHitTracker.startingUpdates();
				dataCacheRequestTracker.startingUpdates();

				for (TabletServerStatus server : mmi.tServerInfo) {
					TableInfo summary = TableInfoUtil
							.summarizeTableStats(server);
					totalIngestRate += summary.ingestRate;
					totalIngestByteRate += summary.ingestByteRate;
					totalQueryRate += summary.queryRate;
					totalScanRate += summary.scanRate;
					totalQueryByteRate += summary.queryByteRate;
					totalEntries += summary.recs;
					totalHoldTime += server.holdTime;
					totalLookups += server.lookups;
					majorCompactions += summary.majors.running;
					minorCompactions += summary.minors.running;
					lookupRateTracker.updateTabletServer(server.name,
							server.lastContact, server.lookups);
					indexCacheHitTracker.updateTabletServer(server.name,
							server.lastContact, server.indexCacheHits);
					indexCacheRequestTracker.updateTabletServer(server.name,
							server.lastContact, server.indexCacheRequest);
					dataCacheHitTracker.updateTabletServer(server.name,
							server.lastContact, server.dataCacheHits);
					dataCacheRequestTracker.updateTabletServer(server.name,
							server.lastContact, server.dataCacheRequest);
				}

				lookupRateTracker.finishedUpdating();
				indexCacheHitTracker.finishedUpdating();
				indexCacheRequestTracker.finishedUpdating();
				dataCacheHitTracker.finishedUpdating();
				dataCacheRequestTracker.finishedUpdating();

				int totalTables = 0;
				for (TableInfo tInfo : mmi.tableMap.values()) {
					totalTabletCount += tInfo.tablets;
					totalTables++;
				}
				Monitor.totalIngestRate = totalIngestRate;
				Monitor.totalTables = totalTables;
				totalIngestByteRate = totalIngestByteRate / 1000000.0;
				Monitor.totalQueryRate = totalQueryRate;
				Monitor.totalScanRate = totalScanRate;
				totalQueryByteRate = totalQueryByteRate / 1000000.0;
				Monitor.totalEntries = totalEntries;
				Monitor.totalTabletCount = totalTabletCount;
				Monitor.totalHoldTime = totalHoldTime;
				Monitor.totalLookups = totalLookups;

				ingestRateOverTime.add(new Pair<Long, Double>(currentTime,
						totalIngestRate));
				ingestByteRateOverTime.add(new Pair<Long, Double>(currentTime,
						totalIngestByteRate));

				double totalLoad = 0.;
				for (TabletServerStatus status : mmi.tServerInfo) {
					if (status != null)
						totalLoad += status.osLoad;
				}
				loadOverTime
						.add(new Pair<Long, Double>(currentTime, totalLoad));

				minorCompactionsOverTime.add(new Pair<Long, Integer>(
						currentTime, minorCompactions));
				majorCompactionsOverTime.add(new Pair<Long, Integer>(
						currentTime, majorCompactions));

				lookupsOverTime.add(new Pair<Long, Double>(currentTime,
						lookupRateTracker.calculateRate()));

				queryRateOverTime.add(new Pair<Long, Integer>(currentTime,
						(int) totalQueryRate));
				queryByteRateOverTime.add(new Pair<Long, Double>(currentTime,
						totalQueryByteRate));

				scanRateOverTime.add(new Pair<Long, Integer>(currentTime,
						(int) totalScanRate));

				calcCacheHitRate(indexCacheHitRateOverTime, currentTime,
						indexCacheHitTracker, indexCacheRequestTracker);
				calcCacheHitRate(dataCacheHitRateOverTime, currentTime,
						dataCacheHitTracker, dataCacheRequestTracker);
			}
			// try {
			// Monitor.problemSummary =
			// ProblemReports.getInstance(getContext()).summarize();
			// Monitor.problemException = null;
			// } catch (Exception e) {
			// log.info("Failed to obtain problem reports ", e);
			// Monitor.problemSummary = Collections.emptyMap();
			// Monitor.problemException = e;
			// }

		} finally {
			synchronized (Monitor.class) {
				fetching = false;
				lastRecalc = currentTime;
			}
		}
	}

	private static void calcCacheHitRate(List<Pair<Long, Double>> hitRate,
			long currentTime, EventCounter cacheHits, EventCounter cacheReq) {
		long req = cacheReq.calculateCount();
		if (req > 0)
			hitRate.add(new Pair<Long, Double>(currentTime, cacheHits
					.calculateCount() / (double) cacheReq.calculateCount()));
		else
			hitRate.add(new Pair<Long, Double>(currentTime, null));
	}

	private static GCStatus fetchGcStatus() {
		GCStatus result = null;
		HostAndPort address = null;
		try {
			// Read the gc location from its lock
			ZooReaderWriter zk = ZooReaderWriter.getInstance();
			String path = ZooUtil.getRoot(instance) + Constants.ZGC_LOCK;
			List<String> locks = zk.getChildren(path, null);
			if (locks != null && locks.size() > 0) {
				Collections.sort(locks);
				address = new ServerServices(new String(zk.getData(path + "/"
						+ locks.get(0), null), UTF_8))
						.getAddress(Service.GC_CLIENT);
				GCMonitorService.Client client = ThriftUtil.getClient(
						new GCMonitorService.Client.Factory(), address,
						new AccumuloServerContext(config));
				try {
					result = client.getStatus(Tracer.traceInfo(), getContext()
							.rpcCreds());
				} finally {
					ThriftUtil.returnClient(client);
				}
			}
		} catch (Exception ex) {
			log.warn("Unable to contact the garbage collector at " + address,
					ex);
		}
		return result;
	}

	public static void main(String[] args) throws Exception {
		SecurityUtil.serverLogin(SiteConfiguration.getInstance());

		ServerOpts opts = new ServerOpts();
		final String app = "monitor";
		opts.parseArgs(app, args);
		String hostname = opts.getAddress();

		Accumulo.setupLogging(app);
		VolumeManager fs = VolumeManagerImpl.get();
		instance = HdfsZooInstance.getInstance();
		config = new ServerConfigurationFactory(instance);
		context = new AccumuloServerContext(config);
		Accumulo.init(fs, config, app);
		Monitor monitor = new Monitor();
		DistributedTrace.enable(hostname, app, config.getConfiguration());
		try {
			monitor.run(hostname);
		} finally {
			DistributedTrace.disable();
		}
	}

	private static long START_TIME;

	public void run(String hostname) {
		// try {
		// getMonitorLock();
		// } catch (Exception e) {
		// log.error("Failed to get Monitor ZooKeeper lock");
		// throw new RuntimeException(e);
		// }

		Monitor.START_TIME = System.currentTimeMillis();
		int port = config.getConfiguration().getPort(Property.MONITOR_PORT);
		try {
			log.debug("Creating monitor on port " + port);
			// server = new EmbeddedWebServer(hostname, port);
		} catch (Throwable ex) {
			log.error("Unable to start embedded web server", ex);
			throw new RuntimeException(ex);
		}

		/*
		 * server.addServlet(DefaultServlet.class, "/");
		 * server.addServlet(OperationServlet.class, "/op");
		 * server.addServlet(MasterServlet.class, "/master");
		 * server.addServlet(TablesServlet.class, "/tables");
		 * server.addServlet(TServersServlet.class, "/tservers");
		 * server.addServlet(ProblemServlet.class, "/problems");
		 * server.addServlet(GcStatusServlet.class, "/gc");
		 * server.addServlet(LogServlet.class, "/log");
		 * server.addServlet(XMLServlet.class, "/xml");
		 * server.addServlet(JSONServlet.class, "/json");
		 * server.addServlet(VisServlet.class, "/vis");
		 * server.addServlet(ScanServlet.class, "/scans");
		 * server.addServlet(Summary.class, "/trace/summary");
		 * server.addServlet(ListType.class, "/trace/listType");
		 * server.addServlet(ShowTrace.class, "/trace/show");
		 * server.addServlet(ReplicationServlet.class, "/replication"); if
		 * (server.isUsingSsl()) server.addServlet(ShellServlet.class,
		 * "/shell"); server.start();
		 */

		try {
			hostname = InetAddress.getLocalHost().getHostName();

			log.debug("Using " + hostname
					+ " to advertise monitor location in ZooKeeper");

			// String monitorAddress = HostAndPort.fromParts(hostname,
			// server.getPort()).toString();

			// ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance)
			// + Constants.ZMONITOR_HTTP_ADDR, monitorAddress.getBytes(UTF_8),
			// NodeExistsPolicy.OVERWRITE);
			// log.info("Set monitor address in zookeeper to " +
			// monitorAddress);
		} catch (Exception ex) {
			log.error("Unable to set monitor HTTP address in zookeeper", ex);
		}

		if (null != hostname) {
			// LogService.startLogListener(Monitor.getContext().getConfiguration(),
			// instance.getInstanceID(), hostname);
		} else {
			log.warn("Not starting log4j listener as we could not determine address to use");
		}

		new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()),
				"ZooKeeperStatus").start();

		// need to regularly fetch data so plot data is updated
		new Daemon(new LoggingRunnable(log, new Runnable() {

			@Override
			public void run() {
				while (true) {
					try {
						Monitor.fetchData();
					} catch (Exception e) {
						log.warn("{}", e.getMessage(), e);
					}

					UtilWaitThread.sleep(333);
				}

			}
		}), "Data fetcher").start();

		new Daemon(new LoggingRunnable(log, new Runnable() {
			@Override
			public void run() {
				while (true) {
					try {
						Monitor.fetchScans();
					} catch (Exception e) {
						log.warn("{}", e.getMessage(), e);
					}
					UtilWaitThread.sleep(5000);
				}
			}
		}), "Scan scanner").start();
	}

	public static class ScanStats {
		public final long scanCount;
		public final Long oldestScan;
		public final long fetched;

		ScanStats(List<ActiveScan> active) {
			this.scanCount = active.size();
			long oldest = -1;
			for (ActiveScan scan : active) {
				oldest = Math.max(oldest, scan.age);
			}
			this.oldestScan = oldest < 0 ? null : oldest;
			this.fetched = System.currentTimeMillis();
		}
	}

	static final Map<HostAndPort, ScanStats> allScans = new HashMap<HostAndPort, ScanStats>();

	public static Map<HostAndPort, ScanStats> getScans() {
		synchronized (allScans) {
			return new HashMap<HostAndPort, ScanStats>(allScans);
		}
	}

	protected static void fetchScans() throws Exception {
		if (instance == null)
			return;
		Connector c = context.getConnector();
		for (String server : c.instanceOperations().getTabletServers()) {
			final HostAndPort parsedServer = HostAndPort.fromString(server);
			Client tserver;
			try {
				 tserver = ThriftUtil.getTServerClient(parsedServer,
						context);
			} catch (Exception e) {
				System.out.println(e.getMessage());
				e.printStackTrace();
//				c.getInstance()
				return;
			}
			try {
				List<ActiveScan> scans = tserver.getActiveScans(null,
						context.rpcCreds());
				synchronized (allScans) {
					allScans.put(parsedServer, new ScanStats(scans));
				}
			} catch (Exception ex) {
				log.debug("Failed to get active scans from {}", server, ex);
			} finally {
				ThriftUtil.returnClient(tserver);
			}
		}
		// Age off old scan information
		Iterator<Entry<HostAndPort, ScanStats>> entryIter = allScans.entrySet()
				.iterator();
		long now = System.currentTimeMillis();
		while (entryIter.hasNext()) {
			Entry<HostAndPort, ScanStats> entry = entryIter.next();
			if (now - entry.getValue().fetched > 5 * 60 * 1000) {
				entryIter.remove();
			}
		}
	}

	/**
	 * Get the monitor lock in ZooKeeper
	 */
	private void getMonitorLock() throws KeeperException, InterruptedException {
		final String zRoot = ZooUtil.getRoot(instance);
		final String monitorPath = zRoot + Constants.ZMONITOR;
		final String monitorLockPath = zRoot + Constants.ZMONITOR_LOCK;

		// Ensure that everything is kosher with ZK as this has changed.
		ZooReaderWriter zoo = ZooReaderWriter.getInstance();
		if (zoo.exists(monitorPath)) {
			byte[] data = zoo.getData(monitorPath, null);
			// If the node isn't empty, it's from a previous install (has
			// hostname:port for HTTP server)
			if (0 != data.length) {
				// Recursively delete from that parent node
				zoo.recursiveDelete(monitorPath, NodeMissingPolicy.SKIP);

				// And then make the nodes that we expect for the incoming
				// ephemeral nodes
				zoo.putPersistentData(monitorPath, new byte[0],
						NodeExistsPolicy.FAIL);
				zoo.putPersistentData(monitorLockPath, new byte[0],
						NodeExistsPolicy.FAIL);
			} else if (!zoo.exists(monitorLockPath)) {
				// monitor node in ZK exists and is empty as we expect
				// but the monitor/lock node does not
				zoo.putPersistentData(monitorLockPath, new byte[0],
						NodeExistsPolicy.FAIL);
			}
		} else {
			// 1.5.0 and earlier
			zoo.putPersistentData(zRoot + Constants.ZMONITOR, new byte[0],
					NodeExistsPolicy.FAIL);
			if (!zoo.exists(monitorLockPath)) {
				// Somehow the monitor node exists but not monitor/lock
				zoo.putPersistentData(monitorLockPath, new byte[0],
						NodeExistsPolicy.FAIL);
			}
		}

		// Get a ZooLock for the monitor
		while (true) {
			MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher();
			monitorLock = new ZooLock(monitorLockPath);
			monitorLock.lockAsync(monitorLockWatcher, new byte[0]);

			monitorLockWatcher.waitForChange();

			if (monitorLockWatcher.acquiredLock) {
				break;
			}

			if (!monitorLockWatcher.failedToAcquireLock) {
				throw new IllegalStateException("monitor lock in unknown state");
			}

			monitorLock.tryToCancelAsyncLockOrUnlock();

			UtilWaitThread.sleep(getContext().getConfiguration()
					.getTimeInMillis(Property.MONITOR_LOCK_CHECK_INTERVAL));
		}

		log.info("Got Monitor lock.");
	}

	/**
	 * Async Watcher for monitor lock
	 */
	private static class MoniterLockWatcher implements ZooLock.AsyncLockWatcher {

		boolean acquiredLock = false;
		boolean failedToAcquireLock = false;

		@Override
		public void lostLock(LockLossReason reason) {
			Halt.halt("Monitor lock in zookeeper lost (reason = " + reason
					+ "), exiting!", -1);
		}

		@Override
		public void unableToMonitorLockNode(final Throwable e) {
			Halt.halt(-1, new Runnable() {
				@Override
				public void run() {
					log.error("No longer able to monitor Monitor lock node", e);
				}
			});

		}

		@Override
		public synchronized void acquiredLock() {
			if (acquiredLock || failedToAcquireLock) {
				Halt.halt("Zoolock in unexpected state AL " + acquiredLock
						+ " " + failedToAcquireLock, -1);
			}

			acquiredLock = true;
			notifyAll();
		}

		@Override
		public synchronized void failedToAcquireLock(Exception e) {
			log.warn("Failed to get monitor lock " + e);

			if (acquiredLock) {
				Halt.halt("Zoolock in unexpected state FAL " + acquiredLock
						+ " " + failedToAcquireLock, -1);
			}

			failedToAcquireLock = true;
			notifyAll();
		}

		public synchronized void waitForChange() {
			while (!acquiredLock && !failedToAcquireLock) {
				try {
					wait();
				} catch (InterruptedException e) {
				}
			}
		}
	}

	public static MasterMonitorInfo getMmi() {
		return mmi;
	}

	public static int getTotalTables() {
		return totalTables;
	}

	public static int getTotalTabletCount() {
		return totalTabletCount;
	}

	public static long getTotalEntries() {
		return totalEntries;
	}

	public static double getTotalIngestRate() {
		return totalIngestRate;
	}

	public static double getTotalQueryRate() {
		return totalQueryRate;
	}

	public static double getTotalScanRate() {
		return totalScanRate;
	}

	public static long getTotalHoldTime() {
		return totalHoldTime;
	}

	public static Exception getProblemException() {
		return problemException;
	}

	public static Map<String, Map<ProblemType, Integer>> getProblemSummary() {
		return problemSummary;
	}

	public static GCStatus getGcStatus() {
		return gcStatus;
	}

	public static long getTotalLookups() {
		return totalLookups;
	}

	public static long getStartTime() {
		return START_TIME;
	}

	public static List<Pair<Long, Double>> getLoadOverTime() {
		synchronized (loadOverTime) {
			return new ArrayList<Pair<Long, Double>>(loadOverTime);
		}
	}

	public static List<Pair<Long, Double>> getIngestRateOverTime() {
		synchronized (ingestRateOverTime) {
			return new ArrayList<Pair<Long, Double>>(ingestRateOverTime);
		}
	}

	public static List<Pair<Long, Double>> getIngestByteRateOverTime() {
		synchronized (ingestByteRateOverTime) {
			return new ArrayList<Pair<Long, Double>>(ingestByteRateOverTime);
		}
	}

	public static List<Pair<Long, Integer>> getMinorCompactionsOverTime() {
		synchronized (minorCompactionsOverTime) {
			return new ArrayList<Pair<Long, Integer>>(minorCompactionsOverTime);
		}
	}

	public static List<Pair<Long, Integer>> getMajorCompactionsOverTime() {
		synchronized (majorCompactionsOverTime) {
			return new ArrayList<Pair<Long, Integer>>(majorCompactionsOverTime);
		}
	}

	public static List<Pair<Long, Double>> getLookupsOverTime() {
		synchronized (lookupsOverTime) {
			return new ArrayList<Pair<Long, Double>>(lookupsOverTime);
		}
	}

	public static double getLookupRate() {
		return lookupRateTracker.calculateRate();
	}

	public static List<Pair<Long, Integer>> getQueryRateOverTime() {
		synchronized (queryRateOverTime) {
			return new ArrayList<Pair<Long, Integer>>(queryRateOverTime);
		}
	}

	public static List<Pair<Long, Integer>> getScanRateOverTime() {
		synchronized (scanRateOverTime) {
			return new ArrayList<Pair<Long, Integer>>(scanRateOverTime);
		}
	}

	public static List<Pair<Long, Double>> getQueryByteRateOverTime() {
		synchronized (queryByteRateOverTime) {
			return new ArrayList<Pair<Long, Double>>(queryByteRateOverTime);
		}
	}

	public static List<Pair<Long, Double>> getIndexCacheHitRateOverTime() {
		synchronized (indexCacheHitRateOverTime) {
			return new ArrayList<Pair<Long, Double>>(indexCacheHitRateOverTime);
		}
	}

	public static List<Pair<Long, Double>> getDataCacheHitRateOverTime() {
		synchronized (dataCacheHitRateOverTime) {
			return new ArrayList<Pair<Long, Double>>(dataCacheHitRateOverTime);
		}
	}

	public static boolean isUsingSsl() {
		return server.isUsingSsl();
	}

	public static AccumuloServerContext getContext() {
		return context;
	}
}

Reply via email to