No problem - I understood what you meant and didn't read it as offensive. I
definitely have times where I need to talk to someone else to point out
something obvious when I've been staring at my code too long. :)

S

On Mon, Jan 23, 2017 at 3:40 PM Wyatt Frelot <[email protected]> wrote:

Stephen,

Thank you for pointing the obvious...just skipped over it in my thinking.
That solved my problem

Wyatt

On Mon, Jan 23, 2017 at 6:35 PM Stephen Sisk <[email protected]> wrote:

hi!

Thanks for following up with questions!

I see you're using BatchWriter - based on the docs [1] I see that there are
flush() and close() calls. I presume you'd want to be calling one of those.
I'm not familiar with Accumulo, but usually batching writers wait to do
anything until methods like those are called.

S

[1]
https://accumulo.apache.org/1.5/apidocs/org/apache/accumulo/core/client/BatchWriter.html


On Mon, Jan 23, 2017 at 3:21 PM Wyatt Frelot <[email protected]> wrote:

Good afternoon all,

I am testing capabilities of Beam with Accumulo. I wrote a function to see
what would happen if writing to Accumulo.

The function is completely running, just *nothing* is being inserted into
Accumulo. There are no errors or any other indicator that something is
wrong.

I am using the DirectRunner and everything "appears" to be going out (i.e.,
running), but the system is outputting only one thing for every "Key/Value
Pair" (mutation) of data that is written:

"Received 1"

Where does this come from? Any pointers would greatly be appreciated.

Here is the function:

ublic class SysLogPipeline {
    public static void main (String[] args) throws
AccumuloSecurityException, AccumuloException, TableNotFoundException {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply(TextIO.Read.from("/cloud/accumulo/INSTALL.md"))
                .apply("WordCountDataModel", ParDo.of(new
WordCountDataModel()));
                //.apply("WriteToAccumulo", ParDo.of(new WriteToAccumulo()));

        pipeline.run().waitUntilFinish();

    }
    @DefaultCoder(VarIntCoder.class)
    static class WordCountDataModel extends DoFn<String, Void> {

        // Log and count parse errors.
        //private static final Logger LOG =
LoggerFactory.getLogger(WordCountDataModel.class);

        @ProcessElement
        public void processElement(ProcessContext c) throws
AccumuloSecurityException, AccumuloException, TableNotFoundException {
            System.out.println("Started to the beginning");
            Instance inst = new ZooKeeperInstance("training","localhost");
            Connector conn = inst.getConnector("root", new
PasswordToken("secret"));
            BatchWriter writer = conn.createBatchWriter("test", new
BatchWriterConfig());
            String[] components = c.element().split("[^a-zA-Z']+");
            for (String x : components) {
                System.out.println(x);
                Mutation m;
                m = new Mutation(x);
                m.put("test".getBytes(), "hello".getBytes(),
components[0].getBytes());
                writer.addMutation(m);

            }
            System.out.println("Made it to the end");

        }
    }

    /*static class WriteToAccumulo extends DoFn <Mutation,Integer>  {
        // Log and count parse errors.
        final Logger LOG = LoggerFactory.getLogger(WriteToAccumulo.class);

        WriteToAccumulo() throws AccumuloSecurityException,
AccumuloException, TableNotFoundException {
        }

        @ProcessElement
        public void processElement(ProcessContext c) throws
AccumuloException, AccumuloSecurityException, TableNotFoundException {
            Instance inst = new ZooKeeperInstance("training","localhost");
            Connector conn = inst.getConnector("root", new
PasswordToken("secret"));


            BatchWriter writer = conn.createBatchWriter("test", new
BatchWriterOpts().getBatchWriterConfig());
            writer.addMutation(c.element());
            writer.close();
            c.output(1);
        }
*/

    }

Reply via email to