Flink OpenSearch Connector - Avro JSON to JSON

2023-11-20 Thread Praveen Chandna via user
Hello Team

Please helps to answer the below query.


  1.  OpenSearch supports writing data in the JSON format, but in Flink its 
default data format is Avro JSON. What is the best practice to write data to 
Open Search using Flink OpenSearch Connector? Do we need to manually convert 
Avro JSON to JSON format or is there any in-built support in Flink to write 
JSON to openSearch.
  2.  How can I write the existing complex nested JSON data into OpenSearch 
using the Flink connector rather than writing each key-value(or filed/value).
  3.  OpenSearch Java Rest Client support writing the Java object directly to 
the openSearch using the below code.



IndexRequest indexRequest =new 
IndexRequest.Builder().index(index).document(indexData).build();



https://opensearch.org/docs/latest/clients/java/

Thanks !!

// Regards
Praveen Chandna



Re: Flink Kubernetes operator keeps reporting REST client timeout.

2023-11-20 Thread Xiaolong Wang
Seems the operator didn't get restarted automatically after the configmap
is changed. After a roll-out restart, the exception disappeared. Never mind
this issue. Thanks.

On Tue, Nov 21, 2023 at 11:31 AM Xiaolong Wang 
wrote:

> Hi,
>
> Recently I upgraded the flink-kubernetes-operator from 1.4.0 to 1.6.1 to
> use Flink 1.18. After that, the operator kept reporting the following
> exception:
>
> 2023-11-21 03:26:50,505 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO
>> ][sn-push/sn-push-decision-maker-log-s3-hive-prd] Resource fully
>> reconciled, nothing to do...
>>
>> 2023-11-21 03:26:50,727 o.a.f.r.r.RestClient   [WARN
>> ][realtime-streaming/realtime-perf-report-main-prd-test] Rest endpoint
>> shutdown failed.
>>
>> java.util.concurrent.TimeoutException
>>
>> at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown
>> Source)
>>
>> at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
>>
>> at org.apache.flink.runtime.rest.RestClient.shutdown(RestClient.java:227)
>>
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.close(RestClusterClient.java:270)
>>
>> at
>> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getTaskManagersInfo(AbstractFlinkService.java:925)
>>
>> at
>> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getClusterInfo(AbstractFlinkService.java:621)
>>
>> at
>> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeClusterInfo(AbstractFlinkDeploymentObserver.java:85)
>>
>> at
>> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:75)
>>
>> at
>> org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:49)
>>
>> at
>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:129)
>>
>> at
>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>>
>> at
>> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>>
>> at
>> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>>
>> at
>> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>>
>> at
>> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>>
>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> Source)
>>
>> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>> Source)
>>
>> at java.base/java.lang.Thread.run(Unknown Source)
>>
>
> I tried to increase the rest timeout param of 
> "job.autoscaler.flink.rest-client.timeout"
> to 60 s, yet it does not resolve the issue.
>
> Could you help check this out ? Thanks in advance.
>


Flink Kubernetes operator keeps reporting REST client timeout.

2023-11-20 Thread Xiaolong Wang
Hi,

Recently I upgraded the flink-kubernetes-operator from 1.4.0 to 1.6.1 to
use Flink 1.18. After that, the operator kept reporting the following
exception:

2023-11-21 03:26:50,505 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO
> ][sn-push/sn-push-decision-maker-log-s3-hive-prd] Resource fully
> reconciled, nothing to do...
>
> 2023-11-21 03:26:50,727 o.a.f.r.r.RestClient   [WARN
> ][realtime-streaming/realtime-perf-report-main-prd-test] Rest endpoint
> shutdown failed.
>
> java.util.concurrent.TimeoutException
>
> at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown
> Source)
>
> at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
>
> at org.apache.flink.runtime.rest.RestClient.shutdown(RestClient.java:227)
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.close(RestClusterClient.java:270)
>
> at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getTaskManagersInfo(AbstractFlinkService.java:925)
>
> at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getClusterInfo(AbstractFlinkService.java:621)
>
> at
> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeClusterInfo(AbstractFlinkDeploymentObserver.java:85)
>
> at
> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:75)
>
> at
> org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:49)
>
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:129)
>
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>
> at
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>
> at
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>
> at
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>
> at java.base/java.lang.Thread.run(Unknown Source)
>

