Author: slebresne Date: Mon Jul 4 14:36:11 2011 New Revision: 1142690 URL: http://svn.apache.org/viewvc?rev=1142690&view=rev Log: Reset CF and SC deletion time after gc_grace patch by slebresne; reviewed by jbellis for CASSANDRA-2317
Added: cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Mon Jul 4 14:36:11 2011 @@ -10,6 +10,7 @@ * clean up tmp files after failed compaction (CASSANDRA-2468) * restrict repair streaming to specific columnfamilies (CASSANDRA-2280) * don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589) + * reset CF and SC deletion times after gc_grace (CASSANDRA-2317) 0.8.2 Added: cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java?rev=1142690&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java Mon Jul 4 14:36:11 2011 @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.ICompactSerializer2; +import org.apache.cassandra.io.util.IIterableColumns; +import org.apache.cassandra.utils.FBUtilities; + +public abstract class AbstractColumnContainer implements IColumnContainer, IIterableColumns +{ + private static Logger logger = LoggerFactory.getLogger(AbstractColumnContainer.class); + + protected final ConcurrentSkipListMap<ByteBuffer, IColumn> columns; + protected final AtomicReference<DeletionInfo> deletionInfo = new AtomicReference<DeletionInfo>(new DeletionInfo()); + + protected AbstractColumnContainer(ConcurrentSkipListMap<ByteBuffer, IColumn> columns) + { + this.columns = columns; + } + + @Deprecated // TODO this is a hack to set initial value outside constructor + public void delete(int localtime, long timestamp) + { + deletionInfo.set(new DeletionInfo(timestamp, localtime)); + } + + public void delete(AbstractColumnContainer cc2) + { + // Keeping deletion info for max markedForDeleteAt value + DeletionInfo current; + DeletionInfo cc2Info = cc2.deletionInfo.get(); + while (true) + { + current = deletionInfo.get(); + if (current.markedForDeleteAt >= cc2Info.markedForDeleteAt || deletionInfo.compareAndSet(current, cc2Info)) + break; + } + } + + public boolean isMarkedForDelete() + { + return getMarkedForDeleteAt() > Long.MIN_VALUE; + } + + public long getMarkedForDeleteAt() + { + return deletionInfo.get().markedForDeleteAt; + } + + public int getLocalDeletionTime() + { + return deletionInfo.get().localDeletionTime; + } + + public AbstractType getComparator() + { + return (AbstractType)columns.comparator(); + } + + public void maybeResetDeletionTimes(int gcBefore) + { + while (true) + { + DeletionInfo current = deletionInfo.get(); + // Stop if either we don't need to change the deletion info (it's + // still MIN_VALUE or not expired yet) or we've succesfully changed it + if (current.localDeletionTime == Integer.MIN_VALUE + || current.localDeletionTime > gcBefore + || deletionInfo.compareAndSet(current, new DeletionInfo())) + break; + } + } + + /** + * We need to go through each column in the column container and resolve it before adding + */ + public void addAll(AbstractColumnContainer cc) + { + for (IColumn column : cc.getSortedColumns()) + addColumn(column); + delete(cc); + } + + /* + * If we find an old column that has the same name + * the ask it to resolve itself else add the new column + */ + public void addColumn(IColumn column) + { + ByteBuffer name = column.name(); + IColumn oldColumn; + while ((oldColumn = columns.putIfAbsent(name, column)) != null) + { + if (oldColumn instanceof SuperColumn) + { + assert column instanceof SuperColumn; + ((SuperColumn) oldColumn).putColumn((SuperColumn)column); + break; // Delegated to SuperColumn + } + else + { + // calculate reconciled col from old (existing) col and new col + IColumn reconciledColumn = column.reconcile(oldColumn); + if (columns.replace(name, oldColumn, reconciledColumn)) + break; + + // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. + // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) + } + } + } + + abstract protected void putColumn(SuperColumn sc); + + public IColumn getColumn(ByteBuffer name) + { + return columns.get(name); + } + + public SortedSet<ByteBuffer> getColumnNames() + { + return columns.keySet(); + } + + public Collection<IColumn> getSortedColumns() + { + return columns.values(); + } + + public Collection<IColumn> getReverseSortedColumns() + { + return columns.descendingMap().values(); + } + + public Map<ByteBuffer, IColumn> getColumnsMap() + { + return columns; + } + + public void remove(ByteBuffer columnName) + { + columns.remove(columnName); + } + + public int getColumnCount() + { + return columns.size(); + } + + public boolean isEmpty() + { + return columns.isEmpty(); + } + + public int getEstimatedColumnCount() + { + return getColumnCount(); + } + + public Iterator<IColumn> iterator() + { + return columns.values().iterator(); + } + + private static class DeletionInfo + { + public final long markedForDeleteAt; + public final int localDeletionTime; + + public DeletionInfo() + { + this(Long.MIN_VALUE, Integer.MIN_VALUE); + } + + public DeletionInfo(long markedForDeleteAt, int localDeletionTime) + { + this.markedForDeleteAt = markedForDeleteAt; + this.localDeletionTime = localDeletionTime; + } + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Mon Jul 4 14:36:11 2011 @@ -22,16 +22,9 @@ import static org.apache.cassandra.db.DB import java.nio.ByteBuffer; import java.security.MessageDigest; -import java.util.Collection; -import java.util.Iterator; import java.util.Map; import java.util.SortedSet; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; @@ -42,10 +35,8 @@ import org.apache.cassandra.io.IColumnSe import org.apache.cassandra.io.util.IIterableColumns; import org.apache.cassandra.utils.FBUtilities; -public class ColumnFamily implements IColumnContainer, IIterableColumns +public class ColumnFamily extends AbstractColumnContainer { - private static Logger logger = LoggerFactory.getLogger(ColumnFamily.class); - /* The column serializer for this Column Family. Create based on config. */ private static ColumnFamilySerializer serializer = new ColumnFamilySerializer(); private final CFMetaData cfm; @@ -71,23 +62,25 @@ public class ColumnFamily implements ICo } private transient IColumnSerializer columnSerializer; - final AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE); - final AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE); - private ConcurrentSkipListMap<ByteBuffer, IColumn> columns; public ColumnFamily(CFMetaData cfm) { + this(cfm, new ConcurrentSkipListMap<ByteBuffer, IColumn>(cfm.comparator)); + } + + private ColumnFamily(CFMetaData cfm, ConcurrentSkipListMap<ByteBuffer, IColumn> map) + { + super(map); assert cfm != null; this.cfm = cfm; columnSerializer = cfm.cfType == ColumnFamilyType.Standard ? Column.serializer() : SuperColumn.serializer(cfm.subcolumnComparator); - columns = new ConcurrentSkipListMap<ByteBuffer, IColumn>(cfm.comparator); } public ColumnFamily cloneMeShallow() { ColumnFamily cf = new ColumnFamily(cfm); - cf.markedForDeleteAt.set(markedForDeleteAt.get()); - cf.localDeletionTime.set(localDeletionTime.get()); + // since deletion info is immutable, aliasing it is fine + cf.deletionInfo.set(deletionInfo.get()); return cf; } @@ -103,8 +96,9 @@ public class ColumnFamily implements ICo public ColumnFamily cloneMe() { - ColumnFamily cf = cloneMeShallow(); - cf.columns = columns.clone(); + ColumnFamily cf = new ColumnFamily(cfm, columns.clone()); + // since deletion info is immutable, aliasing it is fine + cf.deletionInfo.set(deletionInfo.get()); return cf; } @@ -121,32 +115,11 @@ public class ColumnFamily implements ICo return cfm; } - /* - * We need to go through each column - * in the column family and resolve it before adding - */ - public void addAll(ColumnFamily cf) - { - for (IColumn column : cf.getSortedColumns()) - addColumn(column); - delete(cf); - } - public IColumnSerializer getColumnSerializer() { return columnSerializer; } - public int getColumnCount() - { - return columns.size(); - } - - public boolean isEmpty() - { - return columns.isEmpty(); - } - public boolean isSuper() { return getType() == ColumnFamilyType.Super; @@ -214,82 +187,6 @@ public class ColumnFamily implements ICo } /* - * If we find an old column that has the same name - * the ask it to resolve itself else add the new column . - */ - public void addColumn(IColumn column) - { - ByteBuffer name = column.name(); - IColumn oldColumn; - while ((oldColumn = columns.putIfAbsent(name, column)) != null) - { - if (oldColumn instanceof SuperColumn) - { - ((SuperColumn) oldColumn).putColumn(column); - break; // Delegated to SuperColumn - } - else - { - // calculate reconciled col from old (existing) col and new col - IColumn reconciledColumn = column.reconcile(oldColumn); - if (columns.replace(name, oldColumn, reconciledColumn)) - break; - - // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. - // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) - } - } - } - - public IColumn getColumn(ByteBuffer name) - { - return columns.get(name); - } - - public SortedSet<ByteBuffer> getColumnNames() - { - return columns.keySet(); - } - - public Collection<IColumn> getSortedColumns() - { - return columns.values(); - } - - public Collection<IColumn> getReverseSortedColumns() - { - return columns.descendingMap().values(); - } - - public Map<ByteBuffer, IColumn> getColumnsMap() - { - return columns; - } - - public void remove(ByteBuffer columnName) - { - columns.remove(columnName); - } - - @Deprecated // TODO this is a hack to set initial value outside constructor - public void delete(int localtime, long timestamp) - { - localDeletionTime.set(localtime); - markedForDeleteAt.set(timestamp); - } - - public void delete(ColumnFamily cf2) - { - FBUtilities.atomicSetMax(localDeletionTime, cf2.getLocalDeletionTime()); // do this first so we won't have a column that's "deleted" but has no local deletion time - FBUtilities.atomicSetMax(markedForDeleteAt, cf2.getMarkedForDeleteAt()); - } - - public boolean isMarkedForDelete() - { - return markedForDeleteAt.get() > Long.MIN_VALUE; - } - - /* * This function will calculate the difference between 2 column families. * The external input is assumed to be a superset of internal. */ @@ -330,11 +227,6 @@ public class ColumnFamily implements ICo return null; } - public AbstractType getComparator() - { - return (AbstractType)columns.comparator(); - } - int size() { int size = 0; @@ -382,16 +274,6 @@ public class ColumnFamily implements ICo column.updateDigest(digest); } - public long getMarkedForDeleteAt() - { - return markedForDeleteAt.get(); - } - - public int getLocalDeletionTime() - { - return localDeletionTime.get(); - } - public static AbstractType getComparatorFor(String table, String columnFamilyName, ByteBuffer superColumnName) { return superColumnName == null @@ -414,16 +296,6 @@ public class ColumnFamily implements ICo addAll(cf); } - public int getEstimatedColumnCount() - { - return getColumnCount(); - } - - public Iterator<IColumn> iterator() - { - return columns.values().iterator(); - } - public long serializedSize() { int size = boolSize_ // bool @@ -449,4 +321,9 @@ public class ColumnFamily implements ICo column.validateFields(metadata); } } + + protected void putColumn(SuperColumn sc) + { + throw new UnsupportedOperationException("Unsupported operation for a column family"); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Mon Jul 4 14:36:11 2011 @@ -97,8 +97,8 @@ public class ColumnFamilySerializer impl public void serializeCFInfo(ColumnFamily columnFamily, DataOutput dos) throws IOException { - dos.writeInt(columnFamily.localDeletionTime.get()); - dos.writeLong(columnFamily.markedForDeleteAt.get()); + dos.writeInt(columnFamily.getLocalDeletionTime()); + dos.writeLong(columnFamily.getMarkedForDeleteAt()); } public int serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jul 4 14:36:11 2011 @@ -775,6 +775,10 @@ public class ColumnFamilyStore implement // (we want this to be deterministic to avoid confusion.) if (cf.getColumnCount() == 0 && cf.getLocalDeletionTime() <= gcBefore) return null; + + // If there is non deleted columns, we still need to reset the column family + // deletion times since gc_grace seconds had elapsed + cf.maybeResetDeletionTimes(gcBefore); return cf; } @@ -844,6 +848,12 @@ public class ColumnFamilyStore implement { cf.remove(c.name()); } + else + { + // If there is non deleted columns, we still need to reset the column family + // deletion times since gc_grace seconds had elapsed + c.maybeResetDeletionTimes(gcBefore); + } } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java Mon Jul 4 14:36:11 2011 @@ -32,6 +32,7 @@ public interface IColumnContainer public boolean isMarkedForDelete(); public long getMarkedForDeleteAt(); + public int getLocalDeletionTime(); public AbstractType getComparator(); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Jul 4 14:36:11 2011 @@ -190,7 +190,7 @@ public class RowMutation implements IMut else if (path.columnName == null) { SuperColumn sc = new SuperColumn(path.superColumnName, columnFamily.getSubComparator()); - sc.markForDeleteAt(localDeleteTime, timestamp); + sc.delete(localDeleteTime, timestamp); columnFamily.addColumn(sc); } else Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Mon Jul 4 14:36:11 2011 @@ -28,8 +28,6 @@ import java.util.Comparator; import java.util.IdentityHashMap; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.marshal.AbstractType; @@ -42,7 +40,7 @@ import org.apache.cassandra.utils.FBUtil import org.cliffc.high_scale_lib.NonBlockingHashMap; -public class SuperColumn implements IColumn, IColumnContainer +public class SuperColumn extends AbstractColumnContainer implements IColumn { private static NonBlockingHashMap<Comparator, SuperColumnSerializer> serializers = new NonBlockingHashMap<Comparator, SuperColumnSerializer>(); public static SuperColumnSerializer serializer(AbstractType comparator) @@ -56,10 +54,7 @@ public class SuperColumn implements ICol return serializer; } - private ByteBuffer name_; - private ConcurrentSkipListMap<ByteBuffer, IColumn> columns_; - private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE); - private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE); + private ByteBuffer name; public SuperColumn(ByteBuffer name, AbstractType comparator) { @@ -68,54 +63,41 @@ public class SuperColumn implements ICol SuperColumn(ByteBuffer name, ConcurrentSkipListMap<ByteBuffer, IColumn> columns) { + super(columns); assert name != null; assert name.remaining() <= IColumn.MAX_NAME_LENGTH; - name_ = name; - columns_ = columns; - } - - public AbstractType getComparator() - { - return (AbstractType)columns_.comparator(); + this.name = name; } public SuperColumn cloneMeShallow() { - SuperColumn sc = new SuperColumn(name_, getComparator()); - sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get()); + SuperColumn sc = new SuperColumn(name, getComparator()); + // since deletion info is immutable, aliasing it is fine + sc.deletionInfo.set(deletionInfo.get()); return sc; } public IColumn cloneMe() { - SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns_)); - sc.markForDeleteAt(localDeletionTime.get(), markedForDeleteAt.get()); + SuperColumn sc = new SuperColumn(name, new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns)); + // since deletion info is immutable, aliasing it is fine + sc.deletionInfo.set(deletionInfo.get()); return sc; } - public boolean isMarkedForDelete() - { - return markedForDeleteAt.get() > Long.MIN_VALUE; - } - public ByteBuffer name() { - return name_; + return name; } public Collection<IColumn> getSubColumns() { - return columns_.values(); - } - - public Collection<IColumn> getSortedColumns() - { - return getSubColumns(); + return getSortedColumns(); } public IColumn getSubColumn(ByteBuffer columnName) { - IColumn column = columns_.get(columnName); + IColumn column = columns.get(columnName); assert column == null || column instanceof Column; return column; } @@ -143,12 +125,7 @@ public class SuperColumn implements ICol * We need to keep the way we are calculating the column size in sync with the * way we are calculating the size for the column family serializer. */ - return DBConstants.shortSize_ + name_.remaining() + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + size(); - } - - public void remove(ByteBuffer columnName) - { - columns_.remove(columnName); + return DBConstants.shortSize_ + name.remaining() + DBConstants.intSize_ + DBConstants.longSize_ + DBConstants.intSize_ + size(); } public long timestamp() @@ -159,7 +136,7 @@ public class SuperColumn implements ICol public long mostRecentLiveChangeAt() { long max = Long.MIN_VALUE; - for (IColumn column : columns_.values()) + for (IColumn column : getSubColumns()) { if (!column.isMarkedForDelete() && column.timestamp() > max) { @@ -174,42 +151,24 @@ public class SuperColumn implements ICol throw new UnsupportedOperationException("This operation is not supported for Super Columns."); } + @Override public void addColumn(IColumn column) { assert column instanceof Column : "A super column can only contain simple columns"; - - ByteBuffer name = column.name(); - IColumn oldColumn; - while ((oldColumn = columns_.putIfAbsent(name, column)) != null) - { - IColumn reconciledColumn = column.reconcile(oldColumn); - if (columns_.replace(name, oldColumn, reconciledColumn)) - break; - - // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. - // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) - } + super.addColumn((Column)column); } /* * Go through each sub column if it exists then as it to resolve itself * if the column does not exist then create it. */ - public void putColumn(IColumn column) + protected void putColumn(SuperColumn column) { - assert column instanceof SuperColumn; - for (IColumn subColumn : column.getSubColumns()) { addColumn(subColumn); } - FBUtilities.atomicSetMax(localDeletionTime, column.getLocalDeletionTime()); // do this first so we won't have a column that's "deleted" but has no local deletion time - FBUtilities.atomicSetMax(markedForDeleteAt, column.getMarkedForDeleteAt()); - } - - public long getMarkedForDeleteAt() - { - return markedForDeleteAt.get(); + delete(column); } public IColumn diff(IColumn columnNew) @@ -217,7 +176,7 @@ public class SuperColumn implements ICol IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator()); if (columnNew.getMarkedForDeleteAt() > getMarkedForDeleteAt()) { - ((SuperColumn)columnDiff).markForDeleteAt(columnNew.getLocalDeletionTime(), columnNew.getMarkedForDeleteAt()); + ((SuperColumn)columnDiff).delete(columnNew.getLocalDeletionTime(), columnNew.getMarkedForDeleteAt()); } // (don't need to worry about columnNew containing subColumns that are shadowed by @@ -225,7 +184,7 @@ public class SuperColumn implements ICol // takes care of those for us.) for (IColumn subColumn : columnNew.getSubColumns()) { - IColumn columnInternal = columns_.get(subColumn.name()); + IColumn columnInternal = columns.get(subColumn.name()); if(columnInternal == null ) { columnDiff.addColumn(subColumn); @@ -248,19 +207,19 @@ public class SuperColumn implements ICol public void updateDigest(MessageDigest digest) { - assert name_ != null; - digest.update(name_.duplicate()); + assert name != null; + digest.update(name.duplicate()); DataOutputBuffer buffer = new DataOutputBuffer(); try { - buffer.writeLong(markedForDeleteAt.get()); + buffer.writeLong(getMarkedForDeleteAt()); } catch (IOException e) { throw new RuntimeException(e); } digest.update(buffer.getData(), 0, buffer.getLength()); - for (IColumn column : columns_.values()) + for (IColumn column : getSubColumns()) { column.updateDigest(digest); } @@ -270,14 +229,14 @@ public class SuperColumn implements ICol { StringBuilder sb = new StringBuilder(); sb.append("SuperColumn("); - sb.append(comparator.getString(name_)); + sb.append(comparator.getString(name)); if (isMarkedForDelete()) { sb.append(" -delete at ").append(getMarkedForDeleteAt()).append("-"); } sb.append(" ["); - sb.append(getComparator().getColumnsString(columns_.values())); + sb.append(getComparator().getColumnsString(getSubColumns())); sb.append("])"); return sb.toString(); @@ -285,26 +244,14 @@ public class SuperColumn implements ICol public boolean isLive() { - return mostRecentLiveChangeAt() > markedForDeleteAt.get(); - } - - public int getLocalDeletionTime() - { - return localDeletionTime.get(); - } - - @Deprecated // TODO this is a hack to set initial value outside constructor - public void markForDeleteAt(int localDeleteTime, long timestamp) - { - this.localDeletionTime.set(localDeleteTime); - this.markedForDeleteAt.set(timestamp); + return mostRecentLiveChangeAt() > getMarkedForDeleteAt(); } public IColumn shallowCopy() { - SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), this.getComparator()); - sc.localDeletionTime = localDeletionTime; - sc.markedForDeleteAt = markedForDeleteAt; + SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name()), this.getComparator()); + // since deletion info is immutable, aliasing it is fine + sc.deletionInfo.set(deletionInfo.get()); return sc; } @@ -312,11 +259,9 @@ public class SuperColumn implements ICol { // we don't try to intern supercolumn names, because if we're using Cassandra correctly it's almost // certainly just going to pollute our interning map with unique, dynamic values - SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), this.getComparator()); - sc.localDeletionTime = localDeletionTime; - sc.markedForDeleteAt = markedForDeleteAt; - - for(Map.Entry<ByteBuffer, IColumn> c : columns_.entrySet()) + SuperColumn sc = (SuperColumn)shallowCopy(); + + for(Map.Entry<ByteBuffer, IColumn> c : columns.entrySet()) { sc.addColumn(c.getValue().localCopy(cfs)); } @@ -414,7 +359,7 @@ class SuperColumnSerializer implements I { throw new IOException("Invalid localDeleteTime read: " + localDeleteTime); } - superColumn.markForDeleteAt(localDeleteTime, markedForDeleteAt); + superColumn.delete(localDeleteTime, markedForDeleteAt); return superColumn; } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Mon Jul 4 14:36:11 2011 @@ -129,10 +129,10 @@ public class QueryFilter // time of the cf, if that is greater. long deletedAt = c.getMarkedForDeleteAt(); if (returnCF.getMarkedForDeleteAt() > deletedAt) - ((SuperColumn)c).markForDeleteAt(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt()); + ((SuperColumn)c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt()); c = filter.filterSuperColumn((SuperColumn)c, gcBefore); - ((SuperColumn)c).markForDeleteAt(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be + ((SuperColumn)c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be } curCF.clear(); Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Mon Jul 4 14:36:11 2011 @@ -54,7 +54,7 @@ public class RowTest extends SchemaLoade sc1.addColumn(column("subcolumn", "A", 0)); SuperColumn sc2 = new SuperColumn(ByteBufferUtil.bytes("one"), AsciiType.instance); - sc2.markForDeleteAt(0, 0); + sc2.delete(0, 0); SuperColumn scDiff = (SuperColumn)sc1.diff(sc2); assertEquals(scDiff.getSubColumns().size(), 0); Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java Mon Jul 4 14:36:11 2011 @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.concurrent.ExecutionException; @@ -31,6 +32,7 @@ import org.apache.cassandra.db.Decorated import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SuperColumn; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.io.sstable.SSTableReader; @@ -184,7 +186,7 @@ public class CompactionsPurgeTest extend } @Test - public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException, InterruptedException + public void testCompactionPurgeCachedRow() throws IOException, ExecutionException, InterruptedException { CompactionManager.instance.disableAutoCompaction(); @@ -230,4 +232,96 @@ public class CompactionsPurgeTest extend for (IColumn c : cf) assert !c.isMarkedForDelete(); } + + @Test + public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + + String tableName = "Keyspace1"; + String cfName = "Standard1"; + Table table = Table.open(tableName); + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + + DecoratedKey key = Util.dk("key3"); + RowMutation rm; + + // inserts + rm = new RowMutation(tableName, key.key); + for (int i = 0; i < 10; i++) + { + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i); + } + rm.apply(); + + // deletes row with timestamp such that not all columns are deleted + rm = new RowMutation(tableName, key.key); + rm.delete(new QueryPath(cfName, null, null), 4); + rm.apply(); + + // flush and major compact (with tombstone purging) + cfs.forceBlockingFlush(); + Util.compactAll(cfs).get(); + + // re-inserts with timestamp lower than delete + rm = new RowMutation(tableName, key.key); + for (int i = 0; i < 5; i++) + { + rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i); + } + rm.apply(); + + // Check that the second insert did went in + ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName))); + assertEquals(10, cf.getColumnCount()); + for (IColumn c : cf) + assert !c.isMarkedForDelete(); + } + + @Test + public void testCompactionPurgeTombstonedSuperColumn() throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + + String tableName = "Keyspace1"; + String cfName = "Super5"; + Table table = Table.open(tableName); + ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + + DecoratedKey key = Util.dk("key5"); + RowMutation rm; + + ByteBuffer scName = ByteBufferUtil.bytes("sc"); + + // inserts + rm = new RowMutation(tableName, key.key); + for (int i = 0; i < 10; i++) + { + rm.add(new QueryPath(cfName, scName, ByteBuffer.wrap(String.valueOf(i).getBytes())), ByteBufferUtil.EMPTY_BYTE_BUFFER, i); + } + rm.apply(); + + // deletes supercolumn with timestamp such that not all columns go + rm = new RowMutation(tableName, key.key); + rm.delete(new QueryPath(cfName, scName, null), 4); + rm.apply(); + + // flush and major compact + cfs.forceBlockingFlush(); + Util.compactAll(cfs).get(); + + // re-inserts with timestamp lower than delete + rm = new RowMutation(tableName, key.key); + for (int i = 0; i < 5; i++) + { + rm.add(new QueryPath(cfName, scName, ByteBuffer.wrap(String.valueOf(i).getBytes())), ByteBufferUtil.EMPTY_BYTE_BUFFER, i); + } + rm.apply(); + + // Check that the second insert did went in + ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName))); + SuperColumn sc = (SuperColumn)cf.getColumn(scName); + assert sc != null; + assertEquals(10, sc.getColumnCount()); + } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java?rev=1142690&r1=1142689&r2=1142690&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java Mon Jul 4 14:36:11 2011 @@ -131,7 +131,7 @@ public class RowResolverTest extends Sch // subcolumn is newer than a tombstone on its parent, but not newer than the row deletion ColumnFamily scf1 = ColumnFamily.create("Keyspace1", "Super1"); SuperColumn sc = superColumn(scf1, "super-foo", column("one", "A", 1)); - sc.markForDeleteAt((int) (System.currentTimeMillis() / 1000), 0); + sc.delete((int) (System.currentTimeMillis() / 1000), 0); scf1.addColumn(sc); ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");