POJO??-??????????????
public class User {

    private String name;
    private String age;


    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age='" + age + '\'' +
                '}';
    }


    public User(String name, String age) {
        this.name = name;
        this.age = age;
    }


    public String getName() {
        return name;
    }


    public void setName(String name) {
        this.name = name;
    }


    public String getAge() {
        return age;
    }


    public void setAge(String age) {
        this.age = age;
    }


}
------------------------------------------------------------------
??????
public class FlinkTest {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
&nbsp; &nbsp; &nbsp; &nbsp; env.addSource(new SourceFunction<String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void 
run(SourceContext<String&gt; sourceContext) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (true) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
sourceContext.collect("xxx");
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
TimeUnit.SECONDS.sleep(5);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void cancel() {


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(s -&gt; s).process(new 
KeyedProcessFunction<String, String, Object&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ValueState<User&gt; state;


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void open(Configuration 
parameters) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final 
ValueStateDescriptor<User&gt; vsd = new ValueStateDescriptor<&gt;("valueState", 
User.class);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state = 
getRuntimeContext().getState(vsd);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void processElement(String s, 
Context context, Collector<Object&gt; collector) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final User value = 
state.value();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (value == null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
state.update(new User("zhangsan", "26"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
System.out.println(value);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).uid("PRO-1");
&nbsp; &nbsp; &nbsp; &nbsp; env.execute();
&nbsp; &nbsp; }
}

?????????? User{name='zhangsan', age='26'}/n
????????????checkpoint????????????
????POJO??
public class User {


&nbsp; &nbsp; private String name;
&nbsp; &nbsp; private String age;
&nbsp; &nbsp; private String addr;


&nbsp; &nbsp; @Override
&nbsp; &nbsp; public String toString() {
&nbsp; &nbsp; &nbsp; &nbsp; return "User{" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "name='" + name + '\'' +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ", age='" + age + '\'' +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ", addr='" + addr + 
'\'' +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; '}';
&nbsp; &nbsp; }


&nbsp; &nbsp; public User(String name, String age) {
&nbsp; &nbsp; &nbsp; &nbsp; this.name = name;
&nbsp; &nbsp; &nbsp; &nbsp; this.age = age;
&nbsp; &nbsp; }


&nbsp; &nbsp; public String getName() {
&nbsp; &nbsp; &nbsp; &nbsp; return name;
&nbsp; &nbsp; }


&nbsp; &nbsp; public void setName(String name) {
&nbsp; &nbsp; &nbsp; &nbsp; this.name = name;
&nbsp; &nbsp; }


&nbsp; &nbsp; public String getAge() {
&nbsp; &nbsp; &nbsp; &nbsp; return age;
&nbsp; &nbsp; }


&nbsp; &nbsp; public void setAge(String age) {
&nbsp; &nbsp; &nbsp; &nbsp; this.age = age;
&nbsp; &nbsp; }


&nbsp; &nbsp; public String getAddr() {
&nbsp; &nbsp; &nbsp; &nbsp; return addr;
&nbsp; &nbsp; }


&nbsp; &nbsp; public void setAddr(String addr) {
&nbsp; &nbsp; &nbsp; &nbsp; this.addr = addr;
&nbsp; &nbsp; }
}

POJO????????????addr????????????????checkpoint????????
???????? User{name='null', age='zhangsan', addr='26'}/n
????????????addr????????????????????????????name??????????????

回复