Author: brandonwilliams Date: Thu Jul 21 20:10:54 2011 New Revision: 1149341
URL: http://svn.apache.org/viewvc?rev=1149341&view=rev Log: Use a UDF-specific context signature. Patch by Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-2869 Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1149341&r1=1149340&r2=1149341&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Thu Jul 21 20:10:54 2011 @@ -68,8 +68,6 @@ public class CassandraStorage extends Lo public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; - private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema"; - private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -78,6 +76,8 @@ public class CassandraStorage extends Lo private boolean slice_reverse = false; private String keyspace; private String column_family; + private String loadSignature; + private String storeSignature; private Configuration conf; private RecordReader reader; @@ -112,7 +112,7 @@ public class CassandraStorage extends Lo if (!reader.nextKeyValue()) return null; - CfDef cfDef = getCfDef(); + CfDef cfDef = getCfDef(loadSignature); ByteBuffer key = (ByteBuffer)reader.getCurrentKey(); SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); assert key != null && cf != null; @@ -165,11 +165,11 @@ public class CassandraStorage extends Lo return pair; } - private CfDef getCfDef() + private CfDef getCfDef(String signature) { UDFContext context = UDFContext.getUDFContext(); Properties property = context.getUDFProperties(CassandraStorage.class); - return cfdefFromString(property.getProperty(getSchemaContextKey())); + return cfdefFromString(property.getProperty(signature)); } private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException @@ -289,7 +289,7 @@ public class CassandraStorage extends Lo } ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); - initSchema(); + initSchema(loadSignature); } @Override @@ -298,9 +298,16 @@ public class CassandraStorage extends Lo return location; } + @Override + public void setUDFContextSignature(String signature) + { + this.loadSignature = signature; + } + /* StoreFunc methods */ public void setStoreFuncUDFContextSignature(String signature) { + this.storeSignature = signature; } public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException @@ -314,7 +321,7 @@ public class CassandraStorage extends Lo setLocationFromUri(location); ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); - initSchema(); + initSchema(storeSignature); } public OutputFormat getOutputFormat() @@ -346,7 +353,7 @@ public class CassandraStorage extends Lo ByteBuffer key = objToBB(t.get(0)); DefaultDataBag pairs = (DefaultDataBag) t.get(1); ArrayList<Mutation> mutationList = new ArrayList<Mutation>(); - CfDef cfDef = getCfDef(); + CfDef cfDef = getCfDef(storeSignature); List<AbstractType> marshallers = getDefaultMarshallers(cfDef); Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); try @@ -404,7 +411,6 @@ public class CassandraStorage extends Lo column.timestamp = System.currentTimeMillis() * 1000; mutation.column_or_supercolumn = new ColumnOrSuperColumn(); mutation.column_or_supercolumn.column = column; - mutationList.add(mutation); } } mutationList.add(mutation); @@ -412,7 +418,7 @@ public class CassandraStorage extends Lo } catch (ClassCastException e) { - throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily"); + throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e); } try { @@ -430,14 +436,13 @@ public class CassandraStorage extends Lo /* Methods to get the column family schema from Cassandra */ - private void initSchema() + private void initSchema(String signature) { UDFContext context = UDFContext.getUDFContext(); Properties property = context.getUDFProperties(CassandraStorage.class); - String schemaContextKey = getSchemaContextKey(); // Only get the schema if we haven't already gotten it - if (!property.containsKey(schemaContextKey)) + if (!property.containsKey(signature)) { Cassandra.Client client = null; try @@ -455,7 +460,7 @@ public class CassandraStorage extends Lo break; } } - property.setProperty(schemaContextKey, cfdefToString(cfDef)); + property.setProperty(signature, cfdefToString(cfDef)); } catch (TException e) { @@ -521,14 +526,4 @@ public class CassandraStorage extends Lo } return cfDef; } - - private String getSchemaContextKey() - { - StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX); - sb.append('.'); - sb.append(keyspace); - sb.append('.'); - sb.append(column_family); - return sb.toString(); - } }