Re: Global/Shared objects

2023-08-10 Thread Hang Ruan
Hi, Kamal.

Each TaskManager is a JVM process and each task slot is a thread of the
TaskManager. More information see [1].
The static fields could be shared among subtasks in the same TaskManager.
If the subtasks are running in the different TaskManager, they cannot share
the static fields.

Best,
hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/flink-architecture/#task-slots-and-resources

Kamal Mittal via user  于2023年8月11日周五 12:53写道:

> Hello,
>
>
>
> Is it possible to create global/shared objects like static which are
> shared among slots in a task manager?
>
>
>
> Is it ok to create such objects in flink?
>
>
>
> Rgds,
>
> Kamal
>


Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Muazim Wani
Thank you so much for taking the time to provide me with such a detailed
response. Your assistance has been incredibly helpful in clarifying my
understanding!
Let me provide you with the exact scenario ,  I think there might be some
misunderstanding. All the streams are bounded and parallelism is set to 1.
I am  writing to 1 file only. *So the main use case Is to provide support
for dynamic Headers and footers with Aggregated values. *
e.g if my input is

Static Header
JohnDoe, 12
Alice, 21

My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic
footer". *These headers are on top of static headers which are already
present in the DataStream(bounded).* The output would be like

Dynamic Header 33
Static Header
JohnDoe, 12
Alice, 21
Dynamic footer

In this particular case I am writing to one file only. I have set my
parallelism to 1. I have 1 InputDataStream on top of that I have one
dynamic header and footer stream (*which contains some dynamic params such
as aggregated value on some fields e.g salary etc*) .*Now I am left with
three transformed streams in Sink Operator. i.e dynamic HeaderStream with
aggregated Value 2) input DataStream 3) dynamic Footer stream with
aggregated Value. *

I could have used String for both Dynamic Headers and footers and emitted
the headers in open() method and footers in close() method of
TextOutputFormat, That would have solved my useCase.* But as I get a
DataStream(with only 1 value i.e final sum) back from Aggregated Values
(Currently I am using reduce function).* I am adding headers to that
DataStream only and similarly for footers. Now I am not able to merge them
while maintaining the order.

Below I have provided my implementation of reduce function
public DataStream sum(
SumFunction reduceFunction, DataStream stream) {

DataStream inputRecordTransformedDataStream =
stream.map(this::transform).returns((TypeInformation) Types.GENERIC
(Number.class));

return inputRecordTransformedDataStream
.keyBy(value -> "key")
.reduce(reduceFunction);
}


Below I am adding my Headers to my Sum Stream

public static DataStream getAggregatedStream(
String header, DataStream sinkStream) {

return sinkStream
.keyBy(key -> "value")
.flatMap(
(FlatMapFunction)
(value, out) -> out.collect(PojoClass.builder().data(header +
value).build()))
.returns(PojoClass.class);
}

Add HeaderStream is a Bounded DataStream with Dynamic Headers and
Aggregated value.
DataStream headerStream = addHeaderRows(sinkStream);

DataStream footerStream = addFooterRows(finalStream);

DataStream sinkStream;

Thanks a lot for your time and the advice.
Muazim Wani


On Fri, 11 Aug 2023 at 07:45, Hang Ruan  wrote:

> ps: Forget the link: Hybrid Source[1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/
>
> Hang Ruan  于2023年8月11日周五 10:14写道:
>
>> Hi, Muazim.
>>
>> I think the Hybird Source[1] may be helpful for your case.
>>
>> Best,
>> Hang
>>
>> Ken Krugler  于2023年8月11日周五 04:18写道:
>>
>>> As (almost) always, the devil is in the details.
>>>
>>> You haven’t said, but I’m assuming you’re writing out multiple files,
>>> each with a different schema, as otherwise you could just leverage the
>>> existing Flink support for CSV.
>>>
>>> So then you could combine the header/footer streams (adding a flag for
>>> header vs. footer), and connect that to the row data stream, then use a
>>> KeyedCoProcessFunction (I’m assuming you can key by something that
>>> identifies which schema). You’d buffer the row data & footer (separately in
>>> state). You would also need to set up a timer to fire at the max watermark,
>>> to flush out pending rows/footer when all of the input data has been
>>> processed.
>>>
>>> After that function you could configure the sink to bucket by the target
>>> schema.
>>>
>>> — Ken
>>>
>>>
>>> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
>>>
>>> Thanks for the response!
>>> I have a specific use case where I am writing to a TextFile sink. I have
>>> a Bounded stream of header data and need  to merge it with another bounded
>>> stream. While writing the data to a text file the header data should be
>>> written before the original data(from another bounded stream). And also at
>>> last I have another stream of footers where I would repeat the same process.
>>> I tried keeping an identifier for all three streams and based on these
>>> identifiers I added the data in three different ListState
>>> using KeyedProcess functions. So for headers I directly emitted the value
>>> but for main data and footers if I store it in a state . The issue is
>>> Outside KeyedProcess I am not able to emit the data in order.
>>> Is there any way I can achieve the use case of orderding the dataStreams
>>> . I also tried with union but it seems it adds data arbitrarily in both
>>> streams .
>>> Thanks and regards
>>>
>>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
>>> wrote:
>>>
 Hi Muazim,

 In Flink, a stream of data (unless bounded) is assumed 

Global/Shared objects

2023-08-10 Thread Kamal Mittal via user
Hello,

Is it possible to create global/shared objects like static which are shared 
among slots in a task manager?

Is it ok to create such objects in flink?

Rgds,
Kamal


关于RichFlatMapFunction的状态输出

2023-08-10 Thread Liu Join
请问,flink1.17在使用RichFlatMapFunction进行批计算时,如何在数据结束时将状态写入输出的数据流中?
谢谢


Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Hang Ruan
ps: Forget the link: Hybrid Source[1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/

Hang Ruan  于2023年8月11日周五 10:14写道:

> Hi, Muazim.
>
> I think the Hybird Source[1] may be helpful for your case.
>
> Best,
> Hang
>
> Ken Krugler  于2023年8月11日周五 04:18写道:
>
>> As (almost) always, the devil is in the details.
>>
>> You haven’t said, but I’m assuming you’re writing out multiple files,
>> each with a different schema, as otherwise you could just leverage the
>> existing Flink support for CSV.
>>
>> So then you could combine the header/footer streams (adding a flag for
>> header vs. footer), and connect that to the row data stream, then use a
>> KeyedCoProcessFunction (I’m assuming you can key by something that
>> identifies which schema). You’d buffer the row data & footer (separately in
>> state). You would also need to set up a timer to fire at the max watermark,
>> to flush out pending rows/footer when all of the input data has been
>> processed.
>>
>> After that function you could configure the sink to bucket by the target
>> schema.
>>
>> — Ken
>>
>>
>> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
>>
>> Thanks for the response!
>> I have a specific use case where I am writing to a TextFile sink. I have
>> a Bounded stream of header data and need  to merge it with another bounded
>> stream. While writing the data to a text file the header data should be
>> written before the original data(from another bounded stream). And also at
>> last I have another stream of footers where I would repeat the same process.
>> I tried keeping an identifier for all three streams and based on these
>> identifiers I added the data in three different ListState
>> using KeyedProcess functions. So for headers I directly emitted the value
>> but for main data and footers if I store it in a state . The issue is
>> Outside KeyedProcess I am not able to emit the data in order.
>> Is there any way I can achieve the use case of orderding the dataStreams
>> . I also tried with union but it seems it adds data arbitrarily in both
>> streams .
>> Thanks and regards
>>
>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
>> wrote:
>>
>>> Hi Muazim,
>>>
>>> In Flink, a stream of data (unless bounded) is assumed to never end.
>>>
>>> So in your example below, this means stream 2 would NEVER be emitted,
>>> because stream 1 would never end (there is no time at which you know for
>>> sure that stream 1 is done).
>>>
>>> And this in turn means stream 2 would be buffered forever in state, thus
>>> growing unbounded.
>>>
>>> I would suggest elaborating on your use case.
>>>
>>> Regards,
>>>
>>> — Ken
>>>
>>>
>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
>>>
>>> Hi Team,
>>> I have a use case where I have two streams and want to join them in
>>> stateful manner .
>>> E.g data of stream 1 should be emitted before stream2.
>>> I tried to store the data in ListState in KeyedProcessFunction but I am
>>> not able to access state  outside proccessElement().
>>> Is there any way I could achieve this?
>>> Thanks and regards
>>>
>>>
>>> --
>>> Ken Krugler
>>> http://www.scaleunlimited.com
>>> Custom big data solutions
>>> Flink, Pinot, Solr, Elasticsearch
>>>
>>>
>>>
>>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>


Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Hang Ruan
Hi, Muazim.

I think the Hybird Source[1] may be helpful for your case.

Best,
Hang

Ken Krugler  于2023年8月11日周五 04:18写道:

> As (almost) always, the devil is in the details.
>
> You haven’t said, but I’m assuming you’re writing out multiple files, each
> with a different schema, as otherwise you could just leverage the existing
> Flink support for CSV.
>
> So then you could combine the header/footer streams (adding a flag for
> header vs. footer), and connect that to the row data stream, then use a
> KeyedCoProcessFunction (I’m assuming you can key by something that
> identifies which schema). You’d buffer the row data & footer (separately in
> state). You would also need to set up a timer to fire at the max watermark,
> to flush out pending rows/footer when all of the input data has been
> processed.
>
> After that function you could configure the sink to bucket by the target
> schema.
>
> — Ken
>
>
> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
>
> Thanks for the response!
> I have a specific use case where I am writing to a TextFile sink. I have a
> Bounded stream of header data and need  to merge it with another bounded
> stream. While writing the data to a text file the header data should be
> written before the original data(from another bounded stream). And also at
> last I have another stream of footers where I would repeat the same process.
> I tried keeping an identifier for all three streams and based on these
> identifiers I added the data in three different ListState
> using KeyedProcess functions. So for headers I directly emitted the value
> but for main data and footers if I store it in a state . The issue is
> Outside KeyedProcess I am not able to emit the data in order.
> Is there any way I can achieve the use case of orderding the dataStreams .
> I also tried with union but it seems it adds data arbitrarily in both
> streams .
> Thanks and regards
>
> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
> wrote:
>
>> Hi Muazim,
>>
>> In Flink, a stream of data (unless bounded) is assumed to never end.
>>
>> So in your example below, this means stream 2 would NEVER be emitted,
>> because stream 1 would never end (there is no time at which you know for
>> sure that stream 1 is done).
>>
>> And this in turn means stream 2 would be buffered forever in state, thus
>> growing unbounded.
>>
>> I would suggest elaborating on your use case.
>>
>> Regards,
>>
>> — Ken
>>
>>
>> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
>>
>> Hi Team,
>> I have a use case where I have two streams and want to join them in
>> stateful manner .
>> E.g data of stream 1 should be emitted before stream2.
>> I tried to store the data in ListState in KeyedProcessFunction but I am
>> not able to access state  outside proccessElement().
>> Is there any way I could achieve this?
>> Thanks and regards
>>
>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


回复: Flink operator job restart

2023-08-10 Thread Chen Zhanghao
Hi Ethan,

You can refer to the K8s operator doc on how to do a stateful job upgrade: Job 
Management | Apache Flink Kubernetes 
Operator.

Best,
Zhanghao Chen

发件人: Ethan T Yang 
发送时间: 2023年8月10日 12:27
收件人: user@flink.apache.org 
主题: Flink operator job restart

Hi Flink users,

When using Flink operator, how to restart jobs from a checkpoint. I am used to 
the Flink Kubernetes native deployment, where I can run

flink run -s

How to achieve that in Flink operator

Thanks
Ivan


回复: Questions related to Autoscaler

2023-08-10 Thread Chen Zhanghao
Q1: if you use operator to submit a standalone mode job with reactive mode 
enabled, KEDA should still work.

Q2: For Flink versions, 1.17 is recommended, but 1.15 is also okay if you 
backport the necessary changes listed in Autoscaler | Apache Flink Kubernetes 
Operator.
 For Kubernetes Operator, the latest stable version is 1.5 (1.6 is close but 
not officially released yet), so stay on 1.5 is fine.

Q3: The metrics monitored (as of v1.5) are: throughput, lag, busy time. CPU and 
memory is not considered. And yes, backlog-processing.lag-threshold is related 
to Kafka consumer lag, when job lag time is beyond this threashold, autoscaler 
will prevent any downscaling behavior.


Best,
Zhanghao Chen

发件人: Hou, Lijuan via user 
发送时间: 2023年8月9日 3:04
收件人: user@flink.apache.org 
主题: Questions related to Autoscaler


Hi Flink team,



This is Lijuan. I am working on our flink job to realize autoscaling. We are 
currently using flink version of 1.16.1, and using flink operator version of 
1.5.0. I have some questions need to confirm with you.



1 - It seems for flink job using flink operator to realize autoscaling, the 
only option to realize autoscaling is to enable the Autoscaler feature, and 
KEDA won’t work, right?



2 - I noticed from the document that we need to upgrade to flink version of 
1.17 to use Autoscaler. But I also noticed that the updated version for flink 
operator is 1.7 now.

Shall we upgrade from 1.5.0 to 1.7 to enable Autoscaler?



3 �C I have done a lot of search, and also read the Autoscaler Algorithm page. 
But I am still not very sure about the list of metrics observed automatically.

  *   Will it include CPU load, memory, throughput and kafka consumer lag? 
Could you please provide the whole list of monitored metrics?

-  Is this config related to kafka consumer lag?
kubernetes.operator.job.autoscaler.backlog-processing.lag-threshold
Thanks a lot for the help!

Best,
Lijuan






退订

2023-08-10 Thread 蔡荣
退订

退订

2023-08-10 Thread DannyLau
退订

Local process and Docker have different behavior

2023-08-10 Thread Daniel Henneberger
Dear Apache Flink community,

When I run Flink locally in my test cases on my Mac, I observe different
behavior compared to running it in my Docker-backed build instance or using
the official Docker-compose image. The processes complete as expected when
i run it in-process but not always when i use docker. I'm using the
datastream file connector with the table api. Specifically, I'm observing
these two behaviors:

1. The file source appears to not always read all of the files that are in
my source directory. There are 3 files that I'm reading in a directory. It
will sometimes skip one, seemingly at random.

2. When I do a join on the kafka source, the process that does the join
will quickly stop emitting data and the file source process appears to end.
The file source process never emits a final watermark before it ends. It
works correctly when I use the file monitor and configure it to emit
watermarks at an interval.

I'm not sure if these two issues are related. Let me know what you think!

Thanks,
Daniel Henneberger


Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Ken Krugler
As (almost) always, the devil is in the details.

You haven’t said, but I’m assuming you’re writing out multiple files, each with 
a different schema, as otherwise you could just leverage the existing Flink 
support for CSV.

So then you could combine the header/footer streams (adding a flag for header 
vs. footer), and connect that to the row data stream, then use a 
KeyedCoProcessFunction (I’m assuming you can key by something that identifies 
which schema). You’d buffer the row data & footer (separately in state). You 
would also need to set up a timer to fire at the max watermark, to flush out 
pending rows/footer when all of the input data has been processed.

After that function you could configure the sink to bucket by the target schema.

— Ken


> On Aug 10, 2023, at 10:41 AM, Muazim Wani  wrote:
> 
> Thanks for the response!
> I have a specific use case where I am writing to a TextFile sink. I have a 
> Bounded stream of header data and need  to merge it with another bounded 
> stream. While writing the data to a text file the header data should be 
> written before the original data(from another bounded stream). And also at 
> last I have another stream of footers where I would repeat the same process.
> I tried keeping an identifier for all three streams and based on these 
> identifiers I added the data in three different ListState using KeyedProcess 
> functions. So for headers I directly emitted the value but for main data and 
> footers if I store it in a state . The issue is Outside KeyedProcess I am not 
> able to emit the data in order.
> Is there any way I can achieve the use case of orderding the dataStreams . I 
> also tried with union but it seems it adds data arbitrarily in both streams .
> Thanks and regards
> 
> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler,  > wrote:
>> Hi Muazim,
>> 
>> In Flink, a stream of data (unless bounded) is assumed to never end.
>> 
>> So in your example below, this means stream 2 would NEVER be emitted, 
>> because stream 1 would never end (there is no time at which you know for 
>> sure that stream 1 is done).
>> 
>> And this in turn means stream 2 would be buffered forever in state, thus 
>> growing unbounded.
>> 
>> I would suggest elaborating on your use case.
>> 
>> Regards,
>> 
>> — Ken
>> 
>> 
>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani >> > wrote:
>>> 
>>> Hi Team,
>>> I have a use case where I have two streams and want to join them in 
>>> stateful manner . 
>>> E.g data of stream 1 should be emitted before stream2.
>>> I tried to store the data in ListState in KeyedProcessFunction but I am not 
>>> able to access state  outside proccessElement().
>>> Is there any way I could achieve this?
>>> Thanks and regards
>>> 
>> 
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com 
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
>> 
>> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Muazim Wani
Thanks for the response!
I have a specific use case where I am writing to a TextFile sink. I have a
Bounded stream of header data and need  to merge it with another bounded
stream. While writing the data to a text file the header data should be
written before the original data(from another bounded stream). And also at
last I have another stream of footers where I would repeat the same process.
I tried keeping an identifier for all three streams and based on these
identifiers I added the data in three different ListState
using KeyedProcess functions. So for headers I directly emitted the value
but for main data and footers if I store it in a state . The issue is
Outside KeyedProcess I am not able to emit the data in order.
Is there any way I can achieve the use case of orderding the dataStreams .
I also tried with union but it seems it adds data arbitrarily in both
streams .
Thanks and regards

On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, 
wrote:

> Hi Muazim,
>
> In Flink, a stream of data (unless bounded) is assumed to never end.
>
> So in your example below, this means stream 2 would NEVER be emitted,
> because stream 1 would never end (there is no time at which you know for
> sure that stream 1 is done).
>
> And this in turn means stream 2 would be buffered forever in state, thus
> growing unbounded.
>
> I would suggest elaborating on your use case.
>
> Regards,
>
> — Ken
>
>
> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
>
> Hi Team,
> I have a use case where I have two streams and want to join them in
> stateful manner .
> E.g data of stream 1 should be emitted before stream2.
> I tried to store the data in ListState in KeyedProcessFunction but I am
> not able to access state  outside proccessElement().
> Is there any way I could achieve this?
> Thanks and regards
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


Questions related to Autoscaler

2023-08-10 Thread Hou, Lijuan via user
Hi Ron,

Thanks for the reply!

> 1 - It seems for flink job using flink operator to realize autoscaling, the 
> only option to realize autoscaling is to enable the Autoscaler feature, and 
> KEDA won’t work, right?

What is KEDA mean?

-> KEDA is a Kubernetes based Event Driven Autoscaler. I found some examples 
using Flink’s previous Reactive mode + KEDA to realize autoscaling. So if 
Autoscaler is enabled, is it still necessary to create KEDA resources? I think 
TaskManager instances are created and destroyed by the Flink JobManager now, 
and aren’t in a replication controller, so they can't be “scaled up” using 
traditional Kubernetes techniques like KEDA.
Could you please help confirm? Thank you!


 --

> 2 - I noticed from the document that we need to upgrade to flink version of 
> 1.17 to use Autoscaler. But I also noticed that the updated version for flink 
> operator is 1.7 now.
Shall we upgrade from 1.5.0 to 1.7 to enable Autoscaler?

I have checked the flink-kubernetes-operator projection pom for release-1.5 
branch, the dependency flink version is 1.16.1. So I recommend you update your 
flink-kubernetes-operator to 1.6. The latest stable release is 1.6.
-> Thank you, so the dependency flink version of flink-kubernetes-operator 
version-1.6 is 1.17?  Our current flink version is 1.16.1, so does it mean we 
need to:
1 – Update flink version to 1.17
2 – Update flink operator version to 1.6?
Could you please help confirm? Thank you!


 --

Oh I have another question from the email, please have a look:
3 – Could you please provide a list of metrics observed by Autoscaler 
automatically?

  *   Will it include CPU load, memory, throughput and kafka consumer lag?
  *   Is there any configurations related to kafka consumer lag that we can 
setup to scale job by making Autoscaler monitor it? Like some threshold?

Thanks a lot for the help!!

Best,
Lijuan



Re: In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Ken Krugler
Hi Muazim,

In Flink, a stream of data (unless bounded) is assumed to never end.

So in your example below, this means stream 2 would NEVER be emitted, because 
stream 1 would never end (there is no time at which you know for sure that 
stream 1 is done).

And this in turn means stream 2 would be buffered forever in state, thus 
growing unbounded.

I would suggest elaborating on your use case.

Regards,

— Ken


> On Aug 10, 2023, at 10:11 AM, Muazim Wani  wrote:
> 
> Hi Team,
> I have a use case where I have two streams and want to join them in stateful 
> manner . 
> E.g data of stream 1 should be emitted before stream2.
> I tried to store the data in ListState in KeyedProcessFunction but I am not 
> able to access state  outside proccessElement().
> Is there any way I could achieve this?
> Thanks and regards
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





In Flink, is there a way to merge two streams in stateful manner

2023-08-10 Thread Muazim Wani
Hi Team,
I have a use case where I have two streams and want to join them in
stateful manner .
E.g data of stream 1 should be emitted before stream2.
I tried to store the data in ListState in KeyedProcessFunction but I am not
able to access state  outside proccessElement().
Is there any way I could achieve this?
Thanks and regards


在flink1.17.1中java调用pythonUDF奇怪问题[maximum recursion depth][python函数正常]

2023-08-10 Thread 1
HI
我简化了我python代码
只要udft方法有外部方法,都会有递归问题, 比如 
agan_add_iig(),尽管我的agan_add_iig()实现很简单,flink难道不能外部import自定义方法吗??
def agan_add_iig():
return 2
@udtf(input_types=DataTypes.STRING(),
result_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()])
def run(data_str):
try:
logger.info("input param is %s", data_str)
data = [{'name': data_str}]
start_time = time.time()

agan_add_iig()
end_time = time.time()

print("入参耗时:", end_time - start_time)
# extractEngine = init_info(data)
return 'success', 'success', 'success', 'success'

except Exception as e:
err = e
logger.error(e)
return str(err), '', '', ''

在flink1.17.1中java调用pythonUDF奇怪问题[maximum recursion depth][python函数正常]

2023-08-10 Thread 1
各位老师好:


背景是这样的[flink1.17.1],我在window机器,本地单机调用自定义的pythonUDF,下面是我python代码


err=None
@udtf(input_types=DataTypes.STRING(),
result_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()])
def run(data_str):
try:
logger.info("input param is ",  data_str)
data = [{'name': data_str}]
extractEngine = init_info(data)
for row in extractEngine.extract():
return row.content.get('content'), list_tran_json(row.content.get('tok')), 
list_tran_json(
row.content.get('pos')), list_tran_json(row.content.get('dep'))
except Exception as e:
err = e
logger.error(e)
return str(err), '', '', ''
这段代码我是定义了一个切词能力,使用hanlp的,进来把语句进行切词和依存处理,并返回,
我能保证并验证这个方法没有任何问题,都能执行;
现在我通过@udtf包装成udf,通过java代码去调用
下面是java代码:
@Test
@SneakyThrows
public void testTableMiningFunc() {
registerTable();
String registerSql = "CREATE TEMPORARY FUNCTION mining AS 
'py_bian_func.mining_w.run' LANGUAGE PYTHON";
tableEnv.executeSql(
registerSql);
String sql = "SELECT * from t_a001 ,LATERAL TABLE(mining(name)) as 
alias(content, pos, top, des) ";
TableResult tableResult = tableEnv.executeSql(sql);
tableResult.print();
}
返回数据是
这个我一直没搞懂,为什么会显示递归问题;


