http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java new file mode 100644 index 0000000..e9255c6 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.configuration; + +import org.apache.phoenix.pherf.PherfConstants; +import org.apache.phoenix.pherf.exception.FileLoaderException; +import org.apache.phoenix.pherf.util.ResourceList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class XMLConfigParser { + + private static final Logger logger = LoggerFactory.getLogger(XMLConfigParser.class); + private String filePattern; + private List<DataModel> dataModels; + private List<Scenario> scenarios = null; + private ResourceList resourceList; + private Collection<Path> paths = null; + + public XMLConfigParser(String pattern) throws Exception { + init(pattern); + } + + public List<DataModel> getDataModels() { + return dataModels; + } + + public synchronized Collection<Path> getPaths(String strPattern) throws Exception { + if (paths != null) { + return paths; + } + paths = getResources(strPattern); + return paths; + } + + public synchronized List<Scenario> getScenarios() throws Exception { + if (scenarios != null) { + return scenarios; + } + + scenarios = (List<Scenario>) Collections.synchronizedCollection(new ArrayList<Scenario>()); + for (Path path : getPaths(getFilePattern())) { + try { + List<Scenario> scenarioList = XMLConfigParser.readDataModel(path).getScenarios(); + for (Scenario scenario : scenarioList) { + scenarios.add(scenario); + } + } catch (JAXBException e) { + e.printStackTrace(); + } + } + return scenarios; + } + + public String getFilePattern() { + return filePattern; + } + + /** + * Unmarshall an XML data file + * + * @param file Name of File + * @return + * @throws JAXBException + */ + // TODO Remove static calls + public static DataModel readDataModel(Path file) throws JAXBException { + JAXBContext jaxbContext = JAXBContext.newInstance(DataModel.class); + Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller(); + String fName = PherfConstants.RESOURCE_SCENARIO + "/" + file.getFileName().toString(); + logger.info("Open config file: " + fName); + return (DataModel) jaxbUnmarshaller + .unmarshal(XMLConfigParser.class.getResourceAsStream(fName)); + } + + // TODO Remove static calls + public static String parseSchemaName(String fullTableName) { + String ret = null; + if (fullTableName.contains(".")) { + ret = fullTableName.substring(0, fullTableName.indexOf(".")); + } + return ret; + } + + // TODO Remove static calls + public static String parseTableName(String fullTableName) { + String ret = fullTableName; + if (fullTableName.contains(".")) { + ret = fullTableName.substring(fullTableName.indexOf(".") + 1, fullTableName.length()); + } + return ret; + } + + // TODO Remove static calls + public static void writeDataModel(DataModel data, OutputStream output) throws JAXBException { + // create JAXB context and initializing Marshaller + JAXBContext jaxbContext = JAXBContext.newInstance(DataModel.class); + Marshaller jaxbMarshaller = jaxbContext.createMarshaller(); + + // for getting nice formatted output + jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); + + // Writing to console + jaxbMarshaller.marshal(data, output); + } + + private void init(String pattern) throws Exception { + if (dataModels != null) { + return; + } + this.filePattern = pattern; + this.dataModels = new ArrayList<>(); + this.resourceList = new ResourceList(PherfConstants.RESOURCE_SCENARIO); + this.paths = getResources(this.filePattern); + if (this.paths.isEmpty()) { + throw new FileLoaderException( + "Could not load the resource files using the pattern: " + pattern); + } + for (Path path : this.paths) { + System.out.println("Adding model for path:" + path.toString()); + this.dataModels.add(XMLConfigParser.readDataModel(path)); + } + } + + private Collection<Path> getResources(String pattern) throws Exception { + Collection<Path> resourceFiles = new ArrayList<Path>(); + resourceFiles = resourceList.getResourceList(pattern); + return resourceFiles; + } +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/FileLoaderException.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/FileLoaderException.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/FileLoaderException.java new file mode 100644 index 0000000..63784eb --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/FileLoaderException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.exception; + +public class FileLoaderException extends PherfException { + public FileLoaderException(String message) throws Exception { + super(message); + } + public FileLoaderException(String message, Exception e) { + super(message, e); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/FileLoaderRuntimeException.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/FileLoaderRuntimeException.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/FileLoaderRuntimeException.java new file mode 100644 index 0000000..4ab751b --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/FileLoaderRuntimeException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.exception; + +public class FileLoaderRuntimeException extends PherfRuntimeException { + public FileLoaderRuntimeException(String message) throws Exception { + super(message); + } + public FileLoaderRuntimeException(String message, Exception e) { + super(message, e); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/PherfException.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/PherfException.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/PherfException.java new file mode 100644 index 0000000..1748b26 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/PherfException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.exception; + +public class PherfException extends Exception { + public PherfException(String message) throws Exception{ + super(message); + } + + public PherfException(String message, Exception e) { + super(message, e); + } +} + http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/PherfRuntimeException.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/PherfRuntimeException.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/PherfRuntimeException.java new file mode 100644 index 0000000..e12feba --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/exception/PherfRuntimeException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.exception; + +public class PherfRuntimeException extends RuntimeException { + public PherfRuntimeException(String message) throws Exception{ + super(message); + } + + public PherfRuntimeException(String message, Exception e) { + super(message, e); + } +} + http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorDetails.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorDetails.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorDetails.java new file mode 100644 index 0000000..0a25ca4 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorDetails.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx; + +import org.apache.phoenix.pherf.jmx.monitors.*; + +public enum MonitorDetails { + FREE_MEMORY("org.apache.phoenix.pherf:type=RuntimeFreeMemory", new FreeMemoryMonitor()), + TOTAL_MEMORY("org.apache.phoenix.pherf:type=RuntimeTotalMemory", new TotalMemoryMonitor()), + MAX_MEMORY("org.apache.phoenix.pherf:type=RuntimeMaxMemory", new MaxMemoryMonitor()), + HEAP_MEMORY_USAGE("org.apache.phoenix.pherf:type=HeapMemoryUsage", new HeapMemoryMonitor()), + NON_HEAP_MEMORY_USAGE("org.apache.phoenix.pherf:type=NonHeapMemoryUsage", new NonHeapMemoryMonitor()), + OBJECT_PENDING_FINALIZATION("org.apache.phoenix.pherf:type=ObjectPendingFinalizationCount", new ObjectPendingFinalizationCountMonitor()), + GARBAGE_COLLECTOR_ELAPSED_TIME("org.apache.phoenix.pherf:type=GarbageCollectorElapsedTime", new GarbageCollectorElapsedTimeMonitor()), + CPU_LOAD_AVERAGE("org.apache.phoenix.pherf:type=CPULoadAverage", new CPULoadAverageMonitor()), + THREAD_COUNT("org.apache.phoenix.pherf:type=PherfThreads",new ThreadMonitor()); + + private final String monitorName; + private final Monitor monitor; + + private MonitorDetails(String monitorName, Monitor monitor) { + this.monitorName = monitorName; + this.monitor = monitor; + } + + @Override + public String toString() { + return monitorName; + } + + public Monitor getMonitor() { + return monitor; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java new file mode 100644 index 0000000..391db58 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx; + +import org.apache.phoenix.pherf.PherfConstants; +import org.apache.phoenix.pherf.exception.FileLoaderRuntimeException; +import org.apache.phoenix.pherf.jmx.monitors.Monitor; +import org.apache.phoenix.pherf.result.file.ResultFileDetails; +import org.apache.phoenix.pherf.result.impl.CSVResultHandler; +import org.apache.phoenix.pherf.result.Result; +import org.apache.phoenix.pherf.result.ResultHandler; +import org.apache.phoenix.util.DateUtil; + +import javax.management.*; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class starts JMX stats for the configured monitors. Monitors should be configured in MonitorDetails Enum. + * Each stat implements {@link org.apache.phoenix.pherf.jmx.monitors.Monitor}. + * + * For the duration of any Pherf run, when the configured {@link org.apache.phoenix.pherf.PherfConstants#MONITOR_FREQUENCY} + * is reached a snapshot of each monitor is taken and dumped out to a log file. + */ +public class MonitorManager implements Runnable { + // List of MonitorDetails for all the running monitors. + // TODO Move this out to config. Possible use Guice and use IOC to inject it in. + private static final List<MonitorDetails> MONITOR_DETAILS_LIST = + Arrays.asList(MonitorDetails.values()); + private final ResultHandler resultHandler; + private final long monitorFrequency; + private AtomicLong rowCount; + private volatile boolean shouldStop = false; + private volatile boolean isRunning = false; + + public MonitorManager() throws Exception { + this(PherfConstants.MONITOR_FREQUENCY); + } + + /** + * + * @param monitorFrequency Frequency at which monitor stats are written to a log file. + * @throws Exception + */ + public MonitorManager(long monitorFrequency) throws Exception { + this.monitorFrequency = monitorFrequency; + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + + // Register all the monitors to JMX + for (MonitorDetails monitorDetails : MONITOR_DETAILS_LIST) { + StandardMBean bean = new StandardMBean(monitorDetails.getMonitor(), Monitor.class); + ObjectName monitorThreadStatName = new ObjectName(monitorDetails.toString()); + try { + mbs.registerMBean(bean, monitorThreadStatName); + } catch (InstanceAlreadyExistsException e) { + mbs.unregisterMBean(monitorThreadStatName); + mbs.registerMBean(bean, monitorThreadStatName); + } + } + rowCount = new AtomicLong(0); + this.resultHandler = new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV); + } + + @Override + public void run() { + try { + while (!shouldStop()) { + isRunning = true; + List rowValues = new ArrayList<String>(); + synchronized (resultHandler) { + for (MonitorDetails monitorDetails : MONITOR_DETAILS_LIST) { + rowValues.clear(); + try { + StandardMBean bean = new StandardMBean(monitorDetails.getMonitor(), Monitor.class); + + Calendar calendar = new GregorianCalendar(); + rowValues.add(monitorDetails); + + rowValues.add(((Monitor) bean.getImplementation()).getStat()); + rowValues.add(DateUtil.DEFAULT_MS_DATE_FORMATTER.format(calendar.getTime())); + Result + result = new Result(ResultFileDetails.CSV, ResultFileDetails.CSV_MONITOR.getHeader().toString(), rowValues); + resultHandler.write(result); + } catch (Exception e) { + throw new FileLoaderRuntimeException("Could not log monitor result.", e); + } + rowCount.getAndIncrement(); + } + try { + Thread.sleep(getMonitorFrequency()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + } + } finally { + try { + isRunning = false; + if (resultHandler != null) { + resultHandler.flush(); + resultHandler.close(); + + } + } catch (Exception e) { + throw new FileLoaderRuntimeException("Could not close monitor results.", e); + } + } + + } + + public long getMonitorFrequency() { + return monitorFrequency; + } + + public synchronized boolean shouldStop() { + return shouldStop; + } + + public synchronized void stop() { + this.shouldStop = true; + } + + public synchronized long getRowCount() { + return rowCount.get(); + } + + public synchronized boolean isRunning() { + return isRunning; + } + + /** + * This method should really only be used for testing + * + * @return List < {@link org.apache.phoenix.pherf.result.Result} > + * @throws IOException + */ + public synchronized List<Result> readResults() throws Exception { + ResultHandler handler = null; + try { + if (resultHandler.isClosed()) { + handler = new CSVResultHandler(PherfConstants.MONITOR_FILE_NAME, ResultFileDetails.CSV); + return handler.read(); + } else { + return resultHandler.read(); + } + } catch (Exception e) { + throw new FileLoaderRuntimeException("Could not close monitor results.", e); + } finally { + if (handler != null) { + handler.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/Stat.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/Stat.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/Stat.java new file mode 100644 index 0000000..ef3703c --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/Stat.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx; + +public class Stat <T>{ + private final T stat; + + public Stat(T stat) { + this.stat = stat; + } + + @Override + public String toString() { + return stat.toString(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/CPULoadAverageMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/CPULoadAverageMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/CPULoadAverageMonitor.java new file mode 100644 index 0000000..0823d43 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/CPULoadAverageMonitor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; + +public class CPULoadAverageMonitor implements Monitor { + + @Override + public Stat getStat() { + Stat<MemoryMXBean> stat = new Stat(ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()); + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ExampleMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ExampleMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ExampleMonitor.java new file mode 100644 index 0000000..1dd72e7 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ExampleMonitor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +import java.util.concurrent.atomic.AtomicLong; + +public class ExampleMonitor implements Monitor { + private final AtomicLong counter = new AtomicLong(); + + @Override + public Stat getStat() { + Stat<Long> stat = new Stat(new Long(counter.getAndIncrement())); + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/FreeMemoryMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/FreeMemoryMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/FreeMemoryMonitor.java new file mode 100644 index 0000000..848863c --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/FreeMemoryMonitor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +public class FreeMemoryMonitor implements Monitor { + + @Override + public Stat getStat() { + Stat<Long> stat = new Stat(new Long(Runtime.getRuntime().freeMemory())); + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/GarbageCollectorElapsedTimeMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/GarbageCollectorElapsedTimeMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/GarbageCollectorElapsedTimeMonitor.java new file mode 100644 index 0000000..7dc6798 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/GarbageCollectorElapsedTimeMonitor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.List; + +public class GarbageCollectorElapsedTimeMonitor implements Monitor { + + @Override + public Stat getStat() { + List<GarbageCollectorMXBean> beans = ManagementFactory.getGarbageCollectorMXBeans(); + long average = 0; + Stat<Long> stat = null; + if (beans.size() > 0) { + for (GarbageCollectorMXBean bean : beans) { + average += bean.getCollectionTime(); + } + stat = new Stat(average / beans.size()); + } else { + stat = new Stat(0); + } + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/HeapMemoryMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/HeapMemoryMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/HeapMemoryMonitor.java new file mode 100644 index 0000000..41f4746 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/HeapMemoryMonitor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +import java.lang.management.ManagementFactory; + +public class HeapMemoryMonitor implements Monitor { + + @Override + public Stat getStat() { + Stat<Long> stat = new Stat(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed()); + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/MaxMemoryMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/MaxMemoryMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/MaxMemoryMonitor.java new file mode 100644 index 0000000..d53e552 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/MaxMemoryMonitor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +public class MaxMemoryMonitor implements Monitor { + + @Override + public Stat getStat() { + Stat<Long> stat = new Stat(new Long(Runtime.getRuntime().maxMemory())); + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/Monitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/Monitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/Monitor.java new file mode 100644 index 0000000..d856398 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/Monitor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +import javax.management.MXBean; + +@MXBean +public interface Monitor { + + public Stat getStat(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/NonHeapMemoryMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/NonHeapMemoryMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/NonHeapMemoryMonitor.java new file mode 100644 index 0000000..4f0a67b --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/NonHeapMemoryMonitor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +import java.lang.management.ManagementFactory; + +public class NonHeapMemoryMonitor implements Monitor { + + @Override + public Stat getStat() { + Stat<Long> stat = new Stat(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage().getUsed()); + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ObjectPendingFinalizationCountMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ObjectPendingFinalizationCountMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ObjectPendingFinalizationCountMonitor.java new file mode 100644 index 0000000..254bf8c --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ObjectPendingFinalizationCountMonitor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; + +public class ObjectPendingFinalizationCountMonitor implements Monitor { + + @Override + public Stat getStat() { + Stat<MemoryMXBean> stat = new Stat(ManagementFactory.getMemoryMXBean().getObjectPendingFinalizationCount()); + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ThreadMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ThreadMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ThreadMonitor.java new file mode 100644 index 0000000..260af71 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/ThreadMonitor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +import java.lang.management.ManagementFactory; + +public class ThreadMonitor implements Monitor { + + @Override + public Stat getStat() { + Stat<Integer> stat = new Stat(new Integer(ManagementFactory.getThreadMXBean().getThreadCount())); + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/TotalMemoryMonitor.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/TotalMemoryMonitor.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/TotalMemoryMonitor.java new file mode 100644 index 0000000..6d7336a --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/monitors/TotalMemoryMonitor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.jmx.monitors; + +import org.apache.phoenix.pherf.jmx.Stat; + +public class TotalMemoryMonitor implements Monitor { + + @Override + public Stat getStat() { + Stat<Long> stat = new Stat(new Long(Runtime.getRuntime().totalMemory())); + return stat; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java new file mode 100644 index 0000000..8ddce34 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/loaddata/DataLoader.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.loaddata; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.phoenix.pherf.result.ResultUtil; +import org.apache.phoenix.pherf.util.ResourceList; +import org.apache.phoenix.pherf.util.RowCalculator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.pherf.PherfConstants; +import org.apache.phoenix.pherf.configuration.Column; +import org.apache.phoenix.pherf.configuration.DataModel; +import org.apache.phoenix.pherf.configuration.Scenario; +import org.apache.phoenix.pherf.configuration.XMLConfigParser; +import org.apache.phoenix.pherf.exception.PherfException; +import org.apache.phoenix.pherf.result.DataLoadThreadTime; +import org.apache.phoenix.pherf.result.DataLoadTimeSummary; +import org.apache.phoenix.pherf.rules.DataValue; +import org.apache.phoenix.pherf.rules.RulesApplier; +import org.apache.phoenix.pherf.util.PhoenixUtil; + +public class DataLoader { + private static final Logger logger = LoggerFactory.getLogger(DataLoader.class); + private final PhoenixUtil pUtil = new PhoenixUtil(); + private final XMLConfigParser parser; + private final RulesApplier rulesApplier; + private final ResultUtil resultUtil; + private final ExecutorService pool; + private final Properties properties; + + private final int threadPoolSize; + private final int batchSize; + + public DataLoader(XMLConfigParser parser) throws Exception { + this(new ResourceList().getProperties(), parser); + } + + /** + * Default the writers to use up all available cores for threads. + * + * @param parser + * @throws Exception + */ + public DataLoader(Properties properties, XMLConfigParser parser) throws Exception { + this.parser = parser; + this.properties = properties; + this.rulesApplier = new RulesApplier(this.parser); + this.resultUtil = new ResultUtil(); + int size = Integer.parseInt(properties.getProperty("pherf.default.dataloader.threadpool")); + this.threadPoolSize = (size == 0) ? Runtime.getRuntime().availableProcessors() : size; + this.pool = Executors.newFixedThreadPool(this.threadPoolSize); + String bSize = properties.getProperty("pherf.default.dataloader.batchsize"); + this.batchSize = (bSize == null) ? PherfConstants.DEFAULT_BATCH_SIZE : Integer.parseInt(bSize); + } + + public void execute() throws Exception { + try { + DataModel model = getParser().getDataModels().get(0); + DataLoadTimeSummary dataLoadTimeSummary = new DataLoadTimeSummary(); + DataLoadThreadTime dataLoadThreadTime = new DataLoadThreadTime(); + + for (Scenario scenario : getParser().getScenarios()) { + List<Future> writeBatches = new ArrayList<Future>(); + logger.info("\nLoading " + scenario.getRowCount() + + " rows for " + scenario.getTableName()); + long start = System.currentTimeMillis(); + + RowCalculator rowCalculator = new RowCalculator(getThreadPoolSize(), scenario.getRowCount()); + for (int i = 0; i < getThreadPoolSize(); i++) { + List<Column> phxMetaCols = pUtil.getColumnsFromPhoenix( + scenario.getSchemaName(), + scenario.getTableNameWithoutSchemaName(), + pUtil.getConnection()); + int threadRowCount = rowCalculator.getNext(); + logger.info("Kick off thread (#" + i + ")for upsert with (" + threadRowCount + ") rows."); + Future<Info> write = upsertData(scenario, phxMetaCols, + scenario.getTableName(), threadRowCount, dataLoadThreadTime); + writeBatches.add(write); + } + + if (writeBatches.isEmpty()) { + throw new PherfException( + "Holy shit snacks! Throwing up hands in disbelief and exiting. Could not write data for some unknown reason."); + } + + int sumRows = 0, sumDuration = 0; + // Wait for all the batch threads to complete + for (Future<Info> write : writeBatches) { + Info writeInfo = write.get(); + sumRows += writeInfo.getRowCount(); + sumDuration += writeInfo.getDuration(); + logger.info("Executor writes complete with row count (" + + writeInfo.getRowCount() + + ") in Ms (" + + writeInfo.getDuration() + ")"); + } + logger.info("Writes completed with total row count (" + sumRows + + ") with total time of(" + sumDuration + ") Ms"); + dataLoadTimeSummary.add(scenario.getTableName(), sumRows, (int) (System.currentTimeMillis() - start)); + + + // always update stats for Phoenix base tables + updatePhoenixStats(scenario.getTableName()); + } + resultUtil.write(dataLoadTimeSummary); + resultUtil.write(dataLoadThreadTime); + + } finally { + pool.shutdown(); + } + } + + /** + * TODO Move this method to PhoenixUtil + * Update Phoenix table stats + * + * @param tableName + * @throws Exception + */ + public void updatePhoenixStats(String tableName) throws Exception { + logger.info("Updating stats for " + tableName); + pUtil.executeStatement("UPDATE STATISTICS " + tableName); + } + + public void printTableColumns(Scenario scenario) throws Exception { + Connection connection = null; + try { + connection = pUtil.getConnection(); + List<Column> columnList = pUtil.getColumnsFromPhoenix( + scenario.getSchemaName(), + scenario.getTableNameWithoutSchemaName(), connection); + + logger.debug("\n\nColumns from metadata:"); + for (Column column : columnList) { + logger.debug("\nColumn name: [" + column.getName() + + "]; type: [" + column.getType() + "]; length: [" + + column.getLength() + "]"); + } + + if (null != scenario.getDataOverride()) { + logger.debug("\n\nColumns from override:"); + for (Column column : scenario.getDataOverride().getColumn()) { + logger.debug("\nColumn name: [" + column.getName() + "]; DataSequence: [" + column.getDataSequence() + + "]; length: [" + column.getLength() + "]"); + } + } + + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + // Swallow since we are closing anyway + e.printStackTrace(); + } + } + } + } + + public Future<Info> upsertData(final Scenario scenario, + final List<Column> columns, final String tableName, + final int rowCount, final DataLoadThreadTime dataLoadThreadTime) { + Future<Info> future = pool.submit(new Callable<Info>() { + @Override + public Info call() throws Exception { + int rowsCreated = 0; + Info info = null; + long start = 0, duration = 0, totalDuration = 0; + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Connection connection = null; + try { + connection = pUtil.getConnection(); + long logStartTime = System.currentTimeMillis(); + for (int i = 0; i < rowCount; i++) { + String sql = buildSql(columns, tableName); + PreparedStatement stmt = connection + .prepareStatement(sql); + stmt = buildStatement(scenario, columns, stmt, simpleDateFormat); + start = System.currentTimeMillis(); + rowsCreated += stmt.executeUpdate(); + stmt.close(); + if ((i % getBatchSize()) == 0) { + connection.commit(); + duration = System.currentTimeMillis() - start; + logger.info("Committed Batch. Total " + tableName + " rows for this thread (" + this.hashCode() + ") in (" + + duration + ") Ms"); + + if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 0) { + dataLoadThreadTime.add(tableName, Thread.currentThread().getName(), i, System.currentTimeMillis() - logStartTime); + logStartTime = System.currentTimeMillis(); + } + } + } + } finally { + if (connection != null) { + try { + connection.commit(); + duration = System.currentTimeMillis() - start; + logger.info("Committed Final Batch. Duration (" + duration + ") Ms"); + connection.close(); + } catch (SQLException e) { + // Swallow since we are closing anyway + e.printStackTrace(); + } + } + } + totalDuration = System.currentTimeMillis() - start; + return new Info(totalDuration, rowsCreated); + } + }); + return future; + } + + private PreparedStatement buildStatement(Scenario scenario, + List<Column> columns, PreparedStatement statement, SimpleDateFormat simpleDateFormat) throws Exception { + int count = 1; + for (Column column : columns) { + + DataValue dataValue = getRulesApplier().getDataForRule(scenario, + column); + switch (column.getType()) { + case VARCHAR: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.VARCHAR); + } else { + statement.setString(count, dataValue.getValue()); + } + break; + case CHAR: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.CHAR); + } else { + statement.setString(count, dataValue.getValue()); + } + break; + case DECIMAL: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.DECIMAL); + } else { + statement.setBigDecimal(count, + new BigDecimal(dataValue.getValue())); + } + break; + case INTEGER: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.INTEGER); + } else { + statement.setInt(count, + Integer.parseInt(dataValue.getValue())); + } + break; + case DATE: + if (dataValue.getValue().equals("")) { + statement.setNull(count, Types.DATE); + } else { + Date date = new java.sql.Date(simpleDateFormat.parse(dataValue.getValue()).getTime()); + statement.setDate(count, date); + } + break; + default: + break; + } + count++; + } + return statement; + } + + private String buildSql(final List<Column> columns, final String tableName) { + StringBuilder builder = new StringBuilder(); + builder.append("upsert into "); + builder.append(tableName); + builder.append(" ("); + int count = 1; + for (Column column : columns) { + builder.append(column.getName()); + if (count < columns.size()) { + builder.append(","); + } else { + builder.append(")"); + } + count++; + } + builder.append(" VALUES ("); + for (int i = 0; i < columns.size(); i++) { + if (i < columns.size() - 1) { + builder.append("?,"); + } else { + builder.append("?)"); + } + } + return builder.toString(); + } + + public XMLConfigParser getParser() { + return parser; + } + + public RulesApplier getRulesApplier() { + return rulesApplier; + } + + public int getBatchSize() { + return batchSize; + } + + public int getThreadPoolSize() { + return threadPoolSize; + } + + private class Info { + + private final int rowCount; + private final long duration; + + public Info(long duration, int rows) { + this(0, 0, 0, duration, rows); + } + + public Info(int regionSize, int completedIterations, int timesSeen, + long duration, int rows) { + this.duration = duration; + this.rowCount = rows; + } + + public long getDuration() { + return duration; + } + + public int getRowCount() { + return rowCount; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java new file mode 100644 index 0000000..47aa2bc --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadThreadTime.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.result; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.phoenix.pherf.PherfConstants; + +public class DataLoadThreadTime { + private List<WriteThreadTime> threadTime = new ArrayList<WriteThreadTime>(); + + public List<WriteThreadTime> getThreadTime() { + return threadTime; + } + + public void setThreadTime(List<WriteThreadTime> threadTime) { + this.threadTime = threadTime; + } + + public void add(String tableName, String threadName, int rowsUpserted, long timeInMsPerMillionRows) { + threadTime.add(new WriteThreadTime(tableName, threadName, rowsUpserted, timeInMsPerMillionRows)); + } + + public String getCsvTitle() { + return "TABLE_NAME,THREAD_NAME,ROWS_UPSERTED,TIME_IN_MS_PER_" + PherfConstants.LOG_PER_NROWS + "_ROWS\n"; + } +} + +class WriteThreadTime { + private String tableName; + private String threadName; + private int rowsUpserted; + private long timeInMsPerMillionRows; + + public WriteThreadTime(String tableName, String threadName, int rowsUpserted, long timeInMsPerMillionRows) { + this.tableName = tableName; + this.threadName = threadName; + this.rowsUpserted = rowsUpserted; + this.timeInMsPerMillionRows = timeInMsPerMillionRows; + } + + public String getTableName() { + return tableName; + } + public void setTableName(String tableName) { + this.tableName = tableName; + } + public String getThreadName() { + return threadName; + } + public void setThreadName(String threadName) { + this.threadName = threadName; + } + public long getTimeInMsPerMillionRows() { + return timeInMsPerMillionRows; + } + public void setTimeInMsPerMillionRows(long timeInMsPerMillionRows) { + this.timeInMsPerMillionRows = timeInMsPerMillionRows; + } + + public List<ResultValue> getCsvRepresentation(ResultUtil util) { + List<ResultValue> rowValues = new ArrayList<>(); + rowValues.add(new ResultValue(util.convertNull(getTableName()))); + rowValues.add(new ResultValue(util.convertNull(getThreadName()))); + rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRowsUpserted())))); + rowValues.add(new ResultValue(util.convertNull(String.valueOf(getTimeInMsPerMillionRows())))); + + return rowValues; + } + + public int getRowsUpserted() { + return rowsUpserted; + } + + public void setRowsUpserted(int rowsUpserted) { + this.rowsUpserted = rowsUpserted; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java new file mode 100644 index 0000000..ae4838b --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataLoadTimeSummary.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.result; + +import java.util.ArrayList; +import java.util.List; + +public class DataLoadTimeSummary { + private List<TableLoadTime> tableLoadTime = new ArrayList<TableLoadTime>(); + + public List<TableLoadTime> getTableLoadTime() { + return tableLoadTime; + } + + public void add(String tableName, int rowCount, int durationInMs) { + tableLoadTime.add(new TableLoadTime(tableName, rowCount, durationInMs)); + } + + public void setTableLoadTime(List<TableLoadTime> tableLoadTime) { + this.tableLoadTime = tableLoadTime; + } + +} + +class TableLoadTime { + private int durationInMs; + private String tableName; + private int rowCount; + + public TableLoadTime(String tableName, int rowCount, int durationInMs) { + this.tableName = tableName; + this.rowCount = rowCount; + this.durationInMs = durationInMs; + } + + public List<ResultValue> getCsvRepresentation(ResultUtil util) { + List<ResultValue> rowValues = new ArrayList<>(); + rowValues.add(new ResultValue(util.convertNull(getTableName()))); + rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRowCount())))); + rowValues.add(new ResultValue(util.convertNull(String.valueOf(getDurationInMs())))); + + return rowValues; + } + + public int getDurationInMs() { + return durationInMs; + } + + public void setDurationInMs(int durationInMs) { + this.durationInMs = durationInMs; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public int getRowCount() { + return rowCount; + } + + public void setRowCount(int rowCount) { + this.rowCount = rowCount; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java new file mode 100644 index 0000000..71ffeaf --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/DataModelResult.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.result; + +import java.util.ArrayList; +import java.util.List; + +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.phoenix.pherf.configuration.DataModel; + +@XmlRootElement(namespace = "org.apache.phoenix.pherf.result") +public class DataModelResult extends DataModel { + private List<ScenarioResult> scenarioResult = new ArrayList<ScenarioResult>(); + private String zookeeper; + + public List<ScenarioResult> getScenarioResult() { + return scenarioResult; + } + + public void setScenarioResult(List<ScenarioResult> scenarioResult) { + this.scenarioResult = scenarioResult; + } + + public DataModelResult() { + } + + private DataModelResult(String name, String release, String zookeeper) { + this.setName(name); + this.setRelease(release); + this.zookeeper = zookeeper; + } + + /** + * Copy constructor + * + * @param dataModelResult + */ + public DataModelResult(DataModelResult dataModelResult) { + this(dataModelResult.getName(), dataModelResult.getRelease(), dataModelResult.getZookeeper()); + this.scenarioResult = dataModelResult.getScenarioResult(); + } + + public DataModelResult(DataModel dataModel, String zookeeper) { + this(dataModel.getName(), dataModel.getRelease(), zookeeper); + } + + public DataModelResult(DataModel dataModel) { + this(dataModel, null); + } + + @XmlAttribute() + public String getZookeeper() { + return zookeeper; + } + + public void setZookeeper(String zookeeper) { + this.zookeeper = zookeeper; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java new file mode 100644 index 0000000..ac50301 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QueryResult.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.result; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.phoenix.pherf.PherfConstants.RunMode; +import org.apache.phoenix.pherf.configuration.Query; +import org.apache.phoenix.util.DateUtil; + +public class QueryResult extends Query { + private List<ThreadTime> threadTimes = new ArrayList<ThreadTime>(); + + public synchronized List<ThreadTime> getThreadTimes() { + return this.threadTimes; + } + + public synchronized void setThreadTimes(List<ThreadTime> threadTimes) { + this.threadTimes = threadTimes; + } + + public QueryResult(Query query) { + this.setStatement(query.getStatement()); + this.setExpectedAggregateRowCount(query.getExpectedAggregateRowCount()); + this.setTenantId(query.getTenantId()); + this.setDdl(query.getDdl()); + this.setQueryGroup(query.getQueryGroup()); + this.setId(query.getId()); + } + + public QueryResult() { + } + + public Date getStartTime() { + Date startTime = null; + for (ThreadTime tt : getThreadTimes()) { + Date currStartTime = tt.getStartTime(); + if (null != currStartTime) { + if (null == startTime) { + startTime = currStartTime; + } else if (currStartTime.compareTo(startTime) < 0) { + startTime = currStartTime; + } + } + } + return startTime; + } + + public int getAvgMaxRunTimeInMs() { + int totalRunTime = 0; + for (ThreadTime tt : getThreadTimes()) { + if (null != tt.getMaxTimeInMs()) { + totalRunTime += tt.getMaxTimeInMs().getElapsedDurationInMs(); + } + } + return totalRunTime / getThreadTimes().size(); + } + + public int getAvgMinRunTimeInMs() { + int totalRunTime = 0; + for (ThreadTime tt : getThreadTimes()) { + if (null != tt.getMinTimeInMs()) { + totalRunTime += tt.getMinTimeInMs().getElapsedDurationInMs(); + } + } + return totalRunTime / getThreadTimes().size(); + } + + public int getAvgRunTimeInMs() { + int totalRunTime = 0; + for (ThreadTime tt : getThreadTimes()) { + if (null != tt.getAvgTimeInMs()) { + totalRunTime += tt.getAvgTimeInMs(); + } + } + return totalRunTime / getThreadTimes().size(); + } + + public List<ResultValue> getCsvRepresentation(ResultUtil util) { + List<ResultValue> rowValues = new ArrayList<>(); + rowValues.add(new ResultValue(util.convertNull(getStartTimeText()))); + rowValues.add(new ResultValue(util.convertNull(this.getQueryGroup()))); + rowValues.add(new ResultValue(util.convertNull(this.getStatement()))); + rowValues.add(new ResultValue(util.convertNull(this.getTenantId()))); + rowValues.add(new ResultValue(util.convertNull(String.valueOf(getAvgMaxRunTimeInMs())))); + rowValues.add(new ResultValue(util.convertNull(String.valueOf(getAvgRunTimeInMs())))); + rowValues.add(new ResultValue(util.convertNull(String.valueOf(getAvgMinRunTimeInMs())))); + rowValues.add(new ResultValue(util.convertNull(String.valueOf(getRunCount())))); + return rowValues; + } + + private int getRunCount() { + int totalRunCount = 0; + for (ThreadTime tt : getThreadTimes()) { + totalRunCount += tt.getRunCount(); + } + return totalRunCount; + } + + public List<List<ResultValue>> getCsvDetailedRepresentation(ResultUtil util, RunMode runMode) { + List<List<ResultValue>> rows = new ArrayList<>(); + for (ThreadTime tt : getThreadTimes()) { + for (List<ResultValue> runTime : runMode == RunMode.PERFORMANCE ? + tt.getCsvPerformanceRepresentation(util) : + tt.getCsvFunctionalRepresentation(util)) { + List<ResultValue> rowValues = new ArrayList<>(); + rowValues.add(new ResultValue(util.convertNull(getStartTimeText()))); + rowValues.add(new ResultValue(util.convertNull(this.getQueryGroup()))); + rowValues.add(new ResultValue(util.convertNull(this.getStatement()))); + rowValues.add(new ResultValue(util.convertNull(this.getTenantId()))); + rowValues.addAll(runTime); + rows.add(rowValues); + } + } + return rows; + } + + private String getStartTimeText() { + return (null == this.getStartTime()) + ? "" + : DateUtil.DEFAULT_MS_DATE_FORMATTER.format(this.getStartTime()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java new file mode 100644 index 0000000..c76c2e5 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/QuerySetResult.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.result; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.phoenix.pherf.configuration.QuerySet; + +public class QuerySetResult extends QuerySet { + + private List<QueryResult> queryResults = new ArrayList<QueryResult>(); + + public QuerySetResult(QuerySet querySet) { + this.setConcurrency(querySet.getConcurrency()); + this.setNumberOfExecutions(querySet.getNumberOfExecutions()); + this.setExecutionDurationInMs(querySet.getExecutionDurationInMs()); + this.setExecutionType(querySet.getExecutionType()); + } + + public QuerySetResult() { + } + + public List<QueryResult> getQueryResults() { + return queryResults; + } + + public void setQueryResults(List<QueryResult> queryResults) { + this.queryResults = queryResults; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java new file mode 100644 index 0000000..104e388 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/Result.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.result; + +import org.apache.phoenix.pherf.result.file.ResultFileDetails; + +import java.util.List; + +/** + * Common container for Pherf results. + */ +public class Result { + private final List<ResultValue> resultValues; + private final ResultFileDetails type; + private final String header; + + /** + * + * @param type {@link org.apache.phoenix.pherf.result.file.ResultFileDetails} Currently unused, but gives metadata about the + * contents of the result. + * @param header Used for CSV, otherwise pass null. For CSV pass comma separated string of header fields. + * @param messageValues List<{@link ResultValue} All fields combined represent the data + * for a row to be written. + */ + public Result(ResultFileDetails type, String header, List<ResultValue> messageValues) { + this.resultValues = messageValues; + this.header = header; + this.type = type; + } + + public List<ResultValue> getResultValues() { + return resultValues; + } + + public String getHeader() { + return header; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java new file mode 100644 index 0000000..f650cbb --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.result; + +import org.apache.phoenix.pherf.result.file.ResultFileDetails; + +import java.util.List; + +/** + * This is a common interface for working with Pherf results in various output formats. Implementations of this + * interface can deal with particular details for that format while giving callers to output a simple API to report + * against. + */ +public interface ResultHandler { + public void write(Result result) throws Exception; + public void flush() throws Exception; + public void close() throws Exception; + public List<Result> read() throws Exception; + public boolean isClosed(); + public ResultFileDetails getResultFileDetails(); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b516559/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java ---------------------------------------------------------------------- diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java new file mode 100644 index 0000000..523feb4 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/result/ResultManager.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.pherf.result; + +import org.apache.phoenix.pherf.PherfConstants; +import org.apache.phoenix.pherf.PherfConstants.RunMode; +import org.apache.phoenix.pherf.result.file.ResultFileDetails; +import org.apache.phoenix.pherf.result.impl.CSVResultHandler; +import org.apache.phoenix.pherf.result.impl.ImageResultHandler; +import org.apache.phoenix.pherf.result.impl.XMLResultHandler; + +import java.util.Arrays; +import java.util.List; + +public class ResultManager { + private final List<ResultHandler> resultHandlers; + private final ResultUtil util; + private final PherfConstants.RunMode runMode; + + + public ResultManager(String fileNameSeed, PherfConstants.RunMode runMode) { + this(runMode, Arrays.asList( + new XMLResultHandler(fileNameSeed, ResultFileDetails.XML), + new ImageResultHandler(fileNameSeed, ResultFileDetails.IMAGE), + new CSVResultHandler( + fileNameSeed, + runMode == RunMode.PERFORMANCE ? ResultFileDetails.CSV_DETAILED_PERFORMANCE + : ResultFileDetails.CSV_DETAILED_FUNCTIONAL), + new CSVResultHandler(fileNameSeed, ResultFileDetails.CSV_AGGREGATE_PERFORMANCE) + )); + } + + public ResultManager(PherfConstants.RunMode runMode, List<ResultHandler> resultHandlers) { + this.resultHandlers = resultHandlers; + util = new ResultUtil(); + this.runMode = runMode; + } + + /** + * Write out the result to each writer in the pool + * + * @param result {@link DataModelResult} + * @throws Exception + */ + public synchronized void write(DataModelResult result) throws Exception { + try { + util.ensureBaseResultDirExists(); + final DataModelResult dataModelResultCopy = new DataModelResult(result); + for (ResultHandler handler : resultHandlers) { + util.write(handler, dataModelResultCopy, runMode); + } + } finally { + for (ResultHandler handler : resultHandlers) { + try { + if (handler != null) { + handler.flush(); + handler.close(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + /** + * Write a combined set of results for each result in the list. + * @param dataModelResults List<{@link DataModelResult > </>} + * @throws Exception + */ + public synchronized void write(List<DataModelResult> dataModelResults) throws Exception { + util.ensureBaseResultDirExists(); + + CSVResultHandler detailsCSVWriter = null; + try { + detailsCSVWriter = new CSVResultHandler(PherfConstants.COMBINED_FILE_NAME, ResultFileDetails.CSV_DETAILED_PERFORMANCE); + for (DataModelResult dataModelResult : dataModelResults) { + util.write(detailsCSVWriter, dataModelResult, runMode); + } + } finally { + if (detailsCSVWriter != null) { + detailsCSVWriter.flush(); + detailsCSVWriter.close(); + } + } + } +}