Hello Ivan! Could you please create a JIRA issue out of this? That seem the proper place where to discuss this.
It seems a bug as the two versions of the code you posted look identical, and the behavior should be consistent. On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka <ivanpetra...@gmail.com>, wrote: > Note, that in Java code, it prints `State: Null`, `State: Null`, as I was > expecting in, unlike pyflink code > On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka <ivanpetra...@gmail.com>, wrote: > > Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to > > work. I have reproduced the exact same code in Java and it works! > > > > Is this a pyflink bug? If so - how can I report it? If not - what can I try > > to do? > > > > Flink: 1.18.0 > > image: flink:1.18.0-scala_2.12-java11 > > > > Code to reproduce. I expect this code to print: <current_datetime, None> > > all the time. But it prints <current_datetime> and state value > > > > ```python > > import time > > > > from datetime import datetime > > > > from pyflink.common import Time, Types > > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, > > StreamExecutionEnvironment, TimeCharacteristic > > from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor > > > > > > class Processor(KeyedProcessFunction): > > def open(self, runtime_context: RuntimeContext): > > state_descriptor = ValueStateDescriptor( > > name="my_state", > > value_type_info=Types.STRING(), > > ) > > > > state_descriptor.enable_time_to_live( > > ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) > > .cleanup_incrementally(cleanup_size=10, > > run_cleanup_for_every_record=True) > > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > .build() > > ) > > > > self.state = runtime_context.get_state(state_descriptor) > > > > def process_element(self, value: int, ctx: > > KeyedProcessFunction.Context): > > current_state = self.state.value() > > > > print(datetime.now(), current_state) > > > > if current_state is None: > > self.state.update(str(datetime.now())) > > > > time.sleep(1.5) > > > > > > if __name__ == "__main__": > > # - Init environment > > > > environment = > > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) > > > > # - Setup pipeline > > > > ( > > environment.set_parallelism(1) > > .from_collection( > > collection=list(range(10)), > > ) > > .key_by(lambda value: 0) > > .process(Processor()) > > > > > > > > ) > > > > # - Execute pipeline > > > > environment.execute("ttl_test") > > > > > > > > ``` > > > > ```java > > import org.apache.flink.api.common.state.StateTtlConfig; > > import org.apache.flink.api.common.state.ValueState; > > import org.apache.flink.api.common.state.ValueStateDescriptor; > > import org.apache.flink.api.common.time.Time; > > import org.apache.flink.configuration.Configuration; > > import org.apache.flink.metrics.Histogram; > > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > > import org.apache.flink.util.Collector; > > > > import java.io.IOException; > > import java.time.LocalDateTime; > > > > public class GameHistoryProcessor extends KeyedProcessFunction<Integer, > > String, String> { > > > > > > private transient ValueState<String> state; > > > > > > @Override > > public void open(Configuration parameters) { > > var stateTtlConfig = StateTtlConfig > > .newBuilder(Time.seconds(1)) > > // .cleanupFullSnapshot() > > .cleanupIncrementally(10, true) > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > .build(); > > > > var stateDescriptor = new ValueStateDescriptor<>("state", > > String.class); > > stateDescriptor.enableTimeToLive(stateTtlConfig); > > > > state = getRuntimeContext().getState(stateDescriptor); > > > > } > > > > @Override > > public void processElement(String event, Context context, > > Collector<String> collector) throws IOException, InterruptedException { > > var state = state.value(); > > System.out.println("State: " + state); > > > > if (state == null) { > > state = LocalDateTime.now().toString(); > > state.update(state); > > } > > > > Thread.sleep(1500); > > } > > }```