Re: Session Windows - not working as expected

2021-05-05 Thread Swagat Mishra
Yes customer generator is setting the event timestamp correctly like I see
below. I debugged and found that the events are getting late, so never
executed. i.e,. in the window operator the method  this.isWindowLate(
actualWindow) is getting executed to false for the rest of the events
except the first, hence the events are getting skipped, not able to figure
out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like

customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector) Customer::getIdentifier)

.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());


*Customer generator looks like:*

while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5],*
LocalTime.now()*, 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(1);
}
}


Custom Watermark generator has this:

.
@Override
public void onEvent(Customer customer, long l, WatermarkOutput
watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp,
customer.getEventTime()  );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));

}
.

trigger looks like:

--


 @Override
public TriggerResult onElement(Customer customer, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
{
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " +
customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());

triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow,
TriggerContext triggerContext) {
//if (timeWindow.maxTimestamp() >
triggerContext.getCurrentWatermark()) {
//triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
//return TriggerResult.CONTINUE;
//}

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}





On Thu, May 6, 2021 at 12:02 PM Arvid Heise  wrote:

> Hi,
>
> Is your CustomerGenerator setting the event timestamp correctly? Are your
> evictors evicting too early?
>
> You can try to add some debug output into the watermark assigner and see
> if it's indeed progressing as expected.
>
> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra  wrote:
>
>> This seems to be working fine in processing time but doesn't work in
>> event time. Is there an issue with the way the water mark is defined or do
>> we need to set up timers?
>>
>> Please advise.
>>
>>
>> WORKING:
>>
>> customerStream
>> .keyBy((KeySelector) Customer::getIdentifier)
>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>> .process(new CustomAggregateFunction());
>>
>>
>> NOT WORKING:
>>
>> customerStream
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>> WaterMarkAssigner()))
>> .keyBy(Customer::getIdentifier)
>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(EventTimeTrigger.create())
>> .evictor(new CustomerEvictor())
>> .process(new CustomAggregateFunction())
>> .print();
>>
>>
>> On Thu, May 6, 2021 at 1:53 AM Sam  wrote:
>>
>>> Adding the code for CustomWatermarkGenerator
>>>
>>> .
>>> @Override
>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>> watermarkOutput) {
>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>> customer.getEventTime()  );
>>> }
>>>
>>> @Override
>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>
>>> }
>>> .
>>>
>>>
>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra  wrote:
>>>
 Hi,

 Bit of background, I have a stream of customers who have purchased some
 product, reading these transactions on a KAFKA topic. I want to aggregate
 the number of products the customer has purchased in a particular duration
 ( say 10 seconds ) and write to a sink.

 I am using session windows to achieve the above.


Re: Session Windows - not working as expected

2021-05-05 Thread Arvid Heise
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your
evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if
it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra  wrote:

> This seems to be working fine in processing time but doesn't work in event
> time. Is there an issue with the way the water mark is defined or do we
> need to set up timers?
>
> Please advise.
>
>
> WORKING:
>
> customerStream
> .keyBy((KeySelector) Customer::getIdentifier)
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
> .process(new CustomAggregateFunction());
>
>
> NOT WORKING:
>
> customerStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
> WaterMarkAssigner()))
> .keyBy(Customer::getIdentifier)
> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
> .trigger(EventTimeTrigger.create())
> .evictor(new CustomerEvictor())
> .process(new CustomAggregateFunction())
> .print();
>
>
> On Thu, May 6, 2021 at 1:53 AM Sam  wrote:
>
>> Adding the code for CustomWatermarkGenerator
>>
>> .
>> @Override
>> public void onEvent(Customer customer, long l, WatermarkOutput 
>> watermarkOutput) {
>> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>> customer.getEventTime()  );
>> }
>>
>> @Override
>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>
>> }
>> .
>>
>>
>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra  wrote:
>>
>>> Hi,
>>>
>>> Bit of background, I have a stream of customers who have purchased some
>>> product, reading these transactions on a KAFKA topic. I want to aggregate
>>> the number of products the customer has purchased in a particular duration
>>> ( say 10 seconds ) and write to a sink.
>>>
>>> I am using session windows to achieve the above.
>>>
>>> For test purposes, i have mocked  up a customer stream and executed
>>> session windows like below.
>>>
>>> StreamExecutionEnvironment environment = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream customerStream = environment.addSource( new 
>>> CustomerGenerator() );
>>>
>>> customerStream
>>> 
>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>> WaterMarkAssigner()))
>>> .keyBy(Customer::getIdentifier)
>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>> .trigger(EventTimeTrigger.create())
>>> .evictor(new CustomerEvictor())
>>> .process(new CustomAggregateFunction())
>>> .print();
>>>
>>> My watermark assigner looks like:
>>>
>>> public class WaterMarkAssigner implements WatermarkStrategy {
>>> static final Logger logger = 
>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>
>>> @Override
>>> public WatermarkGenerator 
>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>> return new CustomWatermarkGenerator();
>>> }
>>> }
>>>
>>> I notice that the evictor, and aggregation functions are getting called 
>>> only once for the first customer in the stream.
>>>
>>> The data stream is generating customers at 1 seconds interval and there are 
>>> 5 customer keys for which it's generating transactions.
>>>
>>> Am I doing something wrong with the above?
>>>
>>> I want to be able to capture the event on each transaction getting added 
>>> and removed from the window so that I can perform the aggregation.
>>>
>>> please advise.
>>>
>>>
>>>
>>>
>>>


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-05 Thread Yun Tang
Thanks for Dawid and Guowei's great work, and thanks for everyone involved for 
this release.

Best
Yun Tang

From: Xintong Song 
Sent: Thursday, May 6, 2021 12:08
To: user ; dev 
Subject: Re: [ANNOUNCE] Apache Flink 1.13.0 released

Thanks Dawid & Guowei as the release managers, and everyone who has
contributed to this release.


Thank you~

Xintong Song



On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:

> Thanks Dawid & Guowei for the great work, thanks everyone involved.
>
> Best,
> Leonard
>
> 在 2021年5月5日,17:12,Theo Diefenthal  写道:
>
> Thanks for managing the release. +1. I like the focus on improving
> operations with this version.
>
> --
> *Von: *"Matthias Pohl" 
> *An: *"Etienne Chauchot" 
> *CC: *"dev" , "Dawid Wysakowicz" <
> dwysakow...@apache.org>, "user" ,
> annou...@apache.org
> *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
> *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
>
> Yes, thanks for managing the release, Dawid & Guowei! +1
>
> On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
> wrote:
>
>> Congrats to everyone involved !
>>
>> Best
>>
>> Etienne
>> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.13.0.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349287
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Guowei & Dawid
>>
>>
>
>


[Avro] Re: TypeSerializer Example

2021-05-05 Thread Sandeep khanzode
Hi,

Is there a working example somewhere that I can refer for writing Avro entities 
in Flink state as well as Avro serializaition in KafkaConsumer/Producer?

I tried to use Avro entities directly but there is an issue beyond Apache Avro 
1.7.7 in that the entities created have a serialVersionUid. So when I tried to 
test schema evolution by adding a member, there was the java serialization 
issue saying the two generated classes’ serialVersionUids do not match i.e. the 
one stored in the state and the one being used with the new member variable now.

Is there any configuration that overrides this?

Request you to please provide some references of samples. Thanks. 

Thanks,
Sandeep 


