Re: Flink, JSON, and JSONSchemas

2022-06-16 Thread Shengkai Fang
Hi.

> *1. Is there some easy way to use deserialized JSON in DataStream without
case classes or POJOs?*

Could you explain what you expected? Do you mean you want to just register
a DataType that is able to bridge the received bytes to the POJO objects. I
am not sure wether the current RAW type[1] in Flink Table is enough for
you.

*> 2. How can I use a DeserializationSchema to get a DataStream
or even DataStreamSource in a unit test from either a file or
String[]/byte[] of serialized JSON?*

For DeserializationSchema, you can refer to the Kafka
connector[2]. I think it should be similar to the
DeserializationSchema.

Best,
Shengkai

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L234



Andrew Otto  于2022年6月17日周五 02:26写道:

> At the Wikimedia Foundation, we use JSON and JSONSchemas for our events in
> Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd like
> to provide library level integration between our 'Event Platform' JSON data
> and Flink.  My main goal:
>
> *No case classes or POJOs.  *The JSONSchemas should be enough.
>
> I can actually do this pretty easily with the Table API. I can
> convert from JSONSchema to a DataType, and then create a table with that
> DataType and format('json').
>
> I'd like to be able to do the same for the DataStream API.  From what I
> can tell, to do this I should be using a Row
> 
> as the record type.  I can also convert from JSONSchema to
> TypeInformation pretty easily, using the Types factory
> 
> .
>
> While I can convert to and from
> 
> the Table API to DataStream, it seems directly using DataStream
> of our JSON could be pretty useful, and would make it possible to use Flink
> without instantiating a StreamTableEnvironment or requiring a 'table
> planner'.  Also, to convert back up to the Table API from a
> DataStream, I need the explicit TypeInformation, which I need to
> manually construct.
>
> Ah but, JsonRowDeserializationSchema
> 
>  is
> deprecated. Okay, fine I can copy it into my code and modify it for my
> purposes.  But even if I do, I'm confused about something else:
>
> DeserializationSchema is not Table API specific (e.g. it can be used as
> the value deserializer in KafkaSource).  Row is also not Table API specific
> (although I know the main purpose is to bridge Table to DataStream API).
> However, it seems that constructing a Source using DeserializationSchema is
> not really that common?  KafkaSource uses it, but FileSource and
> env.fromElements don't?  I'm trying to write integration tests for this
> that use the DataStream API.
>
> *tl;dr questions:*
>
> *1. Is there some easy way to use deserialized JSON in DataStream without
> case classes or POJOs?*
>
> *2. How can I use a DeserializationSchema to get a DataStream or
> even DataStreamSource in a unit test from either a file or
> String[]/byte[] of serialized JSON?*
>
> Thank you!
>
>
>
>
>


Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread Dian Fu
>> This error generally occurs in jobs where there are transfers between
Table and datastream.
AFAIK, this issue should have already been fixed, see
https://issues.apache.org/jira/browse/FLINK-26920 and
https://issues.apache.org/jira/browse/FLINK-23133 for more details.

Regards,
Dian

On Fri, Jun 17, 2022 at 10:17 AM Xingbo Huang  wrote:

> Hi John,
>
> Because I can't see your code, I can only provide some possible reasons
> for this error:
> 1. This error generally occurs in jobs where there are transfers between
> Table and datastream. But given that you said you just used the sql +
> python udf, this shouldn't be the case.
> 2. The default value of `taskmanager.memory.managed.consumer-weights` is
> `OPERATOR:70,STATE_BACKEND:70,PYTHON:30`, so in your case, there is
> actually no need to set it to `PYTHON:30`
> 3. In fact, for pure sql+python udf jobs, if you don't set error value
> `PYTHON:0` in `taskmanager.memory.managed.consumer-weights`, I really can't
> think of any situation where this problem will occur.
>
> Best,
> Xingbo
>
> John Tipper  于2022年6月16日周四 19:41写道:
>
>> Hi Xingbo,
>>
>> Yes, there are a number of temporary views being created, where each is
>> being created using SQL (CREATE TEMPORARY VIEW ...) rather than explicit
>> calls to the Table and DataStream APIs.
>>
>> Is this a good pattern or are there caveats I should be aware of please?
>>
>> Many thanks,
>>
>> John
>>
>>
>> --
>> *From:* Xingbo Huang 
>> *Sent:* 16 June 2022 12:34
>> *To:* John Tipper 
>> *Cc:* user@flink.apache.org 
>> *Subject:* Re: The configured managed memory fraction for Python worker
>> process must be within (0, 1], was: %s
>>
>> Hi John,
>>
>> Does your job logic include conversion between Table and DataStream? For
>> example, methods such as `create_temporary_view(path: str, data_stream:
>> DataStream): -> Table`  are used.
>>
>> Best,
>> Xingbo
>>
>> John Tipper  于2022年6月16日周四 18:31写道:
>>
>> Hi Xingbo,
>>
>> I’m afraid I can’t share my code but Flink is 1.13. The main Flink code
>> is running inside Kinesis on AWS so I cannot change the version.
>>
>> Many thanks,
>>
>> John
>>
>> Sent from my iPhone
>>
>> On 16 Jun 2022, at 10:37, Xingbo Huang  wrote:
>>
>> 
>> Hi John,
>>
>> Could you provide the code snippet and the version of pyflink you used?
>>
>> Best,
>> Xingbo
>>
>>
>> John Tipper  于2022年6月16日周四 17:05写道:
>>
>> Hi all,
>>
>> I'm trying to run a PyFlink unit test to test some PyFlink SQL and where
>> my code uses a Python UDF.  I can't share my code but the test case is
>> similar to the code here:
>> https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
>>   When
>> I have some simple SQL everything is fine. When I add a more complex query
>> I get an error, which looks like it's memory related.
>>
>> java.lang.IllegalArgumentException: The configured managed memory fraction
>> for Python worker process must be within (0, 1], was: %s. It may be because
>> the consumer type "Python" was missing or set to 0 for the config option
>> "taskmanager.memory.managed.consumer-weights".0.0
>>
>>
>>
>> In my test case setUp(), I try to set that value like this, but it seems
>> to have no effect:
>>
>> self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
>> "PYTHON:30")
>>
>>
>> Am I not setting it correctly, or is there something else I need to do to
>> fix this error?
>>
>> Many thanks,
>>
>> John
>>
>>


Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread Xingbo Huang
Hi John,

Because I can't see your code, I can only provide some possible reasons for
this error:
1. This error generally occurs in jobs where there are transfers between
Table and datastream. But given that you said you just used the sql +
python udf, this shouldn't be the case.
2. The default value of `taskmanager.memory.managed.consumer-weights` is
`OPERATOR:70,STATE_BACKEND:70,PYTHON:30`, so in your case, there is
actually no need to set it to `PYTHON:30`
3. In fact, for pure sql+python udf jobs, if you don't set error value
`PYTHON:0` in `taskmanager.memory.managed.consumer-weights`, I really can't
think of any situation where this problem will occur.

Best,
Xingbo

John Tipper  于2022年6月16日周四 19:41写道:

> Hi Xingbo,
>
> Yes, there are a number of temporary views being created, where each is
> being created using SQL (CREATE TEMPORARY VIEW ...) rather than explicit
> calls to the Table and DataStream APIs.
>
> Is this a good pattern or are there caveats I should be aware of please?
>
> Many thanks,
>
> John
>
>
> --
> *From:* Xingbo Huang 
> *Sent:* 16 June 2022 12:34
> *To:* John Tipper 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: The configured managed memory fraction for Python worker
> process must be within (0, 1], was: %s
>
> Hi John,
>
> Does your job logic include conversion between Table and DataStream? For
> example, methods such as `create_temporary_view(path: str, data_stream:
> DataStream): -> Table`  are used.
>
> Best,
> Xingbo
>
> John Tipper  于2022年6月16日周四 18:31写道:
>
> Hi Xingbo,
>
> I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is
> running inside Kinesis on AWS so I cannot change the version.
>
> Many thanks,
>
> John
>
> Sent from my iPhone
>
> On 16 Jun 2022, at 10:37, Xingbo Huang  wrote:
>
> 
> Hi John,
>
> Could you provide the code snippet and the version of pyflink you used?
>
> Best,
> Xingbo
>
>
> John Tipper  于2022年6月16日周四 17:05写道:
>
> Hi all,
>
> I'm trying to run a PyFlink unit test to test some PyFlink SQL and where
> my code uses a Python UDF.  I can't share my code but the test case is
> similar to the code here:
> https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
>   When
> I have some simple SQL everything is fine. When I add a more complex query
> I get an error, which looks like it's memory related.
>
> java.lang.IllegalArgumentException: The configured managed memory fraction
> for Python worker process must be within (0, 1], was: %s. It may be because
> the consumer type "Python" was missing or set to 0 for the config option
> "taskmanager.memory.managed.consumer-weights".0.0
>
>
>
> In my test case setUp(), I try to set that value like this, but it seems
> to have no effect:
>
> self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
> "PYTHON:30")
>
>
> Am I not setting it correctly, or is there something else I need to do to
> fix this error?
>
> Many thanks,
>
> John
>
>


Flink, JSON, and JSONSchemas

2022-06-16 Thread Andrew Otto
At the Wikimedia Foundation, we use JSON and JSONSchemas for our events in
Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd like
to provide library level integration between our 'Event Platform' JSON data
and Flink.  My main goal:

*No case classes or POJOs.  *The JSONSchemas should be enough.

I can actually do this pretty easily with the Table API. I can convert from
JSONSchema to a DataType, and then create a table with that DataType and
format('json').

I'd like to be able to do the same for the DataStream API.  From what I can
tell, to do this I should be using a Row

as the record type.  I can also convert from JSONSchema to
TypeInformation pretty easily, using the Types factory

.

While I can convert to and from

the Table API to DataStream, it seems directly using DataStream
of our JSON could be pretty useful, and would make it possible to use Flink
without instantiating a StreamTableEnvironment or requiring a 'table
planner'.  Also, to convert back up to the Table API from a
DataStream, I need the explicit TypeInformation, which I need to
manually construct.

Ah but, JsonRowDeserializationSchema

is
deprecated. Okay, fine I can copy it into my code and modify it for my
purposes.  But even if I do, I'm confused about something else:

DeserializationSchema is not Table API specific (e.g. it can be used as the
value deserializer in KafkaSource).  Row is also not Table API specific
(although I know the main purpose is to bridge Table to DataStream API).
However, it seems that constructing a Source using DeserializationSchema is
not really that common?  KafkaSource uses it, but FileSource and
env.fromElements don't?  I'm trying to write integration tests for this
that use the DataStream API.

*tl;dr questions:*

*1. Is there some easy way to use deserialized JSON in DataStream without
case classes or POJOs?*

*2. How can I use a DeserializationSchema to get a DataStream or
even DataStreamSource in a unit test from either a file or
String[]/byte[] of serialized JSON?*

Thank you!


Re: Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-16 Thread Peter Westermann
We run a standalone Flink cluster in session mode (but we usually only run one 
job per cluster; session mode just fits better with our deployment workflow 
than application mode).
We trigger hourly savepoints and also use savepoints to stop a job and then 
restart with a new version of the jar.
I haven’t seen any issue with the hourly savepoints (without stopping the job). 
 For these, I can see messages such as Evicted result with trigger id 
30f9457373eba7b9de1bdeaf591a6956 because its TTL of 300s has expired.
~5 minutes after savepoint completion.

When the stop-with-savepoint status lookup fails with Exception occurred in 
REST handler: There is no savepoint operation with 
triggerId=cee5054245598efb42245b3046a6ae75
I still see Evicted result with trigger id cee5054245598efb42245b3046a6ae75 
because its TTL of 300s has expired. ~5 minutes after savepoint completion.

The 
documentation
 for Flink 1.15 mentions a new feature:
For (stop-with-)savepoint operations you can control this triggerId by setting 
it in the body of the request that triggers the operation. This allow you to 
safely* retry such operations without triggering multiple savepoints.

Could this have anything to do with the error I am seeing?



Peter Westermann
Analytics Software Architect
[cidimage001.jpg@01D78D4C.C00AC080]
peter.westerm...@genesys.com
[cidimage001.jpg@01D78D4C.C00AC080]
[cidimage002.jpg@01D78D4C.C00AC080]


From: Chesnay Schepler 
Date: Thursday, June 16, 2022 at 11:32 AM
To: Peter Westermann , user@flink.apache.org 

Subject: Re: Sporadic issues with savepoint status lookup in Flink 1.15
 EXTERNAL EMAIL - Please use caution with links and attachments


ok that shouldn't happen. I couldn't find anything wrong in the code so far; 
will continue trying to reproduce it.

If this happens, does it persist indefinitely for a particular triggerId, or 
does it reappear later on again?
Are you only ever triggering a single savepoint for a given job?

Are you using session or application clusters?

On 16/06/2022 16:59, Peter Westermann wrote:
If it happens it happens immediately. Once we receive the triggerId from 
/jobs/:jobid/stop or /jobs/:jobid/savepoints we poll 
/jobs/:jobid/savepoints/:triggerid every second until the status is no longer 
IN_PROGRESS.

Peter Westermann
Analytics Software Architect
[cid:image003.jpg@01D88178.3859FDB0]
peter.westerm...@genesys.com
[cid:image003.jpg@01D88178.3859FDB0]
[cid:image004.jpg@01D88178.3859FDB0]


From: Chesnay Schepler 
Date: Thursday, June 16, 2022 at 10:55 AM
To: Peter Westermann 
, 
user@flink.apache.org 

Subject: Re: Sporadic issues with savepoint status lookup in Flink 1.15
 EXTERNAL EMAIL - Please use caution with links and attachments


There is an expected case where this might happen:
if too much time has elapsed since the savepoint was completed (default 5 
minutes; controlled by rest.async.store-duration)

Did this happen earlier than that?

On 16/06/2022 15:53, Peter Westermann wrote:
We recently upgraded one of our Flink clusters to version 1.15.0 and are now 
seeing sporadic issues when stopping a job with a savepoint via the REST API. 
This happens for /jobs/:jobid/savepoints and /jobs/:jobid/stop:
The job finishes with a savepoint but the triggerId returned from the REST API 
seems to be invalid. Any lookups via /jobs/:jobid/savepoints/:triggerid fail 
with a 404 and the following error:

org.apache.flink.runtime.rest.handler.RestHandlerException: There is no 
savepoint operation with triggerId=cee5054245598efb42245b3046a6ae75 for job 
0995a9461f0178294ea71c9accbe750c


Peter Westermann
Analytics Software Architect
[cidimage001.jpg@01D78D4C.C00AC080]
peter.westerm...@genesys.com
[cidimage001.jpg@01D78D4C.C00AC080]
[cidimage002.jpg@01D78D4C.C00AC080]







Re: Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-16 Thread Chesnay Schepler
ok that shouldn't happen. I couldn't find anything wrong in the code so 
far; will continue trying to reproduce it.


If this happens, does it persist indefinitely for a particular 
triggerId, or does it reappear later on again?

Are you only ever triggering a single savepoint for a given job?

Are you using session or application clusters?

On 16/06/2022 16:59, Peter Westermann wrote:


If it happens it happens immediately. Once we receive the triggerId 
from */jobs/:jobid/stop *or*/jobs/:jobid/savepoints* we poll 
*/jobs/:jobid/savepoints/:triggerid *every second until the status is 
no longer IN_PROGRESS.


Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com 

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 