I tried to increase the rest timeout param of
"job.autoscaler.flink.rest-client.timeout"
to 60 s, yet it does not resolve the issue.

Could you help check this out ? Thanks in advance.


Handling Errors and Message Replay in Flink

2023-11-20 Thread mejri houssem
Hello Flink community ,

We are currently working on a Flink job that consumes messages from
RabbitMQ, with checkpointing configured to at-least-once mode.

In our job, we make external API requests to retrieve information. If the
external api is down or a timeout is occured, we currently throw an
exception to avoid acknowledging the message in RabbitMQ as we aim to
replay it. However, this approach causes all tasks to be redeployed.

I'm reaching out to inquire if there are alternative solutions available in
Flink to avoid throwing an exception. We are interested in a method that
allows us to instruct Flink not to acknowledge a message if a
problem occurred during its processing.

Any suggestions would be appreciated.

Best regards,


Re: Avoid dynamic classloading in native mode with Kubernetes Operator

2023-11-20 Thread Alexis Sarda-Espinosa
Hi Trystan, I'm actually not very familiar with the operator's internals,
but I'd guess that limitation is in Flink itself - application mode is a
feature from core Flink, the operator just configures it based on the CRDs
it defines. Maybe one of the maintainers can confirm.

Regards,
Alexis.

On Mon, 20 Nov 2023, 19:25 Trystan,  wrote:

> Thanks Alexis, I can give that a try. However, that seems less than ideal
> from the user's perspective.
>
> Is there a technical reason why the operator can't support this
> combination of modes? I'd really like to just let the system do its thing
> rather than build a complicated two-jar approach.
>
> Thanks,
> Trystan
>
> On Fri, Nov 17, 2023 at 12:19 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Trystan,
>>
>> I imagine you can create 2 jars, one should only have a class with the
>> main method, and the other should be a fat jar with everything else for
>> your job. If you create a custom image where your fat jar is placed under
>> /opt/flink/lib/ then I think it would "just work" when specifying the
>> main-method jar in jarURI.
>>
>> Nevertheless, even though Flink shadows a lot of the libraries they use
>> internally, I suppose you could still end up with dependency conflicts, so
>> you would probably have some added complexity managing what's bundled in
>> your fat jar.
>>
>> Regards,
>> Alexis.
>>
>> Am Do., 16. Nov. 2023 um 19:42 Uhr schrieb Trystan :
>>
>>> Is it possible to avoid dynamic classloading when using the operator
>>> with a native kubernetes application deployment?
>>>
>>> If I put the job jar into /opt/flinklib, then there are two possible
>>> outcomes:
>>>
>>>1. If I point jarURI to the jar, I get linkage errors (presumably:
>>>the class have already been loaded by the AppClassLoader and the
>>>FlinkUserCodeClassLoader).
>>>2. If I do not include jarURI the operator pods encounter a
>>>NullPointerException. The docs state this is optional, but appears to 
>>> only
>>>pertain to standalone mode.
>>>
>>> https://issues.apache.org/jira/browse/FLINK-29288 enabled the optional
>>> jarURI (apparently only for standalone deployments).
>>>
>>> Are there any additional configurations (configs, jar locations, etc)
>>> that are needed to avoid dynamic classloading in this case?
>>>
>>


Re: Avoid dynamic classloading in native mode with Kubernetes Operator

2023-11-20 Thread Trystan
Thanks Alexis, I can give that a try. However, that seems less than ideal
from the user's perspective.

Is there a technical reason why the operator can't support this combination
of modes? I'd really like to just let the system do its thing rather than
build a complicated two-jar approach.

Thanks,
Trystan

