RE: AvroRowDeserializationSchema

2022-04-27 Thread lan tran
Hi Dian,Sorry for missing your mail, so if I did as your suggestion and the Flink somehow crashed and we have to restart the service, does the Flink job know the offset where does it read from Kafka ? Sent from Mail for Windows From: Dian FuSent: Tuesday, April 26, 2022 7:54 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,The same code in my last reply showed how to set the UID for the source operator generated using Table API. I meant that you could firstly create a source using Table API, then convert it to a DataStream API and set uid for the source operator using the same code above, then perform operations with DataStream API.Regards,Dian On Mon, Apr 25, 2022 at 9:27 PM lan tran  wrote:Hi Dian, Thank again for fast response.As your suggestion above, we can apply to set the UID for only for the DataStream state (as you suggest to convert from table to data stream). However, at the first phase which is collecting the data from Kafka ( having Debezium format), the UID cannot be set since we are using Table API (auto generate the UID). Therefore, if there is some crashed or needed revert using SavePoint, we cannot use it in the first phase since we cannot set the UID for this => so how can we revert it ?. As a result of that, we want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use the Savepoint for the whole full flow.Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 7:46 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,You could try the following code (also it may be a little hacky):```def set_uid_for_source(ds: DataStream, uid: str):transformation = ds._j_data_stream.getTransformation() source_transformation = transformationwhile not source_transformation.getInputs().isEmpty():source_transformation = source_transformation.getInputs().get(0) source_transformation.setUid(uid)```Besides, could you describe your use case a bit and also how you want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for the sources with these formats, it will send UPDATE messages to downstream operators. RegardsDian On Mon, Apr 25, 2022 at 12:31 PM lan tran  wrote:Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API), the uid is generated automatically which means we cannot revert if the system is crashed. Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 11:04 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema are still not supported in Python DataStream API. Just take a further look at the Java implementation of DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the results type is RowData instead of Row and so it should be not that easy to be directly supported in Python DataStream API. However, it supports conversion between Table API & DataStream API[1]. Could you firstly create a Table which consumes data from kafka and then convert it to a DataStream API?Regards,Dian[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors On Mon, Apr 25, 2022 at 11:48 AM Dian Fu  wrote:Yes, we should support them. For now, if you want to use them, you could create ones in your own project. You could refer to AvroRowDeserializationSchema[1] as an example. It should not be complicated as it's simply a wrapper of the Java implementation.Regards,Dian[1] https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 On Mon, Apr 25, 2022 at 11:27 AM lan tran  wrote:Thank Dian !! Very appreciate this.However, I have another questions related to this. In current version or any updating in future, does DataStream support DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the documentation and seem it is not supported yet.Best,QuynhSent from Mail for Windows From: Dian FuSent: Friday, April 22, 2022 9:36 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,I have added an example on how to use AvroRowDeserializationSchema in Python DataStream API in [1]. Please take a look at if that helps for you~Regards,Dian[1] https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py On Fri, Apr 22, 2022 at 7:24 PM Dian Fu  wrote:Hi Quynh,Could you show some sample code on how you use it?Regards,Dian On Fri, Apr 22, 

Flink team staffing

2022-04-27 Thread Wei Liu
Hi everyone,

I've been thinking about running some production-critical applications
using Flink. The scale is small, to begin with (thousands of events per
second), but we do want to keep the uptime as high as possible.

What does a common team around this type of system look like? We have a
couple of Big Data and a couple of MLE engineers. Are we in good shape?
Would love to hear your thoughts.

-Wei

-- 
206-430-3317


Re: Kafka Sink Error in sending Row Type data in Flink-(version 1.14.4)

2022-04-27 Thread Dian Fu
Hi Harshit,

I should have already replied to you in an earlier thread[1] for the same
question. It seems that you have missed that. Please double check if that
reply is helpful for you.

Regards,
Dian

[1] https://lists.apache.org/thread/cm6r569spq67249dxw57q8lxh0mk3f7y


On Wed, Apr 27, 2022 at 6:57 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.14.4 & using reference code from
> pyflink github.
>
>
>
> I am getting following error .
>
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
>
> status = StatusCode.CANCELLED
>
> details = "Multiplexer hanging up"
>
> debug_error_string =
> "{"created":"@1651051695.10400","description":"Error received from peer
> ipv6:[::1]:64839","file":"src/core/lib/surface/call.cc","file_line":904,"grpc_message":"Multiplexer
> hanging up","grpc_status":1}"
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> Caused by: java.lang.ClassCastException: [B cannot be cast to
> org.apache.flink.types.Row
>
>
>
> Below is my code for reference..
>
>
>
> import json
>
> import logging
>
> import os
>
> import sys
>
>
>
> from pyflink.common import Types, JsonRowSerializationSchema, Row,
> CsvRowSerializationSchema
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.datastream.connectors import FlinkKafkaProducer
>
> import math
>
>
>
>
>
> def show(ds, env):
>
> ds.print()
>
> env.execute()
>
>
>
>
>
> def basic_operations():
>
> env = StreamExecutionEnvironment.get_execution_environment()
>
> env.set_parallelism(1)
>
> kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>  'flink-sql-connector-kafka_2.11-1.14.4.jar')
>
> env.add_jars("file:///{}".format(kafka_jar))
>
>
>
> ds = env.from_collection(
>
> collection=[
>
> ('user1', 1, 2000), ('user2', 2, 4000), ('user3', 3, 1000),
> ('user1', 4, 25000), ('user2', 5, 7000),
>
> ('user3', 8, 7000),
>
> ('user1', 12, 2000), ('user2', 13, 4000), ('user3', 15, 1000),
> ('user1', 17, 2), ('user2', 18, 4),
>
> ('user3', 20, 1), ('user1', 21, 2000), ('user2', 22,
> 4000), ('user3', 33, 1000), ('user1', 34, 25000),
>
> ('user2', 35, 7000), ('user3', 38, 7000)
>
> ],
>
> type_info=Types.ROW_NAMED(["id", "info", "value"],
> [Types.STRING(), Types.INT(), Types.INT()])
>
> )
>
> ds1 = ds.map(lambda x: x)
>
> ds1.print()
>
>
>
> def update_tel(data):
>
> # parse the json
>
> test_data = data.info
>
> test_data += data.value
>
> res = Row('x', 'y')
>
> #return Types.ROW(data.id, test_data)
>
> return res(data.id, test_data)
>
>
>
> # show(ds.map(update_tel).key_by(lambda data: data[0]), env)
>
> ds = ds.map(update_tel)
>
> ds.print()
>
> # ds = ds.map(lambda x: type(x))
>
> # ds.print()
>
> # ds = ds.map(lambda x: Row([x]),
> output_type=Types.ROW([Types.STRING(), Types.INT()]))
>
> # ds.print()
>
>
>
> type_info = Types.ROW_NAMED(['x', 'y'], [Types.STRING(), Types.INT()])
>
> serialization_schema =
> CsvRowSerializationSchema.Builder(type_info).build()
>
> kafka_producer = FlinkKafkaProducer(
>
> topic='testing',
>
> serialization_schema=serialization_schema,
>
> producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
> 'test_group'}
>
> )
>
>
>
> ds.add_sink(kafka_producer)
>
>
>
> env.execute('basic_operations')
>
>
>
> if __name__ == '__main__':
>
> logging.basicConfig(stream=sys.stdout, level=logging.INFO,
> format="%(message)s")
>
>
>
> basic_operations()
>
>
>
>
>


