Hi

从错误栈来看,应该是 serializer 不一致导致的,可以再检查下相应的 key/namespace serialzier

Best,
Congxian


chengwenfeng <[email protected]> 于2019年11月12日周二 下午2:47写道:

> 大家好:
> 我在测试Querable State功能的时候,发现
> 语法
>         dataStream.keyby(key).process();  这种语法下,简单的状态和复杂的POJO都可以查询
> 但在
>        studentAnswerDataStream.connect(learningStrategyDataStream)
>                 .keyBy(val->val.getCourseId()+"_"+val.getTaskId()
>                         , val->val.getCourseId()+"_"+val.getTaskId())
>                 .process()  这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来
>
>
>
>
> 错误:
>
>
> Exception in thread "main" java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Failed request 0.
>  Caused by: java.lang.RuntimeException: Failed request 9.
>  Caused by: java.lang.RuntimeException: Error while processing request
> with ID 9. Caused by: java.io.IOException: Unable to deserialize key and
> namespace. This indicates a mismatch in the key/namespace serializers used
> by the KvState instance and this access.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unexpected magic number 48.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
> ... 10 more
>
>
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
> at
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41)
> Caused by: java.lang.RuntimeException: Failed request 0.
>  Caused by: java.lang.RuntimeException: Failed request 9.
>  Caused by: java.lang.RuntimeException: Error while processing request
> with ID 9. Caused by: java.io.IOException: Unable to deserialize key and
> namespace. This indicates a mismatch in the key/namespace serializers used
> by the KvState instance and this access.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unexpected magic number 48.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
> ... 10 more
>
>
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
> at
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 使用版本1.9.1
>
>
> 代码如下
>
>
> 状态代码
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.util.Collector;
>
>
> public class QueryableStateDemo2 {
>
>  public static void main(String[] args) throws Exception {
>
>  StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  env.setParallelism(1);
>
>  DataStream<StudentAnswer> studentAnswerDataStream = env.addSource(new
> SourceFunction<StudentAnswer>() {
>  @Override
>  public void run(SourceContext<StudentAnswer> ctx) throws Exception {
>  StudentAnswer studentAnswer = new StudentAnswer();
>  studentAnswer.setCourseId(100L);
>  studentAnswer.setTaskId("ug01");
>  studentAnswer.setUserId(1L);
>  studentAnswer.setAnswer("答案");
>  ctx.collect(studentAnswer);
>  while (true) {
>  Thread.sleep(1000 * 60);
>  }
>  }
>
>  @Override
>  public void cancel() {
>
>  }
>  });
>
>  DataStream<LearningStrategy> learningStrategyDataStream =
> env.addSource(new SourceFunction<LearningStrategy>() {
>  @Override
>  public void run(SourceContext<LearningStrategy> ctx) throws Exception {
>  LearningStrategy learningStrategy = new LearningStrategy();
>  learningStrategy.setCourseId(100L);
>  learningStrategy.setTaskId("ug01");
>  ctx.collect(learningStrategy);
>  while (true) {
>  Thread.sleep(1000 * 60);
>  }
>  }
>
>  @Override
>  public void cancel() {
>
>  }
>  });
>
>  studentAnswerDataStream.connect(learningStrategyDataStream)
>  .keyBy(val->val.getCourseId()+"_"+val.getTaskId()
>  , val->val.getCourseId()+"_"+val.getTaskId())
>  .process(new KeyedCoProcessFunction<String, StudentAnswer,
> LearningStrategy, String>() {
>
>
>  private transient ValueState<StudentAnswer> leftBuffer;
>  private transient ValueState<LearningStrategy> rightBuffer;
>
>
>  @Override
>  public void open(Configuration conf) {
>  ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new
> ValueStateDescriptor<>(
>  "left_buffer", TypeInformation.of(StudentAnswer.class));
>  leftBufferDescriptor.setQueryable("left_buffer_query");
>
>  ValueStateDescriptor<LearningStrategy> rightBufferDescriptor = new
> ValueStateDescriptor<>(
>  "right_buffer", TypeInformation.of(LearningStrategy.class));
>  rightBufferDescriptor.setQueryable("right_buffer_query");
>
>  leftBuffer = getRuntimeContext().getState(leftBufferDescriptor);
>  rightBuffer = getRuntimeContext().getState(rightBufferDescriptor);
>  }
>
>  @Override
>  public void processElement1(StudentAnswer value, Context context,
> Collector<String> collector) throws Exception {
>  System.out.println("processElement1:" + value);
>  leftBuffer.update(value);
>  String key = context.getCurrentKey();
>  collector.collect(key);
>  }
>
>  @Override
>  public void processElement2(LearningStrategy value, Context context,
> Collector<String> collector) throws Exception {
>  System.out.println("processElement2:" + value);
>  rightBuffer.update(value);
>  String key = context.getCurrentKey();
>  collector.collect(key);
>  }
>  }).print("结果");
>
>
>  env.execute("State");
>  }
>
>
> }
>
> 客户端代码:
>
>
> import org.apache.flink.api.common.ExecutionConfig;
> import org.apache.flink.api.common.JobID;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.queryablestate.client.QueryableStateClient;
>
> import java.util.concurrent.CompletableFuture;
>
> public class GetQueryableState2 {
>
>  public static void main(String[] args) throws Exception
>  {
>
>
>  QueryableStateClient client = new QueryableStateClient("localhost", 9069);
>
>  ExecutionConfig executionConfig = new ExecutionConfig();
>  client.setExecutionConfig(executionConfig);
>
>
>  ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new
> ValueStateDescriptor<>(
>  "left_buffer", TypeInformation.of(StudentAnswer.class));
>
>
>  String key = "100_ug01";
>
>  JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f");
>
>  CompletableFuture<ValueState<StudentAnswer>> resultFuture =
>  client.getKvState(jobId, "left_buffer_query", key
>  , BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor);
>
>
>  ValueState<StudentAnswer> leftBuffer = resultFuture.get();
>  System.out.println("结果:"+leftBuffer.value());
>
>  // now handle the returned value
> // resultFuture.thenAccept(response ->
> // {
> // try {
> // Tuple2<String, Long> res = response.value();
> //
> // System.out.println("Queried sum value: " + res);
> //
> // } catch (Exception e)
> // {
> // e.printStackTrace();
> // }
> // System.out.println("Exiting future ...");
> // });
>  Thread.sleep(1000L*10);
>  }
>
> }
>
>
>
>
> Domain如下
> public class BaseDomain implements Serializable {
>
>  protected String bn = "2019";
>  protected String version = "1.0";
>
>  public String getBn() {
>  return bn;
>  }
>
>  public void setBn(String bn) {
>  this.bn = bn;
>  }
>
>  public String getVersion() {
>  return version;
>  }
>
>  public void setVersion(String version) {
>  this.version = version;
>  }
> }
>
>
> public class LearningStrategy extends BaseDomain {
>  private Long courseId;
>  private String taskId;
>  private Byte pushOrder = 1;
>
>  public Long getCourseId() {
>  return courseId;
>  }
>
>  public void setCourseId(Long courseId) {
>  this.courseId = courseId;
>  }
>
>  public String getTaskId() {
>  return taskId;
>  }
>
>  public void setTaskId(String taskId) {
>  this.taskId = taskId;
>  }
>
>  public Byte getPushOrder() {
>  return pushOrder;
>  }
>
>  public void setPushOrder(Byte pushOrder) {
>  this.pushOrder = pushOrder;
>  }
>
>  @Override
>  public String toString() {
>  return "LearningStrategy{" +
>  "bn='" + bn + '\'' +
>  ", version='" + version + '\'' +
>  ", courseId=" + courseId +
>  ", taskId='" + taskId + '\'' +
>  ", pushOrder=" + pushOrder +
>  '}';
>  }
> }
>
>
> public class StudentAnswer extends BaseDomain{
>  private Long courseId;
>  private String taskId;
>  private Long userId;
>  private String answer;
>
>  public Long getCourseId() {
>  return courseId;
>  }
>
>  public void setCourseId(Long courseId) {
>  this.courseId = courseId;
>  }
>
>  public String getTaskId() {
>  return taskId;
>  }
>
>  public void setTaskId(String taskId) {
>  this.taskId = taskId;
>  }
>
>  public Long getUserId() {
>  return userId;
>  }
>
>  public void setUserId(Long userId) {
>  this.userId = userId;
>  }
>
>  public String getAnswer() {
>  return answer;
>  }
>
>  public void setAnswer(String answer) {
>  this.answer = answer;
>  }
>
>  @Override
>  public String toString() {
>  return "StudentAnswer{" +
>  "bn='" + bn + '\'' +
>  ", version='" + version + '\'' +
>  ", courseId=" + courseId +
>  ", taskId='" + taskId + '\'' +
>  ", userId=" + userId +
>  ", answer='" + answer + '\'' +
>  '}';
>  }
> }

回复