Hi 从错误栈来看,应该是 serializer 不一致导致的,可以再检查下相应的 key/namespace serialzier
Best, Congxian chengwenfeng <[email protected]> 于2019年11月12日周二 下午2:47写道: > 大家好: > 我在测试Querable State功能的时候,发现 > 语法 > dataStream.keyby(key).process(); 这种语法下,简单的状态和复杂的POJO都可以查询 > 但在 > studentAnswerDataStream.connect(learningStrategyDataStream) > .keyBy(val->val.getCourseId()+"_"+val.getTaskId() > , val->val.getCourseId()+"_"+val.getTaskId()) > .process() 这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来 > > > > > 错误: > > > Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Failed request 0. > Caused by: java.lang.RuntimeException: Failed request 9. > Caused by: java.lang.RuntimeException: Error while processing request > with ID 9. Caused by: java.io.IOException: Unable to deserialize key and > namespace. This indicates a mismatch in the key/namespace serializers used > by the KvState instance and this access. > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) > at > org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Unexpected magic number 48. > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) > ... 10 more > > > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41) > Caused by: java.lang.RuntimeException: Failed request 0. > Caused by: java.lang.RuntimeException: Failed request 9. > Caused by: java.lang.RuntimeException: Error while processing request > with ID 9. Caused by: java.io.IOException: Unable to deserialize key and > namespace. This indicates a mismatch in the key/namespace serializers used > by the KvState instance and this access. > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109) > at > org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Unexpected magic number 48. > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99) > ... 10 more > > > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > > > 使用版本1.9.1 > > > 代码如下 > > > 状态代码 > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.util.Collector; > > > public class QueryableStateDemo2 { > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > DataStream<StudentAnswer> studentAnswerDataStream = env.addSource(new > SourceFunction<StudentAnswer>() { > @Override > public void run(SourceContext<StudentAnswer> ctx) throws Exception { > StudentAnswer studentAnswer = new StudentAnswer(); > studentAnswer.setCourseId(100L); > studentAnswer.setTaskId("ug01"); > studentAnswer.setUserId(1L); > studentAnswer.setAnswer("答案"); > ctx.collect(studentAnswer); > while (true) { > Thread.sleep(1000 * 60); > } > } > > @Override > public void cancel() { > > } > }); > > DataStream<LearningStrategy> learningStrategyDataStream = > env.addSource(new SourceFunction<LearningStrategy>() { > @Override > public void run(SourceContext<LearningStrategy> ctx) throws Exception { > LearningStrategy learningStrategy = new LearningStrategy(); > learningStrategy.setCourseId(100L); > learningStrategy.setTaskId("ug01"); > ctx.collect(learningStrategy); > while (true) { > Thread.sleep(1000 * 60); > } > } > > @Override > public void cancel() { > > } > }); > > studentAnswerDataStream.connect(learningStrategyDataStream) > .keyBy(val->val.getCourseId()+"_"+val.getTaskId() > , val->val.getCourseId()+"_"+val.getTaskId()) > .process(new KeyedCoProcessFunction<String, StudentAnswer, > LearningStrategy, String>() { > > > private transient ValueState<StudentAnswer> leftBuffer; > private transient ValueState<LearningStrategy> rightBuffer; > > > @Override > public void open(Configuration conf) { > ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new > ValueStateDescriptor<>( > "left_buffer", TypeInformation.of(StudentAnswer.class)); > leftBufferDescriptor.setQueryable("left_buffer_query"); > > ValueStateDescriptor<LearningStrategy> rightBufferDescriptor = new > ValueStateDescriptor<>( > "right_buffer", TypeInformation.of(LearningStrategy.class)); > rightBufferDescriptor.setQueryable("right_buffer_query"); > > leftBuffer = getRuntimeContext().getState(leftBufferDescriptor); > rightBuffer = getRuntimeContext().getState(rightBufferDescriptor); > } > > @Override > public void processElement1(StudentAnswer value, Context context, > Collector<String> collector) throws Exception { > System.out.println("processElement1:" + value); > leftBuffer.update(value); > String key = context.getCurrentKey(); > collector.collect(key); > } > > @Override > public void processElement2(LearningStrategy value, Context context, > Collector<String> collector) throws Exception { > System.out.println("processElement2:" + value); > rightBuffer.update(value); > String key = context.getCurrentKey(); > collector.collect(key); > } > }).print("结果"); > > > env.execute("State"); > } > > > } > > 客户端代码: > > > import org.apache.flink.api.common.ExecutionConfig; > import org.apache.flink.api.common.JobID; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.queryablestate.client.QueryableStateClient; > > import java.util.concurrent.CompletableFuture; > > public class GetQueryableState2 { > > public static void main(String[] args) throws Exception > { > > > QueryableStateClient client = new QueryableStateClient("localhost", 9069); > > ExecutionConfig executionConfig = new ExecutionConfig(); > client.setExecutionConfig(executionConfig); > > > ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new > ValueStateDescriptor<>( > "left_buffer", TypeInformation.of(StudentAnswer.class)); > > > String key = "100_ug01"; > > JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f"); > > CompletableFuture<ValueState<StudentAnswer>> resultFuture = > client.getKvState(jobId, "left_buffer_query", key > , BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor); > > > ValueState<StudentAnswer> leftBuffer = resultFuture.get(); > System.out.println("结果:"+leftBuffer.value()); > > // now handle the returned value > // resultFuture.thenAccept(response -> > // { > // try { > // Tuple2<String, Long> res = response.value(); > // > // System.out.println("Queried sum value: " + res); > // > // } catch (Exception e) > // { > // e.printStackTrace(); > // } > // System.out.println("Exiting future ..."); > // }); > Thread.sleep(1000L*10); > } > > } > > > > > Domain如下 > public class BaseDomain implements Serializable { > > protected String bn = "2019"; > protected String version = "1.0"; > > public String getBn() { > return bn; > } > > public void setBn(String bn) { > this.bn = bn; > } > > public String getVersion() { > return version; > } > > public void setVersion(String version) { > this.version = version; > } > } > > > public class LearningStrategy extends BaseDomain { > private Long courseId; > private String taskId; > private Byte pushOrder = 1; > > public Long getCourseId() { > return courseId; > } > > public void setCourseId(Long courseId) { > this.courseId = courseId; > } > > public String getTaskId() { > return taskId; > } > > public void setTaskId(String taskId) { > this.taskId = taskId; > } > > public Byte getPushOrder() { > return pushOrder; > } > > public void setPushOrder(Byte pushOrder) { > this.pushOrder = pushOrder; > } > > @Override > public String toString() { > return "LearningStrategy{" + > "bn='" + bn + '\'' + > ", version='" + version + '\'' + > ", courseId=" + courseId + > ", taskId='" + taskId + '\'' + > ", pushOrder=" + pushOrder + > '}'; > } > } > > > public class StudentAnswer extends BaseDomain{ > private Long courseId; > private String taskId; > private Long userId; > private String answer; > > public Long getCourseId() { > return courseId; > } > > public void setCourseId(Long courseId) { > this.courseId = courseId; > } > > public String getTaskId() { > return taskId; > } > > public void setTaskId(String taskId) { > this.taskId = taskId; > } > > public Long getUserId() { > return userId; > } > > public void setUserId(Long userId) { > this.userId = userId; > } > > public String getAnswer() { > return answer; > } > > public void setAnswer(String answer) { > this.answer = answer; > } > > @Override > public String toString() { > return "StudentAnswer{" + > "bn='" + bn + '\'' + > ", version='" + version + '\'' + > ", courseId=" + courseId + > ", taskId='" + taskId + '\'' + > ", userId=" + userId + > ", answer='" + answer + '\'' + > '}'; > } > }