> On 30-Apr-2021, at 5:00 PM, Timo Walther  wrote:
> 
> I also found these pages:
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
> 
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro
> 
> I hope this helps.
> 
> Regards,
> Timo
> 
> 
> On 30.04.21 13:20, Sandeep khanzode wrote:
>> Hi Timo,
>> Thanks! I will take a look at the links.
>> Can you please share if you have any simple (or complex) example of Avro 
>> state data structures?
>> Thanks,
>> Sandeep
>>> On 30-Apr-2021, at 4:46 PM, Timo Walther  wrote:
>>> 
>>> Hi Sandeep,
>>> 
>>> did you have a chance to look at this documentation page?
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/custom_serialization.html
>>> 
>>> The interfaces might not be easy to implement but are very powerful to 
>>> address compatibility issues. You can also look into Flink serializers for 
>>> some examples:
>>> 
>>> https://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime
>>> 
>>> Esp:
>>> 
>>> https://github.com/apache/flink/blob/89c6c03660a88a648bbd13b4e6696124fe46d013/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L599
>>> 
>>> For the POJO logic.
>>> 
>>> By the way, usually we recommend Avro for state data structures if schema 
>>> evolution is a topic.
>>> 
>>> Regards,
>>> Timo
>>> 
>>> 
>>> 
>>> On 29.04.21 18:10, Sandeep khanzode wrote:
 Hello,
 Is there a working example of a TypeSerializer for a Java type stored in 
 the State?
 My requirement is that I should be able to store the Java POJO entity in 
 the MapState. The state is backed by RocksDBBackend.
 If I update the entity with a new member variable, I am unable to 
 deserialise the state into the new entity.
 I checked this link.
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
  
 
 It does mention that the POJO type is special based on the rules. Does 
 that mean that I can add or remove member variables for the POJO? I have 
 been unable to get it to work.
 Thanks,
 Sandeep
>>> 
> 



Read kafka offsets from checkpoint - state processor

2021-05-05 Thread bat man
Hi Users,

Is there a way that Flink 1.9 the checkpointed data can be read using the
state processor api.
Docs [1] says - When reading operator state, users specify the operator
uid, the state name, and the type information.

What is the type for the kafka operator, which needs to be specified while
reading the state.

[1] -
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

Thanks,
Hemant


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-05 Thread Xintong Song
Thanks Dawid & Guowei as the release managers, and everyone who has
contributed to this release.


Thank you~

Xintong Song



On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:

> Thanks Dawid & Guowei for the great work, thanks everyone involved.
>
> Best,
> Leonard
>
> 在 2021年5月5日,17:12,Theo Diefenthal  写道:
>
> Thanks for managing the release. +1. I like the focus on improving
> operations with this version.
>
> --
> *Von: *"Matthias Pohl" 
> *An: *"Etienne Chauchot" 
> *CC: *"dev" , "Dawid Wysakowicz" <
> dwysakow...@apache.org>, "user" ,
> annou...@apache.org
> *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
> *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
>
> Yes, thanks for managing the release, Dawid & Guowei! +1
>
> On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
> wrote:
>
>> Congrats to everyone involved !
>>
>> Best
>>
>> Etienne
>> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.13.0.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349287
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Guowei & Dawid
>>
>>
>
>


Re: savepoint command in code

2021-05-05 Thread Yun Tang
Hi,

You could trigger savepoint via rest API [1] or refer to SavepointITCase[2] to 
see how to trigger savepoint in test code.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints
[2] 
https://github.com/apache/flink/blob/c688bf3c83e72155ccf5d04fe397b7c0a1274fd1/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java#L438

Best
Yun Tang

From: Abdullah bin Omar 
Sent: Tuesday, May 4, 2021 11:50
To: user@flink.apache.org 
Subject: savepoint command in code

Hello,

I am trying to use the savepoint command (./bin/flink savepoint jobid) in the 
code instead of doing it manually in the terminal. The jobid can get using 
getjobid(). The problem is to define the path ./bin/flink  —  it can not be 
shown as a directory (probably because of a unix executable file).

Is there a way to define the path (./bin/flink) in the code? or, is there any 
function to get the savepoint from code instead of manual command?

Thank you




Re: How to tell between a local mode run vs. remote mode run?

2021-05-05 Thread Yik San Chan
Hi Xingbo,

Thank you!

On Thu, May 6, 2021 at 10:01 AM Xingbo Huang  wrote:

> Hi Yik San,
> You can check whether the execution environment used is
> `LocalStreamEnvironment` and you can get the class object corresponding to
> the corresponding java object through py4j in PyFlink. You can take a look
> at the example I wrote below, I hope it will help you
> ```
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.java_gateway import get_gateway
> from py4j.java_gateway import get_java_class
>
>
> def test():
> env = StreamExecutionEnvironment.get_execution_environment()
> table_env = StreamTableEnvironment.create(
> env, environment_settings=EnvironmentSettings.new_instance()
> .in_streaming_mode().use_blink_planner().build())
> gateway = get_gateway()
>
> # get the execution environment class
> env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass()
>
> # get the LocalStreamEnvironment class
> local_stream_environment_class = get_java_class(
>
> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment)
> print(env_class == local_stream_environment_class)
>
>
> if __name__ == '__main__':
> test()
>
> ```
>
> Yik San Chan  于2021年5月5日周三 下午12:04写道:
>
>> Hi,
>>
>> According to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/faq/
>>
>> > When executing jobs in mini cluster(e.g. when executing jobs in IDE)
>> ... please remember to explicitly wait for the job execution to finish as
>> these APIs are asynchronous.
>>
>> I hope my program will be able to run in both local mode as well as in
>> remote mode. Therefore I hope to do something like:
>>
>> ```python
>> result = ...
>> if local_mode:
>>   result.wait()
>> else:
>>   result
>> ```
>>
>> Is there a way to tell if the program is run under local mode vs. remote
>> mode?
>>
>> Best,
>> Yik San
>>
>


Re: Define rowtime on intermediate table field

2021-05-05 Thread Yun Gao
Hi Sumeet,

I think you might first convert the table back to the DataStream [1],
then define the timestamp and watermark with 
`assignTimestampsAndWatermarks(...)`, 
and then convert it back to table[2].

Best,
Yun

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/common/#convert-a-table-into-a-datastream
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#during-datastream-to-table-conversion



 --Original Mail --
Sender:Sumeet Malhotra 
Send Date:Tue May 4 16:32:10 2021
Recipients:user 
Subject:Define rowtime on intermediate table field

Hi,

My use case involves reading raw data records from Kafka and processing them. 
The records are coming from a database, where a periodic job reads new rows, 
packages them into a single JSON object (as described below) and writes the 
entire record to Kafka.

{
'id': 'some_id',
'key_a': 'value_a',
'key_b': 'value_b',
'result': {
'columns': [
'col_a',
'col_b',
'col_c',
'col_d'
],
'rows': [
['2021-05-04T05:23:13.953610Z', '655361', '8013', '0'],
['2021-05-04T05:23:13.953610Z', '655362', '4000', '456'],
['2021-05-04T05:23:13.953610Z', '655363', '2', '562'],
...
...
]
}
}

As can be seen, the row time is actually embedded in the `result` object.

What I'm doing at the moment is to run this data through a user defined table 
function, which parses the `result` object as a string, and emits multiple rows 
that include the timestamp field. This is working fine.

In the next step, I would want to perform windowing on this transformed data. 
That requires defining the event time attribute along with the watermark. As I 
understand, this can be done either during the initial table DDL definition or 
during conversion to a datastream.

Since I extract the timestamp value only after reading from Kafka, how can I 
define an event time attribute on the intermediate table that's basically a 
result of the user defined table function?

The only solution I can think of at the moment, is to write the intermediate 
table back to Kafka, and then create a new consumer that reads from Kafka, 
where I can define the event time attribute as part of its DDL. This most 
likely won't be good performance wise. I'm looking at any other way, I can 
define event time on results of my user defined table function?

Thanks in advance,
Sumeet



Re: Flink : Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

2021-05-05 Thread Yun Gao
Hi Ragini,

How did you submit your job ? The exception here is mostly cuased
that the `flink-client` is not included in the classpath at the client side.
If the job is submitted via the flink cli, namely `flink run -c  xx.jar`,
it should be included by default, and if some programming way is used, then
the flink-client must be included in the dependency.

Best,
Yun


--
Sender:Ragini Manjaiah
Date:2021/05/04 17:25:30
Recipient:user
Theme:Flink : Caused by: java.lang.IllegalStateException: No ExecutorFactory 
found to execute the application.

Hi Team,
I am trying to submit a flink job of version 1.11.3 . The actual application is 
developed in flink 1.8.1.
Since the Hadoop cluster is 3.2.0 apache I downloaded flink 1.11.3 
(flink-1.11.3-bin-scala_2.11.tgz) and tried  to submit the job.
while submitting facing the below mentioned  exception . I have set the HADOOP 
parameters :

export HADOOP_CONF_DIR=/etc/hadoop/conf
export HADOOP_CLASSPATH=`hadoop classpath`

