Hello Saravanan, Yes you are correct. EFO uses AWS SDK v2 and the builder does not set proxy configuration [1]. The polling (non EFO) mechanism is using AWS SDK v1 which has a more general configuration deserialiser, and hence proxy is configurable. I do not believe there is a workaround for this without modifying the connector.
If you are in a position to make a contribution to add support, we would appreciate this. Otherwise I can take this one. Please let me know your thoughts. [1] https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java#L113 Thanks, Danny Cranmer. On Tue, Jan 18, 2022 at 12:52 AM Gnanamoorthy, Saravanan < [email protected]> wrote: > Hello, > > We are using Flink kinesis connector for processing the streaming data > from kinesis. We are running the application behind the proxy. After the > proxyhost and proxyport settings, the Connector works with default > publisher type(Polling) but it doesn’t work when we enable the publisher > type as Enhanced fanout (EFO). We tried with different connector version > but it the behaviours is same. I am wondering if the proxy settings are > ignored for EFO type. I am looking forward to your > feedback/recommendations. > > > > Flink version: 1.3.5 > > Java version: 11 > > > > Here is the error log: > > 2022-01-17 18:59:20,707 WARN org.apache.flink.runtime.taskmanager.Task > [] - Source: Custom Source -> Sink: Print to Std. Out > (1/1)#0 (fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED > with failure cause: > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException: > Error registering stream: a367945-consumer-stream-dit > > at > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125) > > at > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106) > > at > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75) > > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:429) > > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365) > > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536) > > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > > Suppressed: java.lang.NullPointerException > > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > > at java.base/java.lang.Thread.run(Thread.java:834) > > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkClientException: > Unable to execute HTTP request: Network is unreachable: > kinesis.us-east-1.amazonaws.com/3.227.250.203:443 > > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) > > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101) > > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191) > > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100) > > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90) > > at > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122) > > ... 9 more > > > > > > > > Thanks > > -Saravan >
