Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Edward
So there is no way to do a countWindow(100) and preserve data locality?

My use case is this: augment a data stream with new fields from DynamoDB
lookup. DynamoDB allows batch get's of up to 100 records, so I am trying to
collect 100 records before making that call. I have no other reason to do a
repartitioning, so I am hoping to avoid incurring the cost of shipping all
the data across the network to do this. 

If I use countWindowAll, I am limited to parallelism = 1, so all data gets
repartitioned twice. And if I use keyBy().countWindow(), then it gets
repartitioned by key. So in both cases I lose locality.

Am I missing any other options?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13981.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Edward
Thanks, Fabian.
In this case, I could just extend your idea by creating some deterministic
multiplier of the subtask index:

  RichMapFunction> keyByMap = new
RichMapFunction>() {
  public Tuple2 map(String value) {
int indexOfCounter = Math.abs(value.hashCode()) % 4;
int key = (( getRuntimeContext().getIndexOfThisSubtask() +
1)  * (indexOfCounter + 1)) - 1;
counters.get(key).add(1);
return new Tuple2<>(key, value);
}
};

With this idea, if there are 12 subtasks, then subtask 0 would create 4
keys: 0, 12, 24, and 36.

The big advantage of your idea was that it would keep the data local. Is
this still true with my example here (where I'm applying a function to the
subtask index)? That is, if a each partition is generating a unique set of
keys (unique to that subtask), will it optimize to keep that set of keys
local for the next downstream subtask?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13978.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Edward
Hi Fabian - 
I've tried this idea of creating a KeyedStream based on
getRuntimeContext().getIndexOfThisSubtask(). However, not all target
subtasks are receiving records.

All subtasks have a parallelism of 12, so I have 12 source subtasks and 12
target subtasks. I've confirmed that the call to getIndexOfThisSubtask is
evenly distributed between 0 and 11. However, 4 out of the 12 target
subtasks (the subtasks after the hash) are no receiving any data. This means
it's not actually keeping all the data local, because at least 4 of the 12
partitions could be getting sent to different TaskManagers.

Do I need to do a .partitionCustom to ensure even/local distribution?

Thanks,
Edward



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13971.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Classloader issue with UDF's in DataStreamSource

2017-08-28 Thread Edward
I need help debugging a problem with using user defined functions in my
DataStreamSource code.

Here's the behavior:
The first time I upload my jar to the Flink cluster and submit the job, it
runs fine.
For any subsequent runs of the same job, it's giving me a NoClassDefFound
error on one of my UDF classes.
If I restart the Flink cluster, this it will again work, but only the first
time I submit the job.

I am using a customized KafkaAvroDeserializer where the reader schema is
different from the writer schema (and where that reader schema in a
generated Avro class in which is included in my uploaded jar file). If I
change my code to use the standard KafkaAvroDeserializer (i.e. no UDF's in
the DataStreamSource), it works fine, even though there are UDF's used in
other steps of my job, so the problem seems specific to DataStreamSource
step.

Why would the classloader here not have access to all classes in my uploaded
jar file, while the classloader used in subsequent steps does have access to
that jar file? Why would it work fine the first time I upload the jar via
the Flink Dashboard, but not on subsequent executions?

Here's the exception I'm seeing:








--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Classloader-issue-with-UDF-s-in-DataStreamSource-tp15192.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Classloader issue with UDF's in DataStreamSource

2017-08-30 Thread Edward
In case anyone else runs into this, here's what I discovered:

For whatever reason, the classloader used by
org.apache.flink.api.java.typeutils.TypeExtractor did not have access to the
classes in my udf.jar file. However, if I changed my
KeyedDeserializationSchema implementation to use standard Avro classes (i.e.
GenericRecord rather than a SpecificRecord), the classloader didn't require
any of the generated Avro classes in udf.jar during the ExecutionGraph
stage.

At execution time, my deserializer forced the returned GenericRecord into
the my custom Avro SpecificRecord class, which was available to the
classloader at this point. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: REST api: how to upload jar?

2017-12-08 Thread Edward
Has anyone successfully uploaded to the REST API using command line tools
(i.e. curl)? If so, please post an example.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: specify user name when connecting to hdfs

2017-12-07 Thread Edward
I have the same question. 
I am setting fs.hdfs.hadoopconf to the location of a Hadoop config. However,
when I start a job, I get an error message that it's trying to connect to
the HDFS directory as user "flink":

Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
Permission denied: user=flink, access=EXECUTE,
inode="/user/site":site:hadoop:drwx--
at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281)
at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262)
at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkTraverse(DefaultAuthorizationProvider.java:206)
at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:158)
at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152)
at
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:3495)
at
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:3478)
at
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:3465)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkTraverse(FSNamesystem.java:6596)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4377)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4355)

I have seen in other threads on this list where people mention setting up
the impersonate user in core-site.xml, but I've been unable to determine the
correct setting.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: REST api: how to upload jar?

2017-12-11 Thread Edward
Thanks for the response, Shailesh. However, when I try with python, I get the
same error as when I attempted this with cURL:


That is, if I tell python (or cURL) that my jar file is at
/path/to/jar/file.jar, the file path it uses on the server side includes
that entire path. And if I try the script with no path (i.e. run the script
in the folder where file.jar exists), it uploads an empty file named
file.jar.  The endpoint at file/upload seems to be take the form-data
element "jarfile" and use the fully qualified path when trying to save the
jar file on the server side.

Here is my equivalent attempt using cURL, which gives the same
FileNoFoundException as above:




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: REST api: how to upload jar?

2017-12-11 Thread Edward
Let me try that again -- it didn't seem to render my commands correctly:

Thanks for the response, Shailesh. However, when I try with python, I get
the 
same error as when I attempted this with cURL: 

$ python uploadJar.py
java.io.FileNotFoundException:
/tmp/flink-web-4bed7801-fa5e-4e5e-abf1-3fa13ba1f528/438eaac1-7647-4716-8d8d-f95acd8129b2_/path/to/jar/file.jar
(No such file or directory)

That is, if I tell python (or cURL) that my jar file is at 
/path/to/jar/file.jar, the file path it uses on the server side includes 
that entire path in the target file name. And if I try the script with no
path (i.e. run the script 
in the folder where file.jar exists), it uploads an empty file named 
file.jar.  The endpoint at file/upload seems to be take the form-data 
element "jarfile" and use the fully qualified path when trying to save the 
jar file on the server side. 

Here is my equivalent attempt using cURL, which gives the same 
FileNoFoundException as above: 

curl 'http://localhost:8081/jars/upload' -H 'Content-Type:
multipart/form-data; boundary=Boundary' --data-binary
$'--Boundary\r\nContent-Disposition: form-data; name="jarfile";
filename="/path/to/jar/file.jar"\r\nContent-Type:
application/java-archive\r\n\r\n\r\n--Boundary--\r\n' 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Upgrade to 1.4.0 - Kryo/Avro issue

2018-01-19 Thread Edward
We're attempting to upgrade our 1.3.2 cluster and jobs to 1.4.0. When
submitting jobs to the 1.4.0 Kafka cluster, they fail with a Kryo
registration error. 

My jobs are consuming from Kafka topics with messages in Avro format. The
avro schemas are registered with a Confluent avro schema registry. For
ingestion, we've been using the KafkaDeserializerWrapper class from this
pull request: https://github.com/apache/flink/pull/2705

In the pom.xml, I added a new dependency for flink-avro, and upgraded all
other maven dependencies to version 1.4.0

Here's the error:


Here are the dependencies:





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error with Avro/Kyro after upgrade to Flink 1.4

2018-01-24 Thread Edward
Thanks, Stephan. You're correct that there was in fact no shading issue in
the official flink-dist_2.11-1.4.0.jar. We are using the jar in the flink
docker image, but I mis-spoke when I said ObjectMapper appeared there
unshaded. It turned out the issue was really a version conflict in our job's
uber-jar file between jackson-core and jackson-databind, which I was able to
resolve.

Regarding the Verify Error, please post back here when you have a patch or a
link to a pull request which we can track.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-06 Thread Edward
I'm having an issue where off-heap memory is growing unchecked until I get
OOM exceptions.

I was hoping that upgrading to 1.4 would solve these, since the child-first
classloader is supposed to resolve issues with Avro classes cached in a
different classloader (which prevents the classloaders from being garbage
collected).

However, after upgrading, we are still seeing an off-heap memory leak. I
think I may have isolated the issue to the JmxReporter class used for
collecting Kafka metrics.

Here are the details of what I'm seeing:
Our cluster is running in kubernetes, using the latest flink:1.4 docker
image. We are using the default classloading order (child first).

If I resubmit my job repeatedly, the ClassLoaders from the previous job
submissions don't get cleaned up, and the non-heap memory slowly grows until
the task manager runs out of memory. 

I can see all of the un-deleted classloaders if I run "sudo -u flink jmap
-clstats " (the output is below). This list of dead classloaders
continues to grow every time I kill and resubmit a new Flink job.  In all,
it lists 3200 dead class loaders. I'm only going to upload the ones which
show more than 2K of used memory.

finding class loader instances ..done.
computing per loader stat ..done.
please wait.. computing liveness.liveness analysis may be inaccurate ...
class_loaderclasses bytes   parent_loader   alive?  type
0x807302a0  7522122130760x804c58c0  dead
sun/misc/Launcher$AppClassLoader@0x0001f070
0x8eb0  36996021535 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x94200190  36936016807 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x9e7bc6c8  36966001081 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0xa9d80008  35845530412 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0xf4103650  35815527354 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x901801f8  35815527354 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x942637c0  32315121176 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x96c2ec00  32315119662 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x8f60  32255116241 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x92700d48  32285112270 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
 25484424440 nulllive
0x96b77190  22343634602 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98

Next I took a heap dump:
sudo -u flink jmap -dump:format=b,file=/tmp/HeapDump.hprof 

Then, using Eclipse Memory Analyzer, I followed the steps from this blog
post:
http://java.jiderhamn.se/2011/12/11/classloader-leaks-i-how-to-find-classloader-leaks-with-eclipse-memory-analyser-mat/

The result of looking for strong references to classes in a dead classloader
is this tree:

Class Name  
   
| Shallow Heap | Retained Heap
---
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
@ 0x94200190|   88 |   616,992
'-  class
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean @ 0x94250cb0 

|0 | 0
   '-  org.apache.kafka.common.metrics.JmxReporter$KafkaMbean @
0xbae537e8   |   24 |   328
  '- object com.sun.jmx.mbeanserver.NamedObject @ 0xbace01e0
   
|   24 |24
 '- value java.util.HashMap$Node @ 0xbace0110   
   
|   32 |   232
'- [247] java.util.HashMap$Node[512] @ 0xfa0d08c0   

Re: Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-09 Thread Edward
I applied the change in the pull request associated with that Kafka bug, and
unfortunately it didn't resolve anything. It doesn't unregister any
additional MBeans which are created by Kafka's JmxRepository -- it is just a
fix to the remove mbeans from Kafka's cache of mbeans (i.e. it is doing
cleanup within the job's  ChildFirst classloader, not the bootstrap/App
classloader where the strong reference exists).



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-07 Thread Edward
We are using FlinkKafkaConsumer011 and FlinkKafkaProducer011, but we also
experienced the same behavior with FlinkKafkaConsumer010 and
FlinkKafkaProducer010.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Error with Avro/Kyro after upgrade to Flink 1.4

2018-01-22 Thread Edward
(resubmission of a previous post, since the stack trace didn't show up last
time)

We're attempting to upgrade our 1.3.2 cluster and jobs to 1.4.0. When 
submitting jobs to the 1.4.0 Kafka cluster, they fail with a Kryo 
registration error. 

My jobs are consuming from Kafka topics with messages in Avro format. The 
avro schemas are registered with a Confluent avro schema registry. For 
ingestion, we've been using the KafkaDeserializerWrapper class from this 
pull request: https://github.com/apache/flink/pull/2705

In the pom.xml, I added a new dependency for flink-avro, and upgraded all 
other maven dependencies to version 1.4.0 

Here's the error: 

java.lang.VerifyError: Bad type on operand stack
  Exception Details:
Location:


org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V
@23: invokespecial
  Reason:
Type
'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
(current frame, stack[7]) is not assignable to
'com/esotericsoftware/kryo/Serializer'
  Current Frame:
bci: @23
flags: { }
locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils',
'java/util/LinkedHashMap' }
stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6,
uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12,
'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList'
}
  Bytecode:
0x000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
0x010: bb00 0659 b700 0eb7 000f b700 10b6 0011
0x020: 57b1   