Is there any changes I need to do it the pom file to overcome this 


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: No ExecutorFactory found to execute the application.
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
 at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute 
the application.
 at 
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1809)
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
 at org.sapphire.appspayload.StreamingJob.main(StreamingJob.java:214)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 


Re: Interacting with flink-jobmanager via CLI in separate pod

2021-05-05 Thread Yang Wang
It seems that you are using the NodePort to expose the rest service. If you
only want to access the Flink UI/rest in the K8s cluster,
then I would suggest to set "kubernetes.rest-service.exposed.type" to
"ClusterIP". Because we are using the K8s master node to
construct the JobManager rest endpoint when using NodePort. Sometime, it is
not accessible due to firewall.

Best,
Yang

Robert Metzger  于2021年5月6日周四 上午2:08写道:

> Okay, it appears to have resolved 10.43.0.1:30081 as the address of the
> JobManager. Most likely, the container can not access this address. Can you
> validate this from within the container?
>
> If I understand the Flink documentation correctly, you should be able to
> manually specify rest.address, rest.port for the JobManager address. If
> you can manually figure out an address to the JobManager service, and pass
> it to Flink, the submission should work.
>
> On Wed, May 5, 2021 at 7:15 PM Robert Cullen 
> wrote:
>
>> Thanks for the reply. Here is an updated exception with DEBUG on. It
>> appears to be timing out:
>>
>> 2021-05-05 16:56:19,700 DEBUG 
>> org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory [] - Setting 
>> namespace of Kubernetes client to cmdaa
>> 2021-05-05 16:56:19,700 DEBUG 
>> org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory [] - Setting 
>> max concurrent requests of Kubernetes client to 64
>> 2021-05-05 16:56:20,176 INFO  
>> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
>> flink cluster flink-jobmanager successfully, JobManager Web Interface: 
>> http://10.43.0.1:30081
>> 2021-05-05 16:56:20,239 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] - Waiting for response...
>> 2021-05-05 17:02:09,605 ERROR org.apache.flink.client.cli.CliFrontend
>>   [] - Error while running the command.
>> org.apache.flink.util.FlinkException: Failed to retrieve job list.
>> at 
>> org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:449) 
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:430) 
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427) 
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060) 
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>  [flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
>> [flink-dist_2.12-1.13.0.jar:1.13.0]
>> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
>> Could not complete the operation. Number of retries has been exhausted.
>> at 
>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>  ~[?:1.8.0_292]
>> at 
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>  ~[?:1.8.0_292]
>> at 
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>  ~[?:1.8.0_292]
>> at 
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>>  ~[?:1.8.0_292]
>> at 
>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at 
>

Re: Protobuf support with Flink SQL and Kafka Connector

2021-05-05 Thread Jark Wu
Hi Shipeng,

Matthias is correct. FLINK-18202 should address this topic. There is
already a pull request there which is in good shape. You can also download
the PR and build the format jar yourself, and then it should work with
Flink 1.12.

Best,
Jark

On Mon, 3 May 2021 at 21:41, Matthias Pohl  wrote:

> Hi Shipeng,
> it looks like there is an open Jira issue FLINK-18202 [1] addressing this
> topic. You might want to follow up on that one. I'm adding Timo and Jark to
> this thread. They might have more insights.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-18202
>
> On Sat, May 1, 2021 at 2:00 AM Fuyao Li  wrote:
>
>> Hello Shipeng,
>>
>>
>>
>> I am not an expert in Flink, just want to share some of my thoughts.
>> Maybe others can give you better ideas.
>>
>> I think there is no directly available Protobuf support for Flink SQL.
>> However, you can write a user-defined format to support it [1].
>>
>> If you use DataStream API, you can leverage Kryo Serializer to serialize
>> and deserialize with Protobuf format. [2]. There is an out-of-box
>> integration for Protobuf here. You will need to convert it to Flink SQL
>> after data ingestion.
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#user-defined-sources-sinks
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>>
>>
>>
>> Best,
>>
>> Fuyao
>>
>>
>>
>>
>>
>> *From: *Shipeng Xie 
>> *Date: *Friday, April 30, 2021 at 14:58
>> *To: *user@flink.apache.org 
>> *Subject: *[External] : Protobuf support with Flink SQL and Kafka
>> Connector
>>
>> Hi,
>>
>>
>>
>> In
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/
>> ,
>> it does not mention protobuf format. Does Flink SQL support protobuf
>> format? If not, is there any plan to support it in the near future?
>>
>> Thanks!
>>
>


Re: How to tell between a local mode run vs. remote mode run?

2021-05-05 Thread Xingbo Huang
Hi Yik San,
You can check whether the execution environment used is
`LocalStreamEnvironment` and you can get the class object corresponding to
the corresponding java object through py4j in PyFlink. You can take a look
at the example I wrote below, I hope it will help you
```
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway
from py4j.java_gateway import get_java_class


def test():
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(
env, environment_settings=EnvironmentSettings.new_instance()
.in_streaming_mode().use_blink_planner().build())
gateway = get_gateway()

# get the execution environment class
env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass()

# get the LocalStreamEnvironment class
local_stream_environment_class = get_java_class(

gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment)
print(env_class == local_stream_environment_class)


if __name__ == '__main__':
test()

```

Yik San Chan  于2021年5月5日周三 下午12:04写道:

> Hi,
>
> According to
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/faq/
>
> > When executing jobs in mini cluster(e.g. when executing jobs in IDE)
> ... please remember to explicitly wait for the job execution to finish as
> these APIs are asynchronous.
>
> I hope my program will be able to run in both local mode as well as in
> remote mode. Therefore I hope to do something like:
>
> ```python
> result = ...
> if local_mode:
>   result.wait()
> else:
>   result
> ```
>
> Is there a way to tell if the program is run under local mode vs. remote
> mode?
>
> Best,
> Yik San
>


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-05 Thread Leonard Xu
Thanks Dawid & Guowei for the great work, thanks everyone involved. 

Best,
Leonard

> 在 2021年5月5日,17:12,Theo Diefenthal  写道:
> 
> Thanks for managing the release. +1. I like the focus on improving operations 
> with this version. 
> 
> Von: "Matthias Pohl" 
> An: "Etienne Chauchot" 
> CC: "dev" , "Dawid Wysakowicz" 
> , "user" , annou...@apache.org
> Gesendet: Dienstag, 4. Mai 2021 21:53:31
> Betreff: Re: [ANNOUNCE] Apache Flink 1.13.0 released
> 
> Yes, thanks for managing the release, Dawid & Guowei! +1
> 
> On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot  > wrote:
> Congrats to everyone involved !
> 
> Best
> 
> Etienne
> 
> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.13.0.
>  
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
>  
> The release is available for download at:
> https://flink.apache.org/downloads.html 
> 
>  
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html 
> 
>  
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349287
>  
> 
>  
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
>  
> Regards,
> Guowei & Dawid
> 



Re: Session Windows - not working as expected

2021-05-05 Thread Swagat Mishra
This seems to be working fine in processing time but doesn't work in event
time. Is there an issue with the way the water mark is defined or do we
need to set up timers?

Please advise.


WORKING:

customerStream
.keyBy((KeySelector) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());


NOT WORKING:

customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();


On Thu, May 6, 2021 at 1:53 AM Sam  wrote:

> Adding the code for CustomWatermarkGenerator
>
> .
> @Override
> public void onEvent(Customer customer, long l, WatermarkOutput 
> watermarkOutput) {
> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
> customer.getEventTime()  );
> }
>
> @Override
> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>
> }
> .
>
>
> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra  wrote:
>
>> Hi,
>>
>> Bit of background, I have a stream of customers who have purchased some
>> product, reading these transactions on a KAFKA topic. I want to aggregate
>> the number of products the customer has purchased in a particular duration
>> ( say 10 seconds ) and write to a sink.
>>
>> I am using session windows to achieve the above.
>>
>> For test purposes, i have mocked  up a customer stream and executed
>> session windows like below.
>>
>> StreamExecutionEnvironment environment = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream customerStream = environment.addSource( new 
>> CustomerGenerator() );
>>
>> customerStream
>> 
>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>> WaterMarkAssigner()))
>> .keyBy(Customer::getIdentifier)
>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(EventTimeTrigger.create())
>> .evictor(new CustomerEvictor())
>> .process(new CustomAggregateFunction())
>> .print();
>>
>> My watermark assigner looks like:
>>
>> public class WaterMarkAssigner implements WatermarkStrategy {
>> static final Logger logger = 
>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>
>> @Override
>> public WatermarkGenerator 
>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>> return new CustomWatermarkGenerator();
>> }
>> }
>>
>> I notice that the evictor, and aggregation functions are getting called only 
>> once for the first customer in the stream.
>>
>> The data stream is generating customers at 1 seconds interval and there are 
>> 5 customer keys for which it's generating transactions.
>>
>> Am I doing something wrong with the above?
>>
>> I want to be able to capture the event on each transaction getting added and 
>> removed from the window so that I can perform the aggregation.
>>
>> please advise.
>>
>>
>>
>>
>>


