That does allow me to setup the ConsumerConfigConstants. - This does have one downside. The SourceFunctionSpec has a different TYPE than KinesisFunctionSpec so the hashed operator ID does not match. Thus I had to allowNonRestoredState.
But it is worth it. Thank you. On Thu, Apr 29, 2021 at 4:44 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Ammon, > > Unfortunately you're right. I think the Flink Kinesis Consumer specific > configs, e.g. keys in the ConsumerConfigConstants class, were overlooked in > the initial design. > > One way to workaround this is to use the `SourceFunctionSpec` [1]. Using > that spec, you can use any Flink SourceFunction (e.g. a > FlinkKinesisConsumer) as the ingress. > Simply instantiate a `SourceFunctionSpec` with the desired ID, and provide > a custom FlinkKinesisConsumer that you create directly (which should allow > you to provide the ConsumerConfigConstants). > > As a side note, I've created this JIRA to address the issue you > encountered, as I believe this should be supported in the native StateFun > Kinesis ingress [2]. > > Cheers, > Gordon > > [1] > https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/datastream/SourceFunctionSpec.java > [2] https://issues.apache.org/jira/browse/FLINK-22529 > > On Thu, Apr 29, 2021 at 7:25 AM Ammon Diether <adiet...@gmail.com> wrote: > >> >> When using Flink Stateful Function's KinesisIngressBuilder, I do not see >> a way to set things like ConsumerConfigConstants.SHARD_GETRECORDS_MAX or >> ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS >> >> Looking at KinesisSourceProvider, it appears that this is the spot that >> creates the FlinkKinesisConsumer. The function named >> propertiesFromSpec(kinesisIngressSpec) only allows for AWS properties and a >> few startup position properties. >> ConsumerConfigConstants.SHARD_GETRECORDS_MAX cannot be provided. >> >> Is there an obvious workaround? >> >>