大家好:
我在测试Querable State功能的时候,发现
语法
dataStream.keyby(key).process(); 这种语法下,简单的状态和复杂的POJO都可以查询
但在
studentAnswerDataStream.connect(learningStrategyDataStream)
.keyBy(val->val.getCourseId()+"_"+val.getTaskId()
, val->val.getCourseId()+"_"+val.getTaskId())
.process() 这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来
错误:
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Failed request 9.
Caused by: java.lang.RuntimeException: Error while processing request with ID
9. Caused by: java.io.IOException: Unable to deserialize key and namespace.
This indicates a mismatch in the key/namespace serializers used by the KvState
instance and this access.
at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
at
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unexpected magic number 48.
at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
... 10 more
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41)
Caused by: java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Failed request 9.
Caused by: java.lang.RuntimeException: Error while processing request with ID
9. Caused by: java.io.IOException: Unable to deserialize key and namespace.
This indicates a mismatch in the key/namespace serializers used by the KvState
instance and this access.
at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
at
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unexpected magic number 48.
at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
... 10 more
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
使用版本1.9.1
代码如下
状态代码
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class QueryableStateDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<StudentAnswer> studentAnswerDataStream = env.addSource(new
SourceFunction<StudentAnswer>() {
@Override
public void run(SourceContext<StudentAnswer> ctx) throws Exception {
StudentAnswer studentAnswer = new StudentAnswer();
studentAnswer.setCourseId(100L);
studentAnswer.setTaskId("ug01");
studentAnswer.setUserId(1L);
studentAnswer.setAnswer("答案");
ctx.collect(studentAnswer);
while (true) {
Thread.sleep(1000 * 60);
}
}
@Override
public void cancel() {
}
});
DataStream<LearningStrategy> learningStrategyDataStream = env.addSource(new
SourceFunction<LearningStrategy>() {
@Override
public void run(SourceContext<LearningStrategy> ctx) throws Exception {
LearningStrategy learningStrategy = new LearningStrategy();
learningStrategy.setCourseId(100L);
learningStrategy.setTaskId("ug01");
ctx.collect(learningStrategy);
while (true) {
Thread.sleep(1000 * 60);
}
}
@Override
public void cancel() {
}
});
studentAnswerDataStream.connect(learningStrategyDataStream)
.keyBy(val->val.getCourseId()+"_"+val.getTaskId()
, val->val.getCourseId()+"_"+val.getTaskId())
.process(new KeyedCoProcessFunction<String, StudentAnswer, LearningStrategy,
String>() {
private transient ValueState<StudentAnswer> leftBuffer;
private transient ValueState<LearningStrategy> rightBuffer;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new
ValueStateDescriptor<>(
"left_buffer", TypeInformation.of(StudentAnswer.class));
leftBufferDescriptor.setQueryable("left_buffer_query");
ValueStateDescriptor<LearningStrategy> rightBufferDescriptor = new
ValueStateDescriptor<>(
"right_buffer", TypeInformation.of(LearningStrategy.class));
rightBufferDescriptor.setQueryable("right_buffer_query");
leftBuffer = getRuntimeContext().getState(leftBufferDescriptor);
rightBuffer = getRuntimeContext().getState(rightBufferDescriptor);
}
@Override
public void processElement1(StudentAnswer value, Context context,
Collector<String> collector) throws Exception {
System.out.println("processElement1:" + value);
leftBuffer.update(value);
String key = context.getCurrentKey();
collector.collect(key);
}
@Override
public void processElement2(LearningStrategy value, Context context,
Collector<String> collector) throws Exception {
System.out.println("processElement2:" + value);
rightBuffer.update(value);
String key = context.getCurrentKey();
collector.collect(key);
}
}).print("结果");
env.execute("State");
}
}
客户端代码:
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import java.util.concurrent.CompletableFuture;
public class GetQueryableState2 {
public static void main(String[] args) throws Exception
{
QueryableStateClient client = new QueryableStateClient("localhost", 9069);
ExecutionConfig executionConfig = new ExecutionConfig();
client.setExecutionConfig(executionConfig);
ValueStateDescriptor<StudentAnswer> leftBufferDescriptor = new
ValueStateDescriptor<>(
"left_buffer", TypeInformation.of(StudentAnswer.class));
String key = "100_ug01";
JobID jobId = JobID.fromHexString("0b4ed273b44f0cff6065705c6e4ea17f");
CompletableFuture<ValueState<StudentAnswer>> resultFuture =
client.getKvState(jobId, "left_buffer_query", key
, BasicTypeInfo.STRING_TYPE_INFO, leftBufferDescriptor);
ValueState<StudentAnswer> leftBuffer = resultFuture.get();
System.out.println("结果:"+leftBuffer.value());
// now handle the returned value
// resultFuture.thenAccept(response ->
// {
// try {
// Tuple2<String, Long> res = response.value();
//
// System.out.println("Queried sum value: " + res);
//
// } catch (Exception e)
// {
// e.printStackTrace();
// }
// System.out.println("Exiting future ...");
// });
Thread.sleep(1000L*10);
}
}
Domain如下
public class BaseDomain implements Serializable {
protected String bn = "2019";
protected String version = "1.0";
public String getBn() {
return bn;
}
public void setBn(String bn) {
this.bn = bn;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
}
public class LearningStrategy extends BaseDomain {
private Long courseId;
private String taskId;
private Byte pushOrder = 1;
public Long getCourseId() {
return courseId;
}
public void setCourseId(Long courseId) {
this.courseId = courseId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Byte getPushOrder() {
return pushOrder;
}
public void setPushOrder(Byte pushOrder) {
this.pushOrder = pushOrder;
}
@Override
public String toString() {
return "LearningStrategy{" +
"bn='" + bn + '\'' +
", version='" + version + '\'' +
", courseId=" + courseId +
", taskId='" + taskId + '\'' +
", pushOrder=" + pushOrder +
'}';
}
}
public class StudentAnswer extends BaseDomain{
private Long courseId;
private String taskId;
private Long userId;
private String answer;
public Long getCourseId() {
return courseId;
}
public void setCourseId(Long courseId) {
this.courseId = courseId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getAnswer() {
return answer;
}
public void setAnswer(String answer) {
this.answer = answer;
}
@Override
public String toString() {
return "StudentAnswer{" +
"bn='" + bn + '\'' +
", version='" + version + '\'' +
", courseId=" + courseId +
", taskId='" + taskId + '\'' +
", userId=" + userId +
", answer='" + answer + '\'' +
'}';
}
}