Re: Session Windows - not working as expected

2021-05-05 Thread Sam
Adding the code for CustomWatermarkGenerator

.
@Override
public void onEvent(Customer customer, long l, WatermarkOutput
watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp,
customer.getEventTime()  );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));

}
.


On Thu, May 6, 2021 at 1:33 AM Swagat Mishra  wrote:

> Hi,
>
> Bit of background, I have a stream of customers who have purchased some
> product, reading these transactions on a KAFKA topic. I want to aggregate
> the number of products the customer has purchased in a particular duration
> ( say 10 seconds ) and write to a sink.
>
> I am using session windows to achieve the above.
>
> For test purposes, i have mocked  up a customer stream and executed
> session windows like below.
>
> StreamExecutionEnvironment environment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream customerStream = environment.addSource( new 
> CustomerGenerator() );
>
> customerStream
> 
> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
> WaterMarkAssigner()))
> .keyBy(Customer::getIdentifier)
> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
> .trigger(EventTimeTrigger.create())
> .evictor(new CustomerEvictor())
> .process(new CustomAggregateFunction())
> .print();
>
> My watermark assigner looks like:
>
> public class WaterMarkAssigner implements WatermarkStrategy {
> static final Logger logger = 
> LoggerFactory.getLogger(WaterMarkAssigner.class);
>
> @Override
> public WatermarkGenerator 
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
> return new CustomWatermarkGenerator();
> }
> }
>
> I notice that the evictor, and aggregation functions are getting called only 
> once for the first customer in the stream.
>
> The data stream is generating customers at 1 seconds interval and there are 5 
> customer keys for which it's generating transactions.
>
> Am I doing something wrong with the above?
>
> I want to be able to capture the event on each transaction getting added and 
> removed from the window so that I can perform the aggregation.
>
> please advise.
>
>
>
>
>


Re: Flink Event specific window

2021-05-05 Thread Swagat Mishra
Hi Arvid,

I sent a separate mail titled - Session Windows - not working as expected (
to the user community )

All other details are here if you need, closing this thread.

Please have a look when you have a few minutes, much appreciated.

Regards,
Swagat

On Thu, May 6, 2021 at 1:50 AM Swagat Mishra  wrote:

> Hi Arvid,
>
> I sent a separate mail titled - Session Windows - not working as expected
>
> closing this thread.
>
> Please have a look when you have a few minutes, much appreciated.
>
> Regards,
> Swagat
>
>
> On Wed, May 5, 2021 at 7:24 PM Swagat Mishra  wrote:
>
>> Hi Arvid,
>>
>> Tried a small POC to reproduce the behaviour, somehow dont see the
>> process function getting called, am I doing something wrong?
>>
>> customerStream
>> .keyBy(Customer::getIdentifier)
>> .window(EventTimeSessionWindows.withGap(Time.seconds(8)))
>> .process(new CustomAggregateFunction())
>> .print();
>>
>> the process function looks like below
>>
>> public class CustomAggregateFunction extends ProcessWindowFunction> CustomerAggregate, String, TimeWindow> {
>>
>> @Override
>> public void process(String key, Context context, Iterable 
>> iterable, Collector collector) throws Exception {
>> System.out.println("in aggregation");
>> }
>> }
>>
>> the customer generator
>>
>> public class CustomerGenerator implements SourceFunction {
>>
>> volatile boolean isRunning = true;
>>
>> private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"};
>>
>> @Override
>> public void run(SourceContext sourceContext) throws Exception {
>> int counter = 1;
>>
>> while (isRunning) {
>> Customer c = new Customer(CUSTOMER_KEY[counter % 5], 
>> LocalTime.now(), 1000);
>> System.out.println("Writing customer: " + c);
>> sourceContext.collect(c);
>> Thread.sleep(1000);
>> counter++;
>> }
>> }
>>
>> @Override
>> public void cancel() {
>> isRunning = false;
>> }
>> }
>>
>>
>> Customer object
>>
>> public class Customer {
>> private String identifier;
>> private LocalTime eventTime;
>> private double amount;
>>
>> public Customer(String identifier, LocalTime eventTime, double amount) {
>> this.identifier = identifier;
>> this.amount = amount;
>> this.eventTime = eventTime;
>> }
>>
>> public String getIdentifier() {
>> return identifier;
>> }
>>
>> public LocalTime getEventTime() {
>> return eventTime;
>> }
>>
>> public double getAmount() {
>> return amount;
>> }
>>
>> @Override
>> public String toString() {
>> return "Customer{" +
>> "identifier='" + identifier + '\'' +
>> ", eventTime=" + eventTime +
>> ", amount=" + amount +
>> '}';
>> }
>> }
>>
>>
>>
>> On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise  wrote:
>>
>>> Hi Swagat,
>>>
>>> 1. Where the data primarily resides depends on the chosen state backend
>>> [1]. In most cases, it's written to some file with a memory cache. It's
>>> possible to query the state [2] but not with SQL. In fact, it's so basic
>>> that we decided to drop the feature in the future to make room for a more
>>> sophisticated solution based around replicating the state to an external
>>> queryable form but there is nothing specific yet.
>>> 2. It would help if you (re)read the section about state persistence.
>>> [3] Basically, the state is updated on every write access of the process
>>> function. Flink creates a checkpoint of the state periodically and can
>>> recover from these checkpoint. It's also possible to look into these
>>> checkpoint with the state processor API [4].
>>> 3. It's embedded. See above to what happens on failure.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-persistence
>>> [4]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>
>>> On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra 
>>> wrote:
>>>
 Hi Arvid,

 On 2 - I was referring to stateful functions as an alternative to
 windows, but in this particular use case, its not fitting in exactly I
 think, though a solution can be built around it.

 On the overall approach here what's the right way to use Flink SQL:

 Every event has the transaction time which I am using as event time to 
 assign WatermarkStrategy
 KeyBy - customerId
 SlidingEventTimeWindows of 1 hr
 then process all elements using ProcessWindowFunction

 Extending above..

 For the session window, taking the above exampl

Re: Flink Event specific window

2021-05-05 Thread Swagat Mishra
Hi Arvid,

I sent a separate mail titled - Session Windows - not working as expected

closing this thread.

Please have a look when you have a few minutes, much appreciated.

Regards,
Swagat


On Wed, May 5, 2021 at 7:24 PM Swagat Mishra  wrote:

> Hi Arvid,
>
> Tried a small POC to reproduce the behaviour, somehow dont see the process
> function getting called, am I doing something wrong?
>
> customerStream
> .keyBy(Customer::getIdentifier)
> .window(EventTimeSessionWindows.withGap(Time.seconds(8)))
> .process(new CustomAggregateFunction())
> .print();
>
> the process function looks like below
>
> public class CustomAggregateFunction extends ProcessWindowFunction CustomerAggregate, String, TimeWindow> {
>
> @Override
> public void process(String key, Context context, Iterable 
> iterable, Collector collector) throws Exception {
> System.out.println("in aggregation");
> }
> }
>
> the customer generator
>
> public class CustomerGenerator implements SourceFunction {
>
> volatile boolean isRunning = true;
>
> private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"};
>
> @Override
> public void run(SourceContext sourceContext) throws Exception {
> int counter = 1;
>
> while (isRunning) {
> Customer c = new Customer(CUSTOMER_KEY[counter % 5], 
> LocalTime.now(), 1000);
> System.out.println("Writing customer: " + c);
> sourceContext.collect(c);
> Thread.sleep(1000);
> counter++;
> }
> }
>
> @Override
> public void cancel() {
> isRunning = false;
> }
> }
>
>
> Customer object
>
> public class Customer {
> private String identifier;
> private LocalTime eventTime;
> private double amount;
>
> public Customer(String identifier, LocalTime eventTime, double amount) {
> this.identifier = identifier;
> this.amount = amount;
> this.eventTime = eventTime;
> }
>
> public String getIdentifier() {
> return identifier;
> }
>
> public LocalTime getEventTime() {
> return eventTime;
> }
>
> public double getAmount() {
> return amount;
> }
>
> @Override
> public String toString() {
> return "Customer{" +
> "identifier='" + identifier + '\'' +
> ", eventTime=" + eventTime +
> ", amount=" + amount +
> '}';
> }
> }
>
>
>
> On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise  wrote:
>
>> Hi Swagat,
>>
>> 1. Where the data primarily resides depends on the chosen state backend
>> [1]. In most cases, it's written to some file with a memory cache. It's
>> possible to query the state [2] but not with SQL. In fact, it's so basic
>> that we decided to drop the feature in the future to make room for a more
>> sophisticated solution based around replicating the state to an external
>> queryable form but there is nothing specific yet.
>> 2. It would help if you (re)read the section about state persistence. [3]
>> Basically, the state is updated on every write access of the process
>> function. Flink creates a checkpoint of the state periodically and can
>> recover from these checkpoint. It's also possible to look into these
>> checkpoint with the state processor API [4].
>> 3. It's embedded. See above to what happens on failure.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-persistence
>> [4]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra 
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> On 2 - I was referring to stateful functions as an alternative to
>>> windows, but in this particular use case, its not fitting in exactly I
>>> think, though a solution can be built around it.
>>>
>>> On the overall approach here what's the right way to use Flink SQL:
>>>
>>> Every event has the transaction time which I am using as event time to 
>>> assign WatermarkStrategy
>>> KeyBy - customerId
>>> SlidingEventTimeWindows of 1 hr
>>> then process all elements using ProcessWindowFunction
>>>
>>> Extending above..
>>>
>>> For the session window, taking the above example , reiterated below:
>>>
>>> Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 
>>> am.
>>> Customer2 has done 1 transaction one at 10:00 am
>>> Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.
>>>
>>> 1 hour window:
>>> 9:30AM - 10:30 AM : Customer 2
>>> 10:30 AM - 11:30 AM : Customer 1, Customer 3
>>> 11:30 AM - 12:30 PM : Customer 3
>>>
>>> Questions - how do we access the state?
>>>
>>>1. Will the proces

Re: Flink auto-scaling feature and documentation suggestions

2021-05-05 Thread vishalovercome
Yes. While back-pressure would eventually ensure high throughput, hand tuning
parallelism became necessary because the job with high source parallelism
would immediately bring down our internal services - not giving enough time
to flink to adjust the in-rate. Plus running all operators at such a high
scale would result in wastage of resources, even with operator chaining in
place. 

That's why I think more toggles are needed to make current auto-scaling
truly shine.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Session Windows - not working as expected

2021-05-05 Thread Swagat Mishra
Hi,

Bit of background, I have a stream of customers who have purchased some
product, reading these transactions on a KAFKA topic. I want to aggregate
the number of products the customer has purchased in a particular duration
( say 10 seconds ) and write to a sink.

I am using session windows to achieve the above.

For test purposes, i have mocked  up a customer stream and executed session
windows like below.

StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream customerStream = environment.addSource( new
CustomerGenerator() );

customerStream

//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

My watermark assigner looks like:

public class WaterMarkAssigner implements WatermarkStrategy {
static final Logger logger =
LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}

I notice that the evictor, and aggregation functions are getting
called only once for the first customer in the stream.

The data stream is generating customers at 1 seconds interval and
there are 5 customer keys for which it's generating transactions.

Am I doing something wrong with the above?

I want to be able to capture the event on each transaction getting
added and removed from the window so that I can perform the
aggregation.

please advise.


Re: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: IOException: 1 time,

2021-05-05 Thread Robert Metzger
Hi Ragini,

Since this exception is coming from the Hbase client, I assume the issue
has nothing to do with Flink directly.
I would recommend carefully studying the HBase client configuration
parameters, maybe setup a simple Java application that "hammers" data into
Hbase at a maximum rate to understand the impact of different combinations
of configuration parameters.

Best,
Robert

On Tue, May 4, 2021 at 5:05 AM Ragini Manjaiah 
wrote:

> Hi ,
> One of my flink applications needs to get and put records from HBASE for
> every event while processing in real time . When there are less events the
> application process without any issues. when the number of events
> increases we start hitting with the below mentioned exception .Can these
> exceptions bring down the throughput and start to build lag . What are the
> parameters we can tune at HBASE /flink side to overcome this exception . We
> are seeing 7000/sec hits as minimum hits to HBase when load is normal. The
> hbase table 3 region server
>
>
> org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 
> action: IOException: 1 time,
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:258)
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2000(AsyncProcess.java:238)
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1817)
>   at 
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:240)
>   at 
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:190)
>   at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1434)
>   at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1018)
>   at org......xx(xxx.java:202)
>   at 
> org......xxx.xxx(xxx.java:144)
>   at 
> org......xxx.(x.java:30)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
>
>


Re: Interacting with flink-jobmanager via CLI in separate pod

2021-05-05 Thread Robert Metzger
Okay, it appears to have resolved 10.43.0.1:30081 as the address of the
JobManager. Most likely, the container can not access this address. Can you
validate this from within the container?

If I understand the Flink documentation correctly, you should be able to
manually specify rest.address, rest.port for the JobManager address. If you
can manually figure out an address to the JobManager service, and pass it
to Flink, the submission should work.

On Wed, May 5, 2021 at 7:15 PM Robert Cullen  wrote:

> Thanks for the reply. Here is an updated exception with DEBUG on. It
> appears to be timing out:
>
> 2021-05-05 16:56:19,700 DEBUG 
> org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory [] - Setting 
> namespace of Kubernetes client to cmdaa
> 2021-05-05 16:56:19,700 DEBUG 
> org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory [] - Setting 
> max concurrent requests of Kubernetes client to 64
> 2021-05-05 16:56:20,176 INFO  
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
> flink cluster flink-jobmanager successfully, JobManager Web Interface: 
> http://10.43.0.1:30081
> 2021-05-05 16:56:20,239 INFO  org.apache.flink.client.cli.CliFrontend 
>  [] - Waiting for response...
> 2021-05-05 17:02:09,605 ERROR org.apache.flink.client.cli.CliFrontend 
>  [] - Error while running the command.
> org.apache.flink.util.FlinkException: Failed to retrieve job list.
> at 
> org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:449) 
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:430) 
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427) 
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060) 
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) 
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>  [flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
> [flink-dist_2.12-1.13.0.jar:1.13.0]
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Number of retries has been exhausted.
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>  ~[?:1.8.0_292]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>  ~[?:1.8.0_292]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>  ~[?:1.8.0_292]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>  ~[?:1.8.0_292]
> at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:263)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4

Re: Flink auto-scaling feature and documentation suggestions

2021-05-05 Thread Ken Krugler
Hi Vishal,

WRT “bring down our internal services” - a common pattern with making requests 
to external services is to measure latency, and throttle (delay) requests in 
response to increased latency.

You’ll see this discussed frequently on web crawling forums as an auto-tuning 
approach.

Typically there’s a steady increase in latency as load on the service increases.

The trick is throttling soon enough before you hit the “elbow” where a service 
effectively falls over.

— Ken



> On May 5, 2021, at 9:08 AM, vishalovercome  wrote:
> 
> Yes. While back-pressure would eventually ensure high throughput, hand tuning
> parallelism became necessary because the job with high source parallelism
> would immediately bring down our internal services - not giving enough time
> to flink to adjust the in-rate. Plus running all operators at such a high
> scale would result in wastage of resources, even with operator chaining in
> place. 
> 
> That's why I think more toggles are needed to make current auto-scaling
> truly shine.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Flink auto-scaling feature and documentation suggestions