at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getConstructor(Class.java:1825)
at
org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:48)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:481)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.(KryoSerializer.java:119)
at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:90)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
at
org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
at
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:520)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:165)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:692)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

Here are the dependencies: 




org.apache.flink
flink-java
1.4



org.apache.flink
flink-streaming-java_2.11
1.4



org.apache.flink
flink-avro
1.4



org.apache.flink
flink-clients_2.11
1.4



org.apache.flink
flink-connector-filesystem_2.11
1.4



org.apache.flink
flink-connector-kafka-0.11_2.11
1.4



org.apache.flink
flink-metrics-statsd
1.4


   
io.confluent
kafka-avro-serializer
3.3.1










--
Sent from: 

Re: NoClassDefFoundError of a Avro class after cancel then resubmit the same job

2018-01-22 Thread Edward
Yes, we've seen this issue as well, though it usually takes many more
resubmits before the error pops up. Interestingly, of the 7 jobs we run (all
of which use different Avro schemas), we only see this issue on 1 of them.
Once the NoClassDefFoundError crops up though, it is necessary to recreate
the task managers.

There's a whole page on the Flink documentation on debugging classloading,
and Avro is mentioned several times on that page:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html

It seems that (in 1.3 at least) each submitted job has its own classloader,
and its own instance of the Avro class definitions. However, the Avro class
cache will keep references to the Avro classes from classloaders for the
previous cancelled jobs. That said, we haven't been able to find a solution
to this error yet. Flink 1.4 would be worth a try because the of the changes
to the default classloading behaviour (child-first is the new default, not
parent-first).





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error with Avro/Kyro after upgrade to Flink 1.4

2018-01-22 Thread Edward
Also, I'm not sure if this would cause the uninitialized error, but I did
notice that in the maven dependency tree there are 2 different versions of
kyro listed as Flink dependencies:
 flink-java 1.4 requires kyro 2.24, but flink-streaming-java_2.11 requires
kyro 2.21:

[INFO] +- org.apache.flink:flink-java:jar:1.4.0:compile
[INFO] |  +- org.apache.flink:flink-core:jar:1.4.0:compile
[INFO] |  |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile

[INFO] +- org.apache.flink:flink-streaming-java_2.11:jar:1.4.0:compile
[INFO] |  +- (org.apache.flink:flink-core:jar:1.4.0:compile - omitted for
duplicate)
[INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.4.0:compile
[INFO] |  |  +- com.twitter:chill_2.11:jar:0.7.4:compile
[INFO] |  |  |  +- com.twitter:chill-java:jar:0.7.4:compile
[INFO] |  |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
omitted for conflict with 2.24.0)
[INFO] |  |  |  \- (com.esotericsoftware.kryo:kryo:jar:2.21:compile -
omitted for conflict with 2.24.0)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Error with Avro/Kyro after upgrade to Flink 1.4

2018-01-23 Thread Edward
Thanks for the follow-up Stephan.

I have been running this job from a built jar file which was submitted to an
existing Flink 1.4 cluster, not from within the IDE. Interestingly, I am now
getting the same error when any of the following 3 conditions are true:
1. I run the job on a local cluster from within my IDE
2. I run the job on a cluster where "classloader.resolve-order:
parent-first"
3. I build the uber jar file without including flink-java,
flink-streaming-java and flink-clients (I changed those to "provided" as you
suggested, so they aren't in my jar)

If any of those 3 cases are true, I get a new NoClassDefFoundError. This
error is caused because com.fasterxml.jackson.databind.ObjectMapper is
present in flink-dist_2.11-1.4.0.jar, but
com.fasterxml.jackson.databind.SerializationConfig is not (only the shaded
version:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationConfig)

java.lang.NoClassDefFoundError: Could not initialize class
com.fasterxml.jackson.databind.SerializationConfig
at
com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:558)
at
com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:474)
at
com.mycom.datapipeline.common.client.UmsCientFactory.getUserMappingServiceClient(UmsCientFactory.java:31)
at
com.mycom.datapipeline.flink.udf.UserLookupFunctionBase.open(UserLookupFunctionBase.java:78)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

I understand why this is happening in the case of the parent-first
classloader, but I can't understand why it's happening when I exclude
flink-java from my job's uber jar file -- in that 2nd case, I would expect
the job's child classloader to be used, which contains both of those
fasterxml classes.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kafka offset auto-commit stops after timeout

2018-03-06 Thread Edward
Thanks for the reply, Nico.

I've been testing with OffsetCommitMode.ON_CHECKPOINTS, and I can confirm
that this fixes the issue -- even if a single commit time out when
communicating with Kafka, subsequent offset commits are still successful.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoints very slow with high backpressure

2018-04-05 Thread Edward
I read through this thread and didn't see any resolution to the slow
checkpoint issue (just that someone resolved their backpressure issue).

We are experiencing the same problem: 
- When there is no backpressure, checkpoints take less than 100ms
- When there is high backpressure, checkpoints take anywhere from 5 minutes
to 25 minutes.

This is preventing us from using the checkpointing feature at all, since
periodic backpressure is unavoidable.

We are experiencing this when running on Flink 1.4.0.
We are retaining only a single checkpoint, and the size of retained
checkpoint is less than 250KB, so there's not a lot of state.
   state.backend: jobmanager
   state.backend.async: true
   state.backend.fs.checkpointdir: hdfs://checkpoints
   state.checkpoints.num-retained: 1
   max concurrent checkpoints: 1
   checkpointing mode: AT_LEAST_ONCE

One other data point: if I rewrite the job to allow chaining all steps (i.e.
same parallelism on all steps, so they fit in 1 task slot), the checkpoints
are still slow under backpressure, but are an order of magnitude faster --
they take about 60 seconds rather than 15 minutes.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoints very slow with high backpressure

2018-04-05 Thread Edward
Thanks for the update Piotr.

The reason it prevents us from using checkpoints is this:
We are relying on the checkpoints to trigger commit of Kafka offsets for our
source (kafka consumers).
When there is no backpressure this works fine. When there is backpressure,
checkpoints fail because they take too long, and our Kafka offsets are never
committed to Kafka brokers (as we just learned the hard way).

Normally there is no backpressure in our jobs, but when there is some
outage, then the jobs do experience 
backpressure when catching up. And when you're already trying to recover
from an incident, that is not the ideal time for kafka offsets commits to
stop working.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Kafka offset auto-commit stops after timeout

2018-03-05 Thread Edward
We have noticed that the Kafka offset auto-commit functionality seems to stop
working after it encounters a timeout. It appears in the logs like this:

2018-03-04 07:02:54,779 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator kafka06:9092 (id: 2147483641 rack: null) dead for group
consumergroup01
2018-03-04 07:02:54,780 WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
Auto-commit of offsets {topic01-24=OffsetAndMetadata{offset=153237895,
metadata=''}} failed for group consumergroup01: Offset commit failed with a
retriable exception. You should retry committing offsets. The underlying
error was: The request timed out.

After this message is logged, no more offsets are committed by the job until
it is restarted (and if the flink process ends abnormally, the offsets never
get committed).

This is using Flink 1.4.0 which uses kafka-clients 0.11.0.2. We are using
the default kafka client settings for enable.auto.commit (true) and
auto.commit.interval.ms (5000). We are not using Flink checkpointing, so the
kafka client offset commit mode is OffsetCommitMode.KAFKA_PERIODIC (not
OffsetCommitMode.ON_CHECKPOINTS).

I'm wondering if others have encountered this?

And if so, does enabling checkpointing resolve the issue, because
Kafka09Fetcher.doCommitInternalOffsetsToKafka is called from the Flink code?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Python UDF's in DataStream API?

2020-09-16 Thread Edward
Is there any plan to allow Python UDF's within the DataStream API (as opposed
to an entire job defined in Python)? FLIP-130
  
discusses Python support for the DataStream API, but it's not clear whether
this will include the ability to have a single Python UDF (similar to 
FLIP-58

 
, but for DataSteam API, not Table API)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

HA Standalone Cluster configuration

2017-06-21 Thread Edward Buck
Questions about standalone cluster configuration:

  1.  Is it considered bad practice to have standby JobManagers co-located on 
the same machines as TaskManagers?
  2.  Is it considered bad practice to have zookeeper installed on the same 
machines as the JobManager leader and standby machines? (the docs say "In 
production setups, it is recommended to manage your own ZooKeeper 
installation.", but I'm assuming it's still okay to co-locate ZK on with 
JobManager?)
  3.  In another thread, I read that the rule of thumb for 
taskmanager.numberOfTaskSlots = number of cores. Doesn't this ignore cases 
where threads have a high proportion of idle time (i.e. waiting on an I/O 
call)? If the total number of task slot limits my degree of parallelism, but 
most parallel copies of a subtask are idle at any given time, it seems that I 
would want to have # of task slots equal to some multiple of the number of 
cores.

Thanks,
Edward


Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

2018-05-04 Thread Edward Rojas
Hello all,

We have a kafka consumer listening to a topic pattern "topic-*" with a
partition discovery interval.
We eventually add new topics and this is working perfectly, the consumer
discover the new topics (and partitions) and listen to them.

But we also remove topics eventually and in this case the consumer is not
updated. The consumer continue listen to the removed partitions *forever*
and we get logs like:


2018-05-04 11:32:11,462 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1154 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:11,965 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1156 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:12,468 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1158 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:12,970 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1160 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:13,473 WARN  org.apache.kafka.clients.NetworkClient
   
- Error while fetching metadata with correlation id 1162 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
...

This requests continue *forever* and the logs are shown several times per
second hiding other possible problems and it's using resources that could be
freed for other processing.

I think the partition discovery mechanism should be modified to take into
account not only new partitions but also removing no longer available
partitions.

What do you think ?

Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Regression: On Flink 1.5 CLI -m,--jobmanager option not working

2018-04-27 Thread Edward Rojas
I'm preparing to migrate my environment from Flink 1.4 to 1.5 and I found
this issue.

Every time I try to use the flink CLI with the -m option to specify the
jobmanager address, the CLI get stuck on "Waiting for response..." and  I
get the following error on the Jobmanager:

WARN  akka.remote.transport.netty.NettyTransport- Remote
connection to [/x.x.x.x:] failed with
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 1195725860 - discarded

I get the error even when I run it locally and try something like "flink
list -m localhost:6123". But "flink list" works as expected. 

I'm using the version from the "release-1.5" branch.

I tested on the tag release 1.5.0-rc1 and it's working as expected.

I did a /git bisect/ and it seems like the commit introducing the regression
is  47909f4
<https://github.com/apache/flink/commit/47909f466b9c9ee1f4caf94e9f6862a21b628817>
  

I created a JIRA ticket for this:
https://issues.apache.org/jira/browse/FLINK-9255.

Do you have any thoughts about it ?

Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink list -r shows CANCELED jobs - Flink 1.5

2018-05-17 Thread Edward Rojas
I forgot to add an example of the execution:

$ ./bin/flink list -r
Waiting for response...
-- Running/Restarting Jobs ---
17.05.2018 19:34:31 : edec969d6f9609455f9c42443b26d688 : FlinkAvgJob
(CANCELED)
17.05.2018 19:36:01 : bd87ffc35e1521806928d6251990d715 : FlinkAvgJob
(RUNNING)

Note that from the title it's supposed to return only Running or Restarting
Jobs.

I'm using this response on a script that updates a job by canceling with
savepoint... I just want to know if I have ti update my script :)

Thanks in advance



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


flink list -r shows CANCELED jobs - Flink 1.5

2018-05-17 Thread Edward Rojas
Hello all,

On Flink 1.5, the CLI returns the CANCELED jobs when requesting only the
running job by using the -r flag...
is this an intended behavior ?

On 1.4 CANCELED jobs does not appear when running this command.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RE: S3 for state backend in Flink 1.4.0

2018-01-31 Thread Edward Rojas
Hi,

We are having a similar problem when trying to use Flink 1.4.0 with IBM
Object Storage for reading and writing data. 

We followed
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html
and the suggestion on https://issues.apache.org/jira/browse/FLINK-851.

We put the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder
and we added the configuration on the flink-config.yaml:

s3.access-key: 
s3.secret-key: 
s3.endpoint: s3.us-south.objectstorage.softlayer.net 

With this we can read from IBM Object Storage without any problem when using
env.readTextFile("s3://flink-test/flink-test.txt");

But we are having problems when trying to write. 
We are using a kafka consumer to read from the bus, we're making some
processing and after saving  some data on Object Storage.

