POJO??-??????????????
public class User {
private String name;
private String age;
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age='" + age + '\'' +
'}';
}
public User(String name, String age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
}
------------------------------------------------------------------
??????
public class FlinkTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFunction<String>() {
@Override
public void
run(SourceContext<String> sourceContext) throws Exception {
while (true) {
sourceContext.collect("xxx");
TimeUnit.SECONDS.sleep(5);
}
}
@Override
public void cancel() {
}
}).keyBy(s -> s).process(new
KeyedProcessFunction<String, String, Object>() {
ValueState<User> state;
@Override
public void open(Configuration
parameters) throws Exception {
super.open(parameters);
final
ValueStateDescriptor<User> vsd = new ValueStateDescriptor<>("valueState",
User.class);
state =
getRuntimeContext().getState(vsd);
}
@Override
public void processElement(String s,
Context context, Collector<Object> collector) throws Exception {
final User value =
state.value();
if (value == null) {
state.update(new User("zhangsan", "26"));
} else {
System.out.println(value);
}
}
}).uid("PRO-1");
env.execute();
}
}
?????????? User{name='zhangsan', age='26'}/n
????????????checkpoint????????????
????POJO??
public class User {
private String name;
private String age;
private String addr;
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age='" + age + '\'' +
", addr='" + addr +
'\'' +
'}';
}
public User(String name, String age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
}
POJO????????????addr????????????????checkpoint????????
???????? User{name='null', age='zhangsan', addr='26'}/n
????????????addr????????????????????????????name??????????????