*From: *Chesnay Schepler 
*Date: *Thursday, June 16, 2022 at 10:55 AM
*To: *Peter Westermann , 
user@flink.apache.org 

*Subject: *Re: Sporadic issues with savepoint status lookup in Flink 1.15

* EXTERNAL EMAIL - Please use caution with links and attachments *



There is an expected case where this might happen:

if too much time has elapsed since the savepoint was completed 
(default 5 minutes; controlled by rest.async.store-duration)


Did this happen earlier than that?

On 16/06/2022 15:53, Peter Westermann wrote:

We recently upgraded one of our Flink clusters to version 1.15.0
and are now seeing sporadic issues when stopping a job with a
savepoint via the REST API. This happens for
*/jobs/:jobid/savepoints *and*/jobs/:jobid/stop*:

The job finishes with a savepoint but the triggerId returned from
the REST API seems to be invalid. Any lookups via
*/jobs/:jobid/savepoints/:triggerid* fail with a 404 and the
following error:

org.apache.flink.runtime.rest.handler.RestHandlerException: There
is no savepoint operation with
triggerId=cee5054245598efb42245b3046a6ae75 for job
0995a9461f0178294ea71c9accbe750c

Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com 

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 



Re: Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-16 Thread Chesnay Schepler

Are there any log messages from the CompletedOperationCache in the logs?

On 16/06/2022 16:54, Chesnay Schepler wrote:

There is an expected case where this might happen:
if too much time has elapsed since the savepoint was completed 
(default 5 minutes; controlled by rest.async.store-duration)


Did this happen earlier than that?

On 16/06/2022 15:53, Peter Westermann wrote:


We recently upgraded one of our Flink clusters to version 1.15.0 and 
are now seeing sporadic issues when stopping a job with a savepoint 
via the REST API. This happens for */jobs/:jobid/savepoints 
*and*/jobs/:jobid/stop*:


The job finishes with a savepoint but the triggerId returned from the 
REST API seems to be invalid. Any lookups via 
*/jobs/:jobid/savepoints/:triggerid* fail with a 404 and the 
following error:


org.apache.flink.runtime.rest.handler.RestHandlerException: There is 
no savepoint operation with 
triggerId=cee5054245598efb42245b3046a6ae75 for job 
0995a9461f0178294ea71c9accbe750c


Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com 

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 





Re: Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-16 Thread Peter Westermann
If it happens it happens immediately. Once we receive the triggerId from 
/jobs/:jobid/stop or /jobs/:jobid/savepoints we poll 
/jobs/:jobid/savepoints/:triggerid every second until the status is no longer 
IN_PROGRESS.

Peter Westermann
Analytics Software Architect
[cidimage001.jpg@01D78D4C.C00AC080]
peter.westerm...@genesys.com
[cidimage001.jpg@01D78D4C.C00AC080]
[cidimage002.jpg@01D78D4C.C00AC080]


From: Chesnay Schepler 
Date: Thursday, June 16, 2022 at 10:55 AM
To: Peter Westermann , user@flink.apache.org 

Subject: Re: Sporadic issues with savepoint status lookup in Flink 1.15
 EXTERNAL EMAIL - Please use caution with links and attachments


There is an expected case where this might happen:
if too much time has elapsed since the savepoint was completed (default 5 
minutes; controlled by rest.async.store-duration)

Did this happen earlier than that?

On 16/06/2022 15:53, Peter Westermann wrote:
We recently upgraded one of our Flink clusters to version 1.15.0 and are now 
seeing sporadic issues when stopping a job with a savepoint via the REST API. 
This happens for /jobs/:jobid/savepoints and /jobs/:jobid/stop:
The job finishes with a savepoint but the triggerId returned from the REST API 
seems to be invalid. Any lookups via /jobs/:jobid/savepoints/:triggerid fail 
with a 404 and the following error:

org.apache.flink.runtime.rest.handler.RestHandlerException: There is no 
savepoint operation with triggerId=cee5054245598efb42245b3046a6ae75 for job 
0995a9461f0178294ea71c9accbe750c


Peter Westermann
Analytics Software Architect
[cidimage001.jpg@01D78D4C.C00AC080]
peter.westerm...@genesys.com
[cidimage001.jpg@01D78D4C.C00AC080]
[cidimage002.jpg@01D78D4C.C00AC080]





Re: Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-16 Thread Chesnay Schepler

There is an expected case where this might happen:
if too much time has elapsed since the savepoint was completed (default 
5 minutes; controlled by rest.async.store-duration)


Did this happen earlier than that?

On 16/06/2022 15:53, Peter Westermann wrote:


We recently upgraded one of our Flink clusters to version 1.15.0 and 
are now seeing sporadic issues when stopping a job with a savepoint 
via the REST API. This happens for */jobs/:jobid/savepoints 
*and*/jobs/:jobid/stop*:


The job finishes with a savepoint but the triggerId returned from the 
REST API seems to be invalid. Any lookups via 
*/jobs/:jobid/savepoints/:triggerid* fail with a 404 and the following 
error:


org.apache.flink.runtime.rest.handler.RestHandlerException: There is 
no savepoint operation with triggerId=cee5054245598efb42245b3046a6ae75 
for job 0995a9461f0178294ea71c9accbe750c


Peter Westermann

Analytics Software Architect

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com 

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 



Re:flink 1.10.1 flinkui ???????? ????????????cancalling?? ????????????????

2022-06-16 Thread Xuyang
Hi?? 
??JMTM??logdatastream??closedelay
?? 2022-06-16 15:10:07??"??" <757434...@qq.com.INVALID> ??
>flink 1.10.1 flinkui  cancalling?? 


Re: Flink Shaded dependencies and extending Flink APIs

2022-06-16 Thread Andrew Otto
Hi all thanks for the responses.

> Create a module let's say "wikimedia-event-utilities-shaded"
This actually doesn't help me, as wikimedia-event-utilities is used as an
API by non Flink stuff too, so I don't want to use the shaded ObjectNode in
the API params.

> Another solution is that you can serialize then deserialize the "different"
ObjectNode
Haha, I thought of this too and then was like...no way, too crazy!

> Both flink-shaded, any relocation pattern and JsonRowDataSerializationSchema
are Flink internals that users shouldn't use/rely on.
Yeah, in hindsight, I think the right solution is to make my own
SerializationSchema, even if that is mostly copy/pasting the internal Flink
one, rather than extending it.

I have another question around JSON and Flink, but I'll start a new thread
for that.

Thank you!




On Mon, Jun 13, 2022 at 7:17 AM Chesnay Schepler  wrote:

> Can we find a more robust way to support this?
>
> Both flink-shaded, any relocation pattern and
> JsonRowDataSerializationSchema are Flink internals that users shouldn't
> use/rely on.
>
> On 13/06/2022 12:26, Qingsheng Ren wrote:
> > Hi Andrew,
> >
> > This is indeed a tricky case since Flink doesn't provide non-shaded
> > JAR for flink-json. One hacky solution in my mind is like:
> >
> > 1. Create a module let's say "wikimedia-event-utilities-shaded" that
> > relocates Jackson in the same way and uses the same Jackson version as
> > flink-shaded-jackson
> > 2. Deploy the module to a local or remote Maven repository
> > 3. Let your custom format depend on the
> > "wikimedia-event-utilities-shaded" module, then all Jackson
> > dependencies are relocated in the same way.
> >
> > Another solution is that you can serialize then deserialize the
> > "different" ObjectNode to do the conversion but this sacrifices the
> > performance.
> >
> > Hope this could be helpful!
> >
> > Best regards,
> >
> > Qingsheng
> >
> > On Thu, Jun 9, 2022 at 8:29 PM Andrew Otto  wrote:
> >> Hi all,
> >>
> >> I'm working on an integration project trying to write some library code
> that will allow us at the Wikimedia Foundation to use Flink with our 'Event
> Platform'.  Specifically, I'm trying to write a reusable step near the end
> of a pipeline that will ensure our JSON events satisfy some criteria before
> producing them to Kafka.  Details here.
> >>
> >> I'm experimenting with writing my own custom format to do this.  But
> all I really need to do is override JsonRowDataSerializationSchema's
> serialize method and augment and validate the ObjectNode before it is
> serialized to byte[].
> >>
> >> I'm running into an issue where the ObjectNode that is used by Flink
> here is the shaded one:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,
> whereas the WMF code I want to use to augment the ObjectNode is using a
> regular non shaded one.  I can't pass the shaded ObjectNode instance to a
> function that takes a non shaded one, and I can't cast the shaded
> ObjectNode to non shaded either.
> >>
> >> My Q is: is there a way to extend Flink APIs that use shaded
> dependencies?  I suppose I could copy/paste the whole of the "json" format
> code that I need into my project and just make it my own, but this feels
> quite obnoxious.
> >>
> >> Thank you!
> >> -Andrew Otto
> >>   Wikimedia Foundation
> >>
> >>
>
>


Sporadic issues with savepoint status lookup in Flink 1.15

2022-06-16 Thread Peter Westermann
We recently upgraded one of our Flink clusters to version 1.15.0 and are now 
seeing sporadic issues when stopping a job with a savepoint via the REST API. 
This happens for /jobs/:jobid/savepoints and /jobs/:jobid/stop:
The job finishes with a savepoint but the triggerId returned from the REST API 
seems to be invalid. Any lookups via /jobs/:jobid/savepoints/:triggerid fail 
with a 404 and the following error:

org.apache.flink.runtime.rest.handler.RestHandlerException: There is no 
savepoint operation with triggerId=cee5054245598efb42245b3046a6ae75 for job 
0995a9461f0178294ea71c9accbe750c


Peter Westermann
Analytics Software Architect
[cidimage001.jpg@01D78D4C.C00AC080]
peter.westerm...@genesys.com
[cidimage001.jpg@01D78D4C.C00AC080]
[cidimage002.jpg@01D78D4C.C00AC080]



Re: flink 1.10.1 flinkui 取消任务 任务一直处于cancalling中 很长时间才取消掉

2022-06-16 Thread Weihua Hu
Hi,
建议看一下 JobManager 的日志,检查下再 Canceling 时作业是在什么状态。也检查下 Task 是否使用 UDF ,在 UDF
close 时是否有耗时的操作。

