Hello Ivan!

Could you please create a JIRA issue out of this?
That seem the proper place where to discuss this.

It seems a bug as the two versions of the code you posted look identical, and 
the behavior should be consistent.
On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka <ivanpetra...@gmail.com>, wrote:
> Note, that in Java code, it prints `State: Null`, `State: Null`, as I was 
> expecting in, unlike pyflink code
> On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka <ivanpetra...@gmail.com>, wrote:
> > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
> > work. I have reproduced the exact same code in Java and it works!
> >
> > Is this a pyflink bug? If so - how can I report it? If not - what can I try 
> > to do?
> >
> > Flink: 1.18.0
> > image: flink:1.18.0-scala_2.12-java11
> >
> > Code to reproduce. I expect this code to print: <current_datetime, None> 
> > all the time. But it prints <current_datetime> and state value
> >
> > ```python
> > import time
> >
> > from datetime import datetime
> >
> > from pyflink.common import Time, Types
> > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> > StreamExecutionEnvironment, TimeCharacteristic
> > from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
> >
> >
> > class Processor(KeyedProcessFunction):
> >     def open(self, runtime_context: RuntimeContext):
> >         state_descriptor = ValueStateDescriptor(
> >             name="my_state",
> >             value_type_info=Types.STRING(),
> >         )
> >
> >         state_descriptor.enable_time_to_live(
> >             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
> >             .cleanup_incrementally(cleanup_size=10, 
> > run_cleanup_for_every_record=True)
> >             .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >             
> > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >             .build()
> >         )
> >
> >         self.state = runtime_context.get_state(state_descriptor)
> >
> >     def process_element(self, value: int, ctx: 
> > KeyedProcessFunction.Context):
> >         current_state = self.state.value()
> >
> >         print(datetime.now(), current_state)
> >
> >         if current_state is None:
> >             self.state.update(str(datetime.now()))
> >
> >         time.sleep(1.5)
> >
> >
> > if __name__ == "__main__":
> >     # - Init environment
> >
> >     environment = 
> > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
> >
> >     # - Setup pipeline
> >
> >     (
> >         environment.set_parallelism(1)
> >         .from_collection(
> >             collection=list(range(10)),
> >         )
> >         .key_by(lambda value: 0)
> >         .process(Processor())
> >
> >
> >
> >     )
> >
> >     # - Execute pipeline
> >
> >     environment.execute("ttl_test")
> >
> >
> >
> > ```
> >
> > ```java
> > import org.apache.flink.api.common.state.StateTtlConfig;
> > import org.apache.flink.api.common.state.ValueState;
> > import org.apache.flink.api.common.state.ValueStateDescriptor;
> > import org.apache.flink.api.common.time.Time;
> > import org.apache.flink.configuration.Configuration;
> > import org.apache.flink.metrics.Histogram;
> > import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> > import org.apache.flink.util.Collector;
> >
> > import java.io.IOException;
> > import java.time.LocalDateTime;
> >
> > public class GameHistoryProcessor extends KeyedProcessFunction<Integer, 
> > String, String> {
> >
> >
> >     private transient ValueState<String> state;
> >
> >
> >     @Override
> >     public void open(Configuration parameters) {
> >         var stateTtlConfig = StateTtlConfig
> >                 .newBuilder(Time.seconds(1))
> > //                .cleanupFullSnapshot()
> >                 .cleanupIncrementally(10, true)
> >                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >                 
> > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >                 .build();
> >
> >         var stateDescriptor = new ValueStateDescriptor<>("state", 
> > String.class);
> >         stateDescriptor.enableTimeToLive(stateTtlConfig);
> >
> >         state = getRuntimeContext().getState(stateDescriptor);
> >
> >     }
> >
> >     @Override
> >     public void processElement(String event, Context context, 
> > Collector<String> collector) throws IOException, InterruptedException {
> >         var state = state.value();
> >         System.out.println("State: " + state);
> >
> >         if (state == null) {
> >             state = LocalDateTime.now().toString();
> >             state.update(state);
> >         }
> >
> >         Thread.sleep(1500);
> >     }
> > }```

Reply via email to