Hi Robert Metzger,

       I am very happy to share my code,

public class ConcatString {
public List<String> list = new ArrayList<>();

        public void add(String toString) {
if (list != null) {
if (list.size() < 100) {
list.add(toString);
}
            }
        }
    }
  
    > Are you registering your custom types in the ExecutionConfig? (If so, it 
increases the chances of this error to happen)
    Let me describe my scenario. We have built a SQL platform based on Flink, 
hoping to support user-defined UDF/UDAF, hoping that users only submit SQL and 
do not need to customize other codes. As for the serialization problem, it does 
exist.


    I currently work around this problem like this
    First :this.env.getConfig().registerTypeWithKryoSerializer(ArrayList.class, 
org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer.class);
    Second:ConcatString inherits Arraylist
   


public class ConcatString extends ArrayList<String> {


    @Override

public boolean add(String toString) {
if (this.size() < 1000) {
super.add(toString);
            return true;
}
return false;
}

public List<String> getList() {
return this;
}

}


Best forideal







At 2020-08-14 21:46:45, "Robert Metzger" <rmetz...@apache.org> wrote:

Hi Forideal,


When using RocksDB, we need to serialize the data (to store it on disk), 
whereas when using the memory backend, the data (in this case 
RedConcat.ConcatString instances) is on the heap, thus we won't run into this 
issue.



Are you registering your custom types in the ExecutionConfig? (If so, it 
increases the chances of this error to happen)


Could you share the code of RedConcat.ConcatString as well?


I would not be surprised if this is a bug in Flink. Using a UDAF with custom 
types is probably not a very common use case.


Best,
Robert






On Fri, Aug 14, 2020 at 12:39 PM forideal <fszw...@163.com> wrote:

    Hi 
             I wrote a UDAF referring to this article  
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#aggregation-functions,
 when using in-memory state, the task can run normally. However, When I chose 
rocksdb as the state backend, I encountered this error. Thank you for helping 
me see this problem.


The following is the error content:
com.esotericsoftware.kryo.KryoException: Encountered unregistered classID: 87
Serialization trace:
list (com.red.data.platform.RedConcat$ConcatString)
    at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.util.InstantiationUtil.deserializeFromByt


public class RedConcat extends AggregateFunction<String, 
RedConcat.ConcatString> {

public class ConcatString {
public List<String> list = new ArrayList<>();

        public void add(String toString) {
if (list != null) {
if (list.size() < 100) {
list.add(toString);
}
            }
        }
    }

@Override
public boolean isDeterministic() {
return false;
}

@Override
public ConcatString createAccumulator() {
return new ConcatString();
}

@Override
public void open(FunctionContext context)
throws Exception {
    }




Best forideal
        




 

Reply via email to