Best,
Weihua


On Thu, Jun 16, 2022 at 3:11 PM 沈保源 <757434...@qq.com.invalid> wrote:

> flink 1.10.1 flinkui 取消任务 任务一直处于cancalling中 很长时间才取消掉


Re: 关于PyFlink的开发环境问题

2022-06-16 Thread Weihua Hu
Hi,

看起来是依赖缺失问题,建议参考官方教程文档跑通一个简单的示例

Table API:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table_api_tutorial/
DataStreamAPI:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/datastream_tutorial/

Best,
Weihua


On Wed, Jun 15, 2022 at 8:35 PM Xingbo Huang  wrote:

> Hi,
>
> 你可以执行 pip install -r flink-python/dev/dev-requirements.txt 安装开发环境所需要的依赖
>
> Best,
> Xingbo
>
> 张 兴博  于2022年6月15日周三 10:20写道:
>
> > 您好:
> >我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为:
> >
> > Traceback (most recent call last):
> >   File "/root/.py", line 6, in 
> > s_env = StreamExecutionEnvironment.get_execution_environment()
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> > line 805, in get_execution_environment
> > return StreamExecutionEnvironment(j_stream_exection_environment)
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> > line 62, in __init__
> > self._open()
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> > line 973, in _open
> > startup_loopback_server()
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> > line 963, in startup_loopback_server
> > from pyflink.fn_execution.beam.beam_worker_pool_service import \
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/pyflink/fn_execution/beam/beam_worker_pool_service.py",
> > line 31, in 
> > from apache_beam.options.pipeline_options import DebugOptions
> >   File "/usr/local/lib/python3.8/dist-packages/apache_beam/__init__.py",
> > line 96, in 
> > from apache_beam import io
> >   File
> > "/usr/local/lib/python3.8/dist-packages/apache_beam/io/__init__.py", line
> > 23, in 
> > from apache_beam.io.avroio import *
> >   File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/avroio.py",
> > line 63, in 
> > from apache_beam.io import filebasedsink
> >   File
> > "/usr/local/lib/python3.8/dist-packages/apache_beam/io/filebasedsink.py",
> > line 36, in 
> > from apache_beam.io import iobase
> >   File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/iobase.py",
> > line 57, in 
> > from apache_beam.transforms import Impulse
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/__init__.py",
> > line 25, in 
> > from apache_beam.transforms.external import *
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/external.py",
> > line 45, in 
> > from apache_beam.runners import pipeline_context
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/pipeline_context.py",
> > line 51, in 
> > from apache_beam.transforms import environments
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/environments.py",
> > line 54, in 
> > from apache_beam.runners.portability.sdk_container_builder import
> > SdkContainerImageBuilder
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/sdk_container_builder.py",
> > line 44, in 
> > from apache_beam.internal.gcp.auth import get_service_credentials
> >   File
> >
> "/usr/local/lib/python3.8/dist-packages/apache_beam/internal/gcp/auth.py",
> > line 28, in 
> > from oauth2client.client import GoogleCredentials
> >   File "/usr/local/lib/python3.8/dist-packages/oauth2client/client.py",
> > line 39, in 
> > from oauth2client import transport
> >   File
> "/usr/local/lib/python3.8/dist-packages/oauth2client/transport.py",
> > line 17, in 
> > import httplib2
> > ModuleNotFoundError: No module named 'httplib2'
> >
> > 通过查询发现在python新版中,httplib2已经不用了?采用的名字是http.client?
> > 我的python版本为3.8.10,jdk为openjdk 11.0.15(另一台为java 1.8)
> > 我想知道这是什么原因造成的呢?怎么能解决这个问题呢?
> >
> > 感谢您在百忙之中解答我的问题,万分感谢~!
> >
> > 发送自 Windows 11 版邮件应用
> >
> >
>


Re: flink 1.10.1 cep 使用flollowdby 同一条事件 模式1会匹配多次

2022-06-16 Thread yue ma
你好,可以把你的 pattern 和 输入数据,输出结果都贴出来看看。

沈保源 <757434...@qq.com.invalid> 于2022年6月16日周四 15:08写道:

> flink 1.10.1 cep 使用flollowdby 同一条事件 模式1会匹配多次 ,在debug情况下发现
> 不知道为什么


Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread John Tipper
Hi Xingbo,

Yes, there are a number of temporary views being created, where each is being 
created using SQL (CREATE TEMPORARY VIEW ...) rather than explicit calls to the 
Table and DataStream APIs.

Is this a good pattern or are there caveats I should be aware of please?

Many thanks,

John



From: Xingbo Huang 
Sent: 16 June 2022 12:34
To: John Tipper 
Cc: user@flink.apache.org 
Subject: Re: The configured managed memory fraction for Python worker process 
must be within (0, 1], was: %s

Hi John,

Does your job logic include conversion between Table and DataStream? For 
example, methods such as `create_temporary_view(path: str, data_stream: 
DataStream): -> Table`  are used.

Best,
Xingbo

John Tipper mailto:john_tip...@hotmail.com>> 
于2022年6月16日周四 18:31写道:
Hi Xingbo,

I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is 
running inside Kinesis on AWS so I cannot change the version.

Many thanks,

John

