Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 e9e91d7b5 -> bb1deac0b
  refs/heads/trunk 235bae7b3 -> dc6080107


Revert 5417 changes to hadoop
Patch by slebresne, reviewed by brandonwilliams for CASSANDRA-7241


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bb1deac0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bb1deac0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bb1deac0

Branch: refs/heads/cassandra-2.1
Commit: bb1deac0b9073ca3ee590731fbbf505867a890d1
Parents: e9e91d7
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Wed May 28 13:21:59 2014 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Wed May 28 13:22:37 2014 -0500

----------------------------------------------------------------------
 .../hadoop/ColumnFamilyInputFormat.java         |  7 +-
 .../hadoop/ColumnFamilyRecordReader.java        | 70 ++++++++++----------
 .../hadoop/pig/AbstractCassandraStorage.java    |  2 +-
 3 files changed, 39 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb1deac0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java 
b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index a2c7a36..686d486 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellName;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
@@ -45,15 +44,15 @@ import org.apache.hadoop.mapreduce.*;
  *
  * The default split size is 64k rows.
  */
-public class ColumnFamilyInputFormat extends 
AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<CellName, Cell>>
+public class ColumnFamilyInputFormat extends 
AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>>
 {
     
-    public RecordReader<ByteBuffer, SortedMap<CellName, Cell>> 
createRecordReader(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException
+    public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> 
createRecordReader(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();
     }
 
-    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, 
SortedMap<CellName, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit 
split, JobConf jobConf, final Reporter reporter) throws IOException
+    public org.apache.hadoop.mapred.RecordReader<ByteBuffer, 
SortedMap<ByteBuffer, Cell>> 
getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, 
final Reporter reporter) throws IOException
     {
         TaskAttemptContext tac = HadoopCompat.newMapContext(
                 jobConf,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb1deac0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java 
b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index b6b12ec..0b52904 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -30,8 +30,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.thrift.*;
@@ -45,8 +46,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 
-public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, 
SortedMap<CellName, Cell>>
-    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, 
SortedMap<CellName, Cell>>
+public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, 
SortedMap<ByteBuffer, Cell>>
+    implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, 
SortedMap<ByteBuffer, Cell>>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ColumnFamilyRecordReader.class);
 
@@ -54,7 +55,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
 
     private ColumnFamilySplit split;
     private RowIterator iter;
-    private Pair<ByteBuffer, SortedMap<CellName, Cell>> currentRow;
+    private Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> currentRow;
     private SlicePredicate predicate;
     private boolean isEmptyPredicate;
     private int totalRowCount; // total number of rows to fetch
@@ -93,7 +94,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
         return currentRow.left;
     }
 
-    public SortedMap<CellName, Cell> getCurrentValue()
+    public SortedMap<ByteBuffer, Cell> getCurrentValue()
     {
         return currentRow.right;
     }
@@ -211,12 +212,12 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
         return split.getLocations()[0];
     }
 
-    private abstract class RowIterator extends 
AbstractIterator<Pair<ByteBuffer, SortedMap<CellName, Cell>>>
+    private abstract class RowIterator extends 
AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>>
     {
         protected List<KeySlice> rows;
         protected int totalRead = 0;
         protected final boolean isSuper;
-        protected final CellNameType comparator;
+        protected final AbstractType<?> comparator;
         protected final AbstractType<?> subComparator;
         protected final IPartitioner partitioner;
 
@@ -254,7 +255,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
                         cfDef.column_type = ByteBufferUtil.string(type);
                 }
 
-                comparator = 
CellNames.fromAbstractType(TypeParser.parse(cfDef.comparator_type), true);
+                comparator = TypeParser.parse(cfDef.comparator_type);
                 subComparator = cfDef.subcomparator_type == null ? null : 
TypeParser.parse(cfDef.subcomparator_type);
             }
             catch (ConfigurationException e)
