slinkydeveloper commented on a change in pull request #18352:
URL: https://github.com/apache/flink/pull/18352#discussion_r784104335
##
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java
##
@@ -243,6 +244,36 @@ public static void prepareInstance(ReadableConfig config,
UserDefinedFunction fu
cleanFunction(config, function);
}
+/**
+ * Returns whether a {@link UserDefinedFunction} can be easily serialized
and identified by only
+ * a fully qualified class name. It must have a default constructor and no
serializable fields.
+ */
+public static boolean isClassNameSerializable(UserDefinedFunction
function) {
+final Class functionClass = function.getClass();
+if (!InstantiationUtil.hasPublicNullaryConstructor(functionClass)) {
+// function must be parameterized
+return false;
+}
+Class currentClass = functionClass;
+while (!currentClass.equals(UserDefinedFunction.class)) {
+for (Field field : currentClass.getDeclaredFields()) {
+if (!Modifier.isTransient(field.getModifiers())
+&& !Modifier.isStatic(field.getModifiers())) {
+// function seems to be stateful
+return false;
+}
+}
+currentClass = currentClass.getSuperclass();
+}
Review comment:
That sounds like a very magic way to find out whether a function is
stateful or not. Don't we have a marker interface for that?
##
File path:
flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java
##
@@ -143,34 +162,27 @@ private void testErrorMessage() {
@Nullable String expectedErrorMessage;
-TestSpec(Class functionClass) {
+TestSpec(
+@Nullable Class functionClass,
+@Nullable UserDefinedFunction functionInstance,
+@Nullable CatalogFunction catalogFunction) {
this.functionClass = functionClass;
-this.functionInstance = null;
-this.catalogFunction = null;
-}
-
-TestSpec(UserDefinedFunction functionInstance) {
-this.functionClass = null;
this.functionInstance = functionInstance;
-this.catalogFunction = null;
-}
-
-TestSpec(CatalogFunction catalogFunction) {
-this.functionClass = null;
-this.functionInstance = null;
this.catalogFunction = catalogFunction;
}
static TestSpec forClass(Class
function) {
-return new TestSpec(function);
+return new TestSpec(function, null, null);
}
static TestSpec forInstance(UserDefinedFunction function) {
-return new TestSpec(function);
+return new TestSpec(null, function, null);
}
-static TestSpec forCatalogFunction(String className, FunctionLanguage
language) {
-return new TestSpec(new CatalogFunctionMock(className, language));
+static TestSpec forCatalogFunction(
+String className,
+@SuppressWarnings("SameParameterValue") FunctionLanguage
language) {
Review comment:
What warning is this one?
##
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
##
@@ -58,9 +60,13 @@
/** Returns a unique, serialized representation for this function. */
public final String functionIdentifier() {
+final String className = getClass().getName();
+if (isClassNameSerializable(this)) {
+return className;
+}
final String md5 =
EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
-return getClass().getName().replace('.', '$').concat("$").concat(md5);
+return className.concat("$").concat(md5);
Review comment:
Does this logic works when the function is implemented as a scala
`object`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org