Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/255#discussion_r213555484
  
    --- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java ---
    @@ -0,0 +1,263 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.tools.fqltool;
    +
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.Collectors;
    +
    +import com.google.common.primitives.Longs;
    +import com.google.common.util.concurrent.FluentFuture;
    +import com.google.common.util.concurrent.Futures;
    +import com.google.common.util.concurrent.ListenableFuture;
    +import com.google.common.util.concurrent.MoreExecutors;
    +
    +import com.datastax.driver.core.BatchStatement;
    +import com.datastax.driver.core.ConsistencyLevel;
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.SimpleStatement;
    +import org.apache.cassandra.audit.FullQueryLogger;
    +import org.apache.cassandra.cql3.QueryOptions;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.binlog.BinLog;
    +
    +public abstract class FQLQuery implements Comparable<FQLQuery>
    +{
    +    public final long queryTime;
    +    public final QueryOptions queryOptions;
    +    public final int protocolVersion;
    +    public final String keyspace;
    +
    +    public FQLQuery(String keyspace, int protocolVersion, QueryOptions 
queryOptions, long queryTime)
    +    {
    +        this.queryTime = queryTime;
    +        this.queryOptions = queryOptions;
    +        this.protocolVersion = protocolVersion;
    +        this.keyspace = keyspace;
    +    }
    +
    +    public abstract ListenableFuture<ResultHandler.ComparableResultSet> 
execute(Session session);
    +
    +    /**
    +     * used when storing the queries executed
    +     */
    +    public abstract BinLog.ReleaseableWriteMarshallable toMarshallable();
    +
    +    /**
    +     * Make sure we catch any query errors
    +     *
    +     * On error, this creates a failed ComparableResultSet with the 
exception set to be able to store
    +     * this fact in the result file and handle comparison of failed result 
sets.
    +     */
    +    ListenableFuture<ResultHandler.ComparableResultSet> 
handleErrors(ListenableFuture<ResultSet> result)
    +    {
    +        FluentFuture<ResultHandler.ComparableResultSet> fluentFuture = 
FluentFuture.from(result)
    +                                                                           
        .transform(DriverResultSet::new, MoreExecutors.directExecutor());
    +        return fluentFuture.catching(Throwable.class, 
DriverResultSet::failed, MoreExecutors.directExecutor());
    +    }
    +
    +    public boolean equals(Object o)
    +    {
    +        if (this == o) return true;
    +        if (!(o instanceof FQLQuery)) return false;
    +        FQLQuery fqlQuery = (FQLQuery) o;
    +        return queryTime == fqlQuery.queryTime &&
    +               protocolVersion == fqlQuery.protocolVersion &&
    +               
queryOptions.getValues().equals(fqlQuery.queryOptions.getValues()) &&
    +               Objects.equals(keyspace, fqlQuery.keyspace);
    +    }
    +
    +    public int hashCode()
    +    {
    +        return Objects.hash(queryTime, queryOptions, protocolVersion, 
keyspace);
    +    }
    +
    +    public int compareTo(FQLQuery other)
    +    {
    +        return Longs.compare(queryTime, other.queryTime);
    +    }
    +
    +    public static class Single extends FQLQuery
    +    {
    +        public final String query;
    +        public final List<ByteBuffer> values;
    +
    +        public Single(String keyspace, int protocolVersion, QueryOptions 
queryOptions, long queryTime, String queryString, List<ByteBuffer> values)
    +        {
    +            super(keyspace, protocolVersion, queryOptions, queryTime);
    +            this.query = queryString;
    +            this.values = values;
    +        }
    +
    +        @Override
    +        public String toString()
    +        {
    +            return String.format("Query = %s, Options = %s, Values = %s",
    +                                 query,
    +                                 queryOptions,
    +                                 
values.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(",")));
    +        }
    +
    +        public ListenableFuture<ResultHandler.ComparableResultSet> 
execute(Session session)
    +        {
    +            SimpleStatement ss = new SimpleStatement(query, 
values.toArray());
    +            
ss.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
    +            ListenableFuture<ResultSet> future = session.executeAsync(ss);
    +            return handleErrors(future);
    +        }
    +
    +        public BinLog.ReleaseableWriteMarshallable toMarshallable()
    +        {
    +            return new FullQueryLogger.WeighableMarshallableQuery(query, 
keyspace, queryOptions, queryTime);
    +        }
    +
    +        public int compareTo(FQLQuery other)
    +        {
    +            int cmp = super.compareTo(other);
    +
    +            if (cmp == 0)
    +            {
    +                if (other instanceof Batch)
    +                    return -1;
    +
    +                Single singleQuery = (Single) other;
    +
    +                cmp = query.compareTo(singleQuery.query);
    +                if (cmp == 0)
    +                {
    +                    if (values.size() != singleQuery.values.size())
    +                        return values.size() - singleQuery.values.size();
    +                    for (int i = 0; i < values.size(); i++)
    +                    {
    +                        cmp = 
values.get(i).compareTo(singleQuery.values.get(i));
    +                        if (cmp != 0)
    +                            return cmp;
    +                    }
    +                }
    +            }
    +            return cmp;
    +        }
    +
    +        public boolean equals(Object o)
    +        {
    +            if (this == o) return true;
    +            if (!(o instanceof Single)) return false;
    +            if (!super.equals(o)) return false;
    +            Single single = (Single) o;
    +            return Objects.equals(query, single.query) &&
    +                   Objects.equals(values, single.values);
    +        }
    +
    +        public int hashCode()
    +        {
    +            return Objects.hash(super.hashCode(), query, values);
    +        }
    +    }
    +
    +    public static class Batch extends FQLQuery
    +    {
    +        public final BatchStatement.Type batchType;
    +        public final List<Single> queries;
    +
    +        public Batch(String keyspace, int protocolVersion, QueryOptions 
queryOptions, long queryTime, BatchStatement.Type batchType, List<String> 
queries, List<List<ByteBuffer>> values)
    +        {
    +            super(keyspace, protocolVersion, queryOptions, queryTime);
    +            this.batchType = batchType;
    +            this.queries = new ArrayList<>(queries.size());
    +            for (int i = 0; i < queries.size(); i++)
    +                this.queries.add(new Single(keyspace, protocolVersion, 
queryOptions, queryTime, queries.get(i), values.get(i)));
    +        }
    +
    +        public ListenableFuture<ResultHandler.ComparableResultSet> 
execute(Session session)
    +        {
    +            BatchStatement bs = new BatchStatement(batchType);
    +            
bs.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name()));
    +            for (Single query : queries)
    +            {
    +                bs.add(new SimpleStatement(query.query, 
query.values.toArray()));
    +            }
    +            ListenableFuture<ResultSet> future = session.executeAsync(bs);
    +            return handleErrors(future);
    +        }
    +
    +        public int compareTo(FQLQuery other)
    +        {
    +            int cmp = super.compareTo(other);
    +
    +            if (cmp == 0)
    +            {
    +                if (other instanceof Single)
    +                    return 1;
    +
    +                Batch otherBatch = (Batch) other;
    +                if (queries.size() != otherBatch.queries.size())
    --- End diff --
    
    Only the length of of the queries lists are compared, not the actual 
contents of the queries lists. (`Single.compareTo()` compares the query 
strings).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to