Kafka Sink Error in sending Row Type data in Flink-(version 1.14.4)

2022-04-27 Thread harshit.varsh...@iktara.ai
Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink github. 

 

I am getting following error . 

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:

status = StatusCode.CANCELLED

details = "Multiplexer hanging up"

debug_error_string =
"{"created":"@1651051695.10400","description":"Error received from peer
ipv6:[::1]:64839","file":"src/core/lib/surface/call.cc","file_line":904,"grp
c_message":"Multiplexer hanging up","grpc_status":1}"

py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
: Could not forward element to next operator

Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
: Could not forward element to next operator

Caused by: java.lang.ClassCastException: [B cannot be cast to
org.apache.flink.types.Row

 

Below is my code for reference..

 

import json

import logging

import os

import sys

 

from pyflink.common import Types, JsonRowSerializationSchema, Row,
CsvRowSerializationSchema

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.datastream.connectors import FlinkKafkaProducer

import math

 

 

def show(ds, env):

ds.print()

env.execute()

 

 

def basic_operations():

env = StreamExecutionEnvironment.get_execution_environment()

env.set_parallelism(1)

kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

 'flink-sql-connector-kafka_2.11-1.14.4.jar')

env.add_jars("file:///{}".format(kafka_jar))

 

ds = env.from_collection(

collection=[

('user1', 1, 2000), ('user2', 2, 4000), ('user3', 3, 1000),
('user1', 4, 25000), ('user2', 5, 7000),

('user3', 8, 7000),

('user1', 12, 2000), ('user2', 13, 4000), ('user3', 15, 1000),
('user1', 17, 2), ('user2', 18, 4),

('user3', 20, 1), ('user1', 21, 2000), ('user2', 22, 4000),
('user3', 33, 1000), ('user1', 34, 25000),

('user2', 35, 7000), ('user3', 38, 7000)

],

type_info=Types.ROW_NAMED(["id", "info", "value"], [Types.STRING(),
Types.INT(), Types.INT()])

)

