Stephen,

I probably could have said that a little bit better, please excuse how I
expressed my appreciation in the previous email.  That definitely solved my
problem and i appreciated you taking the time to respond. I just skipped
over it while thinking about it.

Thanks again!

Wyatt

On Mon, Jan 23, 2017 at 6:39 PM Wyatt Frelot <[email protected]> wrote:

> Stephen,
>
> Thank you for pointing the obvious...just skipped over it in my thinking.
> That solved my problem
>
> Wyatt
>
> On Mon, Jan 23, 2017 at 6:35 PM Stephen Sisk <[email protected]> wrote:
>
> hi!
>
> Thanks for following up with questions!
>
> I see you're using BatchWriter - based on the docs [1] I see that there
> are flush() and close() calls. I presume you'd want to be calling one of
> those. I'm not familiar with Accumulo, but usually batching writers wait to
> do anything until methods like those are called.
>
> S
>
> [1]
> https://accumulo.apache.org/1.5/apidocs/org/apache/accumulo/core/client/BatchWriter.html
>
>
> On Mon, Jan 23, 2017 at 3:21 PM Wyatt Frelot <[email protected]> wrote:
>
> Good afternoon all,
>
> I am testing capabilities of Beam with Accumulo. I wrote a function to see
> what would happen if writing to Accumulo.
>
> The function is completely running, just *nothing* is being inserted into
> Accumulo. There are no errors or any other indicator that something is
> wrong.
>
> I am using the DirectRunner and everything "appears" to be going out
> (i.e., running), but the system is outputting only one thing for every
> "Key/Value Pair" (mutation) of data that is written:
>
> "Received 1"
>
> Where does this come from? Any pointers would greatly be appreciated.
>
> Here is the function:
>
> ublic class SysLogPipeline {
>     public static void main (String[] args) throws AccumuloSecurityException, 
> AccumuloException, TableNotFoundException {
>         PipelineOptions options = PipelineOptionsFactory.create();
>         Pipeline pipeline = Pipeline.create(options);
>         pipeline.apply(TextIO.Read.from("/cloud/accumulo/INSTALL.md"))
>                 .apply("WordCountDataModel", ParDo.of(new 
> WordCountDataModel()));
>                 //.apply("WriteToAccumulo", ParDo.of(new WriteToAccumulo()));
>
>         pipeline.run().waitUntilFinish();
>
>     }
>     @DefaultCoder(VarIntCoder.class)
>     static class WordCountDataModel extends DoFn<String, Void> {
>
>         // Log and count parse errors.
>         //private static final Logger LOG = 
> LoggerFactory.getLogger(WordCountDataModel.class);
>
>         @ProcessElement
>         public void processElement(ProcessContext c) throws 
> AccumuloSecurityException, AccumuloException, TableNotFoundException {
>             System.out.println("Started to the beginning");
>             Instance inst = new ZooKeeperInstance("training","localhost");
>             Connector conn = inst.getConnector("root", new 
> PasswordToken("secret"));
>             BatchWriter writer = conn.createBatchWriter("test", new 
> BatchWriterConfig());
>             String[] components = c.element().split("[^a-zA-Z']+");
>             for (String x : components) {
>                 System.out.println(x);
>                 Mutation m;
>                 m = new Mutation(x);
>                 m.put("test".getBytes(), "hello".getBytes(), 
> components[0].getBytes());
>                 writer.addMutation(m);
>
>             }
>             System.out.println("Made it to the end");
>
>         }
>     }
>
>     /*static class WriteToAccumulo extends DoFn <Mutation,Integer>  {
>         // Log and count parse errors.
>         final Logger LOG = LoggerFactory.getLogger(WriteToAccumulo.class);
>
>         WriteToAccumulo() throws AccumuloSecurityException, 
> AccumuloException, TableNotFoundException {
>         }
>
>         @ProcessElement
>         public void processElement(ProcessContext c) throws 
> AccumuloException, AccumuloSecurityException, TableNotFoundException {
>             Instance inst = new ZooKeeperInstance("training","localhost");
>             Connector conn = inst.getConnector("root", new 
> PasswordToken("secret"));
>
>
>             BatchWriter writer = conn.createBatchWriter("test", new 
> BatchWriterOpts().getBatchWriterConfig());
>             writer.addMutation(c.element());
>             writer.close();
>             c.output(1);
>         }
> */
>
>     }
>
>
>
>

Reply via email to