hi! Thanks for following up with questions!
I see you're using BatchWriter - based on the docs [1] I see that there are flush() and close() calls. I presume you'd want to be calling one of those. I'm not familiar with Accumulo, but usually batching writers wait to do anything until methods like those are called. S [1] https://accumulo.apache.org/1.5/apidocs/org/apache/accumulo/core/client/BatchWriter.html On Mon, Jan 23, 2017 at 3:21 PM Wyatt Frelot <[email protected]> wrote: > Good afternoon all, > > I am testing capabilities of Beam with Accumulo. I wrote a function to see > what would happen if writing to Accumulo. > > The function is completely running, just *nothing* is being inserted into > Accumulo. There are no errors or any other indicator that something is > wrong. > > I am using the DirectRunner and everything "appears" to be going out > (i.e., running), but the system is outputting only one thing for every > "Key/Value Pair" (mutation) of data that is written: > > "Received 1" > > Where does this come from? Any pointers would greatly be appreciated. > > Here is the function: > > ublic class SysLogPipeline { > public static void main (String[] args) throws AccumuloSecurityException, > AccumuloException, TableNotFoundException { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline.apply(TextIO.Read.from("/cloud/accumulo/INSTALL.md")) > .apply("WordCountDataModel", ParDo.of(new > WordCountDataModel())); > //.apply("WriteToAccumulo", ParDo.of(new WriteToAccumulo())); > > pipeline.run().waitUntilFinish(); > > } > @DefaultCoder(VarIntCoder.class) > static class WordCountDataModel extends DoFn<String, Void> { > > // Log and count parse errors. > //private static final Logger LOG = > LoggerFactory.getLogger(WordCountDataModel.class); > > @ProcessElement > public void processElement(ProcessContext c) throws > AccumuloSecurityException, AccumuloException, TableNotFoundException { > System.out.println("Started to the beginning"); > Instance inst = new ZooKeeperInstance("training","localhost"); > Connector conn = inst.getConnector("root", new > PasswordToken("secret")); > BatchWriter writer = conn.createBatchWriter("test", new > BatchWriterConfig()); > String[] components = c.element().split("[^a-zA-Z']+"); > for (String x : components) { > System.out.println(x); > Mutation m; > m = new Mutation(x); > m.put("test".getBytes(), "hello".getBytes(), > components[0].getBytes()); > writer.addMutation(m); > > } > System.out.println("Made it to the end"); > > } > } > > /*static class WriteToAccumulo extends DoFn <Mutation,Integer> { > // Log and count parse errors. > final Logger LOG = LoggerFactory.getLogger(WriteToAccumulo.class); > > WriteToAccumulo() throws AccumuloSecurityException, > AccumuloException, TableNotFoundException { > } > > @ProcessElement > public void processElement(ProcessContext c) throws > AccumuloException, AccumuloSecurityException, TableNotFoundException { > Instance inst = new ZooKeeperInstance("training","localhost"); > Connector conn = inst.getConnector("root", new > PasswordToken("secret")); > > > BatchWriter writer = conn.createBatchWriter("test", new > BatchWriterOpts().getBatchWriterConfig()); > writer.addMutation(c.element()); > writer.close(); > c.output(1); > } > */ > > } > > > >
