Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/184#discussion_r160281918
  
    --- Diff: 
src/java/org/apache/cassandra/db/LegacySystemKeyspaceMigrator.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.db;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.schema.SchemaConstants;
    +import org.apache.cassandra.cql3.QueryProcessor;
    +import org.apache.cassandra.cql3.UntypedResultSet;
    +import org.apache.cassandra.db.marshal.BytesType;
    +import org.apache.cassandra.db.marshal.Int32Type;
    +import org.apache.cassandra.db.marshal.UTF8Type;
    +import org.apache.cassandra.db.marshal.UUIDType;
    +
    +/**
    + * Migrate 3.0 versions of some tables to 4.0. In this case it's just 
extra columns and some keys
    + * that are changed.
    + *
    + * Can't just add the additional columns because they are primary key 
columns and C* doesn't support changing
    + * key columns even if it's just clustering columns.
    + */
    +public class LegacySystemKeyspaceMigrator
    +{
    +    static final String legacyPeersName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEERS);
    +    static final String peersName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEERS_V2);
    +    static final String legacyPeerEventsName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEER_EVENTS);
    +    static final String peerEventsName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2);
    +    static final String legacyTransferredRangesName = 
String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, 
SystemKeyspace.LEGACY_TRANSFERRED_RANGES);
    +    static final String transferredRangesName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2);
    +
    +    private static final Logger logger = 
LoggerFactory.getLogger(LegacySystemKeyspaceMigrator.class);
    +
    +    private LegacySystemKeyspaceMigrator() {}
    +
    +    public static void migrate()
    +    {
    +        migratePeers();
    +        migratePeerEvents();
    +        migrateLegacyTransferredRanges();
    +    }
    +
    +    private static void migratePeers()
    +    {
    +        ColumnFamilyStore newPeers = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEERS_V2);
    +
    +        if (!newPeers.isEmpty())
    +             return;
    +
    +        logger.info("{} table was empty, migrating legacy {}, if this 
fails you should fix the issue and then truncate {} to have it try again.",
    +                                  peersName, legacyPeersName, peersName);
    +
    +        String query = String.format("SELECT * FROM %s",
    +                                     legacyPeersName);
    +
    +        String insert = String.format("INSERT INTO %s ( "
    +                                      + "peer, "
    +                                      + "peer_port, "
    +                                      + "data_center, "
    +                                      + "host_id, "
    +                                      + "preferred_ip, "
    +                                      + "preferred_port, "
    +                                      + "rack, "
    +                                      + "release_version, "
    +                                      + "native_address, "
    +                                      + "native_port, "
    +                                      + "schema_version, "
    +                                      + "tokens) "
    +                                      + " values ( ?, ?, ? , ? , ?, ?, ?, 
?, ?, ?, ?, ?)",
    +                                      peersName);
    +
    +        UntypedResultSet rows = 
QueryProcessor.executeInternalWithPaging(query, 1000);
    +        int transferred = 0;
    +        for (UntypedResultSet.Row row : rows)
    +        {
    +            logger.debug("Transferring row {}", transferred);
    +            QueryProcessor.executeInternal(insert,
    +                                           row.has("peer") ? 
row.getInetAddress("peer") : null,
    +                                           
DatabaseDescriptor.getStoragePort(),
    +                                           row.has("data_center") ? 
row.getString("data_center") : null,
    +                                           row.has("host_id") ? 
row.getUUID("host_id") : null,
    +                                           row.has("preferred_ip") ? 
row.getInetAddress("preferred_ip") : null,
    +                                           
DatabaseDescriptor.getStoragePort(),
    +                                           row.has("rack") ? 
row.getString("rack") : null,
    +                                           row.has("release_version") ? 
row.getString("release_version") : null,
    +                                           row.has("rpc_address") ? 
row.getInetAddress("rpc_address") : null,
    +                                           
DatabaseDescriptor.getNativeTransportPort(),
    +                                           row.has("schema_version") ? 
row.getUUID("schema_version") : null,
    +                                           row.has("tokens") ? 
row.getSet("tokens", UTF8Type.instance) : null);
    +            transferred++;
    +        }
    +        logger.info("Migrated {} rows from legacy {} to {}", transferred, 
legacyPeersName, peersName);
    +    }
    +
    +    private static void migratePeerEvents()
    +    {
    +        ColumnFamilyStore newPeerEvents = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEER_EVENTS_V2);
    +
    +        if (!newPeerEvents.isEmpty())
    +            return;
    +
    +        logger.info("{} table was empty, migrating legacy {}", 
peerEventsName, legacyPeerEventsName);
    +
    +        String query = String.format("SELECT * FROM %s",
    +                                     legacyPeerEventsName);
    +
    +        String insert = String.format("INSERT INTO %s ( "
    +                                      + "peer, "
    +                                      + "peer_port, "
    +                                      + "hints_dropped) "
    +                                      + " values ( ?, ?, ? )",
    +                                      peerEventsName);
    +
    +        UntypedResultSet rows = 
QueryProcessor.executeInternalWithPaging(query, 1000);
    +        int transferred = 0;
    +        for (UntypedResultSet.Row row : rows)
    +        {
    +            logger.debug("Transferring row {}", transferred);
    +            QueryProcessor.executeInternal(insert,
    +                                           row.has("peer") ? 
row.getInetAddress("peer") : null,
    +                                           
DatabaseDescriptor.getStoragePort(),
    +                                           row.has("hints_dropped") ? 
row.getMap("hints_dropped", UUIDType.instance, Int32Type.instance) : null);
    +            transferred++;
    +        }
    +        logger.info("Migrated {} rows from legacy {} to {}", transferred, 
legacyPeerEventsName, peerEventsName);
    +    }
    +
    +    static void migrateLegacyTransferredRanges()
    --- End diff --
    
    pretty naming nit: remove 'legacy' from method name as you didn't do that 
on the other methods.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to