/**
There you go.
You'll need these in your build.gradle/pom.xml
compile 'ch.qos.logback:logback-classic:1.1.2'
compile 'org.apache.zookeeper:zookeeper:pom:3.4.6'
*/
package om.services.cl;
import ch.qos.logback.classic.Level;
import om.services.Utils;
import org.apache.zookeeper.ZooKeeperMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
/**
* CL tool for executing bulk of operations read from a file into zookeeper
*/
public class ZkBulkLoad {
private ZooKeeperMain zkMain;
private int currentLineNumber;
private List<String> failureMessages = new ArrayList<>();
public static void main(String[] args) throws IOException,
InterruptedException {
final String PROP_KEY_BULK_FILE = "zkBulkLoad.bulkFile";
final String PROP_KEY_LOG_LEVEL = "zkBulkLoad.logLevel";
final Level DEFAULT_LOG_LEVEL = Level.WARN;
setRootLogLevel(PROP_KEY_LOG_LEVEL, DEFAULT_LOG_LEVEL);
ZooKeeperMain zkMain = new ZooKeeperMain(args);
final String bulkFilePath =
Utils.getMandatoryProperty(PROP_KEY_BULK_FILE);
final ZkBulkLoad zkBulkLoad = new ZkBulkLoad(zkMain);
try {
final Result result = zkBulkLoad.load(new File(bulkFilePath));
if (!result.getFailures().isEmpty()) {
System.err.println(result.getFailures());
}
} finally {
zkBulkLoad.close();
}
}
public ZkBulkLoad(final ZooKeeperMain zkMain) {
this.zkMain = zkMain;
}
public Result load(final File bulkFile) throws IOException {
resetState();
final Stream<String> lines =
Files.lines(bulkFile.toPath()).filter(line -> !line.isEmpty());
try {
lines.forEach(this::executeLine);
return new Result(failureMessages);
} finally {
if (lines!=null) {
lines.close();
}
if (!failureMessages.isEmpty()) {
failureMessages.add(0, String.format("\nThere were %s
failures:\n", failureMessages.size()));
}
}
}
public void close() {}
// ----------------------------------------------- private
---------------------------------------------------------
private static void setRootLogLevel(final String propKeyLogLevel,
final Level defaultLogLevel) {
final ch.qos.logback.classic.Logger rootLogger =
(ch.qos.logback.classic.Logger)
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
final String providedLogLevel = System.getProperty(propKeyLogLevel);
if (providedLogLevel == null) {
System.out.printf("property %s was not provided, setting root
logger level by default to WARN\n", propKeyLogLevel);
rootLogger.setLevel(defaultLogLevel);
} else {
Level logLevel = Level.toLevel(providedLogLevel,
defaultLogLevel);
System.out.printf("property %s was provided [%s]. setting root
logger level to [%s]\n", propKeyLogLevel, providedLogLevel, logLevel);
rootLogger.setLevel(logLevel);
}
}
private void resetState() {
currentLineNumber = 1;
failureMessages.clear();
}
private void executeLine(String line) {
try {
System.out.println("executing: " + line);
zkMain.executeLine(line);
} catch (Exception e) {
failureMessages.add("line " + currentLineNumber + ": " +
e.getMessage());
}
currentLineNumber++;
}
class Result {
List<String> failures = new ArrayList<>();
public Result(List<String> failures) {
this.failures = failures;
}
public List<String> getFailures() {
return failures;
}
public void setFailures(List<String> failures) {
this.failures = failures;
}
}
}
--
View this message in context:
http://zookeeper-user.578899.n2.nabble.com/Bulk-load-or-ingest-tp7579884p7580267.html
Sent from the zookeeper-user mailing list archive at Nabble.com.