PS: 
补充解析下,python代码之所为通过全局变量去接受异常,因为我之前发现我如果不进行异常捕获,flink程序会卡住,日志提示会显示:org.apache.beam.runners.fnexecution.logging.GrpcLoggingService
  - Logging client hanged up.然后卡住不动
我去跟踪了算子图:,发现数据发送到python的算子,但是没有输出,对应日志为:org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator
  - Closing streams for instruction 1 and outbound data 
{fn/read/input:0=Byte size: 239, Element count: 4} and timers {}.




现在我在怀疑是flink在@udtf 加载装饰器过程中可能出现了问题,导致我run方法没有进去执行;因为我在run定义了log,但是在控制台没看到任何日志输出;
由于官网的docs对于这块底层加载逻辑没有太多介绍,这边请教下各位老师,应该怎么处理


我用pydev-pycharm去debug pyflink代码,发现在java_gateway.py中run方法有正常跑,而且


能正常发送,但是没有执行到我自定义的udf中的run方法;


困扰2天,望各位老师指点!







Re: Flink operator job restart

2023-08-10 Thread Shammon FY
Hi Ethan:

You can restart jobs with a specified checkpoint directory as [1]. But
generally, we often restart jobs with savepoint, you can refer to [2] for
more information.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/

Best,
Shammon FY



On Thu, Aug 10, 2023 at 2:43 PM liu ron  wrote:

> Hi, lvan
>
> You can refer to the five-part that restore the job in [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/upgrade/#2-upgrading-with-existing-flinkdeployments
>
> Best,
> Ron
>
> Ethan T Yang  于2023年8月10日周四 12:28写道:
>
>> Hi Flink users,
>>
>> When using Flink operator, how to restart jobs from a checkpoint. I am
>> used to the Flink Kubernetes native deployment, where I can run
>>
>> flink run -s
>>
>> How to achieve that in Flink operator
>>
>> Thanks
>> Ivan
>
>


Re: Flink operator job restart

2023-08-10 Thread liu ron
Hi, lvan

You can refer to the five-part that restore the job in [1].

[1]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/upgrade/#2-upgrading-with-existing-flinkdeployments

Best,
Ron

Ethan T Yang  于2023年8月10日周四 12:28写道:

> Hi Flink users,
>
> When using Flink operator, how to restart jobs from a checkpoint. I am
> used to the Flink Kubernetes native deployment, where I can run
>
> flink run -s
>
> How to achieve that in Flink operator
>
> Thanks
> Ivan