Hey Saravanan,

Please read the contribution guide [1]. It is a good idea to review the
code style guidelines [2] to reduce PR churn for nits.

If you can please raise a Jira and mention me, I will assign it to you.

[1] https://flink.apache.org/contributing/how-to-contribute.html
[2]
https://flink.apache.org/contributing/code-style-and-quality-preamble.html

Thanks,

On Sun, Jan 23, 2022 at 6:20 PM Gnanamoorthy, Saravanan <
saravanan.gnanamoor...@fmr.com> wrote:

> Hi Danny,
>
> I should be able to make the contribution to add proxy support. Please let
> me know the contribution process.
>
>
>
> Thanks
>
> -Saravan
>
>
>
> *From: *Danny Cranmer <dannycran...@apache.org>
> *Date: *Wednesday, January 19, 2022 at 3:10 AM
> *To: *Gnanamoorthy, Saravanan <saravanan.gnanamoor...@fmr.com>
> *Cc: *user@flink.apache.org <user@flink.apache.org>
> *Subject: *Re: Flink Kinesis connector - EFO connection error with http
> proxy settings
>
> *NOTICE: This email is from an external sender - do not click on links or
> attachments unless you recognize the sender and know the content is safe. *
>
>
>
> Hello Saravanan,
>
>
>
> Yes you are correct. EFO uses AWS SDK v2 and the builder does not set
> proxy configuration [1]. The polling (non EFO) mechanism is using AWS SDK
> v1 which has a more general configuration deserialiser, and hence proxy is
> configurable. I do not believe there is a workaround for this without
> modifying the connector.
>
>
>
> If you are in a position to make a contribution to add support, we would
> appreciate this. Otherwise I can take this one. Please let me know your
> thoughts.
>
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.13/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java#L113
>
>
>
> Thanks,
>
> Danny Cranmer.
>
>
>
> On Tue, Jan 18, 2022 at 12:52 AM Gnanamoorthy, Saravanan <
> saravanan.gnanamoor...@fmr.com> wrote:
>
> Hello,
>
> We are using Flink kinesis connector for processing the streaming data
> from kinesis. We are running the application behind the proxy. After the
> proxyhost and proxyport settings, the Connector works with default
> publisher type(Polling) but it doesn’t work when we enable the publisher
> type as Enhanced fanout (EFO). We tried with different connector version
> but it the behaviours is same. I am wondering if the proxy settings are
> ignored for EFO type. I am looking forward to your
> feedback/recommendations.
>
>
>
> Flink version: 1.3.5
>
> Java version: 11
>
>
>
> Here is the error log:
>
> 2022-01-17 18:59:20,707 WARN  org.apache.flink.runtime.taskmanager.Task
>                   [] - Source: Custom Source -> Sink: Print to Std. Out
> (1/1)#0 (fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED
> with failure cause:
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException:
> Error registering stream: a367945-consumer-stream-dit
>
>         at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:429)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
>
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
>         Suppressed: java.lang.NullPointerException
>
>                 at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
>
>                 at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662)
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>
>                 at java.base/java.lang.Thread.run(Thread.java:834)
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkClientException:
> Unable to execute HTTP request: Network is unreachable:
> kinesis.us-east-1.amazonaws.com/3.227.250.203:443
>
>         at
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>
>         at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90)
>
>         at
> org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122)
>
>         ... 9 more
>
>
>
>
>
>
>
> Thanks
>
> -Saravan
>
>

Reply via email to