ds1 = ds.map(lambda x: x)

ds1.print()

 

def update_tel(data):

# parse the json

test_data = data.info

test_data += data.value

res = Row('x', 'y')

#return Types.ROW(data.id, test_data)

return res(data.id, test_data)

 

# show(ds.map(update_tel).key_by(lambda data: data[0]), env)

ds = ds.map(update_tel)

ds.print()

# ds = ds.map(lambda x: type(x))

# ds.print()

# ds = ds.map(lambda x: Row([x]), output_type=Types.ROW([Types.STRING(),
Types.INT()]))

# ds.print()

 

type_info = Types.ROW_NAMED(['x', 'y'], [Types.STRING(), Types.INT()])

serialization_schema =
CsvRowSerializationSchema.Builder(type_info).build()

kafka_producer = FlinkKafkaProducer(

topic='testing',

serialization_schema=serialization_schema,

producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
'test_group'}

)

 

ds.add_sink(kafka_producer)

 

env.execute('basic_operations')

 

if __name__ == '__main__':

logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")

 

basic_operations()

 

 



Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-27 Thread ruiyun wan
我也是刚看到YarnLogConfigUtil有这么一段代码:
public static String getLoggingYarnCommand(final Configuration
configuration) {
checkNotNull(configuration);

final String logConfigFilePath =

configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath == null) {
return "";
}

String logCommand = getLog4jCommand(logConfigFilePath);
if (logCommand.isEmpty()) {
logCommand = getLogBackCommand(logConfigFilePath);
}
return logCommand;
}
但是YarnConfigOptionsInternal貌似不让用户设置,加了单引号才可以。应该能解决这个问题了。
谢谢。

Qishang  于2022年4月27日周三 17:35写道:

