That does allow me to setup the ConsumerConfigConstants.
 - This does have one downside.  The SourceFunctionSpec has a different
TYPE than KinesisFunctionSpec so the hashed operator ID does not match.
Thus I had to allowNonRestoredState.

But it is worth it.  Thank you.



On Thu, Apr 29, 2021 at 4:44 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Ammon,
>
> Unfortunately you're right. I think the Flink Kinesis Consumer specific
> configs, e.g. keys in the ConsumerConfigConstants class, were overlooked in
> the initial design.
>
> One way to workaround this is to use the `SourceFunctionSpec` [1]. Using
> that spec, you can use any Flink SourceFunction (e.g. a
> FlinkKinesisConsumer) as the ingress.
> Simply instantiate a `SourceFunctionSpec` with the desired ID, and provide
> a custom FlinkKinesisConsumer that you create directly (which should allow
> you to provide the ConsumerConfigConstants).
>
> As a side note, I've created this JIRA to address the issue you
> encountered, as I believe this should be supported in the native StateFun
> Kinesis ingress [2].
>
> Cheers,
> Gordon
>
> [1]
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/datastream/SourceFunctionSpec.java
> [2] https://issues.apache.org/jira/browse/FLINK-22529
>
> On Thu, Apr 29, 2021 at 7:25 AM Ammon Diether <adiet...@gmail.com> wrote:
>
>>
>> When using Flink Stateful Function's KinesisIngressBuilder, I do not see
>> a way to set things like ConsumerConfigConstants.SHARD_GETRECORDS_MAX or
>> ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS
>>
>> Looking at KinesisSourceProvider, it appears that this is the spot that
>> creates the FlinkKinesisConsumer. The function named
>> propertiesFromSpec(kinesisIngressSpec) only allows for AWS properties and a
>> few startup position properties.
>> ConsumerConfigConstants.SHARD_GETRECORDS_MAX cannot be provided.
>>
>> Is there an obvious workaround?
>>
>>

Reply via email to