2021-05-05 Thread David Anderson
Well, I was thinking you could have avoided overwhelming your internal
services by using something like Flink's async i/o operator, tuned to limit
the total number of concurrent requests. That way the pipeline could have
uniform parallelism without overwhelming those services, and then you'd
rely on backpressure to throttle the sources. I'm not saying that would be
better -- it's arguably worse to have constant backpressure.

But this point I don't understand:

> running all operators at such a high scale would result in wastage of
resources, even with operator chaining in place.

Don't you have the same number of slots, each with the same resources,
either way? Plus, you have to do more ser/de, and more networking?

On Wed, May 5, 2021 at 6:08 PM vishalovercome  wrote:

> Yes. While back-pressure would eventually ensure high throughput, hand
> tuning
> parallelism became necessary because the job with high source parallelism
> would immediately bring down our internal services - not giving enough time
> to flink to adjust the in-rate. Plus running all operators at such a high
> scale would result in wastage of resources, even with operator chaining in
> place.
>
> That's why I think more toggles are needed to make current auto-scaling
> truly shine.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Interacting with flink-jobmanager via CLI in separate pod

2021-05-05 Thread Robert Cullen
Thanks for the reply. Here is an updated exception with DEBUG on. It
appears to be timing out:

2021-05-05 16:56:19,700 DEBUG
org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory [] -
Setting namespace of Kubernetes client to cmdaa
2021-05-05 16:56:19,700 DEBUG
org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory [] -
Setting max concurrent requests of Kubernetes client to 64
2021-05-05 16:56:20,176 INFO
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] -
Retrieve flink cluster flink-jobmanager successfully, JobManager Web
Interface: http://10.43.0.1:30081
2021-05-05 16:56:20,239 INFO  org.apache.flink.client.cli.CliFrontend
[] - Waiting for response...
2021-05-05 17:02:09,605 ERROR org.apache.flink.client.cli.CliFrontend
[] - Error while running the command.
org.apache.flink.util.FlinkException: Failed to retrieve job list.
at 
org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:449)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:430)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
[flink-dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
[flink-dist_2.12-1.13.0.jar:1.13.0]
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Number of retries has been
exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_292]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_292]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_292]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
~[?:1.8.0_292]
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:263)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEv

Re: NPE when aggregate window.

2021-05-05 Thread Arvid Heise
Hi,

I'm assuming it's just a workaround for changing fields. The string
representation happens to be stable while the underlying values change.

It's best practice to use completely immutable types if you have similar
issues, you should double-check that nothing can be changed in your data
type or you make defensive copies before changing.

On Wed, May 5, 2021 at 4:02 PM tuk  wrote:

> Can some provide a bit more explanation why replacing
> /com.google.common.base.Objects.hashCode with toString().hashCode(),/ with
> /toString().hashCode()/ making it work?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: NPE when aggregate window.

2021-05-05 Thread tuk
Can some provide a bit more explanation why replacing
/com.google.common.base.Objects.hashCode with toString().hashCode(),/ with
/toString().hashCode()/ making it work?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[NOTICE] Flink 1.12.3 artifacts for Scala 2.12 were built against Scala 2.11

2021-05-05 Thread Chesnay Schepler

To all Scala 2.12 users,

Due to a mistake during the release process of Flink 1.12.3 jars 
intended to be built against Scala 2.12 were actually built against 
Scala 2.11 .
This affects all jars published to maven central; the convenience 
binaries are not affected.


Scala 2.12 users are advised to not upgrade to Flink 1.12.3, and instead 
wait for 1.12.4, which we will try to release as soon as possible.


Regards,
Chesnay


Re: remote task manager netty exception

2021-05-05 Thread Roman Khachatryan
Hi,

> Could it be somehow partition info isn't up to date on TM when job is 
> restarting?
Partition info should be up to date or become so eventually - but this
is assuming that JM is able to detect the failure.

The latter may not be the case, as Sihan You wrote previously:
> The strange thing is that only 23 of the TM are complaining about the 
> connection issue.
> When this exception occurs, the TM they are complaining about is still up and 
> live.

So it looks like JM to TM network links are OK, but some TM-TM links are down.
Do you have an option to check connection from TM to TM during this
restart loop?

Also it makes sense to check heartbeat interval and timeout [1] so
that JM can detect TM failure quickly enough.

https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-interval

Regards,
Roman