Sent from my iPhone

On 16 Jun 2022, at 10:37, Xingbo Huang 
mailto:hxbks...@gmail.com>> wrote:


Hi John,

Could you provide the code snippet and the version of pyflink you used?

Best,
Xingbo


John Tipper mailto:john_tip...@hotmail.com>> 
于2022年6月16日周四 17:05写道:
Hi all,

I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my 
code uses a Python UDF.  I can't share my code but the test case is similar to 
the code here: 
https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
  When I have some simple SQL everything is fine. When I add a more complex 
query I get an error, which looks like it's memory related.


java.lang.IllegalArgumentException: The configured managed memory fraction
for Python worker process must be within (0, 1], was: %s. It may be because
the consumer type "Python" was missing or set to 0 for the config option
"taskmanager.memory.managed.consumer-weights".0.0


In my test case setUp(), I try to set that value like this, but it seems to 
have no effect:

self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
 "PYTHON:30")


Am I not setting it correctly, or is there something else I need to do to fix 
this error?

Many thanks,

John



Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread Xingbo Huang
Hi John,

Does your job logic include conversion between Table and DataStream? For
example, methods such as `create_temporary_view(path: str, data_stream:
DataStream): -> Table`  are used.

Best,
Xingbo

John Tipper  于2022年6月16日周四 18:31写道:

> Hi Xingbo,
>
> I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is
> running inside Kinesis on AWS so I cannot change the version.
>
> Many thanks,
>
> John
>
> Sent from my iPhone
>
> On 16 Jun 2022, at 10:37, Xingbo Huang  wrote:
>
> 
> Hi John,
>
> Could you provide the code snippet and the version of pyflink you used?
>
> Best,
> Xingbo
>
>
> John Tipper  于2022年6月16日周四 17:05写道:
>
>> Hi all,
>>
>> I'm trying to run a PyFlink unit test to test some PyFlink SQL and where
>> my code uses a Python UDF.  I can't share my code but the test case is
>> similar to the code here:
>> https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
>>   When
>> I have some simple SQL everything is fine. When I add a more complex query
>> I get an error, which looks like it's memory related.
>>
>> java.lang.IllegalArgumentException: The configured managed memory fraction
>> for Python worker process must be within (0, 1], was: %s. It may be because
>> the consumer type "Python" was missing or set to 0 for the config option
>> "taskmanager.memory.managed.consumer-weights".0.0
>>
>>
>>
>> In my test case setUp(), I try to set that value like this, but it seems
>> to have no effect:
>>
>> self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
>> "PYTHON:30")
>>
>>
>> Am I not setting it correctly, or is there something else I need to do to
>> fix this error?
>>
>> Many thanks,
>>
>> John
>>
>>


Re: Flink Kubernetes Operator with K8S + Istio + mTLS - port definitions

2022-06-16 Thread Yang Wang
Could you please have a try with high availability enabled[1]?

If HA enabled, the internal jobmanager rpc service will not be created.
Instead, the TaskManager retrieves the JobManager address via HA services
and connects to it via pod ip.

[1].
https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-checkpoint-ha.yaml


Best,
Yang

Elisha, Moshe (Nokia - IL/Kfar Sava)  于2022年6月16日周四
15:24写道:

> Hello,
>
>
>
> We are launching Flink deployments using the Flink Kubernetes Operator
> 
> on a Kubernetes cluster with Istio and mTLS enabled.
>
>
>
> We found that the TaskManager is unable to communicate with the JobManager
> on the jobmanager-rpc port:
>
>
>
> 2022-06-15 15:25:40,508 WARN  akka.remote.ReliableDeliverySupervisor
>   [] - Association with remote system
> [akka.tcp://flink@amf-events-to-inference-and-central.nwdaf-edge:6123]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@amf-events-to-inference-and-central.nwdaf-edge:6123]]
> Caused by: [The remote system explicitly disassociated (reason unknown).]
>
>
>
> The reason for the issue is that the JobManager service port definitions are
> not following the Istio guidelines
> https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/
> (see example below).
>
>
>
> We believe a change to the default port definitions is needed but for now,
> is there an immediate action we can take to work around the issue? Perhaps
> overriding the default port definitions somehow?
>
>
>
> Thanks.
>
>
>
>
>
> flink-kubernetes-operator 1.0.0
>
> Flink 1.14-java11
>
> Kubernetes v1.19.5
>
> Istio 1.7.6
>
>
>
>
>
> # k get service inference-results-to-analytics-engine -o yaml
>
> apiVersion: v1
>
> kind: Service
>
> metadata:
>
> ...
>
>   labels:
>
> app: inference-results-to-analytics-engine
>
> type: flink-native-kubernetes
>
>   name: inference-results-to-analytics-engine
>
> spec:
>
>   clusterIP: None
>
>   ports:
>
>   - name: jobmanager-rpc # should start with “tcp-“ or add "appProtocol"
> property
>
> port: 6123
>
> protocol: TCP
>
> targetPort: 6123
>
>   - name: blobserver # should start with "tcp-" or add "appProtocol"
> property
>
> port: 6124
>
> protocol: TCP
>
> targetPort: 6124
>
>   selector:
>
> app: inference-results-to-analytics-engine
>
> component: jobmanager
>
> type: flink-native-kubernetes
>
>   sessionAffinity: None
>
>   type: ClusterIP
>
> status:
>
>   loadBalancer: {}
>
>
>


Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread John Tipper
Hi Xingbo,

I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is 
running inside Kinesis on AWS so I cannot change the version.

Many thanks,

John

Sent from my iPhone

On 16 Jun 2022, at 10:37, Xingbo Huang  wrote:


Hi John,

Could you provide the code snippet and the version of pyflink you used?

Best,
Xingbo


John Tipper mailto:john_tip...@hotmail.com>> 
于2022年6月16日周四 17:05写道:
Hi all,

I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my 
code uses a Python UDF.  I can't share my code but the test case is similar to 
the code here: 
https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
  When I have some simple SQL everything is fine. When I add a more complex 
query I get an error, which looks like it's memory related.


java.lang.IllegalArgumentException: The configured managed memory fraction
for Python worker process must be within (0, 1], was: %s. It may be because
the consumer type "Python" was missing or set to 0 for the config option
"taskmanager.memory.managed.consumer-weights".0.0


In my test case setUp(), I try to set that value like this, but it seems to 
have no effect:

self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
 "PYTHON:30")


