Good afternoon all,

I am testing capabilities of Beam with Accumulo. I wrote a function to see
what would happen if writing to Accumulo.

The function is completely running, just *nothing* is being inserted into
Accumulo. There are no errors or any other indicator that something is
wrong.

I am using the DirectRunner and everything "appears" to be going out (i.e.,
running), but the system is outputting only one thing for every "Key/Value
Pair" (mutation) of data that is written:

"Received 1"

Where does this come from? Any pointers would greatly be appreciated.

Here is the function:

ublic class SysLogPipeline {
    public static void main (String[] args) throws
AccumuloSecurityException, AccumuloException, TableNotFoundException {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply(TextIO.Read.from("/cloud/accumulo/INSTALL.md"))
                .apply("WordCountDataModel", ParDo.of(new
WordCountDataModel()));
                //.apply("WriteToAccumulo", ParDo.of(new WriteToAccumulo()));

        pipeline.run().waitUntilFinish();

    }
    @DefaultCoder(VarIntCoder.class)
    static class WordCountDataModel extends DoFn<String, Void> {

        // Log and count parse errors.
        //private static final Logger LOG =
LoggerFactory.getLogger(WordCountDataModel.class);

        @ProcessElement
        public void processElement(ProcessContext c) throws
AccumuloSecurityException, AccumuloException, TableNotFoundException {
            System.out.println("Started to the beginning");
            Instance inst = new ZooKeeperInstance("training","localhost");
            Connector conn = inst.getConnector("root", new
PasswordToken("secret"));
            BatchWriter writer = conn.createBatchWriter("test", new
BatchWriterConfig());
            String[] components = c.element().split("[^a-zA-Z']+");
            for (String x : components) {
                System.out.println(x);
                Mutation m;
                m = new Mutation(x);
                m.put("test".getBytes(), "hello".getBytes(),
components[0].getBytes());
                writer.addMutation(m);

            }
            System.out.println("Made it to the end");

        }
    }

    /*static class WriteToAccumulo extends DoFn <Mutation,Integer>  {
        // Log and count parse errors.
        final Logger LOG = LoggerFactory.getLogger(WriteToAccumulo.class);

        WriteToAccumulo() throws AccumuloSecurityException,
AccumuloException, TableNotFoundException {
        }

        @ProcessElement
        public void processElement(ProcessContext c) throws
AccumuloException, AccumuloSecurityException, TableNotFoundException {
            Instance inst = new ZooKeeperInstance("training","localhost");
            Connector conn = inst.getConnector("root", new
PasswordToken("secret"));


            BatchWriter writer = conn.createBatchWriter("test", new
BatchWriterOpts().getBatchWriterConfig());
            writer.addMutation(c.element());
            writer.close();
            c.output(1);
        }
*/

    }

Reply via email to