[jira] [Commented] (FLINK-16571) Throw exception when current key is out of range

2020-03-25 Thread (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066793#comment-17066793
 ] 

 commented on FLINK-16571:
--

Would that be [~1571238203] (Alex T/x0x8o on github)?

> Throw exception when current key is out of range
> 
>
> Key: FLINK-16571
> URL: https://issues.apache.org/jira/browse/FLINK-16571
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.9.2
>Reporter: 
>Priority: Trivial
>
> * I've got a stream of records, that are "keyed by" using a class whose 
> hashCode isn't stable across jvm instances. 
> * The records are then processed by a parallel operator, which is running on 
> several task managers on the cluster.
> * A given task manager receive a record, calculates the hashCode of the key 
> and sends it to another task manager instance according to it's slot 
> allocation
> * The other task manager instance receives the record for a given parallel 
> slot (allocated by the other instance), calculate the hash for the record 
> (which doesn't match the original hash) and then I get this error:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
> {code}
> It took me a while to figure out what's going on, and while the issue is on 
> my side, it would have helped if the InternalKeyContextImpl had complained in 
> the first place that the hashCode was out of range.
> InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when 
> currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash 
> code function isn't stable.
> A few notes:
> * The reason I'm getting the issue is because on oracle jdk, Enum hashcode 
> use the memory address of the enum. I should use ordinal when hashing to 
> ensure stability
> * The issue is more likely to happen is cluster.evenly-spread-out-slots is on 
> (which forces the job to be distributed to different instances)
> Here's an example that replicates the issue. It has to run using oracle jvm 
> on a cluster of a few nodes, using luster.evenly-spread-out-slots:
> {code:java}
> import java.lang.reflect.Field;
> import java.util.Random;
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.KeyGroupRange;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class JobThatFails {
>   private static final Logger LOG = 
> LoggerFactory.getLogger(JobThatFails.class);
>   private enum EnumKey {
> ONE,
> TWO,
> THREE,
> FOUR,
> FIVE,
> SIX,
> SEVEN,
> EIGHT,
> NINE,
> TEN
>   }
>   void defineGraph(StreamExecutionEnvironment env) {
> int parallelism = 4;
> DataStream> src = null;
> for (int i = 0; i < 10; ++i) {
>   DataStream> eachSrc = env.addSource(new 
> IntegerSourceFunction());
>   if (src == null) {
> src = eachSrc;
>   } else {
> src = src.union(eachSrc);
>   }
> }
> src.keyBy(p -> p.f0)
> .map(new StatefulCounter())
> .name(StatefulCounter.class.getSimpleName())
> .setParallelism(parallelism)
> .addSink(
> new SinkFunction() {
>   @Override
>   public void invoke(Integer value, Context context) {}
> });
>   }
>   private static class IntegerSourceFunction implements 
> SourceFunction> {
> volatile boolean running;
> private IntegerSourceFunction() {}
> @Override
> public void run(SourceContext> ctx) throws 
> Exception {
>   running = true;
>   Random random = new Random();
>   while (running) {
> for (EnumKey e : EnumKey.values()) {
>   ctx.collect(Tuple2.of(e, random.nextInt(10)));
> }
> Thread.sleep(1000);
>   }
> }
> @Override
> public void cancel() {
>   running = false;
> }
>   }
>   class StatefulCounter extends RichMapFunction, 
> Integer> {
> private transient ValueState 

[jira] [Commented] (FLINK-16571) Throw exception when current key is out of range

2020-03-25 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066768#comment-17066768
 ] 

Benchao Li commented on FLINK-16571:


[~0x26dres] I'm not very familiar with this part, if you have further concerns, 
maybe you can ask the committers for this part for help.

> Throw exception when current key is out of range
> 
>
> Key: FLINK-16571
> URL: https://issues.apache.org/jira/browse/FLINK-16571
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.9.2
>Reporter: 
>Priority: Trivial
>
> * I've got a stream of records, that are "keyed by" using a class whose 
> hashCode isn't stable across jvm instances. 
> * The records are then processed by a parallel operator, which is running on 
> several task managers on the cluster.
> * A given task manager receive a record, calculates the hashCode of the key 
> and sends it to another task manager instance according to it's slot 
> allocation
> * The other task manager instance receives the record for a given parallel 
> slot (allocated by the other instance), calculate the hash for the record 
> (which doesn't match the original hash) and then I get this error:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
> {code}
> It took me a while to figure out what's going on, and while the issue is on 
> my side, it would have helped if the InternalKeyContextImpl had complained in 
> the first place that the hashCode was out of range.
> InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when 
> currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash 
> code function isn't stable.
> A few notes:
> * The reason I'm getting the issue is because on oracle jdk, Enum hashcode 
> use the memory address of the enum. I should use ordinal when hashing to 
> ensure stability
> * The issue is more likely to happen is cluster.evenly-spread-out-slots is on 
> (which forces the job to be distributed to different instances)
> Here's an example that replicates the issue. It has to run using oracle jvm 
> on a cluster of a few nodes, using luster.evenly-spread-out-slots:
> {code:java}
> import java.lang.reflect.Field;
> import java.util.Random;
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.KeyGroupRange;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class JobThatFails {
>   private static final Logger LOG = 
> LoggerFactory.getLogger(JobThatFails.class);
>   private enum EnumKey {
> ONE,
> TWO,
> THREE,
> FOUR,
> FIVE,
> SIX,
> SEVEN,
> EIGHT,
> NINE,
> TEN
>   }
>   void defineGraph(StreamExecutionEnvironment env) {
> int parallelism = 4;
> DataStream> src = null;
> for (int i = 0; i < 10; ++i) {
>   DataStream> eachSrc = env.addSource(new 
> IntegerSourceFunction());
>   if (src == null) {
> src = eachSrc;
>   } else {
> src = src.union(eachSrc);
>   }
> }
> src.keyBy(p -> p.f0)
> .map(new StatefulCounter())
> .name(StatefulCounter.class.getSimpleName())
> .setParallelism(parallelism)
> .addSink(
> new SinkFunction() {
>   @Override
>   public void invoke(Integer value, Context context) {}
> });
>   }
>   private static class IntegerSourceFunction implements 
> SourceFunction> {
> volatile boolean running;
> private IntegerSourceFunction() {}
> @Override
> public void run(SourceContext> ctx) throws 
> Exception {
>   running = true;
>   Random random = new Random();
>   while (running) {
> for (EnumKey e : EnumKey.values()) {
>   ctx.collect(Tuple2.of(e, random.nextInt(10)));
> }
> Thread.sleep(1000);
>   }
> }
> @Override
> public void cancel() {
>   running = false;
> }
>   }

[jira] [Commented] (FLINK-16571) Throw exception when current key is out of range

2020-03-25 Thread (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066738#comment-17066738
 ] 

 commented on FLINK-16571:
--

Are you talking about these:
{code}
A type cannot be a key if:
* it is a POJO type but does not override the hashCode() method and relies on 
the Object.hashCode() implementation.
* it is an array of any type.
{code}

It does make sense to me, these are similar constrains that would apply to key 
objects in a HashMap. But for flink the constraint are stricter as it expects 
the hashCode() value to be same on any JVM, since data get distributed across 
different task manager instances. 


> Throw exception when current key is out of range
> 
>
> Key: FLINK-16571
> URL: https://issues.apache.org/jira/browse/FLINK-16571
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.9.2
>Reporter: 
>Priority: Trivial
>
> * I've got a stream of records, that are "keyed by" using a class whose 
> hashCode isn't stable across jvm instances. 
> * The records are then processed by a parallel operator, which is running on 
> several task managers on the cluster.
> * A given task manager receive a record, calculates the hashCode of the key 
> and sends it to another task manager instance according to it's slot 
> allocation
> * The other task manager instance receives the record for a given parallel 
> slot (allocated by the other instance), calculate the hash for the record 
> (which doesn't match the original hash) and then I get this error:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
> {code}
> It took me a while to figure out what's going on, and while the issue is on 
> my side, it would have helped if the InternalKeyContextImpl had complained in 
> the first place that the hashCode was out of range.
> InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when 
> currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash 
> code function isn't stable.
> A few notes:
> * The reason I'm getting the issue is because on oracle jdk, Enum hashcode 
> use the memory address of the enum. I should use ordinal when hashing to 
> ensure stability
> * The issue is more likely to happen is cluster.evenly-spread-out-slots is on 
> (which forces the job to be distributed to different instances)
> Here's an example that replicates the issue. It has to run using oracle jvm 
> on a cluster of a few nodes, using luster.evenly-spread-out-slots:
> {code:java}
> import java.lang.reflect.Field;
> import java.util.Random;
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.KeyGroupRange;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class JobThatFails {
>   private static final Logger LOG = 
> LoggerFactory.getLogger(JobThatFails.class);
>   private enum EnumKey {
> ONE,
> TWO,
> THREE,
> FOUR,
> FIVE,
> SIX,
> SEVEN,
> EIGHT,
> NINE,
> TEN
>   }
>   void defineGraph(StreamExecutionEnvironment env) {
> int parallelism = 4;
> DataStream> src = null;
> for (int i = 0; i < 10; ++i) {
>   DataStream> eachSrc = env.addSource(new 
> IntegerSourceFunction());
>   if (src == null) {
> src = eachSrc;
>   } else {
> src = src.union(eachSrc);
>   }
> }
> src.keyBy(p -> p.f0)
> .map(new StatefulCounter())
> .name(StatefulCounter.class.getSimpleName())
> .setParallelism(parallelism)
> .addSink(
> new SinkFunction() {
>   @Override
>   public void invoke(Integer value, Context context) {}
> });
>   }
>   private static class IntegerSourceFunction implements 
> SourceFunction> {
> volatile boolean running;
> private IntegerSourceFunction() {}
> @Override
> public void run(SourceContext> ctx) throws 
> Exception {

[jira] [Commented] (FLINK-16571) Throw exception when current key is out of range

2020-03-24 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066286#comment-17066286
 ] 

Benchao Li commented on FLINK-16571:


[~0x26dres] Thanks for reporting this. IMHO, this is by design for now, and the 
document has pointed this out, see keyBy section in 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations]

 

> Throw exception when current key is out of range
> 
>
> Key: FLINK-16571
> URL: https://issues.apache.org/jira/browse/FLINK-16571
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.9.2
>Reporter: 
>Priority: Trivial
>
> * I've got a stream of records, that are "keyed by" using a class whose 
> hashCode isn't stable across jvm instances. 
> * The records are then processed by a parallel operator, which is running on 
> several task managers on the cluster.
> * A given task manager receive a record, calculates the hashCode of the key 
> and sends it to another task manager instance according to it's slot 
> allocation
> * The other task manager instance receives the record for a given parallel 
> slot (allocated by the other instance), calculate the hash for the record 
> (which doesn't match the original hash) and then I get this error:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
> {code}
> It took me a while to figure out what's going on, and while the issue is on 
> my side, it would have helped if the InternalKeyContextImpl had complained in 
> the first place that the hashCode was out of range.
> InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when 
> currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash 
> code function isn't stable.
> A few notes:
> * The reason I'm getting the issue is because on oracle jdk, Enum hashcode 
> use the memory address of the enum. I should use ordinal when hashing to 
> ensure stability
> * The issue is more likely to happen is cluster.evenly-spread-out-slots is on 
> (which forces the job to be distributed to different instances)
> Here's an example that replicates the issue. It has to run using oracle jvm 
> on a cluster of a few nodes, using luster.evenly-spread-out-slots:
> {code:java}
> import java.lang.reflect.Field;
> import java.util.Random;
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.KeyGroupRange;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class JobThatFails {
>   private static final Logger LOG = 
> LoggerFactory.getLogger(JobThatFails.class);
>   private enum EnumKey {
> ONE,
> TWO,
> THREE,
> FOUR,
> FIVE,
> SIX,
> SEVEN,
> EIGHT,
> NINE,
> TEN
>   }
>   void defineGraph(StreamExecutionEnvironment env) {
> int parallelism = 4;
> DataStream> src = null;
> for (int i = 0; i < 10; ++i) {
>   DataStream> eachSrc = env.addSource(new 
> IntegerSourceFunction());
>   if (src == null) {
> src = eachSrc;
>   } else {
> src = src.union(eachSrc);
>   }
> }
> src.keyBy(p -> p.f0)
> .map(new StatefulCounter())
> .name(StatefulCounter.class.getSimpleName())
> .setParallelism(parallelism)
> .addSink(
> new SinkFunction() {
>   @Override
>   public void invoke(Integer value, Context context) {}
> });
>   }
>   private static class IntegerSourceFunction implements 
> SourceFunction> {
> volatile boolean running;
> private IntegerSourceFunction() {}
> @Override
> public void run(SourceContext> ctx) throws 
> Exception {
>   running = true;
>   Random random = new Random();
>   while (running) {
> for (EnumKey e : EnumKey.values()) {
>   ctx.collect(Tuple2.of(e, random.nextInt(10)));
> }
> 

[jira] [Commented] (FLINK-16571) Throw exception when current key is out of range

2020-03-24 Thread (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17065986#comment-17065986
 ] 

 commented on FLINK-16571:
--

[~libenchao], I was hopping we could start the discussion about this issue. 

While this issue isn't a big problem in itself, it took me a lot of time to 
debug, and I'd like to save other users some time.
My idea for a fix would be to throw an exception if the key group index is out 
of the key group range:
https://github.com/apache/flink/compare/master...arthurandres:FLINK-16571-throw-error-when-key-hash-is-out-of-range?expand=1

Another solution would to change the transport layer. 
We would send the hash code value with the serialized key over the network and 
check that it matches the hash code value of the deserialized value when it is 
received. 
But that would be a much bigger change and it would not feel natural to send 
the hash code on the network anyway.

What do you think?

> Throw exception when current key is out of range
> 
>
> Key: FLINK-16571
> URL: https://issues.apache.org/jira/browse/FLINK-16571
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.9.2
>Reporter: 
>Priority: Trivial
>
> * I've got a stream of records, that are "keyed by" using a class whose 
> hashCode isn't stable across jvm instances. 
> * The records are then processed by a parallel operator, which is running on 
> several task managers on the cluster.
> * A given task manager receive a record, calculates the hashCode of the key 
> and sends it to another task manager instance according to it's slot 
> allocation
> * The other task manager instance receives the record for a given parallel 
> slot (allocated by the other instance), calculate the hash for the record 
> (which doesn't match the original hash) and then I get this error:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
>   at 
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:90)
> {code}
> It took me a while to figure out what's going on, and while the issue is on 
> my side, it would have helped if the InternalKeyContextImpl had complained in 
> the first place that the hashCode was out of range.
> InternalKeyContextImpl#setCurrentKeyGroupIndex should complain when 
> currentKeyGroupIndex isn't in the keyGroupRange, suggesting that the hash 
> code function isn't stable.
> A few notes:
> * The reason I'm getting the issue is because on oracle jdk, Enum hashcode 
> use the memory address of the enum. I should use ordinal when hashing to 
> ensure stability
> * The issue is more likely to happen is cluster.evenly-spread-out-slots is on 
> (which forces the job to be distributed to different instances)
> Here's an example that replicates the issue. It has to run using oracle jvm 
> on a cluster of a few nodes, using luster.evenly-spread-out-slots:
> {code:java}
> import java.lang.reflect.Field;
> import java.util.Random;
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.KeyGroupRange;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class JobThatFails {
>   private static final Logger LOG = 
> LoggerFactory.getLogger(JobThatFails.class);
>   private enum EnumKey {
> ONE,
> TWO,
> THREE,
> FOUR,
> FIVE,
> SIX,
> SEVEN,
> EIGHT,
> NINE,
> TEN
>   }
>   void defineGraph(StreamExecutionEnvironment env) {
> int parallelism = 4;
> DataStream> src = null;
> for (int i = 0; i < 10; ++i) {
>   DataStream> eachSrc = env.addSource(new 
> IntegerSourceFunction());
>   if (src == null) {
> src = eachSrc;
>   } else {
> src = src.union(eachSrc);
>   }
> }
> src.keyBy(p -> p.f0)
> .map(new StatefulCounter())
> .name(StatefulCounter.class.getSimpleName())
> .setParallelism(parallelism)
> .addSink(
> new SinkFunction() {
>   @Override
>