Getting the same error even when I added flink-avro dependency to the client.
Jayant Ameta On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy <bupt_...@163.com> wrote: > Hi Jayant, > > I don’t know why flink uses the Avro serializer, which is usually used > in POJO class, but from the error messages, I think you can add flink-avro > as a dependency and try again. > > > Best, > > Jiayi Liao > > Original Message > *Sender:* Jayant Ameta<wittyam...@gmail.com> > *Recipient:* bupt_ljy<bupt_...@163.com> > *Cc:* trohrmann<trohrm...@apache.org>; Tzu-Li (Gordon) Tai< > tzuli...@apache.org>; user<user@flink.apache.org> > *Date:* Tuesday, Nov 13, 2018 16:15 > *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception > > Thanks Jiayi, > I updated the client code to use keyed stream key. The key is a > Tuple2<UUID, String> > > CompletableFuture<MapState<UUID, Rule>> resultFuture = > > client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), > "rules", > Tuple2.of(uuid, "test"), TypeInformation.of(new TypeHint<Tuple2<UUID, > String>>() { > }), descriptor); > > I'm now getting a different exception. I'm NOT using Avro as a customer > serializer. Not sure what causes this issue. > > Caused by: java.lang.RuntimeException: Error while processing request with ID > 21. Caused by: java.lang.UnsupportedOperationException: Could not find > required Avro dependency. > at > org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94) > at > org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > > Jayant Ameta > > > On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <bupt_...@163.com> wrote: > >> Hi, Jayant >> >> The key you specified in getKvState function should be the key of the >> keyed stream instead of the key of the map. From what I’ve seen on >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html, >> this feature only supports managed keyed state. >> >> By the way, I think we should optimize the error messages with which >> what Jayant met. >> >> Best, >> Jiayi Liao >> >> Original Message >> *Sender:* Jayant Ameta<wittyam...@gmail.com> >> *Recipient:* trohrmann<trohrm...@apache.org> >> *Cc:* bupt_ljy<bupt_...@163.com>; Tzu-Li (Gordon) Tai<tzuli...@apache.org>; >> user<user@flink.apache.org> >> *Date:* Tuesday, Nov 13, 2018 13:39 >> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception >> >> Hi Till, >> Here is the client snippet. Here Rule is a custom POJO that I use. >> >> public static void main(String[] args) >> throws IOException, InterruptedException, ExecutionException { >> UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb"); >> >> QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069); >> ExecutionConfig config = new ExecutionConfig(); >> client.setExecutionConfig(config); >> >> MapStateDescriptor<UUID, Rule> descriptor = new >> MapStateDescriptor<>("rulePatterns", UUID.class, >> Rule.class); >> CompletableFuture<MapState<UUID, Rule>> resultFuture = >> >> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), >> "rules", >> uuid, TypeInformation.of(UUID.class), descriptor); >> >> while (!resultFuture.isDone()) { >> Thread.sleep(1000); >> } >> resultFuture.whenComplete((result, throwable) -> { >> if (throwable != null) { >> throwable.printStackTrace(); >> } else { >> try { >> System.out.println(result.get(uuid)); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> } >> }); >> } >> >> >> Below is the stack trace: >> >> Caused by: java.lang.RuntimeException: Error while processing request >> with ID 12. Caused by: java.io.IOException: Unable to deserialize key and >> namespace. This indicates a mismatch in the key/namespace serializers used >> by the KvState instance and this access. >> at >> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107) >> at >> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93) >> at >> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87) >> at >> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >> at >> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.EOFException >> at >> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307) >> at org.apache.flink.types.StringValue.readString(StringValue.java:770) >> at >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >> at >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94) >> ... 9 more >> >> at >> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98) >> at >> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >> at >> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> >> at >> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >> at >> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) >> at >> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) >> at >> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> >> at >> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >> at >> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >> at >> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) >> at >> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >> at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> at >> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >> at >> org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >> at >> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >> at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> at >> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >> at >> org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563) >> at >> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >> at >> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> at >> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) >> at java.lang.Thread.run(Thread.java:748) >> >> Jayant Ameta >> >> >> On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Could you send us a small example program which we can use to reproduce >>> the problem? >>> >>> Cheers, >>> Till >>> >>> On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <wittyam...@gmail.com> >>> wrote: >>> >>>> Yeah, it IS using Kryo serializer. >>>> >>>> Jayant Ameta >>>> >>>> >>>> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Jayant, could you check that the UUID key on the TM is actually >>>>> serialized using a Kryo serializer? You can do this by setting a >>>>> breakpoint >>>>> in the constructor of the `AbstractKeyedStateBackend`. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bupt_...@163.com> wrote: >>>>> >>>>>> Hi, Jayant >>>>>> >>>>>> Your code looks good to me. And I’ve tried the >>>>>> serialize/deserialize of Kryo on UUID class, it all looks okay. >>>>>> >>>>>> I’m not very sure about this problem. Maybe you can write a very >>>>>> simple demo to try if it works. >>>>>> >>>>>> >>>>>> Jiayi Liao, Best >>>>>> >>>>>> Original Message >>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>>>> user@flink.apache.org> >>>>>> *Date:* Monday, Oct 29, 2018 11:53 >>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>> Exception >>>>>> >>>>>> Hi Jiayi, >>>>>> Any further help on this? >>>>>> >>>>>> Jayant Ameta >>>>>> >>>>>> >>>>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wittyam...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> MapStateDescriptor<UUID, String> descriptor = new >>>>>>> MapStateDescriptor<>("rulePatterns", UUID.class, >>>>>>> String.class); >>>>>>> >>>>>>> Jayant Ameta >>>>>>> >>>>>>> >>>>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bupt_...@163.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Can you show us the descriptor in the codes below? >>>>>>>> >>>>>>>> client.getKvState(JobID.fromHexString( >>>>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule", >>>>>>>> >>>>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>>>> >>>>>>>>> >>>>>>>> Jiayi Liao, Best >>>>>>>> >>>>>>>> >>>>>>>> Original Message >>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>>>>>> user@flink.apache.org> >>>>>>>> *Date:* Friday, Oct 26, 2018 02:26 >>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>>> Exception >>>>>>>> >>>>>>>> Also, I haven't provided any custom serializer in my flink job. >>>>>>>> Shouldn't the same configuration work for queryable state client? >>>>>>>> >>>>>>>> Jayant Ameta >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wittyam...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Gordon, >>>>>>>>> Following is the stack trace that I'm getting: >>>>>>>>> >>>>>>>>> *Exception in thread "main" >>>>>>>>> java.util.concurrent.ExecutionException: java.lang.RuntimeException: >>>>>>>>> Failed >>>>>>>>> request 0.* >>>>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.* >>>>>>>>> * Caused by: java.lang.RuntimeException: Error while processing >>>>>>>>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: >>>>>>>>> Encountered unregistered class ID: -985346241* >>>>>>>>> *Serialization trace:* >>>>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)* >>>>>>>>> * at >>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)* >>>>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)* >>>>>>>>> * at >>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)* >>>>>>>>> * at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)* >>>>>>>>> * at >>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)* >>>>>>>>> * at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)* >>>>>>>>> * at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)* >>>>>>>>> * at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)* >>>>>>>>> * at >>>>>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)* >>>>>>>>> * at >>>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)* >>>>>>>>> * at >>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)* >>>>>>>>> * at >>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)* >>>>>>>>> * at >>>>>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)* >>>>>>>>> * at >>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)* >>>>>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)* >>>>>>>>> * at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)* >>>>>>>>> * at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)* >>>>>>>>> * at java.lang.Thread.run(Thread.java:748)* >>>>>>>>> >>>>>>>>> I am not using any custom serialize as mentioned by Jiayi. >>>>>>>>> >>>>>>>>> Jayant Ameta >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bupt_...@163.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Jayant, >>>>>>>>>> >>>>>>>>>> There should be a Serializer parameter in the constructor of >>>>>>>>>> the StateDescriptor, you should create a new serializer like this: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> new GenericTypeInfo(classOf[UUID]).createSerializer(env >>>>>>>>>> .getConfig) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> By the way, can you show us your kryo exception like what Gordon >>>>>>>>>> said? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Jiayi Liao, Best >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Original Message >>>>>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tzuli...@apache.org> >>>>>>>>>> *Recipient:* Jayant Ameta<wittyam...@gmail.com>; bupt_ljy< >>>>>>>>>> bupt_...@163.com> >>>>>>>>>> *Cc:* user<user@flink.apache.org> >>>>>>>>>> *Date:* Thursday, Oct 25, 2018 17:18 >>>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>>>>> Exception >>>>>>>>>> >>>>>>>>>> Hi Jayant, >>>>>>>>>> >>>>>>>>>> What is the Kryo exception message that you are getting? >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Gordon >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta ( >>>>>>>>>> wittyam...@gmail.com) wrote: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> I've not configured any serializer in the descriptor. (Neither in >>>>>>>>>> flink job, nor in state query client). >>>>>>>>>> Which serializer should I use? >>>>>>>>>> >>>>>>>>>> Jayant Ameta >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bupt_...@163.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> It seems that your codes are right. Are you sure that you’re >>>>>>>>>>> using the same Serializer as the Flink program do? Could you show >>>>>>>>>>> the >>>>>>>>>>> serializer in descriptor? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Jiayi Liao, Best >>>>>>>>>>> >>>>>>>>>>> Original Message >>>>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>>>>>> *Recipient:* user<user@flink.apache.org> >>>>>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17 >>>>>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro >>>>>>>>>>> Exception >>>>>>>>>>> >>>>>>>>>>> I get Kyro exception when querying the state. >>>>>>>>>>> >>>>>>>>>>> Key: UUID >>>>>>>>>>> MapState<UUID, String> >>>>>>>>>>> >>>>>>>>>>> Client code snippet: >>>>>>>>>>> >>>>>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture = >>>>>>>>>>> >>>>>>>>>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), >>>>>>>>>>> "rule", >>>>>>>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, >>>>>>>>>>> TimeUnit.SECONDS); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Any better way to query it? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Jayant Ameta >>>>>>>>>>> >>>>>>>>>>