On Tue, May 4, 2021 at 10:17 PM Yichen Liu  wrote:
>
> Chime in here since I work with Sihan.
>
> Roman, there isn't much logs beyond this WARN, in fact it should be ERROR 
> since it fail our job and job has to restart.
>
> Here is a fresh new example of "Sending the partition request to 'null' 
> failed." exception. The only log we see before exception was:
>
> timestamp="2021-05-04 14:04:33,014", level="INFO", thread="Latest Billing 
> Info Operator -> (Filter, Filter) (55/80)#12", 
> class="org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend", 
> method="cleanInstanceBasePath(line:462)", message="Closed RocksDB State 
> Backend. Cleaning up RocksDB working directory 
> /tmp/flink-io-9570aace-eec0-4dd9-867f-22a7d367282e/job__op_KeyedProcessOperator_6cf741936dcd5ce8199875ace1f5638a__55_80__uuid_b8ef0675-4355-4b99-9f03-77d1eb713bf4."
>
> timestamp="2021-05-04 14:04:33,633", level="WARN", thread="Latest Billing 
> Info Operator -> (Filter, Filter) (55/80)#12", 
> class="org.apache.flink.runtime.taskmanager.Task", 
> method="transitionState(line:1033)", message="Latest Billing Info Operator -> 
> (Filter, Filter) (55/80)#12 (0f9caefb1122609e3337f2537f7324c3) switched from 
> RUNNING to FAILED." 
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> Sending the partition request to 'null' failed. at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>
> but this looks more like a consequence than cause of exception.
>
> Note this seems to be pretty consistent when one of our TMs went lost. Could 
> it be somehow partition info isn't up to date on TM when job is restarting?
>
> Also note that we have a pretty huge state, each TM has around 130GB state, 
> TMs have a setting of 10GB memory and 2700m CPU (in k8s unit).
>
> On Mon, May 3, 2021 at 8:29 AM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> I see that JM and TM failures are different (from TM, it's actually a
>> warning). Could you please share the ERROR message from TM?
>>
>> Have you tried increasing taskmanager.network.retries [1]?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries
>>
>> Regards,
>> Roman
>>
>> On Fri, Apr 30, 2021 at 11:55 PM Sihan You  wrote:
>> >
>> > Hi,
>> >
>> > We are experiencing some netty issue with our Flink cluster, which we 
>> > couldn't figure the cause.
>> >
>> > Below is the stack trace of exceptions from TM's and JM's perspectives.  
>> > we have 85 TMs and one JM in HA mode. The strange thing is that only 23 of 
>> > the TM are complaining about the connection issue. When this exception 
>> > occurs, the TM they are complaining about is still up and live. this will 
>> > cause our job to be stuck in the restart loop for a couple of hours then 
>> > back to normal.
>> >
>> > We are using HDFS as the state backend and the checkpoint dir.
>> > the application is running in our own data center and in Kubernetes as a 
>> > standalone job.
>> >
>> >
>> > ## Job Graph
>> >
>> > the job graph is like this.
>> > source 1.1 (5 parallelism).  ->
>> >   union ->
>> > source 1.2 (80 parallelism) ->
>> > 
>> > connect -> sink
>> > source 2.1 (5 parallelism).  ->
>> >   union ->
>> > source 2.2 (80 parallelism) ->
>> >
>> >
>> > ## JM's Stacktrace
>> >
>> > ```
>> > message="PLI Deduplicate Operator (60/80) 
>> > (5d2b9fba2eaeae452068bc53e4232d0c) switched from RUNNING to FAILED on 
>> > 100.98.115.117:6122-924d20 @ 100.98.115.117 
>> > (dataPort=41245).

Re: Presence of Jars in Flink reg security

2021-05-05 Thread Chesnay Schepler
One of these (plexus-utils) is afaik used by maven, so the scanner is 
potentially scanning the wrong thing. Or you are scanning all 
dependencies downloaded during the build of Flink, including everything 
used by various plugins of the build process & maven itself.


On 5/5/2021 11:08 AM, Till Rohrmann wrote:

Hi Prasanna,

in the latest Flink version (1.13.0) I couldn't find these dependencies.
Which version of Flink are you looking at? What you could check is whether
one of these dependencies is contained in one of Flink's shaded
dependencies [1].

[1] https://github.com/apache/flink-shaded

Cheers,
Till

On Tue, May 4, 2021 at 3:00 PM Prasanna kumar 
wrote:


Hi Flinksters,

Our repo which is a maven based java project(flink) went through SCA scan using 
WhiteSource tool and following are the HIGH severity issues reported. The 
target vulnerable jar is not found when we build the dependency tree of the 
project.

Could any one let us know if flink uses these anywhere.

+--++
| Library  | Severity   |
+==++
| xercesImpl-2.9.1.jar | HIGH   |
+--++
- Artifact ID: xercesImpl
- Group ID: xerces
- Library Version: 2.9.1
- Library Path: 
/var/lib/jenkins/workspace/branch/latest/?/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar
- Dependency: None
- Type: MAVEN_ARTIFACT
- Description: XMLscanner.java in Apache Xerces2 Java Parser before 
2.12.0, as used in the Java Runtime Environment (JRE) in IBM Java 5.0 before 
5.0 SR16-FP3, 6 before 6 SR14, 6.0.1 before 6.0.1 SR6, and 7 before 7 SR5 as 
well as Oracle Java SE 7u40 and earlier, Java SE 6u60 and earlier, Java SE 
5.0u51 and earlier, JRockit R28.2.8 and earlier, JRockit R27.7.6 and earlier, 
Java SE Embedded 7u40 and earlier, and possibly other products allows remote 
attackers to cause a denial of service via vectors related to XML attribute 
names.
- Suggested Fix: Upgrade to version xerces:xercesImpl:Xerces-J_2_12_0


+---++
| Library   | Severity   |
+===++
| struts-core-1.3.8.jar | HIGH   |
+---++
- Artifact ID: struts-core
- Group ID: org.apache.struts
- Library Version: 1.3.8
- Library Path: 
/var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/apache/struts/struts-core/1.3.8/struts-core-1.3.8.jar
- Dependency: None
- Type: MAVEN_ARTIFACT
- Description: ActionServlet.java in Apache Struts 1 1.x through 1.3.10 
does not properly restrict the Validator configuration, which allows remote 
attackers to conduct cross-site scripting (XSS) attacks or cause a denial of 
service via crafted input, a related issue to CVE-2015-0899.
- Suggested Fix: Replace or update the following file: 
ActionServlet.java

+--++
| Library  | Severity   |
+==++
| plexus-utils-3.0.jar | HIGH   |
+--++
- Artifact ID: plexus-utils
- Group ID: org.codehaus.plexus
- Library Version: 3.0
- Library Path: 
/var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/codehaus/plexus/plexus-utils/3.0/plexus-utils-3.0.jar
- Dependency: None
- Type: MAVEN_ARTIFACT
- Description: Security vulnerability found in plexus-utils before 
3.0.24. XML injection found in XmlWriterUtil.java.
- Suggested Fix: Upgrade to version 3.0.24

Thanks,

Prasanna.






Re: Interacting with flink-jobmanager via CLI in separate pod

2021-05-05 Thread Robert Metzger
Hi,
can you check the client log in the "log/" directory?
The Flink client will try to access the K8s API server to retrieve the
endpoint of the jobmanager. For that, the pod needs to have permissions
(through a service account) to make such calls to K8s. My hope is that the
logs or previous messages are giving an indication into what Flink is
trying to do.
Can you also try running on DEBUG log level? (should be the
log4j-cli.properties file).



On Tue, May 4, 2021 at 3:17 PM Robert Cullen  wrote:

> I have a flink cluster running in kubernetes, just the basic installation
> with one JobManager and two TaskManagers. I want to interact with it via
> command line from a separate container ie:
>
> root@flink-client:/opt/flink# ./bin/flink list --target 
> kubernetes-application -Dkubernetes.cluster-id=job-manager
>
> How do you interact in the same kubernetes instance via CLI (Not from the
> desktop)?  This is the exception:
>
> 
>  The program finished with the following exception:
>
> java.lang.RuntimeException: 
> org.apache.flink.client.deployment.ClusterRetrieveException: Could not get 
> the rest endpoint of job-manager
> at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:103)
> at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:145)
> at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:67)
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1001)
> at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: Could 
> not get the rest endpoint of job-manager
> ... 9 more
> root@flink-client:/opt/flink#
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Question about state processor data outputs

2021-05-05 Thread Chen-Che Huang
Hi Robert,

Due to the performance issue of using state processor, I probably would like to 
give up state processor and am trying StreamingFileSink in a streaming manner. 
However, I need to store the files on GCS. However, I encountered the error 
below. It looks like Flink hasn't support GCS for StreamingFileSink 
(https://issues.apache.org/jira/browse/FLINK-11838). If you know any solution 
to this issue, please let me know. Thanks.
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only 
supported for HDFS
at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:60)

Best regards,
Chen-Che

On 2021/04/16 06:53:37, Robert Metzger  wrote: 
> Hi,
> I assumed you are using the DataStream API, because you mentioned the
> streaming sink. But you also mentioned the state processor API (which I
> ignored a bit).
> 
> I wonder why you are using the state processor API. Can't you use the
> streaming job that created the state also for writing it to files using the
> StreamingFileSink?
> 
> If you want to stick to the DataSet API, then I guess you have to implement
> a custom (File)OutputFormat.
> 
> 
> On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang  wrote:
> 
> > Hi Robert,
> >
> > Thanks for your code. It's really helpful!
> >
> > However, with the readKeyedState api of state processor, we get dataset
> > for our data instead of datastream and it seems the dataset doesn't support
> > streamfilesink (not addSink method like datastream). If not, I need to
> > transform the dataset to a datastream. I'm not sure it's doable based on
> > https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
> > If it's doable, then I'll be able to solve our problem with applying
> > streamfilesink to the transformed dataset.
> >
> > Best wishes,
> > Chen-Che Huang
> >
> > On 2021/04/15 19:23:43, Robert Metzger  wrote:
> > > Hey Chen-Che Huang,
> > >
> > > I guess the StreamingFileSink is what you are looking for. It is
> > documented
> > > here:
> > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > I drafted a short example (that is not production ready), which does
> > > roughly what you are asking for:
> > > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> > >
> > > Hope this helps!
> > >
> > > Best,
> > > Robert
> > >
> > >
> > > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > We're going to use state processor to make our keyedstate data to be
> > > > written to different files based on the keys. More specifically, we
> > want
> > > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt
> > where
> > > > the value with the same key is stored in the same file. In each file,
> > the
> > > > data may be stored as follows. As far as I know, I need to implement
> > my own
> > > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction)
> > to
> > > > meet the requirement. However, I wonder is there a native way to
> > achieve
> > > > this without implementing my own Sink because using official solution
> > is
> > > > usually more efficient and reliable than doing it by myself.  Many
> > thanks
> > > > for any comment.
> > > >
> > > > key1.txt
> > > > key1 value11
> > > > key1 value21
> > > > key1 value31
> > > >
> > > > key2.txt
> > > > key2 value21
> > > > key2 value22
> > > > key2 value23
> > > >
> > > > Best wishes,
> > > > Chen-Che Huang
> > > >
> > >
> >
> 


Re: PyFlink: Split input table stream using filter()

2021-05-05 Thread Dian Fu
Hi Sumeet,

Yes, this approach also works in Table API.

Could you share which API you use to execute the job? For jobs with multiple 
sinks, StatementSet should be used. You could refer to [1] for more details on 
this.

Regards,
Dian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/intro_to_table_api/#emit-results-to-multiple-sink-tables

> 2021年5月5日 下午5:51,Sumeet Malhotra  写道:
> 
> Hi,
> 
> I would like to split streamed data from Kafka into 2 streams based on some 
> filter criteria, using PyFlink Table API. As described here [1], a way to do 
> this is to use .filter() which should split the stream for parallel 
> processing.
> 
> Does this approach work in Table API as well? I'm doing the following, but 
> control never reaches the second stream.
> 
> input = t_env.from_path('TableName')
> stream1 = input.filter().select(...)...
> stream2 = input.filter().select(...)...
> 
> When I execute this, I only see the first stream getting processed. Control 
> never reaches stream2. I have set parallelism to 2.
> 
> Am I missing something? Or is this only supported in Datastreams?
> 
> Thanks in advance,
> Sumeet
> 
> [1]: 
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>  
> 
> 



PyFlink: Split input table stream using filter()

2021-05-05 Thread Sumeet Malhotra
Hi,

I would like to split streamed data from Kafka into 2 streams based on some
filter criteria, using PyFlink Table API. As described here [1], a way to
do this is to use .filter() which should split the stream for parallel
processing.

Does this approach work in Table API as well? I'm doing the following, but
control never reaches the second stream.

input = t_env.from_path('TableName')
stream1 = input.filter().select(...)...
stream2 = input.filter().select(...)...

When I execute this, I only see the first stream getting processed. Control
never reaches stream2. I have set parallelism to 2.

Am I missing something? Or is this only supported in Datastreams?

Thanks in advance,
Sumeet

[1]:
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream


Re: Is keyed state supported in PyFlink?

2021-05-05 Thread Sumeet Malhotra
Thanks Dian. Yes, I hadn't looked at the 1.13.0 documentation earlier.

On Wed, May 5, 2021 at 1:46 PM Dian Fu  wrote:

> Hi Sumeet,
>
> This feature is supported in 1.13.0 which was just released and so there
> is no documentation about it in 1.12.
>
> Regards,
> Dian
>
> 2021年5月4日 上午2:09,Sumeet Malhotra  写道:
>
> Hi,
>
> Is keyed state [1] supported by PyFlink yet? I can see some code for it in
> the Flink master branch, but there's no mention of it in the 1.12 Python
> documentation.
>
> Thanks,
> Sumeet
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
>
>
>


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-05 Thread Theo Diefenthal
Thanks for managing the release. +1. I like the focus on improving operations 
with this version. 


Von: "Matthias Pohl"  
An: "Etienne Chauchot"  
CC: "dev" , "Dawid Wysakowicz" , 
"user" , annou...@apache.org 
Gesendet: Dienstag, 4. Mai 2021 21:53:31 
Betreff: Re: [ANNOUNCE] Apache Flink 1.13.0 released 

Yes, thanks for managing the release, Dawid & Guowei! +1 

On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot < [ mailto:echauc...@apache.org 
| echauc...@apache.org ] > wrote: 





Congrats to everyone involved ! 

Best 

Etienne 
On 03/05/2021 15:38, Dawid Wysakowicz wrote: 

BQ_BEGIN

The Apache Flink community is very happy to announce the release of Apache 
Flink 1.13.0. 
Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications. 
The release is available for download at: 
[ https://flink.apache.org/downloads.html | 
https://flink.apache.org/downloads.html ] 
Please check out the release blog post for an overview of the improvements for 
this bugfix release: 
[ https://flink.apache.org/news/2021/05/03/release-1.13.0.html | 
https://flink.apache.org/news/2021/05/03/release-1.13.0.html ] 
The full release notes are available in Jira: 
[ 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349287
 | 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349287
 ] 
We would like to thank all contributors of the Apache Flink community who made 
this release possible! 
Regards, 
Guowei & Dawid 




BQ_END




Re: Presence of Jars in Flink reg security

2021-05-05 Thread Till Rohrmann
Hi Prasanna,

in the latest Flink version (1.13.0) I couldn't find these dependencies.
Which version of Flink are you looking at? What you could check is whether
one of these dependencies is contained in one of Flink's shaded
dependencies [1].

[1] https://github.com/apache/flink-shaded

Cheers,
Till

On Tue, May 4, 2021 at 3:00 PM Prasanna kumar 
wrote:

> Hi Flinksters,
>
> Our repo which is a maven based java project(flink) went through SCA scan 
> using WhiteSource tool and following are the HIGH severity issues reported. 
> The target vulnerable jar is not found when we build the dependency tree of 
> the project.
>
> Could any one let us know if flink uses these anywhere.
>
> +--++
> | Library  | Severity   |
> +==++
> | xercesImpl-2.9.1.jar | HIGH   |
> +--++
>   - Artifact ID: xercesImpl
>   - Group ID: xerces
>   - Library Version: 2.9.1
>   - Library Path: 
> /var/lib/jenkins/workspace/branch/latest/?/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar
>   - Dependency: None
>   - Type: MAVEN_ARTIFACT
>   - Description: XMLscanner.java in Apache Xerces2 Java Parser before 
> 2.12.0, as used in the Java Runtime Environment (JRE) in IBM Java 5.0 before 
> 5.0 SR16-FP3, 6 before 6 SR14, 6.0.1 before 6.0.1 SR6, and 7 before 7 SR5 as 
> well as Oracle Java SE 7u40 and earlier, Java SE 6u60 and earlier, Java SE 
> 5.0u51 and earlier, JRockit R28.2.8 and earlier, JRockit R27.7.6 and earlier, 
> Java SE Embedded 7u40 and earlier, and possibly other products allows remote 
> attackers to cause a denial of service via vectors related to XML attribute 
> names.
>   - Suggested Fix: Upgrade to version xerces:xercesImpl:Xerces-J_2_12_0
>
>
> +---++
> | Library   | Severity   |
> +===++
> | struts-core-1.3.8.jar | HIGH   |
> +---++
>   - Artifact ID: struts-core
>   - Group ID: org.apache.struts
>   - Library Version: 1.3.8
>   - Library Path: 
> /var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/apache/struts/struts-core/1.3.8/struts-core-1.3.8.jar
>   - Dependency: None
>   - Type: MAVEN_ARTIFACT
>   - Description: ActionServlet.java in Apache Struts 1 1.x through 1.3.10 
> does not properly restrict the Validator configuration, which allows remote 
> attackers to conduct cross-site scripting (XSS) attacks or cause a denial of 
> service via crafted input, a related issue to CVE-2015-0899.
>   - Suggested Fix: Replace or update the following file: 
> ActionServlet.java
>
> +--++
> | Library  | Severity   |
> +==++
> | plexus-utils-3.0.jar | HIGH   |
> +--++
>   - Artifact ID: plexus-utils
>   - Group ID: org.codehaus.plexus
>   - Library Version: 3.0
>   - Library Path: 
> /var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/codehaus/plexus/plexus-utils/3.0/plexus-utils-3.0.jar
>   - Dependency: None
>   - Type: MAVEN_ARTIFACT
>   - Description: Security vulnerability found in plexus-utils before 
> 3.0.24. XML injection found in XmlWriterUtil.java.
>   - Suggested Fix: Upgrade to version 3.0.24
>
> Thanks,
>
> Prasanna.
>
>


Re: Is keyed state supported in PyFlink?

2021-05-05 Thread Dian Fu
Hi Sumeet,

This feature is supported in 1.13.0 which was just released and so there is no 
documentation about it in 1.12.

Regards,
Dian

> 2021年5月4日 上午2:09,Sumeet Malhotra  写道:
> 
> Hi,
> 
> Is keyed state [1] supported by PyFlink yet? I can see some code for it in 
> the Flink master branch, but there's no mention of it in the 1.12 Python 
> documentation.
> 
> Thanks,
> Sumeet
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
>  
> 
> 



Re: Flink auto-scaling feature and documentation suggestions

2021-05-05 Thread David Anderson
Interesting. So if I understand correctly, basically you limited the
parallelism of the sources in order to avoid running the job with constant
backpressure, and then scaled up the windows to maximize throughput.

On Tue, May 4, 2021 at 11:23 PM vishalovercome  wrote:

> In one of my jobs, windowing is the costliest operation while upstream and
> downstream operators are not as resource intensive. There's another
> operator
> in this job that communicates with internal services. This has high
> parallelism as well but not as much as that of the windowing operation.
> Running all operators with the same parallelism as the windowing operation
> would choke some of our internal services we'll be consuming from our
> source
> at a rate much higher than what our internal services can handle. Thus our
> sources, sinks, validation, monitoring related operators have very low
> parallelism while one has high parallelism and another has even higher
> parallelism.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>