When using stream.writeAsText("s3://flink-test/data.txt").setParallelism(1);
The file is created but only when the job finish (or we stop it). But we
need to save the data without stopping the job, so we are trying to use a 
Sink.

But when using a BucketingSink, we get the error: 
java.io.IOException: No FileSystem for scheme: s3
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)


Do you have any idea how could we make it work using Sink?

Thanks,
Regards,

Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RE: S3 for state backend in Flink 1.4.0

2018-02-01 Thread Edward Rojas
Hi Hayden,

It seems like a good alternative. But I see it's intended to work with
spark, did you manage to get it working with Flink ?

I some tests but I get several errors when trying to create a file, either
for checkpointing or saving data. 

Thanks in advance,
Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 for state backend in Flink 1.4.0

2018-01-31 Thread Edward Rojas
Thanks Aljoscha.  That makes sense. 
Do you have a more specific date for the changes on BucketingSink and/or the
PR to be released ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 for state backend in Flink 1.4.0

2018-01-31 Thread Edward Rojas
Hi Aljoscha,

Thinking a little bit more about this, although IBM Object storage is
compatible with Amazon's S3, it's not an eventually consistent file system,
but rather immediately consistent. 

So we won't need the support for eventually consistent FS for our use case
to work, but we would only need that the BucketingSink uses the Flink
FileSystem abstraction instead of directly using the Hadoop FileSystem
abstraction.

Is this something that could be released earlier ?  Or do you have any idea
how we could achieve it ?

Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Secured communication with Zookeeper without Kerberos

2018-02-20 Thread Edward Rojas
Hi,

I'm setting up a Flink cluster on kubernetes with High Availability using
Zookeeper. It's working well so far without the security configuration for
zookeeper.

I need to have secured communication between Flink and zookeeper but I'd
like to avoid the need to setup a Kerberos server only for this.

Is there a way to configure secured communication between Flink and
Zookeeper without using kerberos ? With SSL for example ?

Thanks in advance!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


After OutOfMemoryError State can not be readed

2018-09-06 Thread Edward Rojas
Hello all,

We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend. 
When performing some load testing we got an /OutOfMemoryError: native memory
exhausted/, causing the job to fail and be restarted.

After the Taskmanager is restarted, the job is recovered from a Checkpoint,
but it seems that there is a problem when trying to access the state. We got
the error from the *onTimer* function of a *onProcessingTime*.

It would be possible that the OOM error could have caused to checkpoint a
corrupted state?

We get Exceptions like:

TimerException{java.lang.RuntimeException: Error while retrieving data from
RocksDB.}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
at java.util.concurrent.FutureTask.run(FutureTask.java:277)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:811)
Caused by: java.lang.RuntimeException: Error while retrieving data from
RocksDB.
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
... 7 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:208)
at java.io.DataInputStream.readUTF(DataInputStream.java:618)
at java.io.DataInputStream.readUTF(DataInputStream.java:573)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
... 12 more


Thanks in advance for any help




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Migration to Flip6 Kubernetes

2018-03-15 Thread Edward Rojas
Hello,

Currently I have a Flink 1.4 cluster running on kubernetes based on the
configuration describe on
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
with additional config for HA with Zookeeper.

With this I have several Taskmanagers, a single Jobmanager and I create a
container for each job to perform the Job submission and manage Job updates
with savepoints.


I'm looking into what would be needed to migrate to the new architecture on
FLIP6 as we are planning to use Flink 1.5 once it's ready. 

If I understand correctly from
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
and the current code on master:

* Taskmanagers would continue the same, i.e they will execute the
taskmanager.sh start-foreground  script, which with the flip6 mode activated
will execute the new taskexecutor.TaskManagerRunner.

* We will have now one Job Manager per Job which is really good; but I don't
fully understand how this would be started.
 
I notice that the jobmanager.sh with flip6 mode activated will execute
entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
pass the job jar and parameters (?)

So I think the other possibility to start the job would be via the /flink
run/ command with maybe an option to tell that we are creating a job with
job manager or would be this the default behaviour ?

Or would be this the role of the JobMaster ? I didn't take a look to its
code but it's mentioned on the flip6 page. (however I don't see an
entrypoint from the scripts (?))

Could you help me to understand how this is expected to be done ?


* Also I'm not sure to understand whether it would be better to have a
ResourceManager per job or a single ResourceManager per cluster, as in the
page is stated that there is a ResourceManager for
Self-contained-single-job, but it seems to me that it needs to have the
information about all JobManagers and TaskManagers (?)


Thanks in advance for the help you could provide.

I'm interested in using Flip6 on kubernetes when it will be ready, so I
could help with some testing if needed. 

--
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Migration to Flip6 Kubernetes

2018-03-21 Thread Edward Rojas
Hi Till,

Thanks for the information. We are using the session cluster and is working
quite good, but we would like to benefit from the new approach of per-job
mode in order to have a better control over the jobs that are running on the
cluster. 

I took a look to the YarnJobClusterEntrypoint and I see now how this planned
to be done, but if I understand correctly, in the current state there is not
possible to start a Job cluster on kubernetes as there is only concrete
implementation for Yarn and mesos?

The objective being to have a Flink cluster running on per-job mode and able
to execute several self-contained jobs, I imagine the idea would be also to
have a Kubernetes specific implementation of the ResourceManager that would
be initialized along the TaskManagers and would be listening for the
"self-contained jobs" to join, assign resources and start the execution of
the specific job, each one with its own JobManager?

Is my understanding correct? 
Is the per-job mode on kubernetes planned to be included on 1.5 ?

Regards,
Edward




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


BucketingSink vs StreamingFileSink

2018-11-16 Thread Edward Rojas
Hello,
We are currently using Flink 1.5 and we use the BucketingSink to save the
result of job processing to HDFS.
The data is in JSON format and we store one object per line in the resulting
files. 

We are planning to upgrade to Flink 1.6 and we see that there is this new
StreamingFileSink,  from the description it looks very similar to
BucketingSink when using Row-encoded Output Format, my question is, should
we consider to move to StreamingFileSink?

I would like to better understand what are the suggested use cases for each
of the two options now (?)

We are also considering to additionally output the data in Parquet format
for data scientists (to be stored in HDFS as well), for this I see some
utils to work with StreamingFileSink, so I guess for this case it's
recommended to use that option(?).
Is it possible to use the Parquet writers even when the schema of the data
may evolve ?

Thanks in advance for your help.
(Sorry if I put too many questions in the same message)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to migrate Kafka Producer ?

2018-12-18 Thread Edward Rojas
Hi,

I'm planning to migrate from kafka connector 0.11 to the new universal kafka
connector 1.0.0+ but I'm having some troubles.

