Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 362e13206 -> e4c344c58
  refs/heads/cassandra-3.9 c7547e0de -> 1f014b2ca
  refs/heads/trunk e42352763 -> 9242c85cf


Avoid deserialization error after altering column type

This makes sure the column used when serializing intra-node messages is
"current". Previously, we would use the type used during deserialization
which could not be "current" due to an ALTER TYPE.

patch by slebresne; reviewed by thobbs for CASSANDRA-11820


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e4c344c5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e4c344c5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e4c344c5

Branch: refs/heads/cassandra-3.0
Commit: e4c344c58f8ca8f69224855080de4ec266fb671e
Parents: 362e132
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Mon Jun 27 14:17:27 2016 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Thu Jun 30 11:14:01 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/rows/BufferCell.java    | 16 +++----
 src/java/org/apache/cassandra/db/rows/Cell.java |  4 +-
 .../cassandra/db/rows/UnfilteredSerializer.java | 30 +++++++++----
 .../org/apache/cassandra/cql3/CQLTester.java    | 39 +++++++++++++++--
 .../cql3/validation/operations/AlterTest.java   | 46 +++++++++++++-------
 6 files changed, 98 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ae37d2c..573f704 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.9
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
 Merged from 2.2:
  * MemoryUtil.getShort() should return an unsigned short also for 
architectures not supporting unaligned memory accesses (CASSANDRA-11973)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java 
b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index 22b629a..db0ded5 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -228,7 +228,7 @@ public class BufferCell extends AbstractCell
         private final static int USE_ROW_TIMESTAMP_MASK      = 0x08; // Wether 
the cell has the same timestamp than the row this is a cell of.
         private final static int USE_ROW_TTL_MASK            = 0x10; // Wether 
the cell has the same ttl than the row this is a cell of.
 
-        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo 
rowLiveness, SerializationHeader header) throws IOException
+        public void serialize(Cell cell, ColumnDefinition column, 
DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) 
throws IOException
         {
             assert cell != null;
             boolean hasValue = cell.value().hasRemaining();
@@ -260,11 +260,11 @@ public class BufferCell extends AbstractCell
             if (isExpiring && !useRowTTL)
                 header.writeTTL(cell.ttl(), out);
 
-            if (cell.column().isComplex())
-                cell.column().cellPathSerializer().serialize(cell.path(), out);
+            if (column.isComplex())
+                column.cellPathSerializer().serialize(cell.path(), out);
 
             if (hasValue)
-                header.getType(cell.column()).writeValue(cell.value(), out);
+                header.getType(column).writeValue(cell.value(), out);
         }
 
         public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, 
ColumnDefinition column, SerializationHeader header, SerializationHelper 
helper) throws IOException
@@ -308,7 +308,7 @@ public class BufferCell extends AbstractCell
             return new BufferCell(column, timestamp, ttl, localDeletionTime, 
value, path);
         }
 
-        public long serializedSize(Cell cell, LivenessInfo rowLiveness, 
SerializationHeader header)
+        public long serializedSize(Cell cell, ColumnDefinition column, 
LivenessInfo rowLiveness, SerializationHeader header)
         {
             long size = 1; // flags
             boolean hasValue = cell.value().hasRemaining();
@@ -325,11 +325,11 @@ public class BufferCell extends AbstractCell
             if (isExpiring && !useRowTTL)
                 size += header.ttlSerializedSize(cell.ttl());
 
-            if (cell.column().isComplex())
-                size += 
cell.column().cellPathSerializer().serializedSize(cell.path());
+            if (column.isComplex())
+                size += 
column.cellPathSerializer().serializedSize(cell.path());
 
             if (hasValue)
-                size += 
header.getType(cell.column()).writtenLength(cell.value());
+                size += header.getType(column).writtenLength(cell.value());
 
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java 
b/src/java/org/apache/cassandra/db/rows/Cell.java
index b10ce06..d10cc74 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -145,11 +145,11 @@ public abstract class Cell extends ColumnData
 
     public interface Serializer
     {
-        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo 
rowLiveness, SerializationHeader header) throws IOException;
+        public void serialize(Cell cell, ColumnDefinition column, 
DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) 
throws IOException;
 
         public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, 
ColumnDefinition column, SerializationHeader header, SerializationHelper 
helper) throws IOException;
 
-        public long serializedSize(Cell cell, LivenessInfo rowLiveness, 
SerializationHeader header);
+        public long serializedSize(Cell cell, ColumnDefinition column, 
LivenessInfo rowLiveness, SerializationHeader header);
 
         // Returns if the skipped cell was an actual cell (i.e. it had its 
