Hello Jessy,

Currently StateFun Kafka ingress interprets the keys of the record as the
destination address.
So you'd have to attach a key to use that specific ingress.

If this is not an option for you, you can consider @Tim's suggestion or
create a JIRA with a feature request,
which we will be happy to follow up on, if enough people are interested :-)

Cheers,
Igal



On Thu, Jun 10, 2021 at 5:11 PM Timothy Bess <tdbga...@gmail.com> wrote:

> Hi Jessy,
>
> I had this issue as well, here's the resolution
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefun-2-2-2-Checkpoint-restore-NPE-td44022.html>.
> I ended up forking the version of statefun I used and removing the null
> check to default to empty string, but I'm going to switch to the solution
> Igal suggested.
>
> Thanks,
>
> Tim
>
> On Thu, Jun 10, 2021 at 10:46 AM Jessy Ping <tech.user.str...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am trying to consume data from azure eventhub using the kafka ingress
>> and i am getting the following error.
>>
>> *java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress
>> requires a UTF-8 key set for each record.*
>>
>> While sending the data to the Event hub using my data producer, I am not
>> sending it with any KEY. And the same data can be consumed without any
>> issues with a normal flink application .
>>
>> I can see the error is raising from
>> RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()
>>
>> Why is this check important and how to resolve this for eventhubs.?
>>
>> private byte[] requireNonNullKey(byte[] key) {
>>   if (key == null) {
>>     IngressType tpe = 
>> ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE;
>>     throw new IllegalStateException(
>>         "The "
>>             + tpe.namespace()
>>             + "/"
>>             + tpe.type()
>>             + " ingress requires a UTF-8 key set for each record.");
>>   }
>>   return key;
>> }
>>
>> Thanks
>> Jessy
>> On Thu, 10 Jun 2021 at 20:13, Jessy Ping <tech.user.str...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I am trying to consume data from azure eventhub using the kafka ingress
>>> and i am getting the following error.
>>>
>>> *java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress
>>> requires a UTF-8 key set for each record.*
>>>
>>> While sending the data to the Event hub using my data producer, I am not
>>> sending it with any KEY. And the same data can be consumed without any
>>> issues with a normal flink application .
>>>
>>> I can see the error is raising from
>>> RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()
>>>
>>>
>>>

Reply via email to