Re: flink sql如何实现json字符数据解析?

2023-11-22 Thread jinzhuguang
Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
比如:

SourceT: (
uuid String,
   body_data ARRAY>
)

SinkT (
result ARRAY>
)

Insert into SinkT (result)  select Array[ROW(uuid, null,body_data[1]. field1 as 
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
null,body_data[2]. field, body_data[2]. field2)] as result

希望对你有帮助

> 2023年11月22日 20:54,casel.chen  写道:
> 
> 输入:
> 
> {
> 
>  "uuid":"",
> 
>  "body_data": 
> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
> 
> }
> 
> 
> 
> 
> 输出:
> 
> [
> 
>  {
> 
> "uuid": "",
> 
> "body_data: null,
> 
> "body_data.fild1": "123”,
> 
> "body_data.fild2": "234"
> 
>  },
> 
>  {
> 
> "uuid": "",
> 
> "body_data": null,
> 
> "body_data.fild1": "abc",
> 
> "body_data.fild2": "cdf"
> 
>  }
> 
> ]
> 
> 
> 
> 
> 当格式错误时
> 
> 
> 
> 
> 输入:
> 
> {
> 
> "uuid": "”,
> 
> "body_data": "abc"
> 
> }
> 
> 输出:
> 
> {
> 
> "uuid": "",
> 
> "body_data": "abc",
> 
> "body_data.fild1": null,
> 
> "body_data.fild2": null
> 
> }



Confluent Kafka conection error

2023-11-22 Thread Tauseef Janvekar
Dear Team,

We are facing the below issue while connecting to confluent kafka
Can someone please help here.

2023-11-23 06:09:36,989 INFO  org.apache.flink.runtime.executiongraph.
ExecutionGraph   [] - Source: src_source -> Sink: Print to Std. Out (1/1)
(496f859d5379cd751a3fc473625125f3_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from SCHEDULED to DEPLOYING.
2023-11-23 06:09:36,994 INFO  org.apache.flink.runtime.executiongraph.
ExecutionGraph   [] - Deploying Source: src_source -> Sink: Print to Std.
Out (1/1) (attempt #0) with attempt id
496f859d5379cd751a3fc473625125f3_cbc357ccb763df2852fee8c4fc7d55f2_0_0
and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to flink-taskmanager:6122-
23f057 @ flink-taskmanager.flink.svc.cluster.local (dataPort=46589) with
allocation id 80fe79389102bd305dd87a00247413eb
2023-11-23 06:09:37,011 INFO
 org.apache.kafka.common.security.authenticator.AbstractLogin [] -
Successfully logged in.
2023-11-23 06:09:37,109 WARN  org.apache.kafka.clients.admin.
AdminClientConfig [] - The configuration 'key.deserializer' was
supplied but isn't a known config.
2023-11-23 06:09:37,109 WARN  org.apache.kafka.clients.admin.
AdminClientConfig [] - The configuration 'value.deserializer'
was supplied but isn't a known config.
2023-11-23 06:09:37,110 WARN  org.apache.kafka.clients.admin.
AdminClientConfig [] - The configuration 'client.id.prefix' was
supplied but isn't a known config.
2023-11-23 06:09:37,110 WARN  org.apache.kafka.clients.admin.
AdminClientConfig [] - The configuration '
partition.discovery.interval.ms' was supplied but isn't a known config.
2023-11-23 06:09:37,110 WARN  org.apache.kafka.clients.admin.
AdminClientConfig [] - The configuration
'commit.offsets.on.checkpoint' was supplied but isn't a known config.
2023-11-23 06:09:37,110 WARN  org.apache.kafka.clients.admin.
AdminClientConfig [] - The configuration 'enable.auto.commit'
was supplied but isn't a known config.
2023-11-23 06:09:37,111 WARN  org.apache.kafka.clients.admin.
AdminClientConfig [] - The configuration 'auto.offset.reset'
was supplied but isn't a known config.
2023-11-23 06:09:37,113 INFO  org.apache.kafka.common.utils.AppInfoParser
   [] - Kafka version: 3.2.2
2023-11-23 06:09:37,114 INFO  org.apache.kafka.common.utils.AppInfoParser
   [] - Kafka commitId: 38c22ad893fb6cf5
2023-11-23 06:09:37,114 INFO  org.apache.kafka.common.utils.AppInfoParser
   [] - Kafka startTimeMs: 1700719777111
2023-11-23 06:09:37,117 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Starting the KafkaSourceEnumerator for consumer group null without
periodic partition discovery.
2023-11-23 06:09:37,199 INFO  org.apache.flink.runtime.executiongraph.
ExecutionGraph   [] - Source: src_source -> Sink: Print to Std. Out (1/1)
(496f859d5379cd751a3fc473625125f3_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from DEPLOYING to INITIALIZING.
2023-11-23 06:09:37,302 INFO  org.apache.flink.runtime.source.coordinator.
SourceCoordinator [] - Source Source: src_source registering reader for
parallel task 0 (#0) @ flink-taskmanager
2023-11-23 06:09:37,313 INFO  org.apache.flink.runtime.executiongraph.
ExecutionGraph   [] - Source: src_source -> Sink: Print to Std. Out (1/1)
(496f859d5379cd751a3fc473625125f3_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from INITIALIZING to RUNNING.
2023-11-23 06:09:38,713 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Discovered new partitions: [aiops-3, aiops-2, aiops-1, aiops-0, aiops-5,
aiops-4]
2023-11-23 06:09:38,719 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Assigning splits to readers {0=[[Partition: aiops-1, StartingOffset: -1,
StoppingOffset: -9223372036854775808], [Partition: aiops-2, StartingOffset:
-1, StoppingOffset: -9223372036854775808], [Partition: aiops-0,
StartingOffset: -1, StoppingOffset: -9223372036854775808], [Partition:
aiops-4, StartingOffset: -1, StoppingOffset: -9223372036854775808], [
Partition: aiops-3, StartingOffset: -1, StoppingOffset: -9223372036854775808],
[Partition: aiops-5, StartingOffset: -1, StoppingOffset: -
9223372036854775808]]}
2023-11-23 06:09:57,651 INFO  akka.remote.transport.ProtocolStateActor
[] - No response from remote for outbound association.
Associate timed out after [2 ms].
2023-11-23 06:09:57,651 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://flink-metrics@flink-taskmanager:33837] has failed, address is
now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink-metrics@flink-taskmanager:33837]] Caused by: [No response
from remote for outbound association. Associate timed out after [2 ms].]
2023-11-23 06:09:57,668 WARN  akka.remote.transport.netty.NettyTransport
[] - Remote connection to [null] failed 

Re: [PyFlink] Collect multiple elements in CoProcessFunction

2023-11-22 Thread Alexander Fedulov
Hi David,

Thanks for the confirmation. Let's fix the docs:

https://github.com/apache/flink/pull/23776

Thanks,
Alex

On Sun, 19 Nov 2023 at 01:55, David Anderson  wrote:

> Hi, Alex!
>
> Yes, in PyFlink the various flatmap and process functions are implemented
> as generator functions, so they use yield to emit results.
>
> David
>
> On Tue, Nov 7, 2023 at 1:16 PM Alexander Fedulov <
> alexander.fedu...@gmail.com> wrote:
>
>> Java ProcessFunction API defines a clear way to collect data via the
>> Collector object.
>>
>> PyFlink documentation also refers to the Collector [1] , but it is not
>> being passed to the function and is also nowhere to be found in the pyflink
>> source code.
>> How can multiple elements be collected? Is "yield" the designated way to
>> achieve this?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/api/python/pyflink.datastream.html#pyflink.datastream.CoProcessFunction.process_element1
>>
>> Best,
>> Alex
>>
>


Re: Is RestClusterClient recommended?

2023-11-22 Thread Chesnay Schepler
Don't use the RestClusterClient; you can generate one from the openapi 
spec (see the docs).


On 16/11/2023 20:36, Adrian Alexandru Vasiliu wrote:

Hello,

For a programmatic use in Java of the Flink REST API, which of these 
options would be the best choice?


 1. Direct use via a REST client
 2. The RestClusterClient


 API

RestClusterClient is appealing, because it embeds a bunch of code that 
we wouldn't need to write and maintain.


But I also see reasons to stay away from it:

 1. Its javadoc
 
can
be found by searching the web (versioned per version of Flink /
flink-clients), but I didn't find a user documentation.
https://nightlies.apache.org/flink/flink-docs-stable/
 doesn't
seem to mention RestClusterClient.
 2. Its API isn't marked @Public nor @PublicEvolving.
3.
In 2019, Till Rohrmann (Flink PMC member) wrote: "Flink's cluster
REST API has been designed to work with any REST client. The
RestClusterClient which comes with the flink-clients module is
intended for internal usage."
https://stackoverflow.com/a/56127387/1723384

4.
If at a later time we would bring authentication to the Flink REST
API, say via nginx proxy side-car, RestClusterClient wouldn't know
how to deal with it.

Do I miss something?
Would the community nowadays recommend using RestClusterClient, at 
least in situations without authentication?


Thanks,
Adrian
Unless otherwise stated above:

Compagnie IBM France
Siège Social : 17, avenue de l'Europe, 92275 Bois-Colombes Cedex
RCS Nanterre 552 118 465
Forme Sociale : S.A.S.
Capital Social : 664 069 390,60 €
SIRET : 552 118 465 03644 - Code NAF 6203Z


Re:Re: 退订

2023-11-22 Thread 李国辉

退订




--
发自我的网易邮箱手机智能版



- Original Message -
From: "Junrui Lee" 
To: user-zh@flink.apache.org
Sent: Wed, 22 Nov 2023 10:19:32 +0800
Subject: Re: 退订

Hi,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件。

Best,
Junrui

万建国 <1097498...@qq.com.invalid> 于2023年11月22日周三 10:10写道:

> 退订


Error while trying to connect to Kafka from Flink runner

2023-11-22 Thread Поротиков Станислав Вячеславович via user
Hello!
I am trying to run Beam pipeline in local docker-compose environment on top of 
Flink. I wrote my own Dockerfile for Flink jobmanager and taskmanager.
I need to connect to secure Kafka cluster through kerberos.
Dockerfile for my-image-apache-beam/flink:1.16-java11:
FROM flink:1.16-java11



# python SDK
COPY --from=apache/beam_python3.10_sdk /opt/apache/beam/ /opt/apache/beam/

# java SDK
COPY --from=apache/beam_java11_sdk:2.51.0 /opt/apache/beam/ 
/opt/apache/beam_java/

COPY krb5.conf /etc/
My docker-compose.yml
version: "2.2"
services:
  jobmanager:
image: my-image-apache-beam/flink:1.16-java11
ports:
  - "8081:8081"
volumes:
  - artifacts:/tmp/beam-artifact-staging
command: jobmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager

  taskmanager:
image: registry.kontur.host/srs/apache-beam/flink:1.16-java11
depends_on:
  - jobmanager
command: taskmanager
ports:
  - "8100-8200:8100-8200"
volumes:
  - artifacts:/tmp/beam-artifact-staging
scale: 1
extra_hosts:
  - "host.docker.internal:host-gateway"
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 2Gb

  beam_job_server:
image: apache/beam_flink1.16_job_server
command: --flink-master=jobmanager --job-host=0.0.0.0
ports:
  - "8097:8097"
  - "8098:8098"
  - "8099:8099"
volumes:
  - artifacts:/tmp/beam-artifact-staging

  python-worker-harness:
image: "apache/beam_python3.10_sdk"
command: "-worker_pool"
ports:
  - "5:5"
volumes:
  - artifacts:/tmp/beam-artifact-staging


volumes:
artifacts:
And eventually my pipeline:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka, 
default_io_expansion_service

import os
import logging


job_server = "localhost"


pipeline_external_environment = [
"--runner=PortableRunner",
f"--job_endpoint={job_server}:8099",
f"--artifact_endpoint={job_server}:8098",
"--environment_type=EXTERNAL",
"--environment_config=python-worker-harness:5"
]

kafka_process_expansion_service = default_io_expansion_service(
append_args=[
"--defaultEnvironmentType=PROCESS",

"--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam_java/boot\"}"
]
)


def run():
pipeline_options = PipelineOptions(pipeline_external_environment)

sasl_kerberos_principal = os.getenv('SASL_KERBEROS_PRINCIPAL')
sasl_kerberos_password = os.getenv('SASL_KERBEROS_PASSWORD')

source_config = {
'bootstrap.servers':
'kafka-host1:9093,kafka-host2:9093,kafka-host3:9093',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'GSSAPI',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.principal': f'{sasl_kerberos_principal}',
'sasl.kerberos.kinit.cmd': f'kinit -R || echo {sasl_kerberos_password} 
| kinit {sasl_kerberos_principal}',
'sasl.jaas.config':
f'com.sun.security.auth.module.Krb5LoginModule required debug=true 
principal={sasl_kerberos_principal} useTicketCache=true;',
'group.id': 'test_group_1',
'auto.offset.reset': 'earliest'}

source_topic = 'Test_Source2-0_0_0_0.id-0'

sink_topic = 'Beam.Test'

with beam.Pipeline(options=pipeline_options) as pipeline:
outputs = (pipeline
   | 'Read topic from Kafka' >> 
ReadFromKafka(consumer_config=source_config,
  
topics=[source_topic],
  
expansion_service=kafka_process_expansion_service
  )
   | 'Write topic to Kafka' >> 
WriteToKafka(producer_config=source_config,
topic=sink_topic,

expansion_service=kafka_process_expansion_service
)
   )


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

But I got stuck with ERROR below:
INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Receive 
slot request 2f0a7a3cd89226651c2f84bd11e23321 for job 
1dc3e31750be59cab4f2fcd0710b255e from resource manager with leader id 
.
2023-11-22 12:52:29,065 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Allocated 
slot for 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,065 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
1dc3e31750be59cab4f2fcd0710b255e for job leader monitoring.
2023-11-22 12:52:29,066 

[ANNOUNCE] Apache Flink Kubernetes Operator 1.7.0 released

2023-11-22 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.7.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:
 - Standalone autoscaler module
 - Improved autoscaler metric tracking
 - Savepoint triggering improvements
 - Java 17 & 21 support

Release blogpost:
https://flink.apache.org/2023/11/22/apache-flink-kubernetes-operator-1.7.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353462

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.7.0 released

2023-11-22 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.7.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:
 - Standalone autoscaler module
 - Improved autoscaler metric tracking
 - Savepoint triggering improvements
 - Java 17 & 21 support

Release blogpost:
https://flink.apache.org/2023/11/22/apache-flink-kubernetes-operator-1.7.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353462

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Re:Re:Re:flink sql支持批量lookup join

2023-11-22 Thread Xuyang
Hi, casel.
这种现在应该是没支持,好像也没有issue说要支持,可以去jira里建一个feature看看大家的反响。


目前同样可以实现的方式:
1. 三次look up join + union + udaf。
2. udf,row by row自己查,搞一个缓存提高性能。
3. 将社区的connector魔改一下,重新打包使用。
4. ..



--

Best!
Xuyang





在 2023-11-22 20:44:47,"casel.chen"  写道:
>有一张维表 user,包含id和name字段
>id  | name
>-
>1 | zhangsan
>2 | lisi
>3 | wangwu
>
>
>现在实时来了一条交易数据 
>id  | creator_id  | approver_id  | deployer_id
>-
>1   | 1| 2   | 3
>
>
>希望lookup维表user返回各用户名称
>id   |  creator_name   |  approver_name  |  deployer_name
>
>1| zhangsan  |  lisi|. wangwu
>
>
>
>以上场景用flink sql要如何实现?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2023-11-22 12:37:10,"Xuyang"  写道:
>>Hi, casel.
>>可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and 
>>k3=v3的用法的。
>>
>>
>>
>>
>>--
>>
>>Best!
>>Xuyang
>>
>>
>>
>>
>>在 2023-11-22 11:55:11,"casel.chen"  写道:
>>>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
>>>
>>>
>>>id key1 key2 key3
>>>想实现批量lookup查询返回一行数据 id value1 value2 value3
>>>
>>>
>>>查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
>>>id key1 key2 key3
>>>先将多列转成多行
>>>id key1
>>>id key2
>>>id key3
>>>
>>>分别进行lookup join后得到
>>>id value1
>>>id value2
>>>id value3
>>>最后多行转多列返回一行数据
>>>
>>>id value1 value2 value3
>>>
>>>
>>>上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?


flink sql如何实现json字符数据解析?

2023-11-22 Thread casel.chen
输入:

{

  "uuid":"",

  "body_data": 
"[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"

}




输出:

[

  {

"uuid": "",

"body_data: null,

"body_data.fild1": "123”,

"body_data.fild2": "234"

  },

  {

"uuid": "",

"body_data": null,

"body_data.fild1": "abc",

"body_data.fild2": "cdf"

  }

]




当格式错误时




输入:

{

"uuid": "”,

"body_data": "abc"

}

输出:

{

"uuid": "",

"body_data": "abc",

"body_data.fild1": null,

"body_data.fild2": null

}

Re:Re:flink sql支持批量lookup join

2023-11-22 Thread casel.chen
有一张维表 user,包含id和name字段
id  | name
-
1 | zhangsan
2 | lisi
3 | wangwu


现在实时来了一条交易数据 
id  | creator_id  | approver_id  | deployer_id
-
1   | 1| 2   | 3


希望lookup维表user返回各用户名称
id   |  creator_name   |  approver_name  |  deployer_name

1| zhangsan  |  lisi|. wangwu



以上场景用flink sql要如何实现?














在 2023-11-22 12:37:10,"Xuyang"  写道:
>Hi, casel.
>可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and 
>k3=v3的用法的。
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>在 2023-11-22 11:55:11,"casel.chen"  写道:
>>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
>>
>>
>>id key1 key2 key3
>>想实现批量lookup查询返回一行数据 id value1 value2 value3
>>
>>
>>查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
>>id key1 key2 key3
>>先将多列转成多行
>>id key1
>>id key2
>>id key3
>>
>>分别进行lookup join后得到
>>id value1
>>id value2
>>id value3
>>最后多行转多列返回一行数据
>>
>>id value1 value2 value3
>>
>>
>>上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?


退订

2023-11-22 Thread 新鹏
退订

Re:退订

2023-11-22 Thread 新鹏
退订

















At 2023-10-04 10:06:45, "1"  wrote:
>