Re:回复: flink ui 算子数据展示一直loading...

2024-01-25 Thread Xuyang
Hi, 
手动curl 有问题的metric的接口,出来的数据正常吗? JM log里有发现什么异常么?



--

Best!
Xuyang





在 2024-01-26 11:51:53,"阿华田"  写道:
>这个维度都排查了  都正常
>
>
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2024年01月23日 21:57,Feng Jin 写道:
>可以尝试着下面几种方式确认下原因:
>
>
>1.
>
>打开浏览器开发者模式,看是否因为请求某个接口卡住
>2.
>
>查看下 JobManager 的 GC 情况,是否频繁 FullGC
>3.
>
>查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问.
>
>
>Best,
>Feng
>
>
>On Tue, Jan 23, 2024 at 6:16 PM 阿华田  wrote:
>
>
>
>如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
>阿华田
>a15733178...@163.com
>
>
>签名由 网易邮箱大师  定制
>
>


回复: flink ui 算子数据展示一直loading...

2024-01-25 Thread 阿华田
这个维度都排查了  都正常


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年01月23日 21:57,Feng Jin 写道:
可以尝试着下面几种方式确认下原因:


1.

打开浏览器开发者模式,看是否因为请求某个接口卡住
2.

查看下 JobManager 的 GC 情况,是否频繁 FullGC
3.

查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问.


Best,
Feng


On Tue, Jan 23, 2024 at 6:16 PM 阿华田  wrote:



如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
阿华田
a15733178...@163.com


签名由 网易邮箱大师  定制




Re: [ANNOUNCE] Apache Flink 1.18.1 released

2024-01-25 Thread Jing Ge via user
Hi folks,

The bug has been fixed and PR at docker-library/official-images has been
merged. The official images are available now.

Best regards,
Jing

On Mon, Jan 22, 2024 at 11:39 AM Jing Ge  wrote:

> Hi folks,
>
> I am still working on the official images because of the issue
> https://issues.apache.org/jira/browse/FLINK-34165. Images under
> apache/flink are available.
>
> Best regards,
> Jing
>
> On Sun, Jan 21, 2024 at 11:06 PM Jing Ge  wrote:
>
>> Thanks Leonard for the feedback! Also thanks @Jark Wu  
>> @Chesnay
>> Schepler  and each and everyone who worked closely
>> with me for this release. We made it together!
>>
>> Best regards,
>> Jing
>>
>> On Sun, Jan 21, 2024 at 9:25 AM Leonard Xu  wrote:
>>
>>> Thanks Jing for driving the release, nice work!
>>>
>>> Thanks all who involved this release!
>>>
>>> Best,
>>> Leonard
>>>
>>> > 2024年1月20日 上午12:01,Jing Ge  写道:
>>> >
>>> > The Apache Flink community is very happy to announce the release of
>>> Apache
>>> > Flink 1.18.1, which is the first bugfix release for the Apache Flink
>>> 1.18
>>> > series.
>>> >
>>> > Apache Flink® is an open-source stream processing framework for
>>> > distributed, high-performing, always-available, and accurate data
>>> streaming
>>> > applications.
>>> >
>>> > The release is available for download at:
>>> > https://flink.apache.org/downloads.html
>>> >
>>> > Please check out the release blog post for an overview of the
>>> improvements
>>> > for this bugfix release:
>>> >
>>> https://flink.apache.org/2024/01/19/apache-flink-1.18.1-release-announcement/
>>> >
>>> > Please note: Users that have state compression should not migrate to
>>> 1.18.1
>>> > (nor 1.18.0) due to a critical bug that could lead to data loss. Please
>>> > refer to FLINK-34063 for more information.
>>> >
>>> > The full release notes are available in Jira:
>>> >
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353640
>>> >
>>> > We would like to thank all contributors of the Apache Flink community
>>> who
>>> > made this release possible! Special thanks to @Qingsheng Ren @Leonard
>>> Xu
>>> > @Xintong Song @Matthias Pohl @Martijn Visser for the support during
>>> this
>>> > release.
>>> >
>>> > A Jira task series based on the Flink release wiki has been created for
>>> > 1.18.1 release. Tasks that need to be done by PMC have been explicitly
>>> > created separately. It will be convenient for the release manager to
>>> reach
>>> > out to PMC for those tasks. Any future patch release could consider
>>> cloning
>>> > it and follow the standard release process.
>>> > https://issues.apache.org/jira/browse/FLINK-33824
>>> >
>>> > Feel free to reach out to the release managers (or respond to this
>>> thread)
>>> > with feedback on the release process. Our goal is to constantly
>>> improve the
>>> > release process. Feedback on what could be improved or things that
>>> didn't
>>> > go so well are appreciated.
>>> >
>>> > Regards,
>>> > Jing
>>>
>>>


