dcapwell commented on a change in pull request #1429:
URL: https://github.com/apache/cassandra/pull/1429#discussion_r793054978



##########
File path: src/java/org/apache/cassandra/cql3/QueryProcessor.java
##########
@@ -391,6 +397,61 @@ public static UntypedResultSet executeInternal(String 
query, Object... values)
             return null;
     }
 
+    public static Future<UntypedResultSet> execute(InetAddressAndPort address, 
String query, Object... values)
+    {
+        Prepared prepared = prepareInternal(query);
+        QueryOptions options = makeInternalOptions(prepared.statement, values);
+        if (prepared.statement instanceof SelectStatement)
+        {
+            SelectStatement select = (SelectStatement) prepared.statement;
+            int nowInSec = FBUtilities.nowInSeconds();
+            ReadQuery readQuery = select.getQuery(options, nowInSec);
+            List<ReadCommand> commands;
+            if (readQuery instanceof ReadCommand)
+            {
+                commands = Collections.singletonList((ReadCommand) readQuery);
+            }
+            else if (readQuery instanceof SinglePartitionReadQuery.Group)
+            {
+                List<? extends SinglePartitionReadQuery> queries = 
((SinglePartitionReadQuery.Group<? extends SinglePartitionReadQuery>) 
readQuery).queries;
+                queries.forEach(a -> {
+                    if (!(a instanceof ReadCommand))
+                        throw new IllegalArgumentException("Queries found 
which are not ReadCommand: " + a.getClass());
+                });
+                commands = (List<ReadCommand>) (List<?>) queries;
+            }
+            else
+            {
+                throw new IllegalArgumentException("Unable to handle; only 
expected ReadCommands but given " + readQuery.getClass());
+            }
+            Future<List<Message<ReadResponse>>> future = 
FutureCombiner.allOf(commands.stream()
+                                                                               
       .map(rc -> Message.out(rc.verb(), rc))
+                                                                               
       .map(m -> MessagingService.instance().<ReadResponse>sendWithResult(m, 
address))
+                                                                               
       .collect(Collectors.toList()));
+
+            ResultSetBuilder result = new 
ResultSetBuilder(select.getResultMetadata(), 
select.getSelection().newSelectors(options), null);
+            return future.map(list -> {
+                int i = 0;
+                for (Message<ReadResponse> m : list)
+                {
+                    ReadResponse rsp = m.payload;
+                    try (PartitionIterator it = 
UnfilteredPartitionIterators.filter(rsp.makeIterator(commands.get(i++)), 
nowInSec))
+                    {
+                        while (it.hasNext())
+                        {
+                            try (RowIterator partition = it.next())
+                            {
+                                select.processPartition(partition, options, 
result, nowInSec);

Review comment:
       was wondering if we should pull this into a util; PartitionIterator -> 
ResultSet is specific to SelectStatement




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to