Good afternoon all,
I am testing capabilities of Beam with Accumulo. I wrote a function to see
what would happen if writing to Accumulo.
The function is completely running, just *nothing* is being inserted into
Accumulo. There are no errors or any other indicator that something is
wrong.
I am using the DirectRunner and everything "appears" to be going out (i.e.,
running), but the system is outputting only one thing for every "Key/Value
Pair" (mutation) of data that is written:
"Received 1"
Where does this come from? Any pointers would greatly be appreciated.
Here is the function:
ublic class SysLogPipeline {
public static void main (String[] args) throws
AccumuloSecurityException, AccumuloException, TableNotFoundException {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.Read.from("/cloud/accumulo/INSTALL.md"))
.apply("WordCountDataModel", ParDo.of(new
WordCountDataModel()));
//.apply("WriteToAccumulo", ParDo.of(new WriteToAccumulo()));
pipeline.run().waitUntilFinish();
}
@DefaultCoder(VarIntCoder.class)
static class WordCountDataModel extends DoFn<String, Void> {
// Log and count parse errors.
//private static final Logger LOG =
LoggerFactory.getLogger(WordCountDataModel.class);
@ProcessElement
public void processElement(ProcessContext c) throws
AccumuloSecurityException, AccumuloException, TableNotFoundException {
System.out.println("Started to the beginning");
Instance inst = new ZooKeeperInstance("training","localhost");
Connector conn = inst.getConnector("root", new
PasswordToken("secret"));
BatchWriter writer = conn.createBatchWriter("test", new
BatchWriterConfig());
String[] components = c.element().split("[^a-zA-Z']+");
for (String x : components) {
System.out.println(x);
Mutation m;
m = new Mutation(x);
m.put("test".getBytes(), "hello".getBytes(),
components[0].getBytes());
writer.addMutation(m);
}
System.out.println("Made it to the end");
}
}
/*static class WriteToAccumulo extends DoFn <Mutation,Integer> {
// Log and count parse errors.
final Logger LOG = LoggerFactory.getLogger(WriteToAccumulo.class);
WriteToAccumulo() throws AccumuloSecurityException,
AccumuloException, TableNotFoundException {
}
@ProcessElement
public void processElement(ProcessContext c) throws
AccumuloException, AccumuloSecurityException, TableNotFoundException {
Instance inst = new ZooKeeperInstance("training","localhost");
Connector conn = inst.getConnector("root", new
PasswordToken("secret"));
BatchWriter writer = conn.createBatchWriter("test", new
BatchWriterOpts().getBatchWriterConfig());
writer.addMutation(c.element());
writer.close();
c.output(1);
}
*/
}