Allow native functions in user-defined aggregates patch by Robert Stupp; reviewed by Aleksey Yeschenko CASSANDRA-9542
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d6312c5f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d6312c5f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d6312c5f Branch: refs/heads/trunk Commit: d6312c5f4d6b1dbc16518becef1b7a785cea73b3 Parents: 5931eb4 Author: Robert Stupp <sn...@snazy.de> Authored: Mon Jun 22 17:19:20 2015 +0200 Committer: Robert Stupp <sn...@snazy.de> Committed: Mon Jun 22 17:19:20 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/cql3/CQL.textile | 4 +-- .../statements/CreateAggregateStatement.java | 24 ++++++++----- .../cassandra/schema/LegacySchemaTables.java | 25 ++++++++++--- .../apache/cassandra/cql3/AggregationTest.java | 37 +++++++++++++++++--- test/unit/org/apache/cassandra/cql3/UFTest.java | 1 + 6 files changed, 74 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6312c5f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f821136..93753e5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2 + * Allow native functions in user-defined aggregates (CASSANDRA-9542) * Don't repair system_distributed by default (CASSANDRA-9621) * Fix mixing min, max, and count aggregates for blob type (CASSANRA-9622) * Rename class for DATE type in Java driver (CASSANDRA-9563) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6312c5f/doc/cql3/CQL.textile ---------------------------------------------------------------------- diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile index 689ac94..ec1f660 100644 --- a/doc/cql3/CQL.textile +++ b/doc/cql3/CQL.textile @@ -708,9 +708,9 @@ Signatures for user-defined aggregates follow the "same rules":#functionSignatur The optional @INITCOND@ defines the initial state value for the aggregate. It defaults to @null@. A non-@null@ @INITCOND@ must be specified for state functions that are declared with @RETURNS NULL ON NULL INPUT@. -@SFUNC@ references an existing function to be used as the state modifying function. The type of first argument of the state function must match @STYPE@. The remaining argument types of the state function must match the argument types of the aggregate function. State is not updated for state functions declared with @RETURNS NULL ON NULL INPUT@ and called with @null@. +@SFUNC@ references an existing function to be used as the state modifying function. The type of first argument of the state function must match @STYPE@. The remaining argument types of the state function must match the argument types of the aggregate function. State is not updated for state functions declared with @RETURNS NULL ON NULL INPUT@ and called with @null@. Functions from the system keyspace are resolved before functions in the current keyspace. -The optional @FINALFUNC@ is called just before the aggregate result is returned. It must take only one argument with type @STYPE@. The return type of the @FINALFUNC@ may be a different type. A final function declared with @RETURNS NULL ON NULL INPUT@ means that the aggregate's return value will be @null@, if the last state is @null@. +The optional @FINALFUNC@ is called just before the aggregate result is returned. It must take only one argument with type @STYPE@. The return type of the @FINALFUNC@ may be a different type. A final function declared with @RETURNS NULL ON NULL INPUT@ means that the aggregate's return value will be @null@, if the last state is @null@. Functions from the system keyspace are resolved before functions in the current keyspace. If no @FINALFUNC@ is defined, the overall return type of the aggregate function is @STYPE@. If a @FINALFUNC@ is defined, it is the return type of that function. http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6312c5f/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java index 8b6c8d6..039993f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java @@ -86,7 +86,11 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement argTypes.add(prepareType("arguments", rawType)); AbstractType<?> stateType = prepareType("state type", stateTypeRaw); - Function f = Functions.find(stateFunc, stateArguments(stateType, argTypes)); + + List<AbstractType<?>> stateArgs = stateArguments(stateType, argTypes); + stateFunc = validateFunctionKeyspace(stateFunc, stateArgs); + + Function f = Functions.find(stateFunc, stateArgs); if (!(f instanceof ScalarFunction)) throw new InvalidRequestException("State function " + stateFuncSig(stateFunc, stateTypeRaw, argRawTypes) + " does not exist or is not a scalar function"); stateFunction = (ScalarFunction)f; @@ -97,7 +101,9 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement if (finalFunc != null) { - f = Functions.find(finalFunc, Collections.<AbstractType<?>>singletonList(stateType)); + List<AbstractType<?>> finalArgs = Collections.<AbstractType<?>>singletonList(stateType); + finalFunc = validateFunctionKeyspace(finalFunc, finalArgs); + f = Functions.find(finalFunc, finalArgs); if (!(f instanceof ScalarFunction)) throw new InvalidRequestException("Final function " + finalFunc + '(' + stateTypeRaw + ") does not exist or is not a scalar function"); finalFunction = (ScalarFunction) f; @@ -141,18 +147,20 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement if (!functionName.hasKeyspace()) throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session"); - stateFunc = validateFunctionKeyspace(stateFunc); - - if (finalFunc != null) - finalFunc = validateFunctionKeyspace(finalFunc); - ThriftValidation.validateKeyspaceNotSystem(functionName.keyspace); } - private FunctionName validateFunctionKeyspace(FunctionName func) + private FunctionName validateFunctionKeyspace(FunctionName func, List<AbstractType<?>> argTypes) { if (!func.hasKeyspace()) + { + // If state/final function has no keyspace, check SYSTEM keyspace before logged keyspace. + FunctionName nativeName = FunctionName.nativeFunction(func.name); + if (Functions.find(nativeName, argTypes) != null) + return nativeName; + return new FunctionName(functionName.keyspace, func.name); + } else if (!SystemKeyspace.NAME.equals(func.keyspace) && !functionName.keyspace.equals(func.keyspace)) throw new InvalidRequestException(String.format("Statement on keyspace %s cannot refer to a user function in keyspace %s; " + "user functions can only be used in the keyspace they are defined in", http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6312c5f/src/java/org/apache/cassandra/schema/LegacySchemaTables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java index 1840829..b8f6421 100644 --- a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java +++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java @@ -1393,11 +1393,11 @@ public class LegacySchemaTables adder.resetCollection("argument_types"); adder.add("return_type", aggregate.returnType().toString()); - adder.add("state_func", aggregate.stateFunction().name().name); + adder.add("state_func", aggregate.stateFunction().name().toString()); if (aggregate.stateType() != null) adder.add("state_type", aggregate.stateType().toString()); if (aggregate.finalFunction() != null) - adder.add("final_func", aggregate.finalFunction().name().name); + adder.add("final_func", aggregate.finalFunction().name().toString()); if (aggregate.initialCondition() != null) adder.add("initcond", aggregate.initialCondition()); @@ -1439,8 +1439,8 @@ public class LegacySchemaTables AbstractType<?> returnType = parseType(row.getString("return_type")); - FunctionName stateFunc = new FunctionName(ksName, row.getString("state_func")); - FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null; + FunctionName stateFunc = aggregateParseFunctionName(ksName, row.getString("state_func")); + FunctionName finalFunc = row.has("final_func") ? aggregateParseFunctionName(ksName, row.getString("final_func")) : null; AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null; ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null; @@ -1454,6 +1454,23 @@ public class LegacySchemaTables } } + private static FunctionName aggregateParseFunctionName(String ksName, String func) + { + int i = func.indexOf('.'); + + // function name can be abbreviated (pre 2.2rc2) - it is in the same keyspace as the aggregate + if (i == -1) + return new FunctionName(ksName, func); + + String ks = func.substring(0, i); + String f = func.substring(i + 1); + + // only aggregate's function keyspace and system keyspace are allowed + assert ks.equals(ksName) || ks.equals(SystemKeyspace.NAME); + + return new FunctionName(ks, f); + } + public static Mutation makeDropAggregateMutation(KSMetaData keyspace, UDAggregate aggregate, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6312c5f/test/unit/org/apache/cassandra/cql3/AggregationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/AggregationTest.java index 4281262..38c4759 100644 --- a/test/unit/org/apache/cassandra/cql3/AggregationTest.java +++ b/test/unit/org/apache/cassandra/cql3/AggregationTest.java @@ -31,6 +31,7 @@ import org.apache.cassandra.cql3.functions.Functions; import org.apache.cassandra.cql3.functions.UDAggregate; import org.apache.cassandra.exceptions.FunctionExecutionException; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.serializers.Int32Serializer; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.transport.Event; import org.apache.cassandra.transport.messages.ResultMessage; @@ -1096,7 +1097,7 @@ public class AggregationTest extends CQLTester "FINALFUNC " + shortFunctionName(fFinal) + ' ' + "INITCOND 1"); - assertInvalidMessage(String.format("Statement on keyspace %s cannot refer to a user function in keyspace %s; user functions can only be used in the keyspace they are defined in", + assertInvalidMessage(String.format("Statement on keyspace %s cannot refer to a user type in keyspace %s; user types can only be used in the keyspace they are defined in", KEYSPACE_PER_TEST, KEYSPACE), "CREATE AGGREGATE " + KEYSPACE_PER_TEST + ".test_wrong_ks(int) " + "SFUNC " + fStateWrong + ' ' + @@ -1104,7 +1105,7 @@ public class AggregationTest extends CQLTester "FINALFUNC " + shortFunctionName(fFinal) + ' ' + "INITCOND 1"); - assertInvalidMessage(String.format("Statement on keyspace %s cannot refer to a user function in keyspace %s; user functions can only be used in the keyspace they are defined in", + assertInvalidMessage(String.format("Statement on keyspace %s cannot refer to a user type in keyspace %s; user types can only be used in the keyspace they are defined in", KEYSPACE_PER_TEST, KEYSPACE), "CREATE AGGREGATE " + KEYSPACE_PER_TEST + ".test_wrong_ks(int) " + "SFUNC " + shortFunctionName(fState) + ' ' + @@ -1404,7 +1405,7 @@ public class AggregationTest extends CQLTester "AS 'return \"fin\" + a;'"); String aCON = createAggregate(KEYSPACE, - "text, text", + "text", "CREATE AGGREGATE %s(text) " + "SFUNC " + shortFunctionName(fCON) + ' ' + "STYPE text " + @@ -1428,7 +1429,7 @@ public class AggregationTest extends CQLTester "AS 'return \"fin\" + a;'"); String aRNON = createAggregate(KEYSPACE, - "int, int", + "int", "CREATE AGGREGATE %s(text) " + "SFUNC " + shortFunctionName(fRNON) + ' ' + "STYPE text " + @@ -1447,4 +1448,32 @@ public class AggregationTest extends CQLTester assertRows(execute("SELECT " + aRNON + "(b) FROM %s"), row("fin")); } + + @Test + public void testSystemKsFuncs() throws Throwable + { + + String fAdder = createFunction(KEYSPACE, + "int, int", + "CREATE FUNCTION %s(a int, b int) " + + "CALLED ON NULL INPUT " + + "RETURNS int " + + "LANGUAGE java " + + "AS 'return (a != null ? a : 0) + (b != null ? b : 0);'"); + + String aAggr = createAggregate(KEYSPACE, + "int", + "CREATE AGGREGATE %s(int) " + + "SFUNC " + shortFunctionName(fAdder) + ' ' + + "STYPE int " + + "FINALFUNC intasblob"); + + createTable("CREATE TABLE %s (a int primary key, b int)"); + execute("INSERT INTO %s (a, b) VALUES (1, 1)"); + execute("INSERT INTO %s (a, b) VALUES (2, 2)"); + execute("INSERT INTO %s (a, b) VALUES (3, 3)"); + + assertRows(execute("SELECT " + aAggr + "(b) FROM %s"), row(Int32Serializer.instance.serialize(6))); + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6312c5f/test/unit/org/apache/cassandra/cql3/UFTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/UFTest.java b/test/unit/org/apache/cassandra/cql3/UFTest.java index 1b3326b..40f2dd3 100644 --- a/test/unit/org/apache/cassandra/cql3/UFTest.java +++ b/test/unit/org/apache/cassandra/cql3/UFTest.java @@ -16,6 +16,7 @@ * limitations under the License. */ package org.apache.cassandra.cql3; + import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer;