Multiple Kafka Source for a Data Pipeline

2023-07-05 Thread Yogesh Rao
Hi,

Wanted to know if multiple kafka sources are supported in a data pipeline
within flink.

I am looking at a scenario where data transformation and enrichment needs
to be done when a message from both the sources is received based on a
common identifier.

I coded the logic and it looks to be working however I see a warning trace
as below, which makes me believe perhaps it's not designed to be supported,
I understand its just JMX registration which has failed and does not have
any effect on the actual execution of business logic.

10:22:54,876 WARN  org.apache.kafka.common.utils.AppInfoParser
 [] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=xyz-processing642706448-5
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
~[?:?]
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
~[?:?]
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:816)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:666)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:647)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:627)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.(KafkaPartitionSplitReader.java:88)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:160)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:196)
~[flink-connector-base-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:107)
~[flink-connector-base-1.17.1.jar:1.17.1]

I looked at the implementation as well and found not enough attributes were
used to make the mbean id unique.

Please let me know if this is a bug, I can raise a JIRA and perhaps even
contribute towards its fix.

Regards,
-Yogesh


Re: flink1.17.1使用kafka source异常

2023-07-05 Thread yh z
Hi, aiden. 看起来是类冲突,按照官方的文档,使用 kafka 时,你应该是不需要引入 flink-core 和
flink-connector-base 的(
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/)。如果是因为其他原因需要使用这两个
jar, 你可以使用 mvn dependency::tree 查看一下
"org/apache/kafka/clients/consumer/ConsumerRecord" 是在哪里被重复加载进来,可以exclude 掉非
flink-connector-kafka 的这个类。

aiden <18765295...@163.com> 于2023年7月4日周二 14:23写道:

> hi
>
> 在使用1.17.1版本kafka source时遇到如下异常:
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of org/apache/flink/util/ChildFirstClassLoader) previously
> initiated loading for a different type with name
> "org/apache/kafka/clients/consumer/ConsumerRecord"
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.getDeclaredMethods0(Native Method)
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
> at java.lang.Class.getDeclaredMethod(Class.java:2128)
> at
> java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
> at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
> at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
> at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
> at
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:471)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
> ... 20 more
> 以下是我的部分POM   
> org.apache.flink
> flink-core
> 1.17.1
> 
> 
> org.apache.flink
> flink-connector-kafka
> 1.17.1
> 
> 
> org.apache.flink
> flink-connector-base
> 1.17.1
> 
>
>
> 看起来像是类加载器异常,需要我修改哪些地方吗
>


Re: Flink 1.16 流表 join 的 FilterPushDown 及并行

2023-07-05 Thread yh z
Hi, Chai Kelun, 你的 filter condition 里面包含了你自定义的 UDF,是不满足 filter push down
的条件的,因为对于优化器来说 UDF 是不确定的,优化器不能从里面提取到可以下推的条件, 如果你想实现下推,可以尝试抽取下确定性的
condition,如 product.id > 10 etc.。另外,Flink 是支持 broadcast hash join
的,如果你想控制某两个表的 join type,你可以通过 join hint 来指定 join 类型为 broadcast。()

Chai Kelun  于2023年7月3日周一 17:58写道:

> 有一张 kafka 流表 logClient(id int, name string, price double),一张实现了
> SupportsFilterPushDown 的 customConnector 维表 product(id int, name string,
> value double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。
> 在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join
> 节点进行计算,请问是否有什么选项可以开启下推?(类似与 nestedloop-join,计算推到 product 表数据源进行)
> SELECT A.id, A.name, B.name FROM logClient AS A, product AS B WHERE
> MyUDF(B.value, A.price) < xxx;
> 另外,Kafka 和 customConnector 均支持并行,在 Join 计算时期望使用 BROADCAST 模式,将 product 表在
> logClient 流表的每个 partition 上进行计算,但似乎目前 Flink 流-表 Join 的 distribution 模式仅支持
> SINGLETON 和 HASH[KEY](StreamExecExchange.java Line106 的 switch
> case),后续社区是否会考虑支持更多的 distributionType?
>
> 非常感谢!


Re: Query around Rocksdb

2023-07-05 Thread Yanfei Lei
Hi neha,

1. You can set the path of  jemalloc into LD_LIBRARY_PATH of YARN[1],
and here is a blog post about "RocksDB Memory Usage"[2].
2. The default value of cleanupInRocksdbCompactFilter is 1000[3],
maybe another value can be set according to the TPS of the job. The
value of `state.backend.rocksdb.metrics.num-running-compactions`[4] may
be affected by the sampling frequency of metrics,  is the value of
`state.backend.rocksdb.metrics.compaction-read-bytes` also zero?

[1] 
https://data-flair.training/forums/topic/how-to-include-native-libraries-in-yarn-jobs/
[2] https://shopify.engineering/optimizing-apache-flink-applications-tips
[3] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#cleanup-during-rocksdb-compaction
[4] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
neha goyal  于2023年7月4日周二 17:39写道:

>
> Hello Yanfei and Shammon,
>
> I have two additional questions. The links mentioned in the reply talk about 
> using jemalloc in a Docker image, but I am using Yarn on AWS EMR. How can I 
> use jemalloc with Yarn? Any references you can provide would be greatly 
> appreciated.
>
> StateTtlConfig cityTtlConfig = StateTtlConfig
> 
> .newBuilder(org.apache.flink.api.common.time.Time.hours(longerLookbackHours))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
> MapStateDescriptor cityListDescritpor = new 
> MapStateDescriptor("cityList", String.class, Integer.class);
> cityListDescritpor.enableTimeToLive(cityTtlConfig);
>
>
> Secondly, I have applied TTL to my state, and I rely on RocksDB's automated 
> compaction process to clear expired events. However, according to the RocksDB 
> metrics provided by Flink, the compaction process is not occurring[attached 
> the metrics screenshot], and there is a constant increase in the savepoint 
> size. Do you suggest adding cleanupInRocksdbCompactFilter(1000) as well? What 
> will be the impact of this configuration?
>
> On Tue, Jul 4, 2023 at 8:13 AM Yanfei Lei  wrote:
>>
>> Hi neha,
>>
>> Due to the limitation of RocksDB, we cannot create a
>> strict-capacity-limit LRUCache which shared among rocksDB instance(s),
>> FLINK-15532[1] is created to track this.
>> BTW, have you set TTL for this job[2],  TTL can help control the state size.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15532
>> [2]https://issues.apache.org/jira/browse/FLINK-31089
>>
>> Shammon FY  于2023年7月4日周二 09:08写道:
>> >
>> > Hi neha,
>> >
>> > Which flink version are you using? We have also encountered the issue of 
>> > continuous growth of off-heap memory in the TM of the session cluster 
>> > before, the reason is that the memory fragments cannot be reused like 
>> > issue [1]. You can check the memory allocator and try to use jemalloc 
>> > instead refer to doc [2] and [3].
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-19125
>> > [2] 
>> > https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
>> > [3] 
>> > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator
>> >
>> > Best,
>> > Shammon FY
>> >
>> > On Sat, Jul 1, 2023 at 2:58 PM neha goyal  wrote:
>> >>
>> >> Hello,
>> >>
>> >> I am trying to debug the unbounded memory consumption by the Flink 
>> >> process. The heap size of the process remains the same. The size of the 
>> >> RSS of the process keeps on increasing. I suspect it might be because of 
>> >> RocksDB.
>> >>
>> >> we have the default value for state.backend.rocksdb.memory.managed as 
>> >> true. Can anyone confirm that this config will Rockdb be able to take the 
>> >> unbounded native memory?
>> >>
>> >> If yes, what metrics can I check to confirm the issue? Any help would be 
>> >> appreciated.
>>
>>
>>
>> --
>> Best,
>> Yanfei



--
Best,
Yanfei


Re: how to get blackhole connector jar

2023-07-05 Thread liu ron
hi longfeng,

I think you should check the BlackHole connector related code in which
module, then you can place this module jar to flink lib directory.

Best,
Ron

Hang Ruan  于2023年6月28日周三 16:48写道:

> Hi, longfeng,
>
> I check the blackhole connector document[1] and the blackhole connector is
> a build-in connector.
> If you can not find this connector in your flink, maybe you should check
> the `flink-table-api-java-bridge` jar to find the
> `BlackHoleTableSinkFactory`[2].
>
> Best,
> Hang
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/blackhole.html
> [2]
> https://github.com/apache/flink/blob/release-1.12/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java
>
> longfeng Xu  于2023年6月28日周三 16:09写道:
>
>> hi guys,
>>   when using alibaba flink (version flink 1.12) to running nexmark's
>> query0 , it failed blackhole is not a supported sink connector.
>>  so how can i upload connector-blackhole like nexmark connector? where to
>> download it?
>>
>> thanks
>>
>


Re: SQL-gateway Failed to Run

2023-07-05 Thread liu ron
Hi, Xiaolong

As Shammon says, I think you should the exception info of Flink cluster
first to confirm the root cause.

Best,
Ron

Shammon FY  于2023年7月4日周二 16:44写道:

> Hi Xiaolong,
>
> I think you may need to check the error log in the flink cluster to find
> out the root cause.
>
> Best,
> Shammon FY
>
> On Tue, Jul 4, 2023 at 3:38 PM Xiaolong Wang 
> wrote:
>
>> The flink web ui is fine until I run the Hive query. After that the flink
>> deployment is down and the web UI is not accessible.
>>
>> On Tue, Jul 4, 2023 at 9:13 AM Shammon FY  wrote:
>>
>>> Hi Xiaolong,
>>>
>>> From the exception it seems that the flink session cluster is not
>>> running properly. Can you visit the flink web ui and everything is ok?
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Mon, Jul 3, 2023 at 2:43 PM Xiaolong Wang <
>>> xiaolong.w...@smartnews.com> wrote:
>>>
 Hi,
 I've tested the Flink SQL-gateway to run some simple Hive queries but
 met some exceptions.


 Environment Description:
 Run on : Kubernetes
 Deployment Mode: Session Mode (created by a flink-kubernetes-operator)
 Steps to run:
 1. Apply a `flinkdeployment` of flink session cluster to flink operator
 ```
 apiVersion: flink.apache.org/v1beta1
 kind: FlinkDeployment
 metadata:
   name: flink-session-cluster-example
   namespace: xxx
 spec:
   image: xxx/flink:1.17-sql-gateway-dev
   flinkVersion: v1_17
   flinkConfiguration:
 taskmanager.numberOfTaskSlots: "2"
 pipeline.max-parallelism: "1000"
 state.backend.type: rocksdb
 state.backend.incremental: "true"
 state.checkpoints.dir: xxx
 execution.checkpointing.interval: 1m
 execution.checkpointing.timeout: 30m
 high-availability:
 org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 high-availability.storageDir: xxx
 akka.framesize: 20971520b
 execution.checkpointing.externalized-checkpoint-retention:
 RETAIN_ON_CANCELLATION
 taskmanager.memory.managed.fraction: "0.2"
 kubernetes.hadoop.conf.config-map.name: xxx
   serviceAccount: default
   podTemplate:
 apiVersion: v1
 kind: Pod
 metadata:
   name: pod-template
 spec:
   serviceAccount: default
   jobManager:
 resource:
   memory: "2048m"
   cpu: 1
   taskManager:
 resource:
   memory: "4096m"
   cpu: 1
 ```
 This image has been built with a `hadoop dependency` , an existing
 `hadoop configmap`.

 2. Login to the job-manager pod and run the followings
 `./bin/sql-gateway.sh start-foreground
 -Dsql-gateway.endpoint.type=hiveserver2
 -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf`

 3. Start a beeline and connect to the SQL gateway then run a simple
 Hive query
 `select count(1) from simple_demo_output where dt = '2021-08-14';`

 4.The SQL gateway goes wrong with the following logs:
 ```

 2023-07-03 06:27:11,078 INFO  
 org.apache.flink.client.program.rest.RestClusterClient
   [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5).

 2023-07-03 06:27:15,092 INFO  
 org.apache.flink.client.program.rest.RestClusterClient
   [] - Successfully submitted job 'collect'
 (4c99c40392cb935d3df94891655d2ce5) to '
 http://flink-session-cluster-example-rest.realtime-streaming:8081'.

 2023-07-03 06:27:15,879 ERROR
 org.apache.flink.table.gateway.service.operation.OperationManager [] -
 Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee.

 org.apache.flink.table.api.TableException: Failed to execute sql

 at
 org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
 ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]

 at
 org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
 ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]

 at
 org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
 ~[flink-sql-gateway-1.17.1.jar:1.17.1]

 at
 org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
 ~[flink-sql-gateway-1.17.1.jar:1.17.1]

 at
 org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
 ~[flink-sql-gateway-1.17.1.jar:1.17.1]

 at
 org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
 ~[flink-sql-gateway-1.17.1.jar:1.17.1]

 at
 

Re: Unsubscribe

2023-07-05 Thread liu ron
Hi,

As Hang say, please send an email to user-unsubscr...@flink.apache.org if
you want to unsubscribe the mail from user@flink.apache.org,

Best,
Ron

Ragini Manjaiah  于2023年7月5日周三 13:36写道:

> Unsubscribe
>
> On Tue, Jul 4, 2023 at 1:33 PM Bauddhik Anand  wrote:
>
>> Unsubscribe
>>
>


Re: Using HybridSource

2023-07-05 Thread Andrew Otto
Hm. I wonder if you could implement a custom Deserializer that wraps both
the CSV and Protobuf deserializer, and conditionally chooses which one to
use. As long as the final TypeInformation returned by the Source is the
same in either case, I think it should work?

> Kafka comes from protobuf while the CSV is a POJO though both have the
same fields
This could be the hard part, I think no matter what you do you'll have to
make the TypeInformation the HybridSource uses in either case be the exact
same.  Maybe you could convert your Protobuf to the POJO, or vice versa?




On Wed, Jul 5, 2023 at 10:19 AM Oscar Perez via user 
wrote:

> and this is our case Alexander, it is the same data schema but different
> data format. Kafka comes from protobuf while the CSV is a POJO though both
> have the same fields. IMHO, the design of HybridSource is very limited and
> you have to do nasty workarounds if you want to combine from cold storage
> (CSV, S3) and kafka if the expectations differ a bit from the most common
> use case (e.g. using protobuf)
>
> Regards,
> Oscar
>
> On Wed, 5 Jul 2023 at 12:53, Alexander Fedulov <
> alexander.fedu...@gmail.com> wrote:
>
>> I do not think that trying to "squash" two different data types into one
>> just to use HybridSource is the right thing to do here. HybridSource is
>> primarily intended for use cases that need to read the same data from
>> different sources. A typical example: read events from "cold storage" in S3
>> up to a specific point and switch over to "live" data in Kafka.
>> Since you are already using the low-level API, you can either
>> manually pull the data in inside of the open() function, or stream it into
>> the local state using KeyedCoProcessFunction or
>> KeyedBroadcastProcessFunction (depending on the size of the lookup state).
>>
>> This video should get you covered:
>> https://www.youtube.com/watch?v=cJS18iKLUIY
>>
>> Best,
>> Alex
>>
>>
>> On Wed, 5 Jul 2023 at 07:29, Péter Váry 
>> wrote:
>>
>>> Was it a conscious decision that HybridSource only accept Sources, and
>>> does not allow mapping functions applied to them before combining them?
>>>
>>> On Tue, Jul 4, 2023, 23:53 Ken Krugler 
>>> wrote:
>>>
 Hi Oscar,

 Couldn’t you have both the Kafka and File sources return an Either>>> from CSV File, Protobuf from Kafka>, and then (after the HybridSource) use
 a MapFunction to convert to the unified/correct type?

 — Ken


 On Jul 4, 2023, at 12:13 PM, Oscar Perez via user <
 user@flink.apache.org> wrote:

 Hei,
 1) We populate state based on this CSV data and do business logic based
 on this state and events coming from other unrelated streams.
 2) We are using low level process function in order to process this
 future hybrid source

 Regardless of the aforementioned points, please note that the main
 challenge is to combine in a hybridsource CSV and kafka topic that return
 different datatypes so I dont know how my answers relate to the original
 problem tbh. Regards,
 Oscar

 On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov <
 alexander.fedu...@gmail.com> wrote:

> @Oscar
> 1. How do you plan to use that CSV data? Is it needed for lookup from
> the "main" stream?
> 2. Which API are you using? DataStream/SQL/Table or low level
> ProcessFunction?
>
> Best,
> Alex
>
>
> On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user <
> user@flink.apache.org> wrote:
>
>> ok, but is it? As I said, both sources have different data types. In
>> the example here:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>
>> We are using both sources as returning string but in our case, one
>> source would return a protobuf event while the other would return a pojo.
>> How can we make the 2 sources share the same datatype so that we can
>> successfully use hybrid source?
>>
>> Regards,
>> Oscar
>>
>> On Tue, 4 Jul 2023 at 12:04, Alexey Novakov 
>> wrote:
>>
>>> Hi Oscar,
>>>
>>> You could use connected streams and put your file into a special
>>> Kafka topic before starting such a job:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
>>> But this may require more work and the event ordering (which is
>>> shuffled) in the connected streams is probably not what you are looking 
>>> for.
>>>
>>> I think HybridSource is the right solution.
>>>
>>> Best regards,
>>> Alexey
>>>
>>> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user <
>>> user@flink.apache.org> wrote:
>>>
 Hei, We want to bootstrap some data from a CSV file before reading
 from a kafka topic that has a retention period of 7 days.

 We believe the best tool for that would be the 

Re: Using HybridSource

2023-07-05 Thread Oscar Perez via user
and this is our case Alexander, it is the same data schema but different
data format. Kafka comes from protobuf while the CSV is a POJO though both
have the same fields. IMHO, the design of HybridSource is very limited and
you have to do nasty workarounds if you want to combine from cold storage
(CSV, S3) and kafka if the expectations differ a bit from the most common
use case (e.g. using protobuf)

Regards,
Oscar

On Wed, 5 Jul 2023 at 12:53, Alexander Fedulov 
wrote:

> I do not think that trying to "squash" two different data types into one
> just to use HybridSource is the right thing to do here. HybridSource is
> primarily intended for use cases that need to read the same data from
> different sources. A typical example: read events from "cold storage" in S3
> up to a specific point and switch over to "live" data in Kafka.
> Since you are already using the low-level API, you can either
> manually pull the data in inside of the open() function, or stream it into
> the local state using KeyedCoProcessFunction or
> KeyedBroadcastProcessFunction (depending on the size of the lookup state).
>
> This video should get you covered:
> https://www.youtube.com/watch?v=cJS18iKLUIY
>
> Best,
> Alex
>
>
> On Wed, 5 Jul 2023 at 07:29, Péter Váry 
> wrote:
>
>> Was it a conscious decision that HybridSource only accept Sources, and
>> does not allow mapping functions applied to them before combining them?
>>
>> On Tue, Jul 4, 2023, 23:53 Ken Krugler 
>> wrote:
>>
>>> Hi Oscar,
>>>
>>> Couldn’t you have both the Kafka and File sources return an Either>> from CSV File, Protobuf from Kafka>, and then (after the HybridSource) use
>>> a MapFunction to convert to the unified/correct type?
>>>
>>> — Ken
>>>
>>>
>>> On Jul 4, 2023, at 12:13 PM, Oscar Perez via user 
>>> wrote:
>>>
>>> Hei,
>>> 1) We populate state based on this CSV data and do business logic based
>>> on this state and events coming from other unrelated streams.
>>> 2) We are using low level process function in order to process this
>>> future hybrid source
>>>
>>> Regardless of the aforementioned points, please note that the main
>>> challenge is to combine in a hybridsource CSV and kafka topic that return
>>> different datatypes so I dont know how my answers relate to the original
>>> problem tbh. Regards,
>>> Oscar
>>>
>>> On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov <
>>> alexander.fedu...@gmail.com> wrote:
>>>
 @Oscar
 1. How do you plan to use that CSV data? Is it needed for lookup from
 the "main" stream?
 2. Which API are you using? DataStream/SQL/Table or low level
 ProcessFunction?

 Best,
 Alex


 On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user <
 user@flink.apache.org> wrote:

> ok, but is it? As I said, both sources have different data types. In
> the example here:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>
> We are using both sources as returning string but in our case, one
> source would return a protobuf event while the other would return a pojo.
> How can we make the 2 sources share the same datatype so that we can
> successfully use hybrid source?
>
> Regards,
> Oscar
>
> On Tue, 4 Jul 2023 at 12:04, Alexey Novakov 
> wrote:
>
>> Hi Oscar,
>>
>> You could use connected streams and put your file into a special
>> Kafka topic before starting such a job:
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
>> But this may require more work and the event ordering (which is
>> shuffled) in the connected streams is probably not what you are looking 
>> for.
>>
>> I think HybridSource is the right solution.
>>
>> Best regards,
>> Alexey
>>
>> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user <
>> user@flink.apache.org> wrote:
>>
>>> Hei, We want to bootstrap some data from a CSV file before reading
>>> from a kafka topic that has a retention period of 7 days.
>>>
>>> We believe the best tool for that would be the HybridSource but the
>>> problem we are facing is that both datasources are of different nature. 
>>> The
>>> KafkaSource returns a protobuf event while the CSV is a POJO with just 3
>>> fields.
>>>
>>> We could hack the kafkasource implementation and then in the
>>> valuedeserializer do the mapping from protobuf to the CSV POJO but that
>>> seems rather hackish. Is there a way more elegant to unify both 
>>> datatypes
>>> from both sources using Hybrid Source?
>>>
>>> thanks
>>> Oscar
>>>
>>
>>> --
>>> Ken Krugler
>>> http://www.scaleunlimited.com
>>> Custom big data solutions
>>> Flink, Pinot, Solr, Elasticsearch
>>>
>>>
>>>
>>>


Re: Using HybridSource

2023-07-05 Thread Alexander Fedulov
I do not think that trying to "squash" two different data types into one
just to use HybridSource is the right thing to do here. HybridSource is
primarily intended for use cases that need to read the same data from
different sources. A typical example: read events from "cold storage" in S3
up to a specific point and switch over to "live" data in Kafka.
Since you are already using the low-level API, you can either manually pull
the data in inside of the open() function, or stream it into the local
state using KeyedCoProcessFunction or KeyedBroadcastProcessFunction
(depending on the size of the lookup state).

This video should get you covered:
https://www.youtube.com/watch?v=cJS18iKLUIY

Best,
Alex


On Wed, 5 Jul 2023 at 07:29, Péter Váry  wrote:

> Was it a conscious decision that HybridSource only accept Sources, and
> does not allow mapping functions applied to them before combining them?
>
> On Tue, Jul 4, 2023, 23:53 Ken Krugler 
> wrote:
>
>> Hi Oscar,
>>
>> Couldn’t you have both the Kafka and File sources return an Either> from CSV File, Protobuf from Kafka>, and then (after the HybridSource) use
>> a MapFunction to convert to the unified/correct type?
>>
>> — Ken
>>
>>
>> On Jul 4, 2023, at 12:13 PM, Oscar Perez via user 
>> wrote:
>>
>> Hei,
>> 1) We populate state based on this CSV data and do business logic based
>> on this state and events coming from other unrelated streams.
>> 2) We are using low level process function in order to process this
>> future hybrid source
>>
>> Regardless of the aforementioned points, please note that the main
>> challenge is to combine in a hybridsource CSV and kafka topic that return
>> different datatypes so I dont know how my answers relate to the original
>> problem tbh. Regards,
>> Oscar
>>
>> On Tue, 4 Jul 2023 at 20:53, Alexander Fedulov <
>> alexander.fedu...@gmail.com> wrote:
>>
>>> @Oscar
>>> 1. How do you plan to use that CSV data? Is it needed for lookup from
>>> the "main" stream?
>>> 2. Which API are you using? DataStream/SQL/Table or low level
>>> ProcessFunction?
>>>
>>> Best,
>>> Alex
>>>
>>>
>>> On Tue, 4 Jul 2023 at 11:14, Oscar Perez via user 
>>> wrote:
>>>
 ok, but is it? As I said, both sources have different data types. In
 the example here:


 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/

 We are using both sources as returning string but in our case, one
 source would return a protobuf event while the other would return a pojo.
 How can we make the 2 sources share the same datatype so that we can
 successfully use hybrid source?

 Regards,
 Oscar

 On Tue, 4 Jul 2023 at 12:04, Alexey Novakov 
 wrote:

> Hi Oscar,
>
> You could use connected streams and put your file into a special Kafka
> topic before starting such a job:
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#connect
> But this may require more work and the event ordering (which is
> shuffled) in the connected streams is probably not what you are looking 
> for.
>
> I think HybridSource is the right solution.
>
> Best regards,
> Alexey
>
> On Mon, Jul 3, 2023 at 3:44 PM Oscar Perez via user <
> user@flink.apache.org> wrote:
>
>> Hei, We want to bootstrap some data from a CSV file before reading
>> from a kafka topic that has a retention period of 7 days.
>>
>> We believe the best tool for that would be the HybridSource but the
>> problem we are facing is that both datasources are of different nature. 
>> The
>> KafkaSource returns a protobuf event while the CSV is a POJO with just 3
>> fields.
>>
>> We could hack the kafkasource implementation and then in the
>> valuedeserializer do the mapping from protobuf to the CSV POJO but that
>> seems rather hackish. Is there a way more elegant to unify both datatypes
>> from both sources using Hybrid Source?
>>
>> thanks
>> Oscar
>>
>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>


Re: flink on native k8s里如何使用flink sql gateway

2023-07-05 Thread Shammon FY
Hi,

我们的做法是启动Flink集群后,在其他节点(pod或者独立启动)启动Sql-Gateway,通过Flink的地址远程连接Flink集群,这样Sql-Gateway的部署和Flink集群完全分开

Best,
Shammon FY


On Tue, Jul 4, 2023 at 10:52 AM chaojianok  wrote:

> 大家好,请教个问题。
>
> 用native kubernetes方式在k8s集群上部署好了flink,现在需要在这个flink集群里使用flink sql
> gateway,大家有什么好的方案吗?
> 目前的做法是,进入pod里启动sql gateway,然后在k8s创建flink-sql-gateway
> service,这样就可以通过这个service来访问sql
> gateway了,但是这个方法有个问题,部署过程中必需进入pod启服务,这是不利于自动化部署的,具体的操作命令如下,大家帮忙看看有没有好的解决方案来避免这个问题。
>
> 1、创建flink集群
> ./bin/kubernetes-session.sh \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.service-account=flink-service-account \
> -Dkubernetes.rest-service.exposed.type=NodePort
>
> 2、进入pod通过 ./bin/sql-gateway.sh start
> -Dsql-gateway.endpoint.rest.address=localhost 启动sql gateway服务,退出pod
>
> 3、创建flink-sql-gateway service
> kubectl expose deployment flink-cluster --type=NodePort --port=8083
> --name=flink-sql-gateway -n flink
>


Re: Re: PartitionNotFoundException循环重启

2023-07-05 Thread Shammon FY
Hi,

如果要增加request
partition的重试时间,可以调整配置项`taskmanager.network.request-backoff.max`,默认是10秒,具体配置可以参阅[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-taskmanageroptions

Best,
Shammon FY

On Tue, Jul 4, 2023 at 11:38 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> 从前面日志看是重启后从hdfs加载checkpoint数据处理(100M左右)这过程好像有点久,还有连kafka消费
> 下游的超时重试  可以设置次数或者时长吗?
>
> 发件人: Shammon FY
> 发送时间: 2023-07-04 10:12
> 收件人: user-zh
> 主题: Re: PartitionNotFoundException循环重启
> Hi,
>
> PartitionNotFoundException异常原因通常是下游task向上游task发送partition
>
> request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。
>
> Best,
> Shammon FY
>
> On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
> zhan...@eastcom-sw.com> wrote:
>
> >
> > 异常日志内容
> >
> > 2023-07-03 20:30:15,164 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink:
> > Sink 3 (2/45)
> > (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_
> > 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @
> > nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769).
> > org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > Partition
> >
> 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_
> > 3093 not found.
> > at org.apache.flink.runtime.io
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77]
> > at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77]
> >
> >
> >
> > 发件人: zhan...@eastcom-sw.com
> > 发送时间: 2023-07-04 09:25
> > 收件人: user-zh
> > 主题: PartitionNotFoundException循环重启
> > hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现
> > PartitionNotFoundException 的异常,然后不断的循环重启
> >
> > 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException
> > 的异常后,不断的重启
> > taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> > taskmanager.network.max-num-tcp-connections: 16
> >
> > 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么?
> >
> >
>


Cleanup of changelog and materialized state is not working when job restarts.

2023-07-05 Thread Dongwoo Kim
Hi all,
While working on a streaming application built with flink I have found some
issues and want to ask for advice.
First, our application's key configurations are like below.

flink version: 1.17.0
state.backend: "rocksdb"
state.backend.incremental: "true"
state.backend.changelog.enabled: "true"
state.backend.changelog.storage: "filesystem"
state.backend.changelog.periodic-materialize.interval: "10m"

Ordinarily cleanup of chagelog and materialized state works fine.
Materialization leads to the deletion of the previous changelog, and when
compaction happens former file states are discarded soon. Consequently, the
total number of files generated by our application remains stable and
consistent.
However when the job restarts due to checkpoint failure or any other
reason, both the changelog and materialized state files aren't cleaned up
leading to excessive files left in our checkpoint storage(hdfs).
I was wondering if this is a known issue or an expected behavior?
In order to address this issue, I am considering periodically deleting the
changelog files and the materialized state files(files in the taskowned/
directory) based on their modification time. However, I wanted to ask if
this approach is safe.

Thanks in advance.

Best regards,
dongwoo