> Hi .
> 代码好像是没有设置
> 用这个手动设置一下
> set $internal.deployment.config-dir=/opt/flink-1.14.3/conf
>
> 调用链路的话,凑合看一下吧。下面就是获取到 YarnJobClusterExecutorFactory  ->
> YarnJobClusterExecutor -> YarnClusterClientFactory -> YarnClusterDescriptor
>
> getExecutorFactory:58, DefaultExecutorServiceLoader
> (org.apache.flink.core.execution)
> executeAsync:2032, StreamExecutionEnvironment
> (org.apache.flink.streaming.api.environment)
> executeAsync:95, DefaultExecutor
> (org.apache.flink.table.planner.delegation)
> executeQueryOperation:811, TableEnvironmentImpl
> (org.apache.flink.table.api.internal)
> executeInternal:1274, TableEnvironmentImpl
> (org.apache.flink.table.api.internal)
> lambda$executeOperation$3:209, LocalExecutor
> (org.apache.flink.table.client.gateway.local)
> wrapClassLoader:88, ExecutionContext
> (org.apache.flink.table.client.gateway.context)
> executeOperation:209, LocalExecutor
> (org.apache.flink.table.client.gateway.local)
> executeQuery:231, LocalExecutor
> (org.apache.flink.table.client.gateway.local)
> callSelect:532, CliClient (org.apache.flink.table.client.cli)
> callOperation:423, CliClient (org.apache.flink.table.client.cli)
> lambda$executeStatement$1:332, CliClient
> (org.apache.flink.table.client.cli)
> executeStatement:325, CliClient (org.apache.flink.table.client.cli)
> executeInteractive:297, CliClient (org.apache.flink.table.client.cli)
> executeInInteractiveMode:221, CliClient (org.apache.flink.table.client.cli)
> openCli:151, SqlClient (org.apache.flink.table.client)
> start:95, SqlClient (org.apache.flink.table.client)
> startClient:187, SqlClient (org.apache.flink.table.client)
> main:161, SqlClient (org.apache.flink.table.client)
>
> ruiyun wan  于2022年4月27日周三 14:51写道:
>
> >
> 这个必须有,因为用yarn-session.sh创建集群会有jobmanager.log。能够找到yarn-session.sh的启动类(org.apache.flink.yarn.cli.FlinkYarnSessionCli)到
> > YarnClusterDescriptor的调用路径。
> > [image: image.png]
> > 但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到
> > YarnClusterDescriptor的调用路径。这两者不在同一个包。
> >
> > Qishang  于2022年4月27日周三 13:46写道:
> >
> >> Hi.
> >> 确认下 conf 下,是否有 log4j.properties
> >>
> >> 应该是在这个地放生成的,
> >>
> >>
> https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699
> >>
> >>
> >> ruiyun wan  于2022年4月26日周二 14:41写道:
> >>
> >> > Flink版本:1.13
> >> > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target =
> >> >
> >> >
> >>
> yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。
> >> >
> >>
> >
>


Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-27 Thread Qishang
Hi.
FYI : https://issues.apache.org/jira/browse/FLINK-25435

Qishang  于2022年4月27日周三 17:28写道:

> Hi .
> 代码好像是没有设置
> 用这个手动设置一下
> set $internal.deployment.config-dir=/opt/flink-1.14.3/conf
>
> 调用链路的话,凑合看一下吧。下面就是获取到 YarnJobClusterExecutorFactory  ->
> YarnJobClusterExecutor -> YarnClusterClientFactory -> YarnClusterDescriptor
>
> getExecutorFactory:58, DefaultExecutorServiceLoader
> (org.apache.flink.core.execution)
> executeAsync:2032, StreamExecutionEnvironment
> (org.apache.flink.streaming.api.environment)
> executeAsync:95, DefaultExecutor
> (org.apache.flink.table.planner.delegation)
> executeQueryOperation:811, TableEnvironmentImpl
> (org.apache.flink.table.api.internal)
> executeInternal:1274, TableEnvironmentImpl
> (org.apache.flink.table.api.internal)
> lambda$executeOperation$3:209, LocalExecutor
> (org.apache.flink.table.client.gateway.local)
> wrapClassLoader:88, ExecutionContext
> (org.apache.flink.table.client.gateway.context)
> executeOperation:209, LocalExecutor
> (org.apache.flink.table.client.gateway.local)
> executeQuery:231, LocalExecutor
> (org.apache.flink.table.client.gateway.local)
> callSelect:532, CliClient (org.apache.flink.table.client.cli)
> callOperation:423, CliClient (org.apache.flink.table.client.cli)
> lambda$executeStatement$1:332, CliClient
> (org.apache.flink.table.client.cli)
> executeStatement:325, CliClient (org.apache.flink.table.client.cli)
> executeInteractive:297, CliClient (org.apache.flink.table.client.cli)
> executeInInteractiveMode:221, CliClient (org.apache.flink.table.client.cli)
> openCli:151, SqlClient (org.apache.flink.table.client)
> start:95, SqlClient (org.apache.flink.table.client)
> startClient:187, SqlClient (org.apache.flink.table.client)
> main:161, SqlClient (org.apache.flink.table.client)
>
> ruiyun wan  于2022年4月27日周三 14:51写道:
>
>> 这个必须有,因为用yarn-session.sh创建集群会有jobmanager.log。能够找到yarn-session.sh的启动类(org.apache.flink.yarn.cli.FlinkYarnSessionCli)到
>> YarnClusterDescriptor的调用路径。
>> [image: image.png]
>> 但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到
>> YarnClusterDescriptor的调用路径。这两者不在同一个包。
>>
>> Qishang  于2022年4月27日周三 13:46写道:
>>
>>> Hi.
>>> 确认下 conf 下,是否有 log4j.properties
>>>
>>> 应该是在这个地放生成的,
>>>
>>> https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699
>>>
>>>
>>> ruiyun wan  于2022年4月26日周二 14:41写道:
>>>
>>> > Flink版本:1.13
>>> > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target =
>>> >
>>> >
>>> yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。
>>> >
>>>
>>


Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-27 Thread Qishang
Hi .
代码好像是没有设置
用这个手动设置一下
set $internal.deployment.config-dir=/opt/flink-1.14.3/conf