@@ -298,21 +299,21 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             for (org.apache.cassandra.thrift.Column column : 
super_column.columns)
             {
                 Cell c = unthriftifySimple(column);
-                
cells.add(c.withUpdatedName(comparator.makeCellName(super_column.name, 
c.name().toByteBuffer())));
+                
cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name,
 c.name().toByteBuffer()))));
             }
             return cells;
         }
 
         protected Cell unthriftifySimple(org.apache.cassandra.thrift.Column 
column)
         {
-            return new BufferCell(comparator.cellFromByteBuffer(column.name), 
column.value, column.timestamp);
+            return new BufferCell(CellNames.simpleDense(column.name), 
column.value, column.timestamp);
         }
 
         private Cell unthriftifyCounter(CounterColumn column)
         {
             //CounterColumns read the counterID from the System keyspace, so 
need the StorageService running and access
             //to cassandra.yaml. To avoid a Hadoop needing access to yaml 
return a regular Cell.
-            return new BufferCell(comparator.cellFromByteBuffer(column.name), 
ByteBufferUtil.bytes(column.value), 0);
+            return new BufferCell(CellNames.simpleDense(column.name), 
ByteBufferUtil.bytes(column.value), 0);
         }
 
         private List<Cell> unthriftifySuperCounter(CounterSuperColumn 
super_column)
@@ -321,7 +322,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             for (CounterColumn column : super_column.columns)
             {
                 Cell c = unthriftifyCounter(column);
-                
cells.add(c.withUpdatedName(comparator.makeCellName(super_column.name, 
c.name().toByteBuffer())));
+                
cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name,
 c.name().toByteBuffer()))));
             }
             return cells;
         }
@@ -402,7 +403,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        protected Pair<ByteBuffer, SortedMap<CellName, Cell>> computeNext()
+        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
         {
             maybeInit();
             if (rows == null)
@@ -410,12 +411,13 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
 
             totalRead++;
             KeySlice ks = rows.get(i++);
-            SortedMap<CellName, Cell> map = new TreeMap<CellName, 
Cell>(comparator);
+            AbstractType<?> comp = isSuper ? 
CompositeType.getInstance(comparator, subComparator) : comparator;
+            SortedMap<ByteBuffer, Cell> map = new TreeMap<ByteBuffer, 
Cell>(comp);
             for (ColumnOrSuperColumn cosc : ks.columns)
             {
                 List<Cell> cells = unthriftify(cosc);
                 for (Cell cell : cells)
-                    map.put(cell.name(), cell);
+                    map.put(cell.name().toByteBuffer(), cell);
             }
             return Pair.create(ks.key, map);
         }
