I think that's because you declared it as transient field.

Move the declaration inside of "open" function to resolve that

On Mon, Oct 22, 2018 at 3:48 PM Ahmad Hassan <ahmad.has...@gmail.com> wrote:

> 2018-10-22 13:46:31,944 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - Window(SlidingProcessingTimeWindows(180000, 180000),
> TimeTrigger, MetricWindowFunction) -> Map -> Sink: Unnamed (1/1)
> (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED.
>
> java.lang.NullPointerException: The state properties must not be null
>
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
>
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174)
>
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168)
>
> at
> com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
> at
> org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> On Sat, 20 Oct 2018 at 11:29, vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Ahmad,
>>
>> Can you try to dump thread info from the Task Manager's JVM instance?
>>
>> Thanks, vino.
>>
>> Ahmad Hassan <ahmad.has...@gmail.com> 于2018年10月20日周六 下午4:24写道:
>>
>>> Flink 1.6.0. Valuestate initialises successful but mapstate hangs
>>>
>>> Regards
>>>
>>> On 20 Oct 2018, at 02:55, vino yang <yanghua1...@gmail.com> wrote:
>>>
>>> Hi Ahmad,
>>>
>>> Which version of Flink do you use?
>>>
>>> Thanks, vino.
>>>
>>> Ahmad Hassan <ahmad.has...@gmail.com> 于2018年10月19日周五 下午11:32写道:
>>>
>>>> Hi,
>>>>
>>>> Initializing mapstate hangs in window function. However if i use
>>>> valuestate then it is initialized succcessfully. I am using rocksdb to
>>>> store the state.
>>>>
>>>> public class MyWindowFunction extends RichWindowFunction<Event,
>>>> Payload, Tuple, TimeWindow>
>>>> {
>>>> private transient MapStateDescriptor<String, String> productsDescriptor
>>>> = new MapStateDescriptor<>(
>>>> "mapState", String.class, String.class);
>>>>
>>>> @Override
>>>> public void apply(Tuple key, TimeWindow window, final Iterable<Event>
>>>> input,
>>>> final Collector<Payload> out)
>>>> {
>>>> // do something
>>>> }
>>>>
>>>> @Override
>>>> public void open(Configuration parameters) throws Exception
>>>> {
>>>> System.out.println("## open init window state ");
>>>> * MapState<String, String> state =
>>>> this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs
>>>> here*
>>>> System.out.println("## open window state " + state);
>>>> }
>>>> }
>>>>
>>>> Thanks for the help.
>>>>
>>>

Reply via email to