Just as a note, you might want to look into the whole row iterator which lets 
you get back entire rows as single key value pairs that you then break down, 
this saves you from having to check each entry as it comes for if you're on a 
new row or not.

-----Original Message-----
From: Peter Tillotson [mailto:[email protected]] 
Sent: Thursday, April 18, 2013 9:28 AM
To: [email protected]
Subject: Using iterators to add columns to a row



Apologies in advanced - this is some of my first Accumulo code, but I suspect 
there is a much better way to do this. 


Basically I'm trying to add an edge count column to each row of my table, so I 
get rows along the following line
 - node1 {  to:count:3, to:node2:, to:node3:, to:node3:  }

But on the client side I only need write 
 - node1 {  to:node2:, to:node3:, to:node3:  }


I'd like to use the same approach to add indexes to separate column families, 
and combiners to aggregate.  


Aside from the inefficiency of a BatchWriter for each mutation 
 - is this the correct approach? or
 - is there a simpler way to achieve this?


Many thanks in advance


Peter T
 
--- code compiles but not tested ---

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.hadoop.io.Text;

public class EdgeCountIterator extends SortedKeyIterator
{
    private boolean isDisabled = false;
    private Connector connector;
    private Key currentRowStart = null;
    private String tableId;
    private int count = 0;
    
    @Override
    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, 
String> options, IteratorEnvironment env) throws IOException
    {
        super.init(source, options, env);
        if(env.getIteratorScope() == IteratorScope.scan)
        {
            isDisabled = true;
            return;
        }
        
        String user = options.get("username");
        String password = options.get("password");
        String instanceId = options.get("instance");
        tableId = options.get("tableId");
        
        AuthInfo authInfo = new AuthInfo();
        authInfo.setUser(user);
        authInfo.setPassword(password.getBytes());
        authInfo.setInstanceId(instanceId);
        
        authInfo.setInstanceId(instanceId);
        
        
        try
        {
            connector  = HdfsZooInstance.getInstance().getConnector(authInfo);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void next() throws IOException
    {
        if( isDisabled )
        {
            super.next();
            return;
        }
        
        SortedKeyValueIterator<Key, Value> source = getSource();
        while(source.hasTop())
        {
            Key key = source.getTopKey();
            Value val = source.getTopValue();
            
            source.next();
        }
        doMutations(currentRowStart);
        super.next();
    }
    
    public void process( Key key, Value val )
    {
        if(currentRowStart == null)
        {
            currentRowStart = key;
        }
        else
        {
            if( !currentRowStart.getRow().equals(key.getRow()) )
            {
                doMutations(currentRowStart);
                currentRowStart = key;
                count = 0;
            }
        }
        count++;
    }

    public void doMutations( Key rowStartKey )
    {
        BatchWriter writer = null;
        try
        {
            writer = connector.createBatchWriter(tableId, 0, 0, 2);
            Mutation mutation = new Mutation( rowStartKey.getRow() );
            Text colQ = new Text("count");
            ByteBuffer b = ByteBuffer.allocate(4);
            b.putInt(count).array();
            mutation.put(rowStartKey.getColumnFamily(),
                    colQ,
                    new Value(b));
            writer.addMutation(mutation);
        }
        catch(Exception e)
        {
            throw new RuntimeException(e);
            
        }
        finally
        {
            if(writer != null)
            {
                try
                {
                    writer.close();
                }
                catch (MutationsRejectedException e)
                {
                    throw new RuntimeException(e);              
                }
            }
        }
        
    }
}



Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to