No problem - I understood what you meant and didn't read it as offensive. I definitely have times where I need to talk to someone else to point out something obvious when I've been staring at my code too long. :)
S On Mon, Jan 23, 2017 at 3:40 PM Wyatt Frelot <[email protected]> wrote: Stephen, Thank you for pointing the obvious...just skipped over it in my thinking. That solved my problem Wyatt On Mon, Jan 23, 2017 at 6:35 PM Stephen Sisk <[email protected]> wrote: hi! Thanks for following up with questions! I see you're using BatchWriter - based on the docs [1] I see that there are flush() and close() calls. I presume you'd want to be calling one of those. I'm not familiar with Accumulo, but usually batching writers wait to do anything until methods like those are called. S [1] https://accumulo.apache.org/1.5/apidocs/org/apache/accumulo/core/client/BatchWriter.html On Mon, Jan 23, 2017 at 3:21 PM Wyatt Frelot <[email protected]> wrote: Good afternoon all, I am testing capabilities of Beam with Accumulo. I wrote a function to see what would happen if writing to Accumulo. The function is completely running, just *nothing* is being inserted into Accumulo. There are no errors or any other indicator that something is wrong. I am using the DirectRunner and everything "appears" to be going out (i.e., running), but the system is outputting only one thing for every "Key/Value Pair" (mutation) of data that is written: "Received 1" Where does this come from? Any pointers would greatly be appreciated. Here is the function: ublic class SysLogPipeline { public static void main (String[] args) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.Read.from("/cloud/accumulo/INSTALL.md")) .apply("WordCountDataModel", ParDo.of(new WordCountDataModel())); //.apply("WriteToAccumulo", ParDo.of(new WriteToAccumulo())); pipeline.run().waitUntilFinish(); } @DefaultCoder(VarIntCoder.class) static class WordCountDataModel extends DoFn<String, Void> { // Log and count parse errors. //private static final Logger LOG = LoggerFactory.getLogger(WordCountDataModel.class); @ProcessElement public void processElement(ProcessContext c) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { System.out.println("Started to the beginning"); Instance inst = new ZooKeeperInstance("training","localhost"); Connector conn = inst.getConnector("root", new PasswordToken("secret")); BatchWriter writer = conn.createBatchWriter("test", new BatchWriterConfig()); String[] components = c.element().split("[^a-zA-Z']+"); for (String x : components) { System.out.println(x); Mutation m; m = new Mutation(x); m.put("test".getBytes(), "hello".getBytes(), components[0].getBytes()); writer.addMutation(m); } System.out.println("Made it to the end"); } } /*static class WriteToAccumulo extends DoFn <Mutation,Integer> { // Log and count parse errors. final Logger LOG = LoggerFactory.getLogger(WriteToAccumulo.class); WriteToAccumulo() throws AccumuloSecurityException, AccumuloException, TableNotFoundException { } @ProcessElement public void processElement(ProcessContext c) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Instance inst = new ZooKeeperInstance("training","localhost"); Connector conn = inst.getConnector("root", new PasswordToken("secret")); BatchWriter writer = conn.createBatchWriter("test", new BatchWriterOpts().getBatchWriterConfig()); writer.addMutation(c.element()); writer.close(); c.output(1); } */ }
