Hello Saravanan,

Yes you are correct. EFO uses AWS SDK v2 and the builder does not set proxy
configuration [1]. The polling (non EFO) mechanism is using AWS SDK v1
which has a more general configuration deserialiser, and hence proxy is
configurable. I do not believe there is a workaround for this without
modifying the connector.

If you are in a position to make a contribution to add support, we would
appreciate this. Otherwise I can take this one. Please let me know your
thoughts.

[1]
https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java#L113

Thanks,
Danny Cranmer.

On Tue, Jan 18, 2022 at 12:52 AM Gnanamoorthy, Saravanan <
[email protected]> wrote:

> Hello,
>
> We are using Flink kinesis connector for processing the streaming data
> from kinesis. We are running the application behind the proxy. After the
> proxyhost and proxyport settings, the Connector works with default
> publisher type(Polling) but it doesn’t work when we enable the publisher
> type as Enhanced fanout (EFO). We tried with different connector version
> but it the behaviours is same. I am wondering if the proxy settings are
> ignored for EFO type. I am looking forward to your
> feedback/recommendations.
>
>
>
> Flink version: 1.3.5
>
> Java version: 11
>
>
>
> Here is the error log:
>
> 2022-01-17 18:59:20,707 WARN  org.apache.flink.runtime.taskmanager.Task
>                   [] - Source: Custom Source -> Sink: Print to Std. Out
> (1/1)#0 (fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED
> with failure cause:
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException:
> Error registering stream: a367945-consumer-stream-dit
>
>         at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:429)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
>
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
>         Suppressed: java.lang.NullPointerException
>
>                 at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
>
>                 at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>
>                 at java.base/java.lang.Thread.run(Thread.java:834)
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkClientException:
> Unable to execute HTTP request: Network is unreachable:
> kinesis.us-east-1.amazonaws.com/3.227.250.203:443
>
>         at
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>
>         at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122)
>
>         ... 9 more
>
>
>
>
>
>
>
> Thanks
>
> -Saravan
>

Reply via email to