The kafka consumer seems to be compatible but when trying to migrate the
kafka producer I get an incompatibility error for the state migration. 
It looks like the producer uses a list state of type
"NextTransactionalIdHint", but this class is specific for each Producer
(FlinkKafkaProducer011.NextTransactionalIdHint  vs
FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are not
compatible.


I would like to know what is the recommended way to perform this kind of
migration without losing the state ?

Thanks in advance,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to migrate Kafka Producer ?

2019-01-07 Thread Edward Rojas
Hi Piotr,

Thank you for looking into this. 
Do you have an idea when next version (1.7.2) will be available ?

Also, could you validate / invalidate the approach I proposed in the
previous comment ?


Edward Rojas wrote
> Regarding the kafka producer I am just updating the job with the new
> connector and removing the previous one and upgrading the job by using a
> savepoint and the --allowNonRestoredState. 
> So far my tests with this option are successful.

Is there any risk of using this approach and just ignore the state of the
previous Producer ?


Thanks again for your help.

Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Can checkpoints be used to migrate jobs between Flink versions ?

2019-01-09 Thread Edward Rojas
Hello,

For upgrading jobs between Flink versions I follow the guide in the doc
here:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/upgrading.html#upgrading-the-flink-framework-version

It states that we should always use savepoints for this procedure, I
followed it and it works perfectly. 

I just would like to know if there is a reason why is not advised to use
checkpoints for this procedure.

Say for example that the job has externalized checkpoints with
RETAIN_ON_CANCELLATION policy, one could cancel the job before the upgrade
and use the retained checkpoint to restart the job from it once the Flink
cluster is upgraded... or maybe I'm missing something ?

I performed some tests and we are able to upgrade using checkpoint, by
passing the checkpoint path in the "flink run -s" parameter.

Could you help to clarify if this is advised (and supported) or we should
stick to the use of savepoints for this kind of manipulations ?


Thanks in advance for your help.

Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RE: state access causing segmentation fault

2020-10-12 Thread Colletta, Edward
Thanks Arvid,

I added static to ExecQueue and this did fix the problem.  I tested without 
static on RingBufferExec because it seems that if ExecQueue is static nested, 
there should be no reference to the MyKeyedProcessFunction object as 
RingBufferExec is an inner class of ExecQueue.

However, I did that just for the test.  For my prod code, going forward,  I am 
following flink’s rules for POJO types, adding static to any inner class,  and 
checking for any POJO warnings in the logs.


From: Arvid Heise 
Sent: Sunday, October 11, 2020 3:46 PM
To: Colletta, Edward 
Cc: Dawid Wysakowicz ; user@flink.apache.org
Subject: Re: state access causing segmentation fault

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

could you try adding the static keyword to ExecQueue and RingBufferExec? As is 
they hold a reference to the MyKeyedProcessFunction, which has unforeseen 
consequences.

On Sun, Oct 11, 2020 at 5:38 AM Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:
Tried to attach tar file but it got blocked.   Resending with files attached 
individually.


Ok, have minimal reproducible example.   Attaching a tar file of the job that 
crashed.

The crash has nothing to do with the number of state variables.  But it does 
seem to be caused by using a type for the state variable that is a class nested 
in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class 
(ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the 
ExecQueue definition to its own file fixed the problem.



The attached example always crashes  the taskManager in 30 seconds to 5 minutes.



MyKeyedProcessFunction.java  and also cut and pasted here:



package crash;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import 
org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;



public class MyKeyedProcessFunction extends KeyedProcessFunction {

private static final Logger LOG = 
LoggerFactory.getLogger(MyKeyedProcessFunction.class);

public TypeInformation leftTypeInfo;

public transient ValueState leftState;



public int initQueueSize;

public long emitFrequencyMs;



public MyKeyedProcessFunction() {

initQueueSize = 10;

emitFrequencyMs = 1;

}



@Override

public void open(Configuration conf) {

leftTypeInfo = TypeInformation.of(new TypeHint(){});

leftState = getRuntimeContext().getState(

new ValueStateDescriptor<>("left", leftTypeInfo, null));

}



@Override

public void processElement(Exec leftIn, Context ctx, Collector out) {

try {

ExecQueue eq = leftState.value();

if (eq == null) {

eq = new ExecQueue(10);


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

leftState.update(eq);

}

catch (Exception e) {

LOG.error("Exception in processElement1. Key: " + 
ctx.getCurrentKey() + ". " + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());



}

}





@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) {

try {

ExecQueue eq = leftState.value();


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

catch ( Exception e) {

LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". 
" + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());

}

}

public class ExecQueue {

public RingBufferExec queue;

public ExecQueue (){}

public ExecQueue (int initSize) {

queue = new RingBufferExec(initSize);

}



public class RingBufferExec {

public Integer size;

public Integer count;

public RingBufferExec(){ }

public RingBufferExec(int sizeIn){

size = sizeIn;

count = 0;

}

}

}

}


From: Dawid Wysakowicz mailto:dwysakow...@apache.org>>
Sen

RE: state access causing segmentation fault

2020-10-10 Thread Colletta, Edward
Tried to attach tar file but it got blocked.   Resending with files attached 
individually.


Ok, have minimal reproducible example.   Attaching a tar file of the job that 
crashed.

The crash has nothing to do with the number of state variables.  But it does 
seem to be caused by using a type for the state variable that is a class nested 
in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class 
(ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the 
ExecQueue definition to its own file fixed the problem.



The attached example always crashes  the taskManager in 30 seconds to 5 minutes.



MyKeyedProcessFunction.java  and also cut and pasted here:



package crash;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import 
org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;



public class MyKeyedProcessFunction extends KeyedProcessFunction {

private static final Logger LOG = 
LoggerFactory.getLogger(MyKeyedProcessFunction.class);

public TypeInformation leftTypeInfo;

public transient ValueState leftState;



public int initQueueSize;

public long emitFrequencyMs;



public MyKeyedProcessFunction() {

initQueueSize = 10;

emitFrequencyMs = 1;

}



@Override

public void open(Configuration conf) {

leftTypeInfo = TypeInformation.of(new TypeHint(){});

leftState = getRuntimeContext().getState(

new ValueStateDescriptor<>("left", leftTypeInfo, null));

}



@Override

public void processElement(Exec leftIn, Context ctx, Collector out) {

try {

ExecQueue eq = leftState.value();

if (eq == null) {

eq = new ExecQueue(10);


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

leftState.update(eq);

}

catch (Exception e) {

LOG.error("Exception in processElement1. Key: " + 
ctx.getCurrentKey() + ". " + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());



}

}





@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) {

try {

ExecQueue eq = leftState.value();


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

catch ( Exception e) {

LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". 
" + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());

}

}

public class ExecQueue {

public RingBufferExec queue;

public ExecQueue (){}

public ExecQueue (int initSize) {

queue = new RingBufferExec(initSize);

}



public class RingBufferExec {

public Integer size;

public Integer count;

public RingBufferExec(){ }

public RingBufferExec(int sizeIn){

size = sizeIn;

count = 0;

}

}

}

}


From: Dawid Wysakowicz mailto:dwysakow...@apache.org>>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: state access causing segmentation fault


Hi,

It should be absolutely fine to use multiple state objects. I am not aware of 
any limits to that. A minimal, reproducible example would definitely be 
helpful. For those kind of exceptions, I'd look into the serializers you use. 
Other than that I cannot think of an obvious reason for that kind of exceptions.

Best,

Dawid
On 08/10/2020 12:12, Colletta, Edward wrote:
Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 
instances.

I have a KeyedProcessFunction that is causing a segmentation fault, crashing 
the flink task manager.  The seems to be caused by using 3 State variables in 
the operator.  The crash happens consistently after some load is processed.
This is the second time I have encountered this.   The first time I had 3 
ValueState variables, this time I had 2 ValueState variabl

state access causing segmentation fault

2020-10-08 Thread Colletta, Edward
Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 
instances.

I have a KeyedProcessFunction that is causing a segmentation fault, crashing 
the flink task manager.  The seems to be caused by using 3 State variables in 
the operator.  The crash happens consistently after some load is processed.
This is the second time I have encountered this.   The first time I had 3 
ValueState variables, this time I had 2 ValueState variables and a MapState 
variable.  Both times the error was alleviated by removing one of the state 
variables.
This time I replaced the 2 valueState variables with a Tuple2 of the types of 
the individual variables.   I can try to put together a minimal example, but I 
was wondering if anyone has encountered this problem.

Are there any documented limits of the number of state variables 1 operator can 
use?

For background the reason I use multiple state variables is the operator is 
processing 2 types of inputs, Left and Right.  When Left is received it is put 
it into a PriorityQueue. When the Right type is received I put that into a ring 
buffer.
I replaced the PriorityQueue with a queue of Ids and MapState to hold the 
elements.  So I have Left stored in a queue ValueState variable and MapState 
variable, and Right is stored in the ring buffer ValueState variable.




RE: RE: checkpointing seems to be throttled.

2020-12-24 Thread Colletta, Edward
FYI, this was an EFS issue.  I originally dismissed EFS being the issue because 
the Percent I/O limit metric  was very low.  But I later noticed the throughput 
utilization was very high.  We increased the provisioned throughput and the 
checkpoint times are greatly reduced.

From: Colletta, Edward
Sent: Monday, December 21, 2020 12:32 PM
To: Yun Gao ; user@flink.apache.org
Subject: RE: RE: checkpointing seems to be throttled.

Doh!   Yeah, we set the state backend in code and I read the flink-conf.yaml 
file and use the high-availability storage dir.


From: Yun Gao mailto:yungao...@aliyun.com>>
Sent: Monday, December 21, 2020 11:28 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: RE: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun


--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao mailto:yungao...@aliyun.com>>, 
user@flink.apache.or

checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots 
is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The 
checkpoint timeout is the default 10 minutes.   This does not seem to be 
related to EFS limits/throttling .  We started experiencing these timeouts 
after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which 
cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is using 
high-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout:
 60 min

execution.checkpointing.tolerable-failed-checkpoints:12

execution.checkpointing.unaligned
  true
and also explicitly set
state.checkpoints.dir



RE: RE: checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Doh!   Yeah, we set the state backend in code and I read the flink-conf.yaml 
file and use the high-availability storage dir.


From: Yun Gao 
Sent: Monday, December 21, 2020 11:28 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: RE: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun


--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao mailto:yungao...@aliyun.com>>, 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject:RE: checkpointing seems to be throttled.
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao mailto:yungao...@aliyun.com>>
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org<mailto:user@flink

RE: checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao 
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env then it might better to switch to 
statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 
1.9.2. There may be different issues for checkpoint timeout, and one possible 
one might be there are back-pressure due to some operator could not process its 
records  in time, which would block the checkpoints. I think you might check 
the back-pressure [1] first, and if there is indeed back pressure, then you 
might try unaligned checkpoints or solve the back pressure by increasing the 
parallelism of slow operators.

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html



--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject:checkpointing seems to be throttled.
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots 
is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The 
checkpoint timeout is the default 10 minutes.   This does not seem to be 
related to EFS limits/throttling .  We started experiencing these timeouts 
after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which 
cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is using 
high-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout>:
 60 min

execution.checkpointing.tolerable-failed-checkpoints<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-tolerable-failed-checkpoints>:12

execution.checkpointing.unaligned<https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-unaligned>
  true
and also explicitly set
state.checkpoints.dir



RE: a couple of memory questions

2020-11-05 Thread Colletta, Edward
Thanks you for the response.   We do see the heap actually shrink after 
starting new jobs.


From: Matthias Pohl 
Sent: Thursday, November 5, 2020 8:20 AM
To: Colletta, Edward 
Cc: user@flink.apache.org
Subject: Re: a couple of memory questions

This email is from an external source - exercise caution regarding links and 
attachments.

Hello Edward,
please find my answers within your message below:

On Wed, Nov 4, 2020 at 1:35 PM Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:
Using Flink 1.9.2 with FsStateBackend, Session cluster.


  1.  Does heap state get cleaned up when a job is cancelled?

We have jobs that we run on a daily basis.  We start each morning and cancel 
each evening.  We noticed that the process size does not seem to shrink.  We 
are looking at the resident size of the process with ps and also the USED 
column for Heap on the taskmanager page of the flink dashboard.
There is no explicit cleanup happening on the Flink side. The heap should be 
cleaned up when GC kicks in.

  1.  How can I examine the usage of Flink Managed Memory?

 The configuration documentation seems to indicate this is used for batch jobs, 
and we are only using the Streaming API.   I reduced 
taskmanager.memory.fraction to 0.3, but I think this is still reserving too 
much memory to an area we will not be using.
Unfortunately, I don't know of any way to monitor the managed memory for Flink 
1.9.2 as is. We're going to introduce new metrics for managed memory [1], 
network memory [2] and metaspace [3] in the upcoming release of Flink 1.12.0. 
This should make it easier to monitor these memory pools.

I hope that helps a bit.
Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-14406
[2] https://issues.apache.org/jira/browse/FLINK-14422
[3] https://issues.apache.org/jira/browse/FLINK-19617


a couple of memory questions

2020-11-04 Thread Colletta, Edward
Using Flink 1.9.2 with FsStateBackend, Session cluster.


  1.  Does heap state get cleaned up when a job is cancelled?

We have jobs that we run on a daily basis.  We start each morning and cancel 
each evening.  We noticed that the process size does not seem to shrink.  We 
are looking at the resident size of the process with ps and also the USED 
column for Heap on the taskmanager page of the flink dashboard.

  1.  How can I examine the usage of Flink Managed Memory?

 The configuration documentation seems to indicate this is used for batch jobs, 
and we are only using the Streaming API.   I reduced 
taskmanager.memory.fraction to 0.3, but I think this is still reserving too 
much memory to an area we will not be using.




RE: Event trigger query

2021-02-02 Thread Colletta, Edward
You can use a tumbling processing time window with an offset of 13 hours + your 
time zone offset.

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#tumbling-windows
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.html


From: Abhinav Sharma 
Sent: Tuesday, February 2, 2021 5:27 AM
To: user@flink.apache.org
Subject: Event trigger query

This email is from an external source - exercise caution regarding links and 
attachments.

Newbie question: How can I set triggers to stream which execute according to 
system time? Eg: I want to sum the elements of streams at 1PM everyday.


TaskManager crash. Zookeeper timeout

2021-01-27 Thread Colletta, Edward
Using flink 11.2 on java 11, session cluster with 16 jobs running on aws ecs 
instances.  Cluster has 3 JMs and 3 TMs, separate zookeeper cluster has 3 nodes.

One of our taskmanagers crashed today with what seems to be rooted in a 
zookeeper timeout.   We are wondering if there is any tuning that might cause 
this timeout.  Any help will be greatly appreciated.

The first sign of trouble in the log is the following:

2021-01-27 11:16:39,795 WARN  
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Client 
session timed out, have not heard from server in 34951ms for sessionid 
0x140c01570036
2021-01-27 11:16:39,795 INFO  
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Client 
session timed out, have not heard from server in 34951ms for sessionid 
0x140c01570036, closing socket connection and attempting reconnect
2021-01-27 11:16:39,897 INFO  
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
 [] - State change: SUSPENDED
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 7613291aea3f4892a0deed0e7036e229 with leader id 
8959b1fb00fdd4e3d28daade48204e1f lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 3230dacf7fa0b8b8f9fe1c77ebdde2bb with leader id 
bccda87aa8ab14f23e98a4b6d2bf4081 lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 8f2ee940006ebb6d8f6d12e3db917da3 with leader id 
b72d64c2ec112d96cc3b93697d85478d lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job aaec26e3924e81c12bd5a6d71f6c0d77 with leader id 
8d91fefd14539d11d60a16e0e5cd45b1 lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 2d5f912867ff70a58638aff51c7f6f33 with leader id 
b24724d3e03bee3486fdc5dc616b4a9c lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 29eb631a7a07aa6b2c0224972b9937bb with leader id 
8479de79b7eda73fca6593da93c04027 lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job bc7688332e73f330f08c95428630b99e with leader id 
a541d5eb3b60d29afc3a16cab2f742e7 lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job a70b0023b705c39fa66f47f1a666b65d with leader id 
a0bfc94c9ff40689a7143396cafe4ac7 lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 4c929f573971b8520a76ee1dfe5c3e35 with leader id 
922675f382f87225300696bae21841cc lost leadership.
2021-01-27 11:16:39,970 WARN  

RE: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-01-30 Thread Colletta, Edward
“but I'm not aware of any similar issue reported since the upgrading”
For the record, we experienced this same error on Flink 1.11.2 this past week.

From: Xintong Song 
Sent: Friday, January 29, 2021 7:34 PM
To: user 
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or 
"ResourceManager leader changed to new address null"

This email is from an external source - exercise caution regarding links and 
attachments.



Thank you~

Xintong Song


On Sat, Jan 30, 2021 at 8:27 AM Xintong Song 
mailto:tonysong...@gmail.com>> wrote:
There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not aware of 
any similar issue reported since the upgrading.
I would suggest the following:
- Turn on the DEBUG log see if there's any valuable details
- Maybe try asking in the Apache Zookeeper community, see if this is a known 
issue.

Thank you~
Xintong Song



Thank you~

Xintong Song

On Sat, Jan 30, 2021 at 6:47 AM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
Hi, Xintong

Thanks for replying. Could it relate to the zk version? We are a platform team 
at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9 and 1.11 
jobs talking to the same ZK for JM HA. This problem only surfaced in 1.11 jobs. 
That's why we think it is related to version upgrade.

Best
Lu

On Thu, Jan 28, 2021 at 7:56 PM Xintong Song 
mailto:tonysong...@gmail.com>> wrote:
The ZK client side uses 15s connection timeout and 60s session timeout in 
Flink. There's nothing similar to a heartbeat interval configured, which I 
assume is up to ZK's internal implementation. These things have not changed in 
FLink since at least 2017.

If both ZK client and server complain about timeout, and there's no gc issue 
spotted, I would consider a network instability.


Thank you~

Xintong Song


On Fri, Jan 29, 2021 at 3:15 AM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
After checking the log I found the root cause is zk client timeout on TM:
```
2021-01-25 14:01:49,600 WARN 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client 
session timed out, have not heard from server in 40020ms for sessionid 
0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client 
session timed out, have not heard from server in 40020ms for sessionid 
0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
 - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id 
b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - 
Attempting to fail task externally Sink: 
USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) 
(d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: 
USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) 
(d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 
27ac39342913d29baac4cde13062c4a4 lost the leadership.
```

I checked that TM gc log, no gc issues. it also shows client timeout in 
zookeeper server log. How frequently the zk client sync with server side in 
flink? The log says client doesn't heartbeat to server for 40s. Any help? 
thanks!

Best
Lu


On Thu, Dec 17, 2020 at 6:10 PM Xintong Song 
mailto:tonysong...@gmail.com>> wrote:
I'm not aware of any significant changes to the HA components between 1.9/1.11.
Would you mind sharing the complete jobmanager/taskmanager logs?


Thank you~

Xintong Song


On Fri, Dec 18, 2020 at 8:53 AM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
Hi, Xintong

Thanks for replying and your suggestion. I did check the ZK side but there is 
nothing interesting. The error message actually shows that only one TM thought 
JM lost leadership while others ran fine. Also, this happened only after we 
migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11?

Best
Lu

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song 
mailto:tonysong...@gmail.com>> wrote:
Hi Lu,

I assume you are using ZooKeeper as the HA service?

A common cause of unexpected leadership lost is the instability of HA service. 
E.g., if ZK does not receive heartbeat from 

Re: Writing to Avro from pyflink

2021-04-26 Thread Edward Yang
Hi Dian,

Thanks for trying it out, it ruled out a problem with the python code. I
double checked the jar path and only included the jar you referenced
without any luck. However, I tried creating a python 3.7 (had 3.8)
environment for pyflink and the code worked without any errors!


On Sun, Apr 25, 2021, 10:09 PM Dian Fu  wrote:

> Hi Eddie,
>
> I have tried your program with the following changes and it could execute
> successfully:
> - Replace `rf"
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar”`
> with rf`"
> file:///Users/dianfu/code/src/apache/flink/flink-sql-avro-1.12.3.jar”`
> - Use flink-sql-avro-1.12.3.jar [1] instead of flink-sql-avro-1.12.2.jar
> as I encountered issue FLINK-21012 [2] which has been addressed in 1.12.3
>
> For your problem, I suspect if `
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar` really exists. Could
> you double check that?
>
> [1]
> https://repository.apache.org/content/repositories/orgapacheflink-1419/org/apache/flink/flink-sql-avro/1.12.3/flink-sql-avro-1.12.3.jar
> [2] https://issues.apache.org/jira/browse/FLINK-21012
>
> Regards,
> Dian
>
> 2021年4月25日 下午11:56,Edward Yang  写道:
>
> Hi Dian,
>
> I tried your suggestion but had the same error message unfortunately. I
> also tried file:/ and file:// with the same error, not sure what's going
> on, I assume writing to avro works fine in java and scala?
>
> Eddie
>
> On Sat, Apr 24, 2021 at 10:03 PM Dian Fu  wrote:
>
>> I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar
>> . Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try
>> again?
>>
>> Regards,
>> Dian
>>
>> 2021年4月24日 上午8:29,Edward Yang  写道:
>>
>> I've been trying to write to the avro format with pyflink 1.12.2 on
>> ubuntu, I've tested my code with an iterator writing to csv and everything
>> works as expected. Reading through the flink documentation I see that I
>> should add jar dependencies to work with avro. I downloaded three jar files
>> that I believe are required for avro like so:
>>
>> table_env\
>> .get_config()\
>> .get_configuration()\
>> .set_string(
>> "pipeline.jars",
>> rf"
>> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar
>> "
>> )
>>
>> I suspect I'm not loading the jar files correctly, but it's unclear what
>> I'm supposed to do as I'm not familiar with java and when I switch the sink
>> format to avro I get some unexpected errors:
>>
>> Py4JJavaError: An error occurred while calling o746.executeInsert.
>> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>>  at 
>> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>>  at 
>> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>>  at 
>> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>>  at 
>> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>>  at 
>> scala.collection.Traversabl

Re: Writing to Avro from pyflink

2021-04-25 Thread Edward Yang
Hi Dian,

I tried your suggestion but had the same error message unfortunately. I
also tried file:/ and file:// with the same error, not sure what's going
on, I assume writing to avro works fine in java and scala?

Eddie

On Sat, Apr 24, 2021 at 10:03 PM Dian Fu  wrote:

> I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar
> . Could you remove flink-avro-1.12.2.jar and avro-1.10.2.jar and try
> again?
>
> Regards,
> Dian
>
> 2021年4月24日 上午8:29,Edward Yang  写道:
>
> I've been trying to write to the avro format with pyflink 1.12.2 on
> ubuntu, I've tested my code with an iterator writing to csv and everything
> works as expected. Reading through the flink documentation I see that I
> should add jar dependencies to work with avro. I downloaded three jar files
> that I believe are required for avro like so:
>
> table_env\
> .get_config()\
> .get_configuration()\
> .set_string(
> "pipeline.jars",
> rf"
> file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar
> "
> )
>
> I suspect I'm not loading the jar files correctly, but it's unclear what
> I'm supposed to do as I'm not familiar with java and when I switch the sink
> format to avro I get some unexpected errors:
>
> Py4JJavaError: An error occurred while calling o746.executeInsert.
> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>   at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
>   at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.

Writing to Avro from pyflink

2021-04-23 Thread Edward Yang
I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu,
I've tested my code with an iterator writing to csv and everything works as
expected. Reading through the flink documentation I see that I should add
jar dependencies to work with avro. I downloaded three jar files that I
believe are required for avro like so:

table_env\
.get_config()\
.get_configuration()\
.set_string(
"pipeline.jars",

rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
)

I suspect I'm not loading the jar files correctly, but it's unclear what
I'm supposed to do as I'm not familiar with java and when I switch the sink
format to avro I get some unexpected errors:

Py4JJavaError: An error occurred while calling o746.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
at 
org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
at 
org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
at 
org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter
at 

RE: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-03-27 Thread Colletta, Edward

FYI, we experience a similar error again, lost leadership but not due to 
timeout but a disconnect from zookeeper.  This time I examined logs for other 
errors related to zookeeper and found the kafka cluster that uses the same 
zookeeper also was disconnected.

We run on AWS and this seems to be AWS related.


From: Xintong Song 
Sent: Sunday, January 31, 2021 9:23 PM
To: user 
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or 
"ResourceManager leader changed to new address null"

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Colletta,

This error is kind of expected if the JobMaster / ResourceManager does not 
maintain a stable connection to the ZooKeeper service, which may be caused by 
network issues, GC pause, or unstable ZK service availability.

By "similar issue", what I meant is I'm not aware of any issue related to the 
upgrading of the ZK version that may cause the leadership loss.


Thank you~

Xintong Song


On Sun, Jan 31, 2021 at 4:14 AM Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:
“but I'm not aware of any similar issue reported since the upgrading”
For the record, we experienced this same error on Flink 1.11.2 this past week.

From: Xintong Song mailto:tonysong...@gmail.com>>
Sent: Friday, January 29, 2021 7:34 PM
To: user mailto:user@flink.apache.org>>
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or 
"ResourceManager leader changed to new address null"

This email is from an external source - exercise caution regarding links and 
attachments.



Thank you~

Xintong Song


On Sat, Jan 30, 2021 at 8:27 AM Xintong Song 
mailto:tonysong...@gmail.com>> wrote:
There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not aware of 
any similar issue reported since the upgrading.
I would suggest the following:
- Turn on the DEBUG log see if there's any valuable details
- Maybe try asking in the Apache Zookeeper community, see if this is a known 
issue.

Thank you~
Xintong Song


Thank you~

Xintong Song

On Sat, Jan 30, 2021 at 6:47 AM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
Hi, Xintong

Thanks for replying. Could it relate to the zk version? We are a platform team 
at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9 and 1.11 
jobs talking to the same ZK for JM HA. This problem only surfaced in 1.11 jobs. 
That's why we think it is related to version upgrade.

Best
Lu

On Thu, Jan 28, 2021 at 7:56 PM Xintong Song 
mailto:tonysong...@gmail.com>> wrote:
The ZK client side uses 15s connection timeout and 60s session timeout in 
Flink. There's nothing similar to a heartbeat interval configured, which I 
assume is up to ZK's internal implementation. These things have not changed in 
FLink since at least 2017.

If both ZK client and server complain about timeout, and there's no gc issue 
spotted, I would consider a network instability.


Thank you~

Xintong Song


On Fri, Jan 29, 2021 at 3:15 AM Lu Niu 
mailto:qqib...@gmail.com>> wrote:
After checking the log I found the root cause is zk client timeout on TM:
```
2021-01-25 14:01:49,600 WARN 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client 
session timed out, have not heard from server in 40020ms for sessionid 
0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client 
session timed out, have not heard from server in 40020ms for sessionid 
0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
 - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id 
b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
- Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - 
Attempting to fail task externally Sink: 
USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) 
(d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: 
USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) 
(d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager resp

uniqueness of name when constructing a StateDescriptor

2021-03-11 Thread Colletta, Edward
The documentation for ValueStateDescriptor documents the name parameter as - 
"name - The (unique) name for the state."
What is the scope of the uniqueness?  Unique within an RichFunction instance? 
Unique withing job? Unique within a session cluster?

I ask because I have several jobs that use a KeyedProcessFunction or 
RIchFlatMapFunction in a loop.   Something like
   for (int t:cfg.intervals)
stream
.process(new FuncA(t))
.flatMap(new FuncB())

Where FuncA has something like the following in its open method.
 state = getRuntimeContext().getState(new 
ValueStateDescriptor<>("funcAState", stateTypeInfo, null));

The code seems to work fine, but I am wondering If I need to pass something in 
the constructor to make the state name unique for each instance created in the 
loop.

Thanks,
Eddie Colletta


RE: uniqueness of name when constructing a StateDescriptor

2021-03-15 Thread Colletta, Edward
Thank you.   

-Original Message-
From: Tzu-Li (Gordon) Tai  
Sent: Monday, March 15, 2021 3:05 AM
To: user@flink.apache.org
Subject: Re: uniqueness of name when constructing a StateDescriptor

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.


Hi,

The scope is per individual operator, i.e. a single KeyedProcessFunction 
instance cannot have multiple registered state with the same name.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink 1.11 FlinkKafkaConsumer not propagating watermarks

2021-04-14 Thread Edward Bingham
Hi everyone,

I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some
Flink processors using Flink 1.12, and tried to get them working on Amazon
EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I
went to downgrade, I found, inexplicably, that watermarks were no longer
propagating.

There is only one partition on the topic, and parallelism is set to 1. Is
there something I'm missing here? I feel like I'm going a bit crazy.

I've cross-posted this on stackoverflow, but I figure the mailing list is
probably a better avenue for this question.

Thanks,
Ned


Here's the output for Flink 1.12 (correctly propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
  "nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
  }, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
  "id" : 1,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 8640
Source [timestamp=8640 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 86400
Source [timestamp=86400 watermark=0] "test message"
Emitting watermark 77760
Assigning timestamp 864000
Source [timestamp=864000 watermark=77760] "test message"
Emitting watermark 855360
Assigning timestamp 864
Source [timestamp=864 watermark=855360] "test message"
Emitting watermark 8631360
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=8631360] "test message"
Emitting watermark 9223372036768375807

And here is the output for Flink 1.11 (not propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
  "nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
  }, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
  "id" : 1,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 8640
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 86400
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 77760
Assigning timestamp 864000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 855360
Assigning timestamp 864
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 8631360
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 9223372036768375807

Here's the integration test that exposes it:

package mytest;
import com.fasterxml.jackson.core.JsonProcessingException;import
com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream;import java.io.InputStream;import
java.io.IOException;
import java.nio.file.Files;import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Arrays;import
java.util.concurrent.CompletableFuture;import
java.util.concurrent.TimeUnit;import java.util.Date;import
java.util.HashMap;import java.util.Map;import java.util.Properties;
import kafka.server.KafkaConfig;import kafka.server.KafkaServer;
import kafka.utils.MockTime;import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.flink.api.common.eventtime.TimestampAssigner;import
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import
org.apache.flink.api.common.eventtime.Watermark;import
org.apache.flink.api.common.eventtime.WatermarkGenerator;import
org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import
org.apache.flink.api.common.eventtime.WatermarkOutput;import
org.apache.flink.api.common.eventtime.WatermarkStrategy;import
org.apache.flink.api.common.JobExecutionResult;import

question on ValueState

2021-02-07 Thread Colletta, Edward
Using FsStateBackend.

I was under the impression that ValueState.value will serialize an object which 
is stored in the local state backend, copy the serialized object and 
deserializes it.  Likewise update() would do the same steps copying the object 
back to local state backend.And as a consequence, storing collections in 
ValueState is much less efficient than using ListState or MapState if possible.

However, I am looking at some code I wrote a while ago which made the 
assumption that the value() method just returned a reference to the object.  
The code only calls update() when creating the object if value() returns null.  
  Yet the code works, all changes to the object stored in state are visible the 
next time value() is called.   I have some sample code below.

Can someone clarify what really happens when value() is called?


   public void processElement(M in, Context ctx, Collector out) throws 
Exception {
MyWindow myWindow;
myWindow = windowState.value();
if (myWindow == null) {

ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);
myWindow = new MyWindow(0L, slide, windowSize);
windowState.update(myWindow);
myWindow.eq.add(0L);
}

myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() + 
in.value);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {

ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);
MyWindow myWindow = windowState.value();
myWindow.slide(0L);
out.collect(myWindow.globalAccum);
}




RE: failures during job start

2021-08-20 Thread Colletta, Edward
Thanks, will try that.

From: Chesnay Schepler 
Sent: Friday, August 20, 2021 8:06 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: failures during job start

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

I don't think there are any metrics; logging-wise you will need to do some 
detective work.

We do know which tasks have started deployment by this message from the 
JobManager:
ExecutionGraph [] -  (/) () 
switched from SCHEDULED to DEPLOYING.

We also know which have completed deployment by this message from the 
JobManager:
ExecutionGraph [] -  (/) () 
switched from DEPLOYING to RUNNING.

So what I would do is pick the task that failed with the 
PartitionNotFoundException, then figure out from the application from which 
tasks it consumes data, then check which of these have not finished deployment.


On 19/08/2021 22:34, Colletta, Edward wrote:
Thanks you.   I am going to try the first option for now, but I do need to 
figure out why deployment takes so long.
Are there any metrics or log patterns that would indicate which task is waiting 
and which task is being waited on?


From: Chesnay Schepler <mailto:ches...@apache.org>
Sent: Thursday, August 19, 2021 2:23 PM
To: Colletta, Edward <mailto:edward.colle...@fmr.com>; 
user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: failures during job start

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

This exception means that a task was deployed, but the task that produces the 
data it wants to consume was not available yet (even after waiting for a while).

Your case sounds similar to https://issues.apache.org/jira/browse/FLINK-9413, 
where this happens because the deployment of the producing task takes too long.

You have 2 options to solve this:
a) Have Flink wait longer for the partition to be created by increasing 
taskmanager.network.request-backoff.max
b) Speed up the deployment; for this you'd naturally have to investigate why 
the deployment takes so long in the first place.

On 19/08/2021 07:15, Colletta, Edward wrote:
Any help with this would be appreciated.   Is it possible that this is a 
data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes fails 
with PartitionNotFoundException, but succeeds on restart.   The job has 10 
kafka sources (10 partitions for each topic) and parallelism 5.
The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers is low 
(below 30%)

The scenario we see

  1.  run request to load and run a jar, job appears on dashboard with all 160 
subtasks in Deploying state
  2.  after 2 minutes some subtasks start transitioning to running.
  3.  after another 30 seconds failure occurs and job goes into Restarting state
  4.  after another minute, restart completes all nodes running.

Exception history shows
2021-08-15 07:55:02
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)










failures during job start

2021-08-18 Thread Colletta, Edward
Any help with this would be appreciated.   Is it possible that this is a 
data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes fails 
with PartitionNotFoundException, but succeeds on restart.   The job has 10 
kafka sources (10 partitions for each topic) and parallelism 5.
The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers is low 
(below 30%)

The scenario we see

  *   run request to load and run a jar, job appears on dashboard with all 160 
subtasks in Deploying state
  *   after 2 minutes some subtasks start transitioning to running.
  *   after another 30 seconds failure occurs and job goes into Restarting state
  *   after another minute, restart completes all nodes running.

Exception history shows
2021-08-15 07:55:02
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)






RE: failures during job start

2021-08-19 Thread Colletta, Edward
Thanks you.   I am going to try the first option for now, but I do need to 
figure out why deployment takes so long.
Are there any metrics or log patterns that would indicate which task is waiting 
and which task is being waited on?


From: Chesnay Schepler 
Sent: Thursday, August 19, 2021 2:23 PM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: failures during job start

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

This exception means that a task was deployed, but the task that produces the 
data it wants to consume was not available yet (even after waiting for a while).

Your case sounds similar to https://issues.apache.org/jira/browse/FLINK-9413, 
where this happens because the deployment of the producing task takes too long.

You have 2 options to solve this:
a) Have Flink wait longer for the partition to be created by increasing 
taskmanager.network.request-backoff.max
b) Speed up the deployment; for this you'd naturally have to investigate why 
the deployment takes so long in the first place.

On 19/08/2021 07:15, Colletta, Edward wrote:
Any help with this would be appreciated.   Is it possible that this is a 
data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes fails 
with PartitionNotFoundException, but succeeds on restart.   The job has 10 
kafka sources (10 partitions for each topic) and parallelism 5.
The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers is low 
(below 30%)

The scenario we see

  1.  run request to load and run a jar, job appears on dashboard with all 160 
subtasks in Deploying state
  2.  after 2 minutes some subtasks start transitioning to running.
  3.  after another 30 seconds failure occurs and job goes into Restarting state
  4.  after another minute, restart completes all nodes running.

Exception history shows
2021-08-15 07:55:02
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)








question on jar compatibility - log4j related

2021-12-19 Thread Colletta, Edward
If have jar files built using flink version 11.2 in dependencies, and I upgrade 
my cluster to 11.6, is it safe to run the existing jars on the upgraded cluster 
or should I rebuild all jobs against 11.6?

Thanks,
Eddie Colletta


RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-25 Thread Colletta, Edward

A general pattern for dynamically adding new aggregations could be something 
like this

BroadcastStream broadcastStream = 
aggregationInstructions
.broadcast(broadcastStateDescriptor);

DataStream 
streamReadyToAggregate = dataToAggregate
.connect(broadcastStream)
.process(new JoinFunction())
.flatMap(new AddAggregationKeyAndDescriptor)
.keyBy('aggregationKey')

Where

  *   aggregationInstructions is a stream describing which fields to aggregate 
by.  It might contain a List of the field names and another field which 
can be used to describe what the aggregation is doing.   Example  
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = 
‘bySession’, action = ‘Add’ or  ‘Delete’
  *   JoinFunction is a KeyedBroadcastProcessFunction which adds the 
groupByFields and groupingType to each message in the dataToAggregate stream 
and possibly deletes groupings from state.
  *   AddAggregationKeyAndDescriptor is a FlatMapFunction which adds 
aggregationKey to the stream based on the value of groupByFields

The flatMap means one message may be emitted several times with different 
values of aggregationKey so it may belong to multiple aggregations.



From: M Singh 
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng ; User-Flink 
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

Hi Caizhi:

Thanks for your reply.

I need to aggregate streams based on dynamic groupings.  All the groupings 
(keyBy) are not known upfront and can be added or removed after the streaming 
application is started and I don't want to restart the application/change the 
code.  So, I wanted to find out, what are the options to achieve this 
functionality.  Please let me know if you have any advice or recommendations.

Thanks

On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng 
mailto:tsreape...@gmail.com>> wrote:


Hi!

Adding/removing keyed streams will change the topology graph of the job. 
Currently it is not possible to do so without restarting the job and as far as 
I know there is no existing framework/pattern to achieve this.

By the way, why do you need this functionality? Could you elaborate more on 
your use case?

M Singh mailto:mans2si...@yahoo.com>> 于2022年1月22日周六 
21:32写道:
Hi Folks:

I am working on an exploratory project in which I would like to add/remove 
KeyedStreams 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
 without restarting the Flink streaming application.

Is it possible natively in Apache Flink ?  If not, is there any 
framework/pattern which can be used to implement this without restarting the 
application/changing the code ?

Thanks










RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-25 Thread Colletta, Edward
You don’t have to add keyBy’s at runtime.  You change what is in the value of 
aggregationKey at run time
Some records may appear several times with different fields extracted to 
aggregationKey.  They dynamic building of the grouping is really done by the 
flatMap


From: M Singh 
Sent: Tuesday, January 25, 2022 1:12 PM
To: Caizhi Weng ; User-Flink ; 
Colletta, Edward 
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

Thanks Edward for your response.

The problem I have is that I am not sure how to add or remove keyBy's at run 
time since the flink topology is based on that (as Caizhi mentioned).

I believe we can change the single keyBy in your example, but not add/remove 
them.

Please let me know if I have missed anything.



On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:





A general pattern for dynamically adding new aggregations could be something 
like this



BroadcastStream broadcastStream = 
aggregationInstructions

.broadcast(broadcastStateDescriptor);



DataStream 
streamReadyToAggregate = dataToAggregate

.connect(broadcastStream)

.process(new JoinFunction())

.flatMap(new AddAggregationKeyAndDescriptor)

.keyBy('aggregationKey')



Where

·aggregationInstructions is a stream describing which fields to 
aggregate by.  It might contain a List of the field names and another 
field which can be used to describe what the aggregation is doing.   Example  
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = 
‘bySession’, action = ‘Add’ or  ‘Delete’

·JoinFunction is a KeyedBroadcastProcessFunction which adds the 
groupByFields and groupingType to each message in the dataToAggregate stream 
and possibly deletes groupings from state.

·AddAggregationKeyAndDescriptor is a FlatMapFunction which adds 
aggregationKey to the stream based on the value of groupByFields



The flatMap means one message may be emitted several times with different 
values of aggregationKey so it may belong to multiple aggregations.







From: M Singh mailto:mans2si...@yahoo.com>>
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng mailto:tsreape...@gmail.com>>; User-Flink 
mailto:user@flink.apache.org>>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application



NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.



Hi Caizhi:



Thanks for your reply.



I need to aggregate streams based on dynamic groupings.  All the groupings 
(keyBy) are not known upfront and can be added or removed after the streaming 
application is started and I don't want to restart the application/change the 
code.  So, I wanted to find out, what are the options to achieve this 
functionality.  Please let me know if you have any advice or recommendations.



Thanks



On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng 
mailto:tsreape...@gmail.com>> wrote:





Hi!



Adding/removing keyed streams will change the topology graph of the job. 
Currently it is not possible to do so without restarting the job and as far as 
I know there is no existing framework/pattern to achieve this.



By the way, why do you need this functionality? Could you elaborate more on 
your use case?



M Singh mailto:mans2si...@yahoo.com>> 于2022年1月22日周六 
21:32写道:

Hi Folks:



I am working on an exploratory project in which I would like to add/remove 
KeyedStreams 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
 without restarting the Flink streaming application.



Is it possible natively in Apache Flink ?  If not, is there any 
framework/pattern which can be used to implement this without restarting the 
application/changing the code ?



Thanks


















RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-25 Thread Colletta, Edward
Here is some sample data which may help visualize how the aggregation is 
changed dynamically.
We start by aggregating by session and session+account by placing values into 
aggregationKey based on the fields in groupByFIelds.
Then we delete the session+account aggregation, and add an aggregation by 
account.
The way we are changing the aggregation dynamically is by using an indirect 
field to key by called aggregationKey which we add based on current broadcast 
state.
Note, this is for streaming jobs and aggregations starts fresh from the point 
at which a new groupByType is received.

aggregateInstruction
{groupByFields:[session],groupByType:bySession,action:add}
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:add}

dataToAggregate
{session:1,account:1,value:100}
{session:2,account:1,value:200}
{session:1,account:2,value:400}
{session:1,account:1,value:800}


streamReadyToAggregate
{session:1,account:1,value:100,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
{session:1,account:1,value:100,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}
{session:2,account:1,value:200,groupByFields:[session],groupByType:bySession,aggregationKey:'2'}}
{session:2,account:1,value:200,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'2|1'}}
{session:1,account:2,value:400,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
{session:1,account:2,value:400,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|2'}}
{session:1,account:1,value:800,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}}
{session:1,account:1,value:800,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}}}

aggregateInstruction
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:delete}
{groupByFields:[account],groupByType:byAccount,action:add}

dataToAggregate
{session:3,account:1,value:1600}
{session:3,account:2,value:3200}

streamReadyToAggregate
{session:3,account:1,value:1600,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
{session:3,account:1,value:1600,groupByFields:[account],groupByType:byAccount,aggregationKey:'1'}
{session:3,account:1,value:3200,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
{session:3,account:1,value:3200,groupByFields:[account],groupByType:byAccount,aggregationKey:'2'}



From: Colletta, Edward 
Sent: Tuesday, January 25, 2022 1:29 PM
To: M Singh ; Caizhi Weng ; 
User-Flink 
Subject: RE: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application

You don’t have to add keyBy’s at runtime.  You change what is in the value of 
aggregationKey at run time
Some records may appear several times with different fields extracted to 
aggregationKey.  They dynamic building of the grouping is really done by the 
flatMap


From: M Singh mailto:mans2si...@yahoo.com>>
Sent: Tuesday, January 25, 2022 1:12 PM
To: Caizhi Weng mailto:tsreape...@gmail.com>>; User-Flink 
mailto:user@flink.apache.org>>; Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without 
stopping the application

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

Thanks Edward for your response.

The problem I have is that I am not sure how to add or remove keyBy's at run 
time since the flink topology is based on that (as Caizhi mentioned).

I believe we can change the single keyBy in your example, but not add/remove 
them.

Please let me know if I have missed anything.



On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:





A general pattern for dynamically adding new aggregations could be something 
like this



BroadcastStream broadcastStream = 
aggregationInstructions

.broadcast(broadcastStateDescriptor);



DataStream 
streamReadyToAggregate = dataToAggregate

.connect(broadcastStream)

.process(new JoinFunction())

.flatMap(new AddAggregationKeyAndDescriptor)

.keyBy('aggregationKey')



Where

·aggregationInstructions is a stream describing which fields to 
aggregate by.  It might contain a List of the field names and another 
field which can be used to describe what the aggregation is doing.   Example  
groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = 
‘bySession’, action = ‘Add’ or  ‘Delete’

·JoinFunction is a KeyedBroadcastProcessFunction which adds the 
groupByFields and groupingType to each message in the dataToAggregate stream 
and possibly deletes groupings from state.

·AddAggregationKeyAndDescriptor is a FlatMapFunction which adds 
aggregationKey to the stream based on the value of groupByFields



The

Re: 退订

2023-07-26 Thread Edward Wang
退订

wang <24248...@163.com> 于2023年7月13日周四 07:34写道:

> 退订



-- 
Best Regards,

*Yaohua Wang  王耀华*
School of Software Technology, Xiamen University

Tel: (+86)187-0189-5935
E-mail: wangyaohua2...@gmail.com


退订

2023-07-26 Thread Edward Wang
*退订*


Re: Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

2018-05-07 Thread Edward Alexander Rojas Clavijo
Hello,
I've being working on a fix for this, I posted more details on the JIRA
ticket.

Regards,
Edward

2018-05-07 5:51 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:

> Ah, correct, sorry for the incorrect link.
> Thanks Ted!
>
>
> On 7 May 2018 at 11:43:12 AM, Ted Yu (yuzhih...@gmail.com) wrote:
>
> It seems the correct JIRA should be FLINK-9303
> <https://issues.apache.org/jira/browse/FLINK-9303>
>
> On Sun, May 6, 2018 at 8:29 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Edward,
>>
>> Thanks for brining this up, and I think your suggestion makes sense.
>> The problem is that the Kafka consumer has no notion of "closed"
>> partitions
>> at the moment, so statically assigned partitions to the Kafka client is
>> never removed and is always continuously requested for records.
>>
>> For example, on the Kinesis consumer, there is a notion of closed shards,
>> and therefore is not an issue there.
>>
>> I've created a JIRA to track this:
>> https://issues.apache.org/jira/browse/FLINK-5720
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Regression: On Flink 1.5 CLI -m,--jobmanager option not working

2018-04-27 Thread Edward Alexander Rojas Clavijo
Thank you

2018-04-27 14:55 GMT+02:00 Chesnay Schepler <ches...@apache.org>:

> I've responded in the JIRA.
>
>
> On 27.04.2018 14:26, Edward Rojas wrote:
>
>> I'm preparing to migrate my environment from Flink 1.4 to 1.5 and I found
>> this issue.
>>
>> Every time I try to use the flink CLI with the -m option to specify the
>> jobmanager address, the CLI get stuck on "Waiting for response..." and  I
>> get the following error on the Jobmanager:
>>
>> WARN  akka.remote.transport.netty.NettyTransport-
>> Remote
>> connection to [/x.x.x.x:] failed with
>> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.f
>> rame.TooLongFrameException:
>> Adjusted frame length exceeds 10485760: 1195725860 - discarded
>>
>> I get the error even when I run it locally and try something like "flink
>> list -m localhost:6123". But "flink list" works as expected.
>>
>> I'm using the version from the "release-1.5" branch.
>>
>> I tested on the tag release 1.5.0-rc1 and it's working as expected.
>>
>> I did a /git bisect/ and it seems like the commit introducing the
>> regression
>> is  47909f4
>> <https://github.com/apache/flink/commit/47909f466b9c9ee1f4ca
>> f94e9f6862a21b628817>
>>
>> I created a JIRA ticket for this:
>> https://issues.apache.org/jira/browse/FLINK-9255.
>>
>> Do you have any thoughts about it ?
>>
>> Regards,
>> Edward
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>>
>


-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: After OutOfMemoryError State can not be readed

2018-09-07 Thread Edward Alexander Rojas Clavijo
Hi Stefan, Vino,
Thanks for your answers.

We are using full checkpointing, not incremental. We are using custom
serializers for the operators state classes, The serializers perform
encryption before writing and decrypt when reading. The serializer is
stateless.
We register the Serializers by using
env.getConfig()
  .registerTypeWithKryoSerializer(ProcessState.class,
ProcessStateSerializer.class);

In normal cases the Serialization works correctly, even after recovering
from a failure. We get this error only when taskmnager fails due to memory
problems.

Thanks again for your help,
Edward

El vie., 7 sept. 2018 a las 11:51, Stefan Richter (<
s.rich...@data-artisans.com>) escribió:

> Hi,
>
> what I can say is that any failures like OOMs should not corrupt
> checkpoint files, because only successfully completed checkpoints are used
> for recovery by the job manager. Just to get a bit more info, are you using
> full or incremental checkpoints? Unfortunately, it is a bit hard to say
> from the given information what the cause of the problem is. Typically,
> these problems have been observed when something was wrong with a
> serializer or a stateful serializer was used from multiple threads.
>
> Best,
> Stefan
>
> Am 07.09.2018 um 05:04 schrieb vino yang :
>
> Hi Edward,
>
> From this log: Caused by: java.io.EOFException, it seems that the state
> metadata file has been corrupted.
> But I can't confirm it, maybe Stefan knows more details, Ping him for you.
>
> Thanks, vino.
>
> Edward Rojas  于2018年9月7日周五 上午1:22写道:
>
>> Hello all,
>>
>> We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
>> When performing some load testing we got an /OutOfMemoryError: native
>> memory
>> exhausted/, causing the job to fail and be restarted.
>>
>> After the Taskmanager is restarted, the job is recovered from a
>> Checkpoint,
>> but it seems that there is a problem when trying to access the state. We
>> got
>> the error from the *onTimer* function of a *onProcessingTime*.
>>
>> It would be possible that the OOM error could have caused to checkpoint a
>> corrupted state?
>>
>> We get Exceptions like:
>>
>> TimerException{java.lang.RuntimeException: Error while retrieving data
>> from
>> RocksDB.}
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:277)
>> at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
>> at
>>
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>> at java.lang.Thread.run(Thread.java:811)
>> Caused by: java.lang.RuntimeException: Error while retrieving data from
>> RocksDB.
>> at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
>> at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
>> at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>> at
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
>> at
>>
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>> ... 7 more
>> Caused by: java.io.EOFException
>> at java.io.DataInputStream.readFully(DataInputStream.java:208)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:618)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:573)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
>> at
>>
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
>> ... 12 more
>>
>>
>> Thanks in advance for any help
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


SSL config on Kubernetes - Dynamic IP

2018-03-27 Thread Edward Alexander Rojas Clavijo
Hi all,

Currently I have a Flink 1.4 cluster running on kubernetes and with SSL
configuration based on https://ci.apache.org/projects/flink/flink-docs-
master/ops/security-ssl.html.

However, as the IP of the nodes are dynamic (from the nature of
kubernetes), we are using only the DNS which we can control using
kubernetes services. So we add to the Subject Alternative Name(SAN) the
flink-jobmanager DNS and also the DNS for the task managers
*.flink-taskmanager-svc (each task manager has a DNS in the form
flink-taskmanager-0.flink-taskmanager-svc).

Additionally we set the jobmanager.rpc.address property on all the nodes
and each task manager sets the taskmanager.host property, all matching the
ones on the certificate.

This is working well when using Job with Parallelism set to 1. The SSL
validations are good and the Jobmanager can communicate with Task manager
and vice versa.

But when we set the parallelism to more than 1 we have exceptions on the
SSL validation like this:

Caused by: java.security.cert.CertificateException: No subject alternative
names matching IP address 172.30.247.163 found
at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168)
at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
at sun.security.ssl.X509TrustManagerImpl.checkIdentity(
X509TrustManagerImpl.java:455)
at sun.security.ssl.X509TrustManagerImpl.checkIdentity(
X509TrustManagerImpl.java:436)
at sun.security.ssl.X509TrustManagerImpl.checkTrusted(
X509TrustManagerImpl.java:252)
at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(
X509TrustManagerImpl.java:136)
at sun.security.ssl.ClientHandshaker.serverCertificate(
ClientHandshaker.java:1601)
... 21 more


>From the logs I see the Jobmanager is correctly registering the
taskmanagers:

org.apache.flink.runtime.instance.InstanceManager   - Registered
TaskManager at flink-taskmanager-1 (akka.ssl.tcp://flink@taiga-
flink-taskmanager-1.flink-taskmanager-svc.default.svc.
cluster.local:6122/user/taskmanager) as 1a3f59693cec8b3929ed8898edcc2700.
Current number of registered hosts is 3. Current number of alive task slots
is 6.

And also each taskmanager is correctly registered to use the hostname for
communication:

org.apache.flink.runtime.taskmanager.TaskManager   - TaskManager will use
hostname/address
'flink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local'
(172.30.247.163) for communication.
...
akka.remote.Remoting   - Remoting started; listening on addresses
:[akka.ssl.tcp://flink@flink-taskmanager-1.flink-
taskmanager-svc.default.svc.cluster.local:6122]
...
org.apache.flink.runtime.io.network.netty.NettyConfig   - NettyConfig
[server address: flink-taskmanager-1.flink-taskmanager-svc.default.svc.
cluster.local/172.30.247.163, server port: 6121, ssl enabled: true, memory
segment size (bytes): 32768, transport type: NIO, number of server threads:
2 (manual), number of client threads: 2 (manual), server connect backlog: 0
(use Netty's default), client connect timeout (sec): 120, send/receive
buffer size (bytes): 0 (use Netty's default)]
...
org.apache.flink.runtime.taskmanager.TaskManager   - TaskManager data
connection information: bf4a9b50e57c99c17049adb66d65f685 @
flink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local
(dataPort=6121)



But even with that, it seems like the taskmanagers are using the IP
communicate between them and the SSL validation fails.

Do you know if it's possible to make the taskmanagers to use the hostname
to communicate instead of the IP ?
or
Do you have any advice to get the SSL configuration to work on this
environment ?

Thanks in advance.

Regards,
Edward


Re: SSL config on Kubernetes - Dynamic IP

2018-03-28 Thread Edward Alexander Rojas Clavijo
Hi Till,

I just created the JIRA ticket:
https://issues.apache.org/jira/browse/FLINK-9103

I added the JobManager and TaskManager logs, Hope this helps to resolve the
issue.

Regards,
Edward

2018-03-27 17:48 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Edward,
>
> could you please file a JIRA issue for this problem. It might be as simple
> as that the TaskManager's network stack uses the IP instead of the hostname
> as you suggested. But we have to look into this to be sure. Also the logs
> of the JobManager as well as the TaskManagers could be helpful.
>
> Cheers,
> Till
>
> On Tue, Mar 27, 2018 at 5:17 PM, Christophe Jolif <cjo...@gmail.com>
> wrote:
>
>>
>> I suspect this relates to: https://issues.apache.org/
>> jira/browse/FLINK-5030
>>
>> For which there was a PR at some point but nothing has been done so far.
>> It seems the current code explicitly uses the IP vs Hostname for Netty SSL
>> configuration.
>>
>> Without that I'm really wondering how people are reasonably using SSL on
>> a Kubernetes Flink-based cluster as every time a pod is (re-started) it can
>> theoretically take a different IP? Or do I miss something?
>>
>> --
>> Christophe
>>
>> On Tue, Mar 27, 2018 at 3:24 PM, Edward Alexander Rojas Clavijo <
>> edward.roja...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Currently I have a Flink 1.4 cluster running on kubernetes and with SSL
>>> configuration based on https://ci.apache.org/proje
>>> cts/flink/flink-docs-master/ops/security-ssl.html.
>>>
>>> However, as the IP of the nodes are dynamic (from the nature of
>>> kubernetes), we are using only the DNS which we can control using
>>> kubernetes services. So we add to the Subject Alternative Name(SAN) the
>>> flink-jobmanager DNS and also the DNS for the task managers
>>> *.flink-taskmanager-svc (each task manager has a DNS in the form
>>> flink-taskmanager-0.flink-taskmanager-svc).
>>>
>>> Additionally we set the jobmanager.rpc.address property on all the nodes
>>> and each task manager sets the taskmanager.host property, all matching the
>>> ones on the certificate.
>>>
>>> This is working well when using Job with Parallelism set to 1. The SSL
>>> validations are good and the Jobmanager can communicate with Task manager
>>> and vice versa.
>>>
>>> But when we set the parallelism to more than 1 we have exceptions on the
>>> SSL validation like this:
>>>
>>> Caused by: java.security.cert.CertificateException: No subject
>>> alternative names matching IP address 172.30.247.163 found
>>> at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168)
>>> at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
>>> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
>>> tManagerImpl.java:455)
>>> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
>>> tManagerImpl.java:436)
>>> at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509Trust
>>> ManagerImpl.java:252)
>>> at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X50
>>> 9TrustManagerImpl.java:136)
>>> at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHa
>>> ndshaker.java:1601)
>>> ... 21 more
>>>
>>>
>>> From the logs I see the Jobmanager is correctly registering the
>>> taskmanagers:
>>>
>>> org.apache.flink.runtime.instance.InstanceManager   - Registered
>>> TaskManager at flink-taskmanager-1 (akka.ssl.tcp://flink@taiga-fl
>>> ink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local:6122/user/taskmanager)
>>> as 1a3f59693cec8b3929ed8898edcc2700. Current number of registered hosts
>>> is 3. Current number of alive task slots is 6.
>>>
>>> And also each taskmanager is correctly registered to use the hostname
>>> for communication:
>>>
>>> org.apache.flink.runtime.taskmanager.TaskManager   - TaskManager will
>>> use hostname/address 'flink-taskmanager-1.flink-tas
>>> kmanager-svc.default.svc.cluster.local' (172.30.247.163) for
>>> communication.
>>> ...
>>> akka.remote.Remoting   - Remoting started; listening on addresses
>>> :[akka.ssl.tcp://flink@flink-taskmanager-1.flink-taskmanager
>>> -svc.default.svc.cluster.local:6122]
>>> ...
>>> org.apache.flink.runtime.io.network.netty.NettyConfig   - NettyConfig
>>> [server address: flink-taskmanager-1.flink-task
>>> manager-sv

Re: SSL config on Kubernetes - Dynamic IP

2018-03-29 Thread Edward Alexander Rojas Clavijo
Hi all,

I did some tests based on the PR Christophe mentioned above and by making a
change on the NettyClient to use CanonicalHostName instead of
HostNameAddress to identify the server, the SSL validation works!!

I created a PR with this change: https://github.com/apache/flink/pull/5789

Regards,
Edward

2018-03-28 17:22 GMT+02:00 Edward Alexander Rojas Clavijo <
edward.roja...@gmail.com>:

> Hi Till,
>
> I just created the JIRA ticket: https://issues.apache.org/
> jira/browse/FLINK-9103
>
> I added the JobManager and TaskManager logs, Hope this helps to resolve
> the issue.
>
> Regards,
> Edward
>
> 2018-03-27 17:48 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
>
>> Hi Edward,
>>
>> could you please file a JIRA issue for this problem. It might be as
>> simple as that the TaskManager's network stack uses the IP instead of the
>> hostname as you suggested. But we have to look into this to be sure. Also
>> the logs of the JobManager as well as the TaskManagers could be helpful.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 27, 2018 at 5:17 PM, Christophe Jolif <cjo...@gmail.com>
>> wrote:
>>
>>>
>>> I suspect this relates to: https://issues.apache.org/
>>> jira/browse/FLINK-5030
>>>
>>> For which there was a PR at some point but nothing has been done so far.
>>> It seems the current code explicitly uses the IP vs Hostname for Netty SSL
>>> configuration.
>>>
>>> Without that I'm really wondering how people are reasonably using SSL on
>>> a Kubernetes Flink-based cluster as every time a pod is (re-started) it can
>>> theoretically take a different IP? Or do I miss something?
>>>
>>> --
>>> Christophe
>>>
>>> On Tue, Mar 27, 2018 at 3:24 PM, Edward Alexander Rojas Clavijo <
>>> edward.roja...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Currently I have a Flink 1.4 cluster running on kubernetes and with SSL
>>>> configuration based on https://ci.apache.org/proje
>>>> cts/flink/flink-docs-master/ops/security-ssl.html.
>>>>
>>>> However, as the IP of the nodes are dynamic (from the nature of
>>>> kubernetes), we are using only the DNS which we can control using
>>>> kubernetes services. So we add to the Subject Alternative Name(SAN) the
>>>> flink-jobmanager DNS and also the DNS for the task managers
>>>> *.flink-taskmanager-svc (each task manager has a DNS in the form
>>>> flink-taskmanager-0.flink-taskmanager-svc).
>>>>
>>>> Additionally we set the jobmanager.rpc.address property on all the
>>>> nodes and each task manager sets the taskmanager.host property, all
>>>> matching the ones on the certificate.
>>>>
>>>> This is working well when using Job with Parallelism set to 1. The SSL
>>>> validations are good and the Jobmanager can communicate with Task manager
>>>> and vice versa.
>>>>
>>>> But when we set the parallelism to more than 1 we have exceptions on
>>>> the SSL validation like this:
>>>>
>>>> Caused by: java.security.cert.CertificateException: No subject
>>>> alternative names matching IP address 172.30.247.163 found
>>>> at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168)
>>>> at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
>>>> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
>>>> tManagerImpl.java:455)
>>>> at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509Trus
>>>> tManagerImpl.java:436)
>>>> at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509Trust
>>>> ManagerImpl.java:252)
>>>> at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X50
>>>> 9TrustManagerImpl.java:136)
>>>> at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHa
>>>> ndshaker.java:1601)
>>>> ... 21 more
>>>>
>>>>
>>>> From the logs I see the Jobmanager is correctly registering the
>>>> taskmanagers:
>>>>
>>>> org.apache.flink.runtime.instance.InstanceManager   - Registered
>>>> TaskManager at flink-taskmanager-1 (akka.ssl.tcp://flink@taiga-fl
>>>> ink-taskmanager-1.flink-taskmanager-svc.default.svc.cluster.local:6122/user/taskmanager)
>>>> as 1a3f59693cec8b3929ed8898edcc2700. Current number of registered
>>>> hosts is 3. Current number of alive task slots is 6.
>>>

Re: BucketingSink vs StreamingFileSink

2018-11-21 Thread Edward Alexander Rojas Clavijo
Thank you very much for the information Andrey.

I'll try on my side to do the migration of what we have now and try to add
the sink with Parquet and I'll be back to you if I have more questions :)

Edward

El vie., 16 nov. 2018 a las 19:54, Andrey Zagrebin (<
and...@data-artisans.com>) escribió:

> Hi,
>
> StreamingFileSink is supposed to subsume BucketingSink which will be
> deprecated.
>
> StreamingFileSink fixes some issues of BucketingSink, especially with AWS
> s3
> and adds more flexibility with defining the rolling policy.
>
> StreamingFileSink does not support older hadoop versions at the moment,
> but there are ideas how to resolve this.
>
> You can have a look how to use StreamingFileSink with Parquet here [1].
>
> I also cc’ed Kostas, he might add more to this topic.
>
> Best,
> Andrey
>
> [1]
> https://github.com/apache/flink/blob/0b4947b6142f813d2f1e0e662d0fefdecca0e382/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>
> > On 16 Nov 2018, at 11:31, Edward Rojas  wrote:
> >
> > Hello,
> > We are currently using Flink 1.5 and we use the BucketingSink to save the
> > result of job processing to HDFS.
> > The data is in JSON format and we store one object per line in the
> resulting
> > files.
> >
> > We are planning to upgrade to Flink 1.6 and we see that there is this new
> > StreamingFileSink,  from the description it looks very similar to
> > BucketingSink when using Row-encoded Output Format, my question is,
> should
> > we consider to move to StreamingFileSink?
> >
> > I would like to better understand what are the suggested use cases for
> each
> > of the two options now (?)
> >
> > We are also considering to additionally output the data in Parquet format
> > for data scientists (to be stored in HDFS as well), for this I see some
> > utils to work with StreamingFileSink, so I guess for this case it's
> > recommended to use that option(?).
> > Is it possible to use the Parquet writers even when the schema of the
> data
> > may evolve ?
> >
> > Thanks in advance for your help.
> > (Sorry if I put too many questions in the same message)
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: Can checkpoints be used to migrate jobs between Flink versions ?

2019-01-09 Thread Edward Alexander Rojas Clavijo
Thanks very much for you rapid answer Stefan.

Regards,
Edward

El mié., 9 ene. 2019 a las 15:26, Stefan Richter ()
escribió:

> Hi,
>
> I would assume that this should currently work because the format of basic
> savepoints and checkpoints is the same right now. The restriction in the
> doc is probably there in case that the checkpoint format will diverge more
> in the future.
>
> Best,
> Stefan
>
> > On 9. Jan 2019, at 13:12, Edward Rojas  wrote:
> >
> > Hello,
> >
> > For upgrading jobs between Flink versions I follow the guide in the doc
> > here:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/upgrading.html#upgrading-the-flink-framework-version
> >
> > It states that we should always use savepoints for this procedure, I
> > followed it and it works perfectly.
> >
> > I just would like to know if there is a reason why is not advised to use
> > checkpoints for this procedure.
> >
> > Say for example that the job has externalized checkpoints with
> > RETAIN_ON_CANCELLATION policy, one could cancel the job before the
> upgrade
> > and use the retained checkpoint to restart the job from it once the Flink
> > cluster is upgraded... or maybe I'm missing something ?
> >
> > I performed some tests and we are able to upgrade using checkpoint, by
> > passing the checkpoint path in the "flink run -s" parameter.
> >
> > Could you help to clarify if this is advised (and supported) or we should
> > stick to the use of savepoints for this kind of manipulations ?
> >
> >
> > Thanks in advance for your help.
> >
> > Edward
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: How to migrate Kafka Producer ?

2018-12-19 Thread Edward Alexander Rojas Clavijo
Hi Dawid, Piotr,

I see that for the kafka consumer base there are some migration tests here:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java

As the kafka consumer state is managed on the FlinkKafkaConsumerBase class
I assumed this would cover also migration of connectors versions, but maybe
I'm missing something (?)

I performed some tests on my own and the migration of the kafka consumer
connector worked.


Regarding the kafka producer I am just updating the job with the new
connector and removing the previous one and upgrading the job by using a
savepoint and the --allowNonRestoredState.
So far my tests with this option are successful.

I appreciate any help here to clarify my understanding.

Regards,
Edward

El mié., 19 dic. 2018 a las 10:28, Dawid Wysakowicz ()
escribió:

> Hi Edward,
>
> AFAIK we do not support migrating state from one connector to another
> one, which is in fact the case for kafka 0.11 and the "universal" one.
>
> You might try to use the project bravo[1] to migrate the state manually,
> but unfortunately you have to understand the internals of both of the
> connectors. I pull also Piotr to the thread, maybe he can provide more
> straightforward workaround.
>
> Best,
>
> Dawid
>
> [1] https://github.com/king/bravo
>
> On 18/12/2018 14:33, Edward Rojas wrote:
> > Hi,
> >
> > I'm planning to migrate from kafka connector 0.11 to the new universal
> kafka
> > connector 1.0.0+ but I'm having some troubles.
> >
> > The kafka consumer seems to be compatible but when trying to migrate the
> > kafka producer I get an incompatibility error for the state migration.
> > It looks like the producer uses a list state of type
> > "NextTransactionalIdHint", but this class is specific for each Producer
> > (FlinkKafkaProducer011.NextTransactionalIdHint  vs
> > FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are
> not
> > compatible.
> >
> >
> > I would like to know what is the recommended way to perform this kind of
> > migration without losing the state ?
> >
> > Thanks in advance,
> > Edward
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>