Great, let me know if that helped ;)

Best,
D.

On Mon, Aug 16, 2021 at 4:36 PM László Ciople <ciople.las...@gmail.com>
wrote:

> The events are json dictionaries and the key is a field which represents a
> device id, or if it doesn't exist, then actually a *hashCode *of the
> device object in the dictionary is used. So this could be the problem then.
>
> On Mon, Aug 16, 2021 at 5:33 PM David Morávek <d...@apache.org> wrote:
>
>> Hi László,
>>
>> My intuition is that you have a non-deterministic shuffle key. If you
>> perform any "per-key" operation, you need to make sure that the same key
>> always end up in the same partition. To simplify this, it means that the
>> key needs to have a consistent *hashCode* and *equals* across different
>> JVMs.
>>
>> Usual mistake is that hash code is not deterministic, because it defaults
>> to `System.identityHashCode(..)` (this applies for any custom object that
>> doesn't override hashCode, for arrays, for enums, ...).
>>
>> What are you using as key for this operation?
>>
>> Best,
>> D.
>>
>> On Mon, Aug 16, 2021 at 4:00 PM László Ciople <ciople.las...@gmail.com>
>> wrote:
>>
>>> Hello,
>>> I am trying to write a Flink application which receives data from Kafka,
>>> does processing on keyed windowed streams and sends results on a
>>> different topic.
>>> Shortly after the job is started it fails with a NullPointerException in
>>> StateTable.put(). I am really confused by this error, because I am not
>>> explicitly working with state and in the exception stack, I cannot find a
>>> reference to my own code. I'd really appreciate it if anyone would help me
>>> figure out what's going on. Here's the exception stack:
>>>
>>> 2021-08-16 16:28:54 2021-08-16 13:28:54,026 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>>> Window(TumblingEventTimeWindows(1800000), EventTimeTrigger,
>>> CredentialStuffingAlerter) -> Map -> Sink:
>>> xdr.azure.analytics.login.credential_stuffing (2/2)
>>> (4b3004e64d8e3202535dd5521bff3584) switched from RUNNING to FAILED on
>>> senso-api-lciople-credential-stuffing-taskmanager-1-2 @
>>> ip-192-168-96-68.eu-central-1.compute.internal (dataPort=44941).
>>> 2021-08-16 16:28:54 java.lang.NullPointerException: null
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:351)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>>> 2021-08-16 16:28:54 at java.lang.Thread.run(Unknown Source) ~[?:?]
>>> 2021-08-16 16:28:54 2021-08-16 13:28:54,040 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Flink
>>> Streaming Job (80895c82f109899577d166b4388c157d) switched from state
>>> RUNNING to RESTARTING.
>>>
>>

Reply via email to