On Fri, Nov 17, 2023 at 12:19 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Trystan,
>
> I imagine you can create 2 jars, one should only have a class with the
> main method, and the other should be a fat jar with everything else for
> your job. If you create a custom image where your fat jar is placed under
> /opt/flink/lib/ then I think it would "just work" when specifying the
> main-method jar in jarURI.
>
> Nevertheless, even though Flink shadows a lot of the libraries they use
> internally, I suppose you could still end up with dependency conflicts, so
> you would probably have some added complexity managing what's bundled in
> your fat jar.
>
> Regards,
> Alexis.
>
> Am Do., 16. Nov. 2023 um 19:42 Uhr schrieb Trystan :
>
>> Is it possible to avoid dynamic classloading when using the operator with
>> a native kubernetes application deployment?
>>
>> If I put the job jar into /opt/flinklib, then there are two possible
>> outcomes:
>>
>>1. If I point jarURI to the jar, I get linkage errors (presumably:
>>the class have already been loaded by the AppClassLoader and the
>>FlinkUserCodeClassLoader).
>>2. If I do not include jarURI the operator pods encounter a
>>NullPointerException. The docs state this is optional, but appears to only
>>pertain to standalone mode.
>>
>> https://issues.apache.org/jira/browse/FLINK-29288 enabled the optional
>> jarURI (apparently only for standalone deployments).
>>
>> Are there any additional configurations (configs, jar locations, etc)
>> that are needed to avoid dynamic classloading in this case?
>>
>


Re: flink sql作业如何支持配置流?

2023-11-20 Thread Yu Chen
Hi casel,

我们在生产中有类似的做法,可以考虑实现一个udtf,监听apollo的配置,根据配置选择是否filter数据。

Best,
Yu Chen


> 2023年11月20日 21:05,Xuyang  写道:
> 
> Hi, 
>是否可以将这个”配置维表“换成流表,利用flink cdc,改动这个配置表的时候,监听字段cdc变化,同时下游上流join呢?
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
> 在 2023-11-20 19:24:47,"casel.chen"  写道:
>> 我有一个flink 
>> sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。
>> 
>> 
>> create table customer_conf_tbl (
>> customer_id STRING
>> ) with (
>> 'connector' = 'apollo',
>> '其他属性' 
>> );
>> select * from biz_table where customer_id in (select 
>> string_split(customer_id, ',') from customer_conf_tbl)
>> 
>> 
>> 如果要做成配置实时更新作用于sql作业的话又该如何实现呢?



Re:flink sql作业如何支持配置流?

2023-11-20 Thread Xuyang
Hi, 
是否可以将这个”配置维表“换成流表,利用flink cdc,改动这个配置表的时候,监听字段cdc变化,同时下游上流join呢?




--

Best!
Xuyang





在 2023-11-20 19:24:47,"casel.chen"  写道:
>我有一个flink 
>sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。
>
>
>create table customer_conf_tbl (
>  customer_id STRING
>) with (
>  'connector' = 'apollo',
>  '其他属性' 
>);
>select * from biz_table where customer_id in (select string_split(customer_id, 
>',') from customer_conf_tbl)
>
>
>如果要做成配置实时更新作用于sql作业的话又该如何实现呢?


flink sql作业如何支持配置流?

2023-11-20 Thread casel.chen
我有一个flink 
sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。


create table customer_conf_tbl (
  customer_id STRING
) with (
  'connector' = 'apollo',
  '其他属性' 
);
select * from biz_table where customer_id in (select string_split(customer_id, 
',') from customer_conf_tbl)


如果要做成配置实时更新作用于sql作业的话又该如何实现呢?

Re: [DISCUSS] FLIP-391: Deprecate RuntimeContext#getExecutionConfig

2023-11-20 Thread Jing Ge via user
Hi Junrui,

Thanks for the clarification. On one hand, adding more methods into the
RuntimeContext flat will increase the effort for users who will use
RuntimeContext. But the impact is limited. It is fine. The big impact is,
on the other hand, for users who want to focus on the execution config,
they will need to find the needle in the haystack.

I just shared my thoughts and tried to help you look at the issue from many
different angles and I am open to learning opinions from other
contributors. Please feel free to proceed if there are no other objections.

Best regards,
Jing