@@ -423,8 +425,8 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
 
     private class WideRowIterator extends RowIterator
     {
-        private PeekingIterator<Pair<ByteBuffer, SortedMap<CellName, Cell>>> 
wideColumns;
-        private Composite lastColumn = Composites.EMPTY;
+        private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> 
wideColumns;
+        private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
         private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         private void maybeInit()
@@ -453,7 +455,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
 
             try
             {
-                rows = client.get_paged_slice(cfName, keyRange, 
lastColumn.toByteBuffer(), consistencyLevel);
+                rows = client.get_paged_slice(cfName, keyRange, lastColumn, 
consistencyLevel);
                 int n = 0;
                 for (KeySlice row : rows)
                     n += row.columns.size();
@@ -472,14 +474,14 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        protected Pair<ByteBuffer, SortedMap<CellName, Cell>> computeNext()
+        protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext()
         {
             maybeInit();
             if (rows == null)
                 return endOfData();
 
-            Pair<ByteBuffer, SortedMap<CellName, Cell>> next = 
wideColumns.next();
-            lastColumn = next.right.values().iterator().next().name();
+            Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next = 
wideColumns.next();
+            lastColumn = next.right.keySet().iterator().next().duplicate();
 
             maybeIncreaseRowCounter(next);
             return next;
@@ -490,7 +492,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
          * Increases the row counter only if we really moved to the next row.
          * @param next just fetched row slice
          */
-        private void maybeIncreaseRowCounter(Pair<ByteBuffer, 
SortedMap<CellName, Cell>> next)
+        private void maybeIncreaseRowCounter(Pair<ByteBuffer, 
SortedMap<ByteBuffer, Cell>> next)
         {
             ByteBuffer currentKey = next.left;
             if (!currentKey.equals(lastCountedKey))
@@ -500,7 +502,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
             }
         }
 
-        private class WideColumnIterator extends 
AbstractIterator<Pair<ByteBuffer, SortedMap<CellName, Cell>>>
+        private class WideColumnIterator extends 
AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>>
         {
             private final Iterator<KeySlice> rows;
             private Iterator<ColumnOrSuperColumn> columns;
@@ -521,30 +523,28 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
                 columns = currentRow.columns.iterator();
             }
 
-            protected Pair<ByteBuffer, SortedMap<CellName, Cell>> computeNext()
+            protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> 
computeNext()
             {
-                CellNameType cellType = subComparator == null
-                                      ? comparator
-                                      : new 
CompoundDenseCellNameType(Arrays.asList(comparator.asAbstractType(), 
subComparator));
+                AbstractType<?> comp = isSuper ? 
CompositeType.getInstance(comparator, subComparator) : comparator;
                 while (true)
                 {
                     if (columns.hasNext())
                     {
                         ColumnOrSuperColumn cosc = columns.next();
-                        SortedMap<CellName, Cell> map;
+                        SortedMap<ByteBuffer, Cell> map;
                         List<Cell> cells = unthriftify(cosc);
                         if (cells.size() == 1)
                         {
-                            map = ImmutableSortedMap.of(cells.get(0).name(), 
cells.get(0));
+                            map = 
ImmutableSortedMap.of(cells.get(0).name().toByteBuffer(), cells.get(0));
                         }
                         else
                         {
                             assert isSuper;
-                            map = new TreeMap<CellName, Cell>(cellType);
+                            map = new TreeMap<ByteBuffer, Cell>(comp);
                             for (Cell cell : cells)
-                                map.put(cell.name(), cell);
+                                map.put(cell.name().toByteBuffer(), cell);
                         }
-                        return Pair.<ByteBuffer, SortedMap<CellName, 
Cell>>create(currentRow.key, map);
+                        return Pair.<ByteBuffer, SortedMap<ByteBuffer, 
Cell>>create(currentRow.key, map);
                     }
 
                     if (!rows.hasNext())
@@ -561,7 +561,7 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
     // to the old. Thus, expect a small performance hit.
     // And obviously this wouldn't work for wide rows. But since 
ColumnFamilyInputFormat
     // and ColumnFamilyRecordReader don't support them, it should be fine for 
now.
-    public boolean next(ByteBuffer key, SortedMap<CellName, Cell> value) 
throws IOException
+    public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Cell> value) 
throws IOException
     {
         if (this.nextKeyValue())
         {
@@ -582,9 +582,9 @@ public class ColumnFamilyRecordReader extends 
RecordReader<ByteBuffer, SortedMap
         return ByteBuffer.wrap(new byte[this.keyBufferSize]);
     }
 
-    public SortedMap<CellName, Cell> createValue()
+    public SortedMap<ByteBuffer, Cell> createValue()
     {
-        return new TreeMap<CellName, Cell>();
+        return new TreeMap<ByteBuffer, Cell>();
     }
 
     public long getPos() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb1deac0/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 9737d67..361baa4 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -130,7 +130,7 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
         if(comparator instanceof AbstractCompositeType)
             setTupleValue(pair, 0, 
composeComposite((AbstractCompositeType)comparator,colName));
         else
-            setTupleValue(pair, 0, cassandraToObj(comparator, 
col.name().toByteBuffer()));
+            setTupleValue(pair, 0, cassandraToObj(comparator, colName));
 
         // value
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);

Reply via email to