Note, that in Java code, it prints `State: Null`, `State: Null`, as I was
expecting in, unlike pyflink code
On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka <ivanpetra...@gmail.com>, wrote:
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to
> work. I have reproduced the exact same code in Java and it works!
>
> Is this a pyflink bug? If so - how can I report it? If not - what can I try
> to do?
>
> Flink: 1.18.0
> image: flink:1.18.0-scala_2.12-java11
>
> Code to reproduce. I expect this code to print: <current_datetime, None> all
> the time. But it prints <current_datetime> and state value
>
> ```python
> import time
>
> from datetime import datetime
>
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext,
> StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
>
>
> class Processor(KeyedProcessFunction):
> def open(self, runtime_context: RuntimeContext):
> state_descriptor = ValueStateDescriptor(
> name="my_state",
> value_type_info=Types.STRING(),
> )
>
> state_descriptor.enable_time_to_live(
> ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
> .cleanup_incrementally(cleanup_size=10,
> run_cleanup_for_every_record=True)
> .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build()
> )
>
> self.state = runtime_context.get_state(state_descriptor)
>
> def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
> current_state = self.state.value()
>
> print(datetime.now(), current_state)
>
> if current_state is None:
> self.state.update(str(datetime.now()))
>
> time.sleep(1.5)
>
>
> if __name__ == "__main__":
> # - Init environment
>
> environment =
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>
> # - Setup pipeline
>
> (
> environment.set_parallelism(1)
> .from_collection(
> collection=list(range(10)),
> )
> .key_by(lambda value: 0)
> .process(Processor())
>
>
>
> )
>
> # - Execute pipeline
>
> environment.execute("ttl_test")
>
>
>
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.time.LocalDateTime;
>
> public class GameHistoryProcessor extends KeyedProcessFunction<Integer,
> String, String> {
>
>
> private transient ValueState<String> state;
>
>
> @Override
> public void open(Configuration parameters) {
> var stateTtlConfig = StateTtlConfig
> .newBuilder(Time.seconds(1))
> // .cleanupFullSnapshot()
> .cleanupIncrementally(10, true)
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
>
> var stateDescriptor = new ValueStateDescriptor<>("state",
> String.class);
> stateDescriptor.enableTimeToLive(stateTtlConfig);
>
> state = getRuntimeContext().getState(stateDescriptor);
>
> }
>
> @Override
> public void processElement(String event, Context context,
> Collector<String> collector) throws IOException, InterruptedException {
> var state = state.value();
> System.out.println("State: " + state);
>
> if (state == null) {
> state = LocalDateTime.now().toString();
> state.update(state);
> }
>
> Thread.sleep(1500);
> }
> }```