Hey Saravanan, Please read the contribution guide [1]. It is a good idea to review the code style guidelines [2] to reduce PR churn for nits.
If you can please raise a Jira and mention me, I will assign it to you. [1] https://flink.apache.org/contributing/how-to-contribute.html [2] https://flink.apache.org/contributing/code-style-and-quality-preamble.html Thanks, On Sun, Jan 23, 2022 at 6:20 PM Gnanamoorthy, Saravanan < saravanan.gnanamoor...@fmr.com> wrote: > Hi Danny, > > I should be able to make the contribution to add proxy support. Please let > me know the contribution process. > > > > Thanks > > -Saravan > > > > *From: *Danny Cranmer <dannycran...@apache.org> > *Date: *Wednesday, January 19, 2022 at 3:10 AM > *To: *Gnanamoorthy, Saravanan <saravanan.gnanamoor...@fmr.com> > *Cc: *user@flink.apache.org <user@flink.apache.org> > *Subject: *Re: Flink Kinesis connector - EFO connection error with http > proxy settings > > *NOTICE: This email is from an external sender - do not click on links or > attachments unless you recognize the sender and know the content is safe. * > > > > Hello Saravanan, > > > > Yes you are correct. EFO uses AWS SDK v2 and the builder does not set > proxy configuration [1]. The polling (non EFO) mechanism is using AWS SDK > v1 which has a more general configuration deserialiser, and hence proxy is > configurable. I do not believe there is a workaround for this without > modifying the connector. > > > > If you are in a position to make a contribution to add support, we would > appreciate this. Otherwise I can take this one. Please let me know your > thoughts. > > > > [1] > https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java#L113 > > > > Thanks, > > Danny Cranmer. > > > > On Tue, Jan 18, 2022 at 12:52 AM Gnanamoorthy, Saravanan < > saravanan.gnanamoor...@fmr.com> wrote: > > Hello, > > We are using Flink kinesis connector for processing the streaming data > from kinesis. We are running the application behind the proxy. After the > proxyhost and proxyport settings, the Connector works with default > publisher type(Polling) but it doesn’t work when we enable the publisher > type as Enhanced fanout (EFO). We tried with different connector version > but it the behaviours is same. I am wondering if the proxy settings are > ignored for EFO type. I am looking forward to your > feedback/recommendations. > > > > Flink version: 1.3.5 > > Java version: 11 > > > > Here is the error log: > > 2022-01-17 18:59:20,707 WARN org.apache.flink.runtime.taskmanager.Task > [] - Source: Custom Source -> Sink: Print to Std. Out > (1/1)#0 (fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED > with failure cause: > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException: > Error registering stream: a367945-consumer-stream-dit > > at > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125) > > at > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106) > > at > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75) > > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:429) > > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365) > > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536) > > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > > Suppressed: java.lang.NullPointerException > > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > > at java.base/java.lang.Thread.run(Thread.java:834) > > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkClientException: > Unable to execute HTTP request: Network is unreachable: > kinesis.us-east-1.amazonaws.com/3.227.250.203:443 > > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) > > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101) > > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191) > > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100) > > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90) > > at > org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122) > > ... 9 more > > > > > > > > Thanks > > -Saravan > >