Re: [ANNOUNCE] Apache Flink 1.18.1 released

2024-01-25 Thread Jing Ge
Hi folks,

The bug has been fixed and PR at docker-library/official-images has been
merged. The official images are available now.

Best regards,
Jing

On Mon, Jan 22, 2024 at 11:39 AM Jing Ge  wrote:

> Hi folks,
>
> I am still working on the official images because of the issue
> https://issues.apache.org/jira/browse/FLINK-34165. Images under
> apache/flink are available.
>
> Best regards,
> Jing
>
> On Sun, Jan 21, 2024 at 11:06 PM Jing Ge  wrote:
>
>> Thanks Leonard for the feedback! Also thanks @Jark Wu  
>> @Chesnay
>> Schepler  and each and everyone who worked closely
>> with me for this release. We made it together!
>>
>> Best regards,
>> Jing
>>
>> On Sun, Jan 21, 2024 at 9:25 AM Leonard Xu  wrote:
>>
>>> Thanks Jing for driving the release, nice work!
>>>
>>> Thanks all who involved this release!
>>>
>>> Best,
>>> Leonard
>>>
>>> > 2024年1月20日 上午12:01,Jing Ge  写道:
>>> >
>>> > The Apache Flink community is very happy to announce the release of
>>> Apache
>>> > Flink 1.18.1, which is the first bugfix release for the Apache Flink
>>> 1.18
>>> > series.
>>> >
>>> > Apache Flink® is an open-source stream processing framework for
>>> > distributed, high-performing, always-available, and accurate data
>>> streaming
>>> > applications.
>>> >
>>> > The release is available for download at:
>>> > https://flink.apache.org/downloads.html
>>> >
>>> > Please check out the release blog post for an overview of the
>>> improvements
>>> > for this bugfix release:
>>> >
>>> https://flink.apache.org/2024/01/19/apache-flink-1.18.1-release-announcement/
>>> >
>>> > Please note: Users that have state compression should not migrate to
>>> 1.18.1
>>> > (nor 1.18.0) due to a critical bug that could lead to data loss. Please
>>> > refer to FLINK-34063 for more information.
>>> >
>>> > The full release notes are available in Jira:
>>> >
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353640
>>> >
>>> > We would like to thank all contributors of the Apache Flink community
>>> who
>>> > made this release possible! Special thanks to @Qingsheng Ren @Leonard
>>> Xu
>>> > @Xintong Song @Matthias Pohl @Martijn Visser for the support during
>>> this
>>> > release.
>>> >
>>> > A Jira task series based on the Flink release wiki has been created for
>>> > 1.18.1 release. Tasks that need to be done by PMC have been explicitly
>>> > created separately. It will be convenient for the release manager to
>>> reach
>>> > out to PMC for those tasks. Any future patch release could consider
>>> cloning
>>> > it and follow the standard release process.
>>> > https://issues.apache.org/jira/browse/FLINK-33824
>>> >
>>> > Feel free to reach out to the release managers (or respond to this
>>> thread)
>>> > with feedback on the release process. Our goal is to constantly
>>> improve the
>>> > release process. Feedback on what could be improved or things that
>>> didn't
>>> > go so well are appreciated.
>>> >
>>> > Regards,
>>> > Jing
>>>
>>>