调用链路的话,凑合看一下吧。下面就是获取到 YarnJobClusterExecutorFactory  ->
YarnJobClusterExecutor -> YarnClusterClientFactory -> YarnClusterDescriptor

getExecutorFactory:58, DefaultExecutorServiceLoader
(org.apache.flink.core.execution)
executeAsync:2032, StreamExecutionEnvironment
(org.apache.flink.streaming.api.environment)
executeAsync:95, DefaultExecutor (org.apache.flink.table.planner.delegation)
executeQueryOperation:811, TableEnvironmentImpl
(org.apache.flink.table.api.internal)
executeInternal:1274, TableEnvironmentImpl
(org.apache.flink.table.api.internal)
lambda$executeOperation$3:209, LocalExecutor
(org.apache.flink.table.client.gateway.local)
wrapClassLoader:88, ExecutionContext
(org.apache.flink.table.client.gateway.context)
executeOperation:209, LocalExecutor
(org.apache.flink.table.client.gateway.local)
executeQuery:231, LocalExecutor
(org.apache.flink.table.client.gateway.local)
callSelect:532, CliClient (org.apache.flink.table.client.cli)
callOperation:423, CliClient (org.apache.flink.table.client.cli)
lambda$executeStatement$1:332, CliClient (org.apache.flink.table.client.cli)
executeStatement:325, CliClient (org.apache.flink.table.client.cli)
executeInteractive:297, CliClient (org.apache.flink.table.client.cli)
executeInInteractiveMode:221, CliClient (org.apache.flink.table.client.cli)
openCli:151, SqlClient (org.apache.flink.table.client)
start:95, SqlClient (org.apache.flink.table.client)
startClient:187, SqlClient (org.apache.flink.table.client)
main:161, SqlClient (org.apache.flink.table.client)

ruiyun wan  于2022年4月27日周三 14:51写道:

> 这个必须有,因为用yarn-session.sh创建集群会有jobmanager.log。能够找到yarn-session.sh的启动类(org.apache.flink.yarn.cli.FlinkYarnSessionCli)到
> YarnClusterDescriptor的调用路径。
> [image: image.png]
> 但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到
> YarnClusterDescriptor的调用路径。这两者不在同一个包。
>
> Qishang  于2022年4月27日周三 13:46写道:
>
>> Hi.
>> 确认下 conf 下,是否有 log4j.properties
>>
>> 应该是在这个地放生成的,
>>
>> https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699
>>
>>
>> ruiyun wan  于2022年4月26日周二 14:41写道:
>>
>> > Flink版本:1.13
>> > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target =
>> >
>> >
>> yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。
>> >
>>
>


Re: How to debug Metaspace exception?

2022-04-27 Thread Chesnay Schepler

You're misinterpreting the docs.

The parent/child-first classloading controls where Flink looks for a 
class /first/, specifically whether we first load from /lib or the user-jar.
It does not allow you to load something from the user-jar in the parent 
classloader. That's just not how it works.


It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:
Hi Chesnay as per the docs... 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/


You can either put the jars in task manager lib folder or use 
|classloader.parent-first-patterns-additional| 