Am I not setting it correctly, or is there something else I need to do to fix 
this error?

Many thanks,

John



Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread Xingbo Huang
Hi John,

Could you provide the code snippet and the version of pyflink you used?

Best,
Xingbo


John Tipper  于2022年6月16日周四 17:05写道:

> Hi all,
>
> I'm trying to run a PyFlink unit test to test some PyFlink SQL and where
> my code uses a Python UDF.  I can't share my code but the test case is
> similar to the code here:
> https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
>   When
> I have some simple SQL everything is fine. When I add a more complex query
> I get an error, which looks like it's memory related.
>
> java.lang.IllegalArgumentException: The configured managed memory fraction
> for Python worker process must be within (0, 1], was: %s. It may be because
> the consumer type "Python" was missing or set to 0 for the config option
> "taskmanager.memory.managed.consumer-weights".0.0
>
>
>
> In my test case setUp(), I try to set that value like this, but it seems
> to have no effect:
>
> self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
> "PYTHON:30")
>
>
> Am I not setting it correctly, or is there something else I need to do to
> fix this error?
>
> Many thanks,
>
> John
>
>


The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread John Tipper
Hi all,

I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my 
code uses a Python UDF.  I can't share my code but the test case is similar to 
the code here: 
https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
  When I have some simple SQL everything is fine. When I add a more complex 
query I get an error, which looks like it's memory related.


java.lang.IllegalArgumentException: The configured managed memory fraction
for Python worker process must be within (0, 1], was: %s. It may be because
the consumer type "Python" was missing or set to 0 for the config option
"taskmanager.memory.managed.consumer-weights".0.0


In my test case setUp(), I try to set that value like this, but it seems to 
have no effect:

self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
 "PYTHON:30")


Am I not setting it correctly, or is there something else I need to do to fix 
this error?

Many thanks,

John



Flink Kubernetes Operator with K8S + Istio + mTLS - port definitions

2022-06-16 Thread Elisha, Moshe (Nokia - IL/Kfar Sava)
Hello,

We are launching Flink deployments using the Flink Kubernetes 
Operator
 on a Kubernetes cluster with Istio and mTLS enabled.

We found that the TaskManager is unable to communicate with the JobManager on 
the jobmanager-rpc port:


2022-06-15 15:25:40,508 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink@amf-events-to-inference-and-central.nwdaf-edge:6123] has 
failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@amf-events-to-inference-and-central.nwdaf-edge:6123]] Caused 
by: [The remote system explicitly disassociated (reason unknown).]

The reason for the issue is that the JobManager service port definitions are 
not following the Istio guidelines 
https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/
 (see example below).

We believe a change to the default port definitions is needed but for now, is 
there an immediate action we can take to work around the issue? Perhaps 
overriding the default port definitions somehow?

Thanks.


flink-kubernetes-operator 1.0.0
Flink 1.14-java11
Kubernetes v1.19.5
Istio 1.7.6


# k get service inference-results-to-analytics-engine -o yaml
apiVersion: v1
kind: Service
metadata:
...
  labels:
app: inference-results-to-analytics-engine
type: flink-native-kubernetes
  name: inference-results-to-analytics-engine
spec:
  clusterIP: None
  ports:
  - name: jobmanager-rpc # should start with “tcp-“ or add "appProtocol" 
property
port: 6123
protocol: TCP
targetPort: 6123
  - name: blobserver # should start with "tcp-" or add "appProtocol" property
port: 6124
protocol: TCP
targetPort: 6124
  selector:
app: inference-results-to-analytics-engine
component: jobmanager
type: flink-native-kubernetes
  sessionAffinity: None
  type: ClusterIP
status:
  loadBalancer: {}



flink 1.10.1 flinkui ???????? ????????????cancalling?? ????????????????

2022-06-16 Thread ??????
flink 1.10.1 flinkui  cancalling?? 

flink 1.10.1 cep ????flollowdby ?????????? ????1??????????

2022-06-16 Thread ??????
flink 1.10.1 cep flollowdby ?? 1?? 
debug??