RE: Elasticsearch Sink 1.17.2 error message

2024-01-25 Thread Jiabao Sun
Hi Tauseef,

We cannot directly write POJO types into Elasticsearch. 
You can try serializing the TopologyDTO into a JSON string like Jackson before 
writing it.

public static void main(String[] args) throws IOException {
try (RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(HttpHost.create("http://127.0.0.1:9200; {
TopologyDTO data = new TopologyDTO();

IndexRequest request = Requests.indexRequest()
.index("topology")
.id(data.getUuid()) //here uuid is String
.source(new ObjectMapper().writeValueAsString(data), 
XContentType.JSON);

client.index(request);
}
}

Best,
Jiabao


On 2024/01/25 13:00:58 Tauseef Janvekar wrote:
> Hi Team,
> 
> We get the below error message when we try to add an elastick sink
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 23 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 27 more
> Caused by: java.lang.IllegalArgumentException: cannot write xcontent for
> unknown value of type class com.hds.alta.pipeline.model.TopologyDTO.
> 
> The code written for the same is here
> 
> workflow(filterItems(openTelSrc)).sinkTo(new
> Elasticsearch7SinkBuilder().setBulkFlushMaxActions(1)
> 
> .setHosts(new HttpHost("elastic-host.com", 9200, "https"))
> 
> .setConnectionPassword("password").setConnectionUsername("elastic")
> 
> .setEmitter((element, context, indexer) -> indexer.add(createIndexRequest(
> element))).build())
> 
> .name("topology_sink");
> 
> 
> private static IndexRequest createIndexRequest(TopologyDTO data) {
> 
> Map json = new HashMap<>();
> 
> json.put("data", data);
> 
> return Requests.indexRequest()
> 
> .index("topology")
> 
> .id(data.getUuid()) //here uuid is String
> 
> .source(json);
> 
> }
> 
> Any help would be greatly appreciated.
> 
> Thanks,
> Tauseef
> 

RE: 回复:RE: how to get flink accumulated sink record count

2024-01-25 Thread Jiabao Sun
Hi Enric,

Could you kindly provide more specific details where you would like to capture 
the metric? 
Additionally, if it's convenient for you, could you please share some code 
examples?

Best,
Jiabao


On 2024/01/25 10:43:30 Enric Ott wrote:
> Thanks,Jiabao,but what I mean is capturing the metric in Flink tasks.
> 
> 
> 
> 
> --原始邮件--
> 发件人: "Jiabao Sun" 发送时间: 2024年1月25日(星期四) 下午3:11
> 收件人: "user" 主题: RE: how to get flink accumulated sink record count
> 
> 
> 
> 
> 
> I guess getting the metrics[1] might be helpful for you. 
> You can query the numRecordsOut metric by Metrics Reporter[2] or REST API[3].
> 
> Best,
> Jiabao
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/metrics/
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/#jobs-metrics
> 
> On 2024/01/25 06:54:36 Enric Ott wrote:
>  Hi,Team:
>  I was wondering how to get flink accumulated sink record count(just like 
> the flink UI displays),any help would be appreciated.

Elasticsearch Sink 1.17.2 error message

2024-01-25 Thread Tauseef Janvekar
Hi Team,

We get the below error message when we try to add an elastick sink
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
... 23 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
... 27 more
Caused by: java.lang.IllegalArgumentException: cannot write xcontent for
unknown value of type class com.hds.alta.pipeline.model.TopologyDTO.

The code written for the same is here

workflow(filterItems(openTelSrc)).sinkTo(new
Elasticsearch7SinkBuilder().setBulkFlushMaxActions(1)

.setHosts(new HttpHost("elastic-host.com", 9200, "https"))

.setConnectionPassword("password").setConnectionUsername("elastic")

.setEmitter((element, context, indexer) -> indexer.add(createIndexRequest(
element))).build())

.name("topology_sink");


private static IndexRequest createIndexRequest(TopologyDTO data) {

Map json = new HashMap<>();

json.put("data", data);

return Requests.indexRequest()

.index("topology")

.id(data.getUuid()) //here uuid is String

.source(json);

}

Any help would be greatly appreciated.

Thanks,
Tauseef


Re: Long execution of SQL query to Kafka + Hive (q77 TPC-DS)

2024-01-25 Thread Ron liu
Hi,

Can you help to explain the q77 execution plan? And find which operator
takes a long time in flink UI?

Best
Ron

Вова Фролов  于2024年1月24日周三 09:09写道:

> Hello,
>
> I am executing a heterogeneous SQL query  (part of the data is in Hive
> and part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in
> BatchMode. However, the execution time is excessively long, taking
> approximately 11 minutes to complete , although the request to Hive only
> (without Kafka) is completed in 12 seconds.
>
> How can I speed up execution heterogeneous SQL query to Kafka + Hive?
>
> *   Versions of Components in Use:*
>
> ·Apache Flink: 1.17.1
>
> ·Kafka: 3.2.3
>
> ·Hive: 3.1.2.3.4.5.1-1
>
> *Flink Job Code:*
>
> EnviromentSettings settings = 
> EnviromentSettings.newInstance().inBatchMode().build();
>
> TableEnviroment tEnv = TableEnviroment.create(settings);
>
>
>
> *Hive Catalog*
>
> HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE, PATH_TO_CONF, 
> HiveVersionInfo.getVersion());
>
> tEnv.registerCatalog(“hive”, catalog);
>
> tEnv.useCatalog(“hive”);
>
>
>
>
>
> Creating tables with Kafka connector:
>
>
>
> public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
> store_sales_kafka(\n" +
> "  ss_sold_date_sk INTEGER,\n" +
>
> // here are 21 columns
>
> "  ss_net_profit DOUBLE\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'key.format' = 'avro',\n" +
> "   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +
> "   'properties.group.id' = 'store_sales_group',\n" +
> "   'scan.startup.mode' = 'earliest-offset',\n" +
>
> "   'scan.bounded.mode' = 'latest-offset',\n" +
> "   'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, 
> xyz4:9092, xyz5:9092',\n" +
> "   'topic' = 'storeSales100',\n" +
>
> "'value.format' = 'avro',\n" +
>
> "'value.fields-include' = 'EXCEPT_KEY'\n" +
>
>
> "   );";
>
>
>
> Q77 with Flink
>
> tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);
> Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);
> List res = CollectionUtil.*iteratorToList*(result.execute().collect());
>
> for (Row row : res) {
> System.*out*.println(row);
> }
>
>
>
> Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each
> has: 512 partitions, replication factor 3)
>
> ·num.network.threads=12
>
> ·num.io.threads=10
>
> ·socket.send.buffer.bytes=2097152
>
> ·socket.request.max.bytes=1073741824
>
> Cluster consists of 5 machines and each has:
>
> ·2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
> MHz max turbo frequency. 40 cores, 80 threads total on each machine.
>
> ·RAM 768GB, up to 640GB is available for Flink.
>
> ·2 network cards 10 Gigabit each
>
> ·10 HDD 5.5 TB
>
> Kind regards,
>
> Vladimir
>


??????RE: how to get flink accumulated sink record count

2024-01-25 Thread Enric Ott
Thanks,Jiabao,but what I mean is capturing the metric in Flink tasks.




----
??: "Jiabao Sun"https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/metrics/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/#jobs-metrics

On 2024/01/25 06:54:36 Enric Ott wrote:
 Hi??Team??
 I was wondering how to get flink accumulated sink record count(just like 
the flink UI displays),any help would be appreciated.