presence flag).
         public boolean skip(DataInputPlus in, ColumnDefinition column, 
SerializationHeader header) throws IOException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index e4202c9..dc6f187 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.SearchIterator;
 
 /**
  * Serialize/deserialize a single Unfiltered (both on-wire and on-disk).
@@ -177,16 +178,25 @@ public class UnfilteredSerializer
         if (!hasAllColumns)
             Columns.serializer.serializeSubset(Collections2.transform(row, 
ColumnData::column), headerColumns, out);
 
+        SearchIterator<ColumnDefinition, ColumnDefinition> si = 
headerColumns.iterator();
         for (ColumnData data : row)
         {
+            // We can obtain the column for data directly from data.column(). 
However, if the cell/complex data
+            // originates from a sstable, the column we'll get will have the 
type used when the sstable was serialized,
+            // and if that type have been recently altered, that may not be 
the type we want to serialize the column
+            // with. So we use the ColumnDefinition from the "header" which is 
"current". Also see #11810 for what
+            // happens if we don't do that.
+            ColumnDefinition column = si.next(data.column());
+            assert column != null;
+
             if (data.column.isSimple())
-                Cell.serializer.serialize((Cell) data, out, pkLiveness, 
header);
+                Cell.serializer.serialize((Cell) data, column, out, 
pkLiveness, header);
             else
-                writeComplexColumn((ComplexColumnData) data, 
hasComplexDeletion, pkLiveness, header, out);
+                writeComplexColumn((ComplexColumnData) data, column, 
hasComplexDeletion, pkLiveness, header, out);
         }
     }
 
-    private void writeComplexColumn(ComplexColumnData data, boolean 
hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, 
DataOutputPlus out)
+    private void writeComplexColumn(ComplexColumnData data, ColumnDefinition 
column, boolean hasComplexDeletion, LivenessInfo rowLiveness, 
SerializationHeader header, DataOutputPlus out)
     throws IOException
     {
         if (hasComplexDeletion)
@@ -194,7 +204,7 @@ public class UnfilteredSerializer
 
         out.writeUnsignedVInt(data.cellsCount());
         for (Cell cell : data)
-            Cell.serializer.serialize(cell, out, rowLiveness, header);
+            Cell.serializer.serialize(cell, column, out, rowLiveness, header);
     }
 
     private void serialize(RangeTombstoneMarker marker, SerializationHeader 
header, DataOutputPlus out, long previousUnfilteredSize, int version)
@@ -274,18 +284,22 @@ public class UnfilteredSerializer
         if (!hasAllColumns)
             size += 
Columns.serializer.serializedSubsetSize(Collections2.transform(row, 
ColumnData::column), header.columns(isStatic));
 
+        SearchIterator<ColumnDefinition, ColumnDefinition> si = 
headerColumns.iterator();
         for (ColumnData data : row)
         {
+            ColumnDefinition column = si.next(data.column());
+            assert column != null;
+
             if (data.column.isSimple())
-                size += Cell.serializer.serializedSize((Cell) data, 
pkLiveness, header);
+                size += Cell.serializer.serializedSize((Cell) data, column, 
pkLiveness, header);
             else
-                size += sizeOfComplexColumn((ComplexColumnData) data, 
hasComplexDeletion, pkLiveness, header);
+                size += sizeOfComplexColumn((ComplexColumnData) data, column, 
hasComplexDeletion, pkLiveness, header);
         }
 
         return size;
     }
 
-    private long sizeOfComplexColumn(ComplexColumnData data, boolean 
hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header)
+    private long sizeOfComplexColumn(ComplexColumnData data, ColumnDefinition 
column, boolean hasComplexDeletion, LivenessInfo rowLiveness, 
SerializationHeader header)
     {
         long size = 0;
 
@@ -294,7 +308,7 @@ public class UnfilteredSerializer
 
         size += TypeSizes.sizeofUnsignedVInt(data.cellsCount());
         for (Cell cell : data)
-            size += Cell.serializer.serializedSize(cell, rowLiveness, header);
+            size += Cell.serializer.serializedSize(cell, column, rowLiveness, 
header);
 
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index fe03db4..a213edf 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -131,6 +131,7 @@ public abstract class CQLTester
 
     public static ResultMessage lastSchemaChangeResult;
 
+    private List<String> keyspaces = new ArrayList<>();
     private List<String> tables = new ArrayList<>();
     private List<String> types = new ArrayList<>();
     private List<String> functions = new ArrayList<>();
@@ -262,10 +263,12 @@ public abstract class CQLTester
         usePrepared = USE_PREPARED_VALUES;
         reusePrepared = REUSE_PREPARED;
 
+        final List<String> keyspacesToDrop = copy(keyspaces);
         final List<String> tablesToDrop = copy(tables);
         final List<String> typesToDrop = copy(types);
         final List<String> functionsToDrop = copy(functions);
         final List<String> aggregatesToDrop = copy(aggregates);
+        keyspaces = null;
         tables = null;
         types = null;
         functions = null;
@@ -290,6 +293,9 @@ public abstract class CQLTester
                     for (int i = typesToDrop.size() - 1; i >= 0; i--)
                         schemaChange(String.format("DROP TYPE IF EXISTS 
%s.%s", KEYSPACE, typesToDrop.get(i)));
 
+                    for (int i = keyspacesToDrop.size() - 1; i >= 0; i--)
+                        schemaChange(String.format("DROP KEYSPACE IF EXISTS 
%s", keyspacesToDrop.get(i)));
+
                     // Dropping doesn't delete the sstables. It's not a huge 
deal but it's cleaner to cleanup after us
                     // Thas said, we shouldn't delete blindly before the 
TransactionLogs.SSTableTidier for the table we drop
                     // have run or they will be unhappy. Since those taks are 
scheduled on StorageService.tasks and that's
@@ -501,6 +507,22 @@ public abstract class CQLTester
         schemaChange(fullQuery);
     }
 
+    protected String createKeyspace(String query)
+    {
+        String currentKeyspace = createKeyspaceName();
+        String fullQuery = String.format(query, currentKeyspace);
+        logger.info(fullQuery);
+        schemaChange(fullQuery);
+        return currentKeyspace;
+    }
+
+    protected String createKeyspaceName()
+    {
+        String currentKeyspace = "keyspace_" + seqNumber.getAndIncrement();
+        keyspaces.add(currentKeyspace);
+        return currentKeyspace;
+    }
+
     protected String createTable(String query)
     {
         String currentTable = createTableName();
@@ -519,8 +541,7 @@ public abstract class CQLTester
 
     protected void createTableMayThrow(String query) throws Throwable
     {
-        String currentTable = "table_" + seqNumber.getAndIncrement();
-        tables.add(currentTable);
+        String currentTable = createTableName();
         String fullQuery = formatQuery(query);
         logger.info(fullQuery);
         QueryProcessor.executeOnceInternal(fullQuery);
@@ -825,6 +846,16 @@ public abstract class CQLTester
      */
     public static void assertRowsIgnoringOrder(UntypedResultSet result, 
Object[]... rows)
     {
+        assertRowsIgnoringOrderInternal(result, false, rows);
+    }
+
+    public static void assertRowsIgnoringOrderAndExtra(UntypedResultSet 
result, Object[]... rows)
+    {
+        assertRowsIgnoringOrderInternal(result, true, rows);
+    }
+
+    private static void assertRowsIgnoringOrderInternal(UntypedResultSet 
result, boolean ignoreExtra, Object[]... rows)
+    {
         if (result == null)
         {
             if (rows.length > 0)
@@ -855,7 +886,7 @@ public abstract class CQLTester
 
         com.google.common.collect.Sets.SetView<List<ByteBuffer>> extra = 
com.google.common.collect.Sets.difference(actualRows, expectedRows);
         com.google.common.collect.Sets.SetView<List<ByteBuffer>> missing = 
com.google.common.collect.Sets.difference(expectedRows, actualRows);
-        if (!extra.isEmpty() || !missing.isEmpty())
+        if ((!ignoreExtra && !extra.isEmpty()) || !missing.isEmpty())
         {
             List<String> extraRows = makeRowStrings(extra, meta);
             List<String> missingRows = makeRowStrings(missing, meta);
@@ -876,7 +907,7 @@ public abstract class CQLTester
                 Assert.fail("Missing " + missing.size() + " row(s) in result: 
\n    " + missingRows.stream().collect(Collectors.joining("\n    ")));
         }
 
-        assert expectedRows.size() == actualRows.size();
+        assert ignoreExtra || expectedRows.size() == actualRows.size();
     }
 
     private static List<String> makeRowStrings(Iterable<List<ByteBuffer>> 
rows, List<ColumnSpecification> meta)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
index 9f8bea2..509aeac 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
@@ -24,6 +24,8 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -130,37 +132,33 @@ public class AlterTest extends CQLTester
         assertInvalidThrow(SyntaxException.class, "CREATE KEYSPACE ks1");
         assertInvalidThrow(ConfigurationException.class, "CREATE KEYSPACE ks1 
WITH replication= { 'replication_factor' : 1 }");
 
-        execute("CREATE KEYSPACE ks1 WITH replication={ 'class' : 
'SimpleStrategy', 'replication_factor' : 1 }");
-        execute("CREATE KEYSPACE ks2 WITH replication={ 'class' : 
'SimpleStrategy', 'replication_factor' : 1 } AND durable_writes=false");
+        String ks1 = createKeyspace("CREATE KEYSPACE %s WITH replication={ 
'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+        String ks2 = createKeyspace("CREATE KEYSPACE %s WITH replication={ 
'class' : 'SimpleStrategy', 'replication_factor' : 1 } AND 
durable_writes=false");
 
-        assertRows(execute("SELECT keyspace_name, durable_writes FROM 
system_schema.keyspaces"),
-                   row("ks1", true),
+        assertRowsIgnoringOrderAndExtra(execute("SELECT keyspace_name, 
durable_writes FROM system_schema.keyspaces"),
                    row(KEYSPACE, true),
                    row(KEYSPACE_PER_TEST, true),
-                   row("ks2", false));
+                   row(ks1, true),
+                   row(ks2, false));
 
-        execute("ALTER KEYSPACE ks1 WITH replication = { 'class' : 
'NetworkTopologyStrategy', 'dc1' : 1 } AND durable_writes=False");
-        execute("ALTER KEYSPACE ks2 WITH durable_writes=true");
+        schemaChange("ALTER KEYSPACE " + ks1 + " WITH replication = { 'class' 
: 'NetworkTopologyStrategy', 'dc1' : 1 } AND durable_writes=False");
+        schemaChange("ALTER KEYSPACE " + ks2 + " WITH durable_writes=true");
 
-        assertRows(execute("SELECT keyspace_name, durable_writes, replication 
FROM system_schema.keyspaces"),
-                   row("ks1", false, map("class", 
"org.apache.cassandra.locator.NetworkTopologyStrategy", "dc1", "1")),
+        assertRowsIgnoringOrderAndExtra(execute("SELECT keyspace_name, 
durable_writes, replication FROM system_schema.keyspaces"),
                    row(KEYSPACE, true, map("class", 
"org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1")),
                    row(KEYSPACE_PER_TEST, true, map("class", 
"org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1")),
-                   row("ks2", true, map("class", 
"org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1")));
+                   row(ks1, false, map("class", 
"org.apache.cassandra.locator.NetworkTopologyStrategy", "dc1", "1")),
+                   row(ks2, true, map("class", 
"org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1")));
 
-        execute("USE ks1");
+        execute("USE " + ks1);
 
         assertInvalidThrow(ConfigurationException.class, "CREATE TABLE cf1 (a 
int PRIMARY KEY, b int) WITH compaction = { 'min_threshold' : 4 }");
 
         execute("CREATE TABLE cf1 (a int PRIMARY KEY, b int) WITH compaction = 
{ 'class' : 'SizeTieredCompactionStrategy', 'min_threshold' : 7 }");
-        assertRows(execute("SELECT table_name, compaction FROM 
system_schema.tables WHERE keyspace_name='ks1'"),
+        assertRows(execute("SELECT table_name, compaction FROM 
system_schema.tables WHERE keyspace_name='" + ks1 + "'"),
                    row("cf1", map("class", 
"org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy",
                                   "min_threshold", "7",
                                   "max_threshold", "32")));
-
-        // clean-up
-        execute("DROP KEYSPACE ks1");
-        execute("DROP KEYSPACE ks2");
     }
 
     /**
@@ -324,4 +322,20 @@ public class AlterTest extends CQLTester
         createTable("CREATE TABLE %s (key blob, column1 blob, value blob, 
PRIMARY KEY ((key), column1)) WITH COMPACT STORAGE");
         assertInvalidThrow(InvalidRequestException.class, "ALTER TABLE %s 
ALTER column1 TYPE ascii");
     }
+
+    @Test
+    public void testAlterToBlob() throws Throwable
+    {
+        // This tests for the bug from #11820 in particular
+
+        createTable("CREATE TABLE %s (a int PRIMARY KEY, b int)");
+
+        execute("INSERT INTO %s (a, b) VALUES (1, 1)");
+
+        executeNet(Server.CURRENT_VERSION, "ALTER TABLE %s ALTER b TYPE BLOB");
+
+        assertRowsNet(Server.CURRENT_VERSION, 
executeNet(Server.CURRENT_VERSION, "SELECT * FROM %s WHERE a = 1"),
+            row(1, ByteBufferUtil.bytes(1))
+        );
+    }
 }

Reply via email to