I prefer the latter like this: the dependency stays with the user-jar 
and not on the task manager.


On Tue, Apr 26, 2022 at 9:52 PM John Smith  wrote:

Ok so I should put the Apache ignite and my Microsoft drivers in
the lib folders of my task managers?

And then in my job jar only include them as compile time
dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler
 wrote:

JDBC drivers are well-known for leaking classloaders
unfortunately.

You have correctly identified your alternatives.

You must put the jdbc driver into /lib instead. Setting only
the parent-first pattern shouldn't affect anything.
That is only relevant if something is in both in /lib and the
user-jar, telling Flink to prioritize what is in lib.



On 26/04/2022 15:35, John Smith wrote:

So I put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config and so far I don't
think I'm getting "java.lang.OutOfMemoryError: Metaspace" any
more.

Or it's too early to tell.

Though now, the task managers are shutting down due to some
other failures.

So maybe because tasks were failing and reloading often the
task manager was running out of Metspace. But now maybe it's
just cleanly shutting down.

On Wed, Apr 20, 2022 at 11:35 AM John Smith
 wrote:

Or I can put in the config to treat org.apache.ignite.
classes as first class?

On Tue, Apr 19, 2022 at 10:18 PM John Smith
 wrote:

Ok, so I loaded the dump into Eclipse Mat and
followed:

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

- On the Histogram, I got over 30 entries for:
ChildFirstClassLoader
- Then I clicked on one of them "Merge Shortest
Path..." and picked "Exclude all phantom/weak/soft
references"
- Which then gave me: SqlDriverManager > Apache
Ignite JdbcThin Driver

So i'm guessing anything JDBC based. I should copy
into the task manager libs folder and my jobs make
the dependencies as compile only?

On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko
 wrote:

Also

https://shopify.engineering/optimizing-apache-flink-applications-tips
might be helpful (has a section on profiling, as
well as classloading).

On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler
 wrote:

We have a very rough "guide" in the wiki
(it's just the specific steps I took to debug
another leak):

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

On 19/04/2022 12:01, huweihua wrote:

Hi, John

Sorry for the late reply. You can use MAT[1]
to analyze the dump file. Check whether have
too many loaded classes.

[1] https://www.eclipse.org/mat/


2022年4月18日 下午9:55,John Smith
 写道:

Hi, can anyone help with this? I never
looked at a dump file before.

On Thu, Apr 14, 2022 at 11:59 AM John Smith
 wrote:

Hi, so I have a dump file. What do I
look for?

On Thu, Mar 31, 2022 at 3:28 PM John
Smith  wrote:

Ok so if there's a leak, if I
manually stop the job and restart
it from the UI multiple times, I
won't see the issue because because
the classes are unloaded correctly?


  

Re: 关于使用SQL Client提交执行目标为yarn-per-job的作业时无日志文件(jobmanager.log)的求助

2022-04-27 Thread ruiyun wan
这个必须有,因为用yarn-session.sh创建集群会有jobmanager.log。能够找到yarn-session.sh的启动类(org.apache.flink.yarn.cli.FlinkYarnSessionCli)到
YarnClusterDescriptor的调用路径。
[image: image.png]
但是我没有找到从sql-client.sh的启动类(org.apache.flink.table.client.SqlClient)到
YarnClusterDescriptor的调用路径。这两者不在同一个包。

Qishang  于2022年4月27日周三 13:46写道:

> Hi.
> 确认下 conf 下,是否有 log4j.properties
>
> 应该是在这个地放生成的,
>
> https://github.com/apache/flink/blob/release-1.13/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1699
>
>
> ruiyun wan  于2022年4月26日周二 14:41写道:
>
> > Flink版本:1.13
> > 问题描述:使用sql-client.sh启动yarn-per-job(execution.target =
> >
> >
> yarn-per-job)时,在YARN集群侧生成的launch_container.sh中,启动org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的参数中无-Dlog.file和-Dlog4j.configuration属性参数,导致没有jobmanager.log日志文件,如何设置才能影响Yarn生成的launch_container.sh包含上述参数。
> >
>