On Mon, Nov 20, 2023 at 6:50 AM Junrui Lee  wrote:

> Hi Jing,
>
> Thank you for your feedback. I understand your concerns regarding putting
> all methods into the RuntimeContext flat.
>
> I would like to share some of my thoughts on this matter.
> Firstly, this FLIP only proposes the addition of three additional methods,
> which should not impose too much extra burden on users. Secondly, I agree
> that it is important to make it clearer for users to use the
> RuntimeContext. However, reorganizing the RuntimeContext to achieve this
> requires further discussion. We should focus on a more specific and unified
> reorganization of the RuntimeContext interface in future work, rather than
> implementing a temporary solution now. Therefore, I prefer not to add a
> separate abstraction layer for these three methods in this FLIP.
>
> Please feel free to share any further thoughts.
>
> Best regards,
> Junrui
>
> Jing Ge  于2023年11月20日周一 05:46写道:
>
>> Hi Junrui,
>>
>> Thanks for bringing this to our attention. First of all, it makes sense
>> to deprecate RuntimeContext#getExecutionConfig.
>>
>> Afaic, this is an issue of how we design API with clean concepts/aspects.
>> There are two issues mentioned in the FLIP:
>>
>> 1. short of user-facing abstraction - we just exposed ExecutionConfig
>> which mixed methods for users with methods that should only be used
>> internally.
>> 2. mutable vs immutable - do we want users to be able to modify configs
>> during job execution?
>>
>> An immutable user-facing abstraction design can solve both issues. All
>> execution related configs are still consolidated into the abstraction class
>> and easy to access. This is another design decision: flat vs. hierarchical.
>> Current FLIP removed the execution config abstraction and put all methods
>> into RuntimeContext flat, which will end up with more than 30 methods
>> offered flat by the RuntimeContext. I am not sure if this could help users
>> find the right method in the context of execution config better than
>> before.
>>
>> I might miss something and look forward to your thoughts. Thanks!
>>
>> Best regards,
>> Jing
>>
>> On Sat, Nov 18, 2023 at 11:21 AM Junrui Lee  wrote:
>>
>>> Hello Wencong,
>>>
>>> Thank you for your valuable feedback and suggestions. I want to clarify
>>> that reviewing existing methods in the ExecutionConfig is not directly
>>> related to the proposal in this FLIP. The main focus of this FLIP is to
>>> deprecate the specific method RuntimeContext#getExecutionConfig(). I
>>> believe it is important to keep the scope of this FLIP limited. However,
>>> your suggestion can certainly be considered as a separate FLIP in the
>>> future.
>>>
>>> Best regards,
>>> Junrui
>>>
>>> Wencong Liu  于2023年11月17日周五 22:08写道:
>>>
 Hello Junrui,


 Thanks for the effort. I agree with the proposal to deprecate the
 getExecutionConfig() method in the RuntimeContext class. Exposing
 the complex ExecutionConfig to user-defined functions can lead to
 unnecessary complexity and risks.


 I also have a suggestion. We could consider reviewing the existing
  methods in ExecutionConfig. If there are methods that are defined
  in ExecutionConfig but currently have no callers, we could consider
  annotating  them as @Internal or directly removing them. Since
 users are no longer able to access and invoke these methods,
 it would be beneficial to clean up the codebase.


 +1 (non-binding).


 Best,
 Wencong



















 At 2023-11-15 16:51:15, "Junrui Lee"  wrote:
 >Hi all,
 >
 >I'd like to start a discussion of FLIP-391: Deprecate
 >RuntimeContext#getExecutionConfig[1].
 >
 >Currently, the FLINK RuntimeContext is important for connecting user
 >functions to the underlying runtime details. It provides users with
 >necessary runtime information during job execution.
 >However, he current implementation of the FLINK RuntimeContext exposes
 the
 >ExecutionConfig to users, resulting in two issues:
 >Firstly, the ExecutionConfig contains much unrelated information that
 can
 >confuse users and complicate management.
 >Secondly, exposing the ExecutionConfig allows users to modify it
 during job
 >execution, which can cause