Hello Jessy, Currently StateFun Kafka ingress interprets the keys of the record as the destination address. So you'd have to attach a key to use that specific ingress.
If this is not an option for you, you can consider @Tim's suggestion or create a JIRA with a feature request, which we will be happy to follow up on, if enough people are interested :-) Cheers, Igal On Thu, Jun 10, 2021 at 5:11 PM Timothy Bess <tdbga...@gmail.com> wrote: > Hi Jessy, > > I had this issue as well, here's the resolution > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefun-2-2-2-Checkpoint-restore-NPE-td44022.html>. > I ended up forking the version of statefun I used and removing the null > check to default to empty string, but I'm going to switch to the solution > Igal suggested. > > Thanks, > > Tim > > On Thu, Jun 10, 2021 at 10:46 AM Jessy Ping <tech.user.str...@gmail.com> > wrote: > >> Hi all, >> >> I am trying to consume data from azure eventhub using the kafka ingress >> and i am getting the following error. >> >> *java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress >> requires a UTF-8 key set for each record.* >> >> While sending the data to the Event hub using my data producer, I am not >> sending it with any KEY. And the same data can be consumed without any >> issues with a normal flink application . >> >> I can see the error is raising from >> RoutableProtobufKafkaIngressDeserializer.requireNonNullKey() >> >> Why is this check important and how to resolve this for eventhubs.? >> >> private byte[] requireNonNullKey(byte[] key) { >> if (key == null) { >> IngressType tpe = >> ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE; >> throw new IllegalStateException( >> "The " >> + tpe.namespace() >> + "/" >> + tpe.type() >> + " ingress requires a UTF-8 key set for each record."); >> } >> return key; >> } >> >> Thanks >> Jessy >> On Thu, 10 Jun 2021 at 20:13, Jessy Ping <tech.user.str...@gmail.com> >> wrote: >> >>> Hi all, >>> >>> I am trying to consume data from azure eventhub using the kafka ingress >>> and i am getting the following error. >>> >>> *java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress >>> requires a UTF-8 key set for each record.* >>> >>> While sending the data to the Event hub using my data producer, I am not >>> sending it with any KEY. And the same data can be consumed without any >>> issues with a normal flink application . >>> >>> I can see the error is raising from >>> RoutableProtobufKafkaIngressDeserializer.requireNonNullKey() >>> >>> >>>