Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-20 Thread Shannon Carey
In fact, I can see all my job jar blobs and some checkpoint & job graph files 
in my configured "recovery.zookeeper.storageDir"… however for some reason it 
didn't get restored when my new Flink cluster started up.


From: Shannon Carey >
Date: Friday, January 20, 2017 at 9:14 PM
To: "user@flink.apache.org" 
>
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

I recently added some better visibility into the metrics we're gathering from 
Flink. My Flink cluster died again due to the "Not enough free slots available 
to run the job" problem, and this time I can see that the number of registered 
task managers went down from 11 to 7, then waffled and only ever got back up to 
10 (one short of the requested amount) before dropping to 0 just before the 
cluster died. This would seem to explain why there weren't sufficient slots 
(given that we were probably using them all or nearly all)… The metric of 
"running jobs" went down from 5 to 3 during this time period as well. So the 
problem seems to be loss of taskmanagers due to errors (not yet sure what 
exactly as I have to delve into logs).

The other thing I have to figure out is restoring the jobs… I thought that HA 
would start the jobs back up again if Flink died & I re-launched it, but that 
doesn't appear to be the case.


From: Stephan Ewen >
Date: Thursday, January 5, 2017 at 7:52 AM
To: >
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started. That 
means that on every task restart the code is reloaded. For that to work proper, 
class unloading needs to happen, or the permgen will eventually overflow.

It can happen that class unloading is prevented if the user functions do leave 
references around as "GC roots", which may be threads, or references in 
registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath, so 
code needs not be reloaded on every restart. That should solve that issue.
To "simulate" that behavior in Flink 1.1, put your application code jars into 
the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin 
> wrote:
Hi,

I've faced a similar issue recently. Hope sharing my findings will help. The 
problem can be split into 2 parts:

Source of container failures
The logs you provided indicate that YARN kills its containers for exceeding 
memory limits. Important point here is that memory limit = JVM heap memory + 
off-heap memory. So if off-heap memory usage is high, YARN may kill containers 
despite JVM heap consumption is fine. To solve this issue, Flink reserves a 
share of container memory for off-heap memory. How much will be reserved is 
controlled by yarn.heap-cutoff-ratio and yarn.heap-cutoff-min configuration. By 
default 25% of the requested container memory will be reserved for off-heap. 
This is seems to be a good start, but one should experiment and tune to meet 
their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it Flink 
managed memory moved off heap (taskmanager.memory.off-heap = true)? Is it some 
external library allocating something off heap? Is it your own code?

How Flink handles task manager failures
Whenever a task manager fails, the Flink jobmanager decides whether it should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration 
(https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
 With default settings, job manager does reallocate task manager containers up 
to the point when N failures have been observed, where N is the number of 
requested task managers. After that the application is stopped.

According to the logs, you have a finite number in 
yarn.maximum-failed-containers (11, as I can see from the logs - this may be 
set by Flink if not provided explicitly). On 12th container restart, jobmanager 
gives up and the application stops. I'm not sure why it keeps reporting not 
enough slots after that point. In my experience this may happen when job eats 
up all the available slots, so that after container failure its tasks cannot be 
restarted in other (live) containers. But I believe once the decision to stop 
the application is made, there should not be any further attempts to restart 
the job, hence no logs like those. Hopefully, someone else will explain this to 
us :)

In my case I made jobmanager restart containers infinitely by setting 
yarn.maximum-failed-containers = -1, so that taskmanager failure never results 
in application death. Note this is 

Re: Rapidly failing job eventually causes "Not enough free slots"

2017-01-20 Thread Shannon Carey
I recently added some better visibility into the metrics we're gathering from 
Flink. My Flink cluster died again due to the "Not enough free slots available 
to run the job" problem, and this time I can see that the number of registered 
task managers went down from 11 to 7, then waffled and only ever got back up to 
10 (one short of the requested amount) before dropping to 0 just before the 
cluster died. This would seem to explain why there weren't sufficient slots 
(given that we were probably using them all or nearly all)… The metric of 
"running jobs" went down from 5 to 3 during this time period as well. So the 
problem seems to be loss of taskmanagers due to errors (not yet sure what 
exactly as I have to delve into logs).

The other thing I have to figure out is restoring the jobs… I thought that HA 
would start the jobs back up again if Flink died & I re-launched it, but that 
doesn't appear to be the case.


From: Stephan Ewen >
Date: Thursday, January 5, 2017 at 7:52 AM
To: >
Subject: Re: Rapidly failing job eventually causes "Not enough free slots"

Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started. That 
means that on every task restart the code is reloaded. For that to work proper, 
class unloading needs to happen, or the permgen will eventually overflow.

It can happen that class unloading is prevented if the user functions do leave 
references around as "GC roots", which may be threads, or references in 
registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath, so 
code needs not be reloaded on every restart. That should solve that issue.
To "simulate" that behavior in Flink 1.1, put your application code jars into 
the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin 
> wrote:
Hi,

I've faced a similar issue recently. Hope sharing my findings will help. The 
problem can be split into 2 parts:

Source of container failures
The logs you provided indicate that YARN kills its containers for exceeding 
memory limits. Important point here is that memory limit = JVM heap memory + 
off-heap memory. So if off-heap memory usage is high, YARN may kill containers 
despite JVM heap consumption is fine. To solve this issue, Flink reserves a 
share of container memory for off-heap memory. How much will be reserved is 
controlled by yarn.heap-cutoff-ratio and yarn.heap-cutoff-min configuration. By 
default 25% of the requested container memory will be reserved for off-heap. 
This is seems to be a good start, but one should experiment and tune to meet 
their job specifics.

It's also worthwhile to figure out who consumes off-heap memory. Is it Flink 
managed memory moved off heap (taskmanager.memory.off-heap = true)? Is it some 
external library allocating something off heap? Is it your own code?

How Flink handles task manager failures
Whenever a task manager fails, the Flink jobmanager decides whether it should:
- reallocate failed task manager container
- fail application entirely
These decisions can be guided by certain configuration 
(https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn).
 With default settings, job manager does reallocate task manager containers up 
to the point when N failures have been observed, where N is the number of 
requested task managers. After that the application is stopped.

According to the logs, you have a finite number in 
yarn.maximum-failed-containers (11, as I can see from the logs - this may be 
set by Flink if not provided explicitly). On 12th container restart, jobmanager 
gives up and the application stops. I'm not sure why it keeps reporting not 
enough slots after that point. In my experience this may happen when job eats 
up all the available slots, so that after container failure its tasks cannot be 
restarted in other (live) containers. But I believe once the decision to stop 
the application is made, there should not be any further attempts to restart 
the job, hence no logs like those. Hopefully, someone else will explain this to 
us :)

In my case I made jobmanager restart containers infinitely by setting 
yarn.maximum-failed-containers = -1, so that taskmanager failure never results 
in application death. Note this is unlikely a good choice for a batch job.

Regards,
Yury

2017-01-05 3:21 GMT+03:00 Shannon Carey 
>:
In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice and 
I'm wondering if anyone has some insight about it.

In both cases, we deployed a job that fails very frequently (within 15s-1m of 
launch). Eventually, the Flink cluster dies.

The sequence of events looks something like this:

  *   bad job is launched
  *   bad job fails & is restarted 

Re: Kryo Deserializer

2017-01-20 Thread 小多
Hi Biswajit,


You can follow this is:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/best_practices.html#register-a-custom-serializer-for-your-flink-program


Best regards,
Duo

On Sat, Jan 21, 2017 at 9:15 AM, Biswajit Das  wrote:

> Hello,
>
> Having an issue with nested protobuf deserialization, event tried with
> register the class with Kryo like beloe but seems like no help ,  one of
> the options left for me is to write a custom serializer or convert the byte
> array to a Dictionary object .
>
>
> *val clazz =
> Class.forName("java.util.Collections$UnmodifiableCollection");env.getConfig.registerTypeWithKryoSerializer(clazz,classOf[UnmodifiableCollectionsSerializer])*
>
>
> ---
> .ClickSchema$Click)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(Objec
> tField.java:125)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(F
> ieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(Objec
> tField.java:113)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(F
> ieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
> zer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> Serializer.deserialize(StreamRecordSerializer.java:112)
> at org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> Serializer.deserialize(StreamRecordSerializer.java:42)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationD
> elegate.read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.Spilli
> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
> daptiveSpanningRecordDeserializer.java:116)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
> rocessInput(StreamInputProcessor.java:156)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
> run(OneInputStreamTask.java:67)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
> treamTask.java:267)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException
> at java.util.Collections$UnmodifiableCollection.add(Collections
> .java:1055)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.r
> ead(CollectionSerializer.java:109)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.r
> ead(CollectionSer
> 
>



-- 

Programmer, Geek...


Kryo Deserializer

2017-01-20 Thread Biswajit Das
Hello,

Having an issue with nested protobuf deserialization, event tried with
register the class with Kryo like beloe but seems like no help ,  one of
the options left for me is to write a custom serializer or convert the byte
array to a Dictionary object .


*val clazz =
Class.forName("java.util.Collections$UnmodifiableCollection");env.getConfig.registerTypeWithKryoSerializer(clazz,classOf[UnmodifiableCollectionsSerializer])*


---
.ClickSchema$Click)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:112)
at
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:42)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:116)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:156)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
at
java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSer



Re: Rolling sink parquet/Avro output

2017-01-20 Thread Biswajit Das
Thank for the mail Bruno !!

On Wed, Jan 18, 2017 at 1:10 AM, Bruno Aranda  wrote:

> Sorry, something went wrong with the code for the Writer. Here it is again:
>
> import org.apache.avro.Schema
> import org.apache.flink.streaming.connectors.fs.Writer
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.parquet.avro.AvroParquetWriter
> import org.apache.parquet.hadoop.ParquetWriter
> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>
> @SerialVersionUID(1L)
> class MyAvroParquetWriter[T](schema: String) extends Writer[T] {
>
>   @transient private var writer: ParquetWriter[T] = _
>
>   override def open(fs: FileSystem, path: Path): Unit = {
> writer = AvroParquetWriter.builder[T](path)
>   .withSchema(new Schema.Parser().parse(schema))
>   .withCompressionCodec(CompressionCodecName.SNAPPY)
>   .build()
>   }
>
>   override def write(element: T): Unit = writer.write(element)
>
>   override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema)
>
>   override def close(): Unit = writer.close()
>
>   override def getPos: Long = writer.getDataSize
>
>   override def flush(): Long = writer.getDataSize
>
> }
>
> Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
> "1.8.1". We use this writer in a rolling sink and seems fine so far.
>
> Cheers,
>
> Bruno
>
> On Wed, 18 Jan 2017 at 09:09 elmosca  wrote:
>
>> Hi Biswajit,
>>
>> We use the following Writer for Parquet using Avro conversion (using
>> Scala):
>>
>>
>>
>> Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
>> "1.8.1". We use this writer in a rolling sink and seems fine so far.
>>
>> Cheers,
>>
>> Bruno
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-
>> parquet-Avro-output-tp11123p11127.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>


Re: Apache Flink 1.1.4 - Gelly - LocalClusteringCoefficient - Returning values above 1?

2017-01-20 Thread Greg Hogan
Hi Miguel,

The '--output print' option describes the values and also displays the
local clustering coefficient value.

You're running the undirected algorithm on a directed graph. In 1.2 there
is an option '--simplify true' that will add reverse edges and remove
duplicate edges and self-loops. Alternatively, it looks like you could
simply add reverse edges to your input file (with an optional ' | sort |
uniq' following):

$ cat edges.txt | awk ' { print $1, $2; print $2, $1 } '

The drivers are being reworked for 1.3 to better reuse code and options
which will better support additional drivers and algorithms and make
documentation simpler.

Greg

On Fri, Jan 20, 2017 at 2:06 PM, Vasiliki Kalavri  wrote:

> Hi Miguel,
>
> the LocalClusteringCoefficient algorithm returns a DataSet of type Result,
> which basically wraps a vertex id, its degree, and the number of triangles
> containing this vertex. The number 11 you see is indeed the degree of
> vertex 5113. The Result type contains the method
> getLocalClusteringCoefficientScore() which allows you to retrieve the
> clustering coefficient score for a vertex. The method simply divides the
> numbers of triangles by the number of potential edges between neighbors.
>
> I'm sorry that you this is not clear in the docs. We should definitely
> improve them to explain what is the output and how to retrieve the actual
> clustering coefficient values. I have opened a JIRA for this [1].
>
> Cheers,
> -Vasia.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-5597
>
> On 20 January 2017 at 19:31, Miguel Coimbra 
> wrote:
>
>> Hello,
>>
>> In the documentation of the LocalClusteringCoefficient algorithm, it is
>> said:
>>
>>
>> *The local clustering coefficient measures the connectedness of each
>> vertex’s neighborhood.Scores range from 0.0 (no edges between neighbors) to
>> 1.0 (neighborhood is a clique).*
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/batch/libs/gelly.html#local-clustering-coefficient
>> 
>>
>> However, upon running the algorithm (undirected version), I obtained
>> values above 1.
>>
>> The result I got was this. As you can see, vertex 5113 has a score of 11:
>> (the input edges for the graph are shown further below - around *35
>> edges*):
>>
>> (4907,(1,0))
>> *(5113,(11,0))*
>> (6008,(0,0))
>> (6064,(1,0))
>> (6065,(1,0))
>> (6107,(0,0))
>> (6192,(0,0))
>> (6252,(1,0))
>> (6279,(1,0))
>> (6465,(1,0))
>> (6545,(0,0))
>> (6707,(1,0))
>> (6715,(1,0))
>> (6774,(0,0))
>> (7088,(0,0))
>> (7089,(1,0))
>> (7171,(0,0))
>> (7172,(1,0))
>> (7763,(0,0))
>> (7976,(1,0))
>> (8056,(1,0))
>> (9748,(1,0))
>> (10191,(1,0))
>> (10370,(1,0))
>> (10371,(1,0))
>> (14310,(1,0))
>> (16785,(1,0))
>> (19801,(1,0))
>> (26284,(1,0))
>> (26562,(0,0))
>> (31724,(1,0))
>> (32443,(1,0))
>> (32938,(0,0))
>> (33855,(1,0))
>> (37929,(0,0))
>>
>> This was from a small isolated test with these edges:
>>
>> 51136008
>> 51136774
>> 511332938
>> 51136545
>> 51137088
>> 511337929
>> 511326562
>> 51136107
>> 51137171
>> 51136192
>> 51137763
>> 97485113
>> 101915113
>> 60645113
>> 60655113
>> 62795113
>> 49075113
>> 64655113
>> 67075113
>> 70895113
>> 71725113
>> 143105113
>> 62525113
>> 338555113
>> 79765113
>> 262845113
>> 80565113
>> 103715113
>> 167855113
>> 198015113
>> 67155113
>> 317245113
>> 324435113
>> 103705113
>>
>> I am not sure what I may be doing wrong, but is there perhaps some form
>> of normalization lacking in my execution of:
>>
>> org.apache.flink.graph.library.clustering.undirected.LocalCl
>> usteringCoefficient.Result;
>> org.apache.flink.graph.library.clustering.undirected.LocalCl
>> usteringCoefficient;
>>
>> Am I supposed to divide all scores by the greatest score obtained by the
>> algorithm?
>>
>> Thank you very much!
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com 
>> Skype: miguel.e.coimbra
>>
>
>


Re: Apache Flink 1.1.4 - Gelly - LocalClusteringCoefficient - Returning values above 1?

2017-01-20 Thread Vasiliki Kalavri
Hi Miguel,

the LocalClusteringCoefficient algorithm returns a DataSet of type Result,
which basically wraps a vertex id, its degree, and the number of triangles
containing this vertex. The number 11 you see is indeed the degree of
vertex 5113. The Result type contains the method
getLocalClusteringCoefficientScore() which allows you to retrieve the
clustering coefficient score for a vertex. The method simply divides the
numbers of triangles by the number of potential edges between neighbors.

I'm sorry that you this is not clear in the docs. We should definitely
improve them to explain what is the output and how to retrieve the actual
clustering coefficient values. I have opened a JIRA for this [1].

Cheers,
-Vasia.

[1]: https://issues.apache.org/jira/browse/FLINK-5597

On 20 January 2017 at 19:31, Miguel Coimbra 
wrote:

> Hello,
>
> In the documentation of the LocalClusteringCoefficient algorithm, it is
> said:
>
>
> *The local clustering coefficient measures the connectedness of each
> vertex’s neighborhood.Scores range from 0.0 (no edges between neighbors) to
> 1.0 (neighborhood is a clique).*
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
> apis/batch/libs/gelly.html#local-clustering-coefficient
> 
>
> However, upon running the algorithm (undirected version), I obtained
> values above 1.
>
> The result I got was this. As you can see, vertex 5113 has a score of 11:
> (the input edges for the graph are shown further below - around *35 edges*
> ):
>
> (4907,(1,0))
> *(5113,(11,0))*
> (6008,(0,0))
> (6064,(1,0))
> (6065,(1,0))
> (6107,(0,0))
> (6192,(0,0))
> (6252,(1,0))
> (6279,(1,0))
> (6465,(1,0))
> (6545,(0,0))
> (6707,(1,0))
> (6715,(1,0))
> (6774,(0,0))
> (7088,(0,0))
> (7089,(1,0))
> (7171,(0,0))
> (7172,(1,0))
> (7763,(0,0))
> (7976,(1,0))
> (8056,(1,0))
> (9748,(1,0))
> (10191,(1,0))
> (10370,(1,0))
> (10371,(1,0))
> (14310,(1,0))
> (16785,(1,0))
> (19801,(1,0))
> (26284,(1,0))
> (26562,(0,0))
> (31724,(1,0))
> (32443,(1,0))
> (32938,(0,0))
> (33855,(1,0))
> (37929,(0,0))
>
> This was from a small isolated test with these edges:
>
> 51136008
> 51136774
> 511332938
> 51136545
> 51137088
> 511337929
> 511326562
> 51136107
> 51137171
> 51136192
> 51137763
> 97485113
> 101915113
> 60645113
> 60655113
> 62795113
> 49075113
> 64655113
> 67075113
> 70895113
> 71725113
> 143105113
> 62525113
> 338555113
> 79765113
> 262845113
> 80565113
> 103715113
> 167855113
> 198015113
> 67155113
> 317245113
> 324435113
> 103705113
>
> I am not sure what I may be doing wrong, but is there perhaps some form of
> normalization lacking in my execution of:
>
> org.apache.flink.graph.library.clustering.undirected.
> LocalClusteringCoefficient.Result;
> org.apache.flink.graph.library.clustering.undirected.
> LocalClusteringCoefficient;
>
> Am I supposed to divide all scores by the greatest score obtained by the
> algorithm?
>
> Thank you very much!
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com 
> Skype: miguel.e.coimbra
>


Apache Flink 1.1.4 - Gelly - LocalClusteringCoefficient - Returning values above 1?

2017-01-20 Thread Miguel Coimbra
Hello,

In the documentation of the LocalClusteringCoefficient algorithm, it is
said:


*The local clustering coefficient measures the connectedness of each
vertex’s neighborhood.Scores range from 0.0 (no edges between neighbors) to
1.0 (neighborhood is a clique).*

https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/libs/
gelly.html#local-clustering-coefficient


However, upon running the algorithm (undirected version), I obtained values
above 1.

The result I got was this. As you can see, vertex 5113 has a score of 11:
(the input edges for the graph are shown further below - around *35 edges*):

(4907,(1,0))
*(5113,(11,0))*
(6008,(0,0))
(6064,(1,0))
(6065,(1,0))
(6107,(0,0))
(6192,(0,0))
(6252,(1,0))
(6279,(1,0))
(6465,(1,0))
(6545,(0,0))
(6707,(1,0))
(6715,(1,0))
(6774,(0,0))
(7088,(0,0))
(7089,(1,0))
(7171,(0,0))
(7172,(1,0))
(7763,(0,0))
(7976,(1,0))
(8056,(1,0))
(9748,(1,0))
(10191,(1,0))
(10370,(1,0))
(10371,(1,0))
(14310,(1,0))
(16785,(1,0))
(19801,(1,0))
(26284,(1,0))
(26562,(0,0))
(31724,(1,0))
(32443,(1,0))
(32938,(0,0))
(33855,(1,0))
(37929,(0,0))

This was from a small isolated test with these edges:

51136008
51136774
511332938
51136545
51137088
511337929
511326562
51136107
51137171
51136192
51137763
97485113
101915113
60645113
60655113
62795113
49075113
64655113
67075113
70895113
71725113
143105113
62525113
338555113
79765113
262845113
80565113
103715113
167855113
198015113
67155113
317245113
324435113
103705113

I am not sure what I may be doing wrong, but is there perhaps some form of
normalization lacking in my execution of:

org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient;

Am I supposed to divide all scores by the greatest score obtained by the
algorithm?

Thank you very much!

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra


Re: Re: NPE in JobManager

2017-01-20 Thread Dave Marion
Fixing my accumulator did the trick. I should note that the JobManager did not 
fail when I ran this previously against Flink 1.1.3. Thanks for the help!

Dave


> On January 20, 2017 at 8:45 AM Dave Marion  wrote:
> 
> I do see that message in one of the task manager logs 20ms before the NPE 
> in the JobManager. Looking in that log, there is a 
> ConcurrentModificationException in TreeMap, which my accumulator uses. I'll 
> track this down, thanks for the pointer.
> 
> 
> > > On January 20, 2017 at 8:27 AM Stephan Ewen  
> wrote:
> > 
> > Hi!
> > 
> > My current assumption is that there is an accumulator that cannot 
> > be serialized. The SortedStringAccumulator looks fine at a first glance, 
> > but are there other accumulators involved?
> > Do you see a message like that one in the log of one of the 
> > TaskManagers
> > 
> > "Failed to serialize accumulators for task."
> > 
> > with an exception stack trace?
> > 
> > 
> > Stephan
> > 
> > 
> > 
> > On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion  > mailto:dlmar...@comcast.net > wrote:
> > 
> > > > > Stephan,
> > > 
> > > Thanks for looking at this. Could you elaborate on the 
> > > misbehavior in the accumulator? I'd like to fix it if it's incorrect.
> > > 
> > > Dave
> > > 
> > > 
> > > 
> > > > > > > On January 20, 2017 at 4:29 AM Stephan Ewen 
> > >  wrote:
> > > > 
> > > > Hi!
> > > > 
> > > > It seems that the accumulator behaves in a non-standard 
> > > > way, but the JobManager should also catch that (log a warning or debug 
> > > > message) and simply continue (not crash).
> > > > 
> > > > I'll try to add a patch that the JobManager tolerates 
> > > > these kinds of issues in the accumulators.
> > > > 
> > > > Stephan
> > > > 
> > > > 
> > > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion 
> > > >  wrote:
> > > > 
> > > > > > > > > 
> > > > > Noticed I didn't cc the user list.
> > > > > 
> > > > > > > > >  Original Message --
> > > > From: Dave Marion  > > > mailto:dlmar...@comcast.net >
> > > > To: Ted Yu  > > > mailto:yuzhih...@gmail.com >
> > > > Date: January 19, 2017 at 12:13 PM
> > > > Subject: Re: NPE in JobManager
> > > > 
> > > > 
> > > > That might take some time. Here is a hand typed 
> > > > top N lines. If that is not enough let me know and I will start the 
> > > > process of getting the full stack trace.
> > > > 
> > > > 
> > > > NullPointerException
> > > > 
> > > > at 
> > > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
> > > > 
> > > > at 
> > > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
> > > > 
> > > > at 
> > > > scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)
> > > > 
> > > > at 
> > > > scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
> > > > 
> > > > at 
> > > > org.apache.flink.runtime.jobmanager.JobManager.org 
> > > > http://org.apache.flink.runtime.jobmanager.JobManager.org 
> > > > $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)
> > > > 
> > > > at 
> > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
> > > > 
> > > > at 
> > > > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > > 
> > > > at 
> > > > org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
> > > > 
> > > > at 
> > > > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > > 
> > > > at 
> > > > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> > > > 
> > > > at 
> > > > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> > > > 
> > > > at 
> > > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > > 
> > > > at 
> > > > org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28)
> > > > 
> > > > 
> > > > > > > > > On January 19, 2017 at 11:58 AM 
> > > > Ted Yu 

Re: Rate-limit processing

2017-01-20 Thread Till Rohrmann
Hi Florian,

any blocking of the user code thread is in general a not so good idea
because the checkpointing happens under the very same lock which also
guards the user code invocation. Thus any checkpoint barrier arriving at
the operator has only the chance to trigger the checkpointing once the
blocking is over. Even worse, if the blocking happens in a downstream
operator (not a source), then this blocking could cause backpressure. Since
the checkpoint barriers flow with the events and are processed in order,
the backpressure will then also influence the checkpointing time.

So if you want to limit the rate, you should do it a the sources without
blocking the source thread. You could for example count how many elements
you've emitted in the past second and if it exceeds your maximum, then you
don't emit the next element to downstream operators until some time has
passed (this might end up in a busy loop but it allows the checkpointing to
claim the lock).

Cheers,
Till

On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> Hi,
>
> You might find this similar thread from the mailing list archive helpful :
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/throttled-stream-td6138.html.
>
> Best,
> Yassine
>
> 2017-01-20 10:53 GMT+01:00 Florian König :
>
>> Hi,
>>
>> i need to limit the rate of processing in a Flink stream application.
>> Specifically, the number of items processed in a .map() operation has to
>> stay under a certain maximum per second.
>>
>> At the moment, I have another .map() operation before the actual
>> processing, which just sleeps for a certain time (e.g., 250ms for a limit
>> of 4 requests / sec) and returns the item unchanged:
>>
>> …
>>
>> public T map(final T value) throws Exception {
>> Thread.sleep(delay);
>> return value;
>> }
>>
>> …
>>
>> This works as expected, but is a rather crude approach. Checkpointing the
>> job takes a very long time: minutes for a state of a few kB, which for
>> other jobs is done in a few milliseconds. I assume that letting the whole
>> thread sleep for most of the time interferes with the checkpointing - not
>> good!
>>
>> Would using a different synchronization mechanism (e.g.,
>> https://google.github.io/guava/releases/19.0/api/docs/index.
>> html?com/google/common/util/concurrent/RateLimiter.html) help to make
>> checkpointing work better?
>>
>> Or, preferably, is there a mechanism inside Flink that I can use to
>> accomplish the desired rate limiting? I haven’t found anything in the docs.
>>
>> Cheers,
>> Florian
>>
>
>


Re: Cluster failure after zookeeper glitch.

2017-01-20 Thread Till Rohrmann
Hi Andrew,

if the ZooKeeper cluster fails and Flink is not able to connect to a
functioning quorum again, then it will basically stop working because the
JobManagers are no longer able to elect a leader among them. The lost
leadership of the JobManager can be seen in the logs (=> expected leader
session ID none). The question is now, whether any of your JobManagers is
able to regain leadership after the incident you've described. Could you
check the logs for that (granted leadership)?

If Flink cannot reconnect to ZooKeeper and given that the ZooKeeper cluster
is up and running, then we have to investigate why this is the case. In
order to do that the logs of the Flink cluster would be really helpful.

Cheers,
Till

On Fri, Jan 20, 2017 at 10:30 AM, Stefan Richter <
s.rich...@data-artisans.com> wrote:

> I would think that network problems between Flink and Zookeeper in HA mode
> could indeed lead to problems. Maybe Till (in CC) has a better idea of what
> is going on there).
>
> Am 19.01.2017 um 14:55 schrieb Andrew Ge Wu :
>
> Hi Stefan
>
> Yes we are running in HA mode with dedicated zookeeper cluster. As far as
> I can see it looks like a networking issue with zookeeper cluster.
> 2 out of 5 zookeeper reported something around the same time:
>
> *server1*
> 2017-01-19 11:52:13,044 [myid:1] - WARN  [QuorumPeer[myid=1]/0:0:0:0:
> 0:0:0:0:2181:Follower@89] - Exception when following the leader
> java.net.SocketTimeoutException: Read timed out
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.read(SocketInputStream.java:150)
> at java.net.SocketInputStream.read(SocketInputStream.java:121)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
> at java.io.DataInputStream.readInt(DataInputStream.java:387)
> at org.apache.jute.BinaryInputArchive.readInt(
> BinaryInputArchive.java:63)
> at org.apache.zookeeper.server.quorum.QuorumPacket.
> deserialize(QuorumPacket.java:83)
> at org.apache.jute.BinaryInputArchive.readRecord(
> BinaryInputArchive.java:103)
> at org.apache.zookeeper.server.quorum.Learner.readPacket(
> Learner.java:153)
> at org.apache.zookeeper.server.quorum.Follower.followLeader(
> Follower.java:85)
> at org.apache.zookeeper.server.quorum.QuorumPeer.run(
> QuorumPeer.java:786)
> 2017-01-19 11:52:13,045 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:
> 0:0:0:0:2181:Follower@166] - shutdown called
> java.lang.Exception: shutdown Follower
> at org.apache.zookeeper.server.quorum.Follower.shutdown(
> Follower.java:166)
> at org.apache.zookeeper.server.quorum.QuorumPeer.run(
> QuorumPeer.java:790)
> 2017-01-19 11:52:13,045 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:
> 0:0:0:0:2181:NIOServerCnxn@1007] - Closed socket connection for client /
> 172.27.163.227:51800 which had sessionid 0x159b505820a0009
> 2017-01-19 11:52:13,046 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:
> 0:0:0:0:2181:NIOServerCnxn@1007] - Closed socket connection for client /
> 172.27.163.227:51798 which had sessionid 0x159b505820a0008
> 2017-01-19 11:52:13,046 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:
> 0:0:0:0:2181:NIOServerCnxn@1007] - Closed socket connection for client
> /0:0:0:0:0:0:0:1:46891 which had sessionid 0x1537b32bbe100ad
> 2017-01-19 11:52:13,046 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:
> 0:0:0:0:2181:NIOServerCnxn@1007] - Closed socket connection for client /
> 172.27.165.64:50075 which had sessionid 0x159b505820a000d
> 2017-01-19 11:52:13,046 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:
> 0:0:0:0:2181:FollowerZooKeeperServer@139] - Shutting down
> 2017-01-19 11:52:13,046 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:
> 0:0:0:0:2181:ZooKeeperServer@441] - shutting down
>
>
> *server2*
> 2017-01-19 11:52:13,061 [myid:2] - INFO  [WorkerReceiver[myid=2]:
> FastLeaderElection@597] - Notification: 1 (message format version), 1
> (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING (n.state), 1
> (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
> 2017-01-19 11:52:13,082 [myid:2] - INFO  [WorkerReceiver[myid=2]:
> FastLeaderElection@597] - Notification: 1 (message format version), 4
> (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING (n.state), 4
> (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
> 2017-01-19 11:52:13,083 [myid:2] - INFO  [WorkerReceiver[myid=2]:
> FastLeaderElection@597] - Notification: 1 (message format version), 4
> (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING (n.state), 1
> (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
> 2017-01-19 11:52:13,284 [myid:2] - INFO  [WorkerReceiver[myid=2]:
> FastLeaderElection@597] - Notification: 1 (message format version), 4
> (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING (n.state), 4
> (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
> 2017-01-19 11:52:13,285 [myid:2] - INFO 

Re: Re: NPE in JobManager

2017-01-20 Thread Dave Marion
I do see that message in one of the task manager logs 20ms before the NPE in 
the JobManager. Looking in that log, there is a ConcurrentModificationException 
in TreeMap, which my accumulator uses. I'll track this down, thanks for the 
pointer.


> On January 20, 2017 at 8:27 AM Stephan Ewen  wrote:
> 
> Hi!
> 
> My current assumption is that there is an accumulator that cannot be 
> serialized. The SortedStringAccumulator looks fine at a first glance, but are 
> there other accumulators involved?
> Do you see a message like that one in the log of one of the TaskManagers
> 
> "Failed to serialize accumulators for task."
> 
> with an exception stack trace?
> 
> 
> Stephan
> 
> 
> 
> On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion  mailto:dlmar...@comcast.net > wrote:
> 
> > > Stephan,
> > 
> > Thanks for looking at this. Could you elaborate on the misbehavior 
> > in the accumulator? I'd like to fix it if it's incorrect.
> > 
> > Dave
> > 
> > 
> > 
> > > > > On January 20, 2017 at 4:29 AM Stephan Ewen 
> >  wrote:
> > > 
> > > Hi!
> > > 
> > > It seems that the accumulator behaves in a non-standard way, 
> > > but the JobManager should also catch that (log a warning or debug 
> > > message) and simply continue (not crash).
> > > 
> > > I'll try to add a patch that the JobManager tolerates these 
> > > kinds of issues in the accumulators.
> > > 
> > > Stephan
> > > 
> > > 
> > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion 
> > >  wrote:
> > > 
> > > > > > > 
> > > > Noticed I didn't cc the user list.
> > > > 
> > > > > > >  Original Message --
> > > From: Dave Marion  > > mailto:dlmar...@comcast.net >
> > > To: Ted Yu  > > mailto:yuzhih...@gmail.com >
> > > Date: January 19, 2017 at 12:13 PM
> > > Subject: Re: NPE in JobManager
> > > 
> > > 
> > > That might take some time. Here is a hand typed top N 
> > > lines. If that is not enough let me know and I will start the process of 
> > > getting the full stack trace.
> > > 
> > > 
> > > NullPointerException
> > > 
> > > at 
> > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
> > > 
> > > at 
> > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
> > > 
> > > at 
> > > scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)
> > > 
> > > at 
> > > scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
> > > 
> > > at org.apache.flink.runtime.jobmanager.JobManager.org 
> > > http://org.apache.flink.runtime.jobmanager.JobManager.org 
> > > $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)
> > > 
> > > at 
> > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
> > > 
> > > at 
> > > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > 
> > > at 
> > > org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
> > > 
> > > at 
> > > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > 
> > > at 
> > > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> > > 
> > > at 
> > > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> > > 
> > > at 
> > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > 
> > > at 
> > > org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28)
> > > 
> > > 
> > > > > > > On January 19, 2017 at 11:58 AM Ted Yu 
> > >  wrote:
> > > > 
> > > > Can you pastebin the complete stack trace for 
> > > > the NPE ?
> > > > 
> > > > Thanks
> > > > 
> > > > On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion 
> > > >  wrote:
> > > > 
> > > > > > > > > 
> > > > > I'm running 
> > > > > flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an issue 
> > > > > where after some period of time (measured in 1 - 3 hours) the 
> > > > > JobManager gets an NPE and shuts itself down. The failure is at 
> > > > > 

Re: Re: NPE in JobManager

2017-01-20 Thread Stephan Ewen
Hi!

My current assumption is that there is an accumulator that cannot be
serialized. The SortedStringAccumulator looks fine at a first glance, but
are there other accumulators involved?
Do you see a message like that one in the log of one of the TaskManagers

"Failed to serialize accumulators for task."

with an exception stack trace?


Stephan



On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion  wrote:

> Stephan,
>
> Thanks for looking at this. Could you elaborate on the misbehavior in the
> accumulator? I'd like to fix it if it's incorrect.
>
> Dave
>
>
> On January 20, 2017 at 4:29 AM Stephan Ewen  wrote:
>
> Hi!
>
> It seems that the accumulator behaves in a non-standard way, but the
> JobManager should also catch that (log a warning or debug message) and
> simply continue (not crash).
>
> I'll try to add a patch that the JobManager tolerates these kinds of
> issues in the accumulators.
>
> Stephan
>
>
> On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion  wrote:
>
>> Noticed I didn't cc the user list.
>>
>> -- Original Message --
>> From: Dave Marion 
>> To: Ted Yu 
>> Date: January 19, 2017 at 12:13 PM
>> Subject: Re: NPE in JobManager
>>
>> That might take some time. Here is a hand typed top N lines. If that is
>> not enough let me know and I will start the process of getting the full
>> stack trace.
>>
>>
>> NullPointerException
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
>>
>> at scala.collection.mutable.ResizableArray$class.forEach(ArrayB
>> uffer.scala:48)
>>
>> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$
>> flink$runtime$jobmanager$JobManager$$updateAccumulators
>> (JobManager.scala:1788)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1.applyOrElse(JobManager.scala:967)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun
>> $receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(
>> LogMessages.scala:28)
>>
>>
>> On January 19, 2017 at 11:58 AM Ted Yu  wrote:
>>
>> Can you pastebin the complete stack trace for the NPE ?
>>
>> Thanks
>>
>> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion 
>> wrote:
>>
>> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an
>> issue where after some period of time (measured in 1 - 3 hours) the
>> JobManager gets an NPE and shuts itself down. The failure is at
>> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using
>> a custom accumulator[1], but can't tell from the JobManager code whether
>> the issue is in my Accumulator, or is a bug in the JobManager.
>>
>>
>> [1] https://github.com/NationalSecurityAgency/timely/blob/master
>> /analytics/src/main/java/timely/analytics/flink/SortedS
>> tringAccumulator.java
>>
>>
>>
>
>
>


Re: Re: NPE in JobManager

2017-01-20 Thread Dave Marion
Stephan,

Thanks for looking at this. Could you elaborate on the misbehavior in the 
accumulator? I'd like to fix it if it's incorrect.

Dave


> On January 20, 2017 at 4:29 AM Stephan Ewen  wrote:
> 
> Hi!
> 
> It seems that the accumulator behaves in a non-standard way, but the 
> JobManager should also catch that (log a warning or debug message) and simply 
> continue (not crash).
> 
> I'll try to add a patch that the JobManager tolerates these kinds of 
> issues in the accumulators.
> 
> Stephan
> 
> 
> On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion  mailto:dlmar...@comcast.net > wrote:
> 
> > > 
> > Noticed I didn't cc the user list.
> > 
> > >  Original Message --
> From: Dave Marion  mailto:dlmar...@comcast.net >
> To: Ted Yu 
> Date: January 19, 2017 at 12:13 PM
> Subject: Re: NPE in JobManager
> 
> 
> That might take some time. Here is a hand typed top N lines. If 
> that is not enough let me know and I will start the process of getting the 
> full stack trace.
> 
> 
> NullPointerException
> 
> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
> 
> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
> 
> at 
> scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)
> 
> at 
> scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
> 
> at org.apache.flink.runtime.jobmanager.JobManager.org 
> http://org.apache.flink.runtime.jobmanager.JobManager.org 
> $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)
> 
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
> 
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
> at 
> org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
> 
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 
> at 
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 
> at 
> org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28)
> 
> 
> > > On January 19, 2017 at 11:58 AM Ted Yu 
>  wrote:
> > 
> > Can you pastebin the complete stack trace for the NPE ?
> > 
> > Thanks
> > 
> > On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion 
> >  wrote:
> > 
> > > > > 
> > > I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and 
> > > I'm running into an issue where after some period of time (measured in 1 
> > > - 3 hours) the JobManager gets an NPE and shuts itself down. The failure 
> > > is at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm 
> > > using a custom accumulator[1], but can't tell from the JobManager code 
> > > whether the issue is in my Accumulator, or is a bug in the JobManager.
> > > 
> > > 
> > > [1] 
> > > https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
> > >  
> > > https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
> > > 
> > > > > 
> > 
> > > 
> 


 


Re: Streaming file source?

2017-01-20 Thread Niels Basjes
Thanks!

This sounds really close to what I had in mind.
I'll use this first and see how far I get.

Niels

On Fri, Jan 20, 2017 at 11:27 AM, Stephan Ewen  wrote:

> Hi Niels!
>
> There is the Continuous File Monitoring Source, used via
>
> StreamExecutionEnvironment.readFile(FileInputFormat
> inputFormat, String filePath, FileProcessingMode watchType, long interval);
>
> This can be used to both continuously ingest from files, or to read files
> once.
>
> Kostas can probably comment more about whether and how you can make the
> file order deterministic.
>
> Stephan
>
>
> On Fri, Jan 20, 2017 at 11:20 AM, Niels Basjes  wrote:
>
>> Hi,
>>
>> For testing and optimizing a streaming application I want to have a "100%
>> accurate repeatable" substitute for a Kafka source.
>> I was thinking of creating a streaming source class that simply reads the
>> records from a (static unchanging) set of files.
>> Each file would then produce the data which (in the live situation) come
>> from a single Kafka partition.
>>
>> I hate reinventing the wheel so I'm wondering is something like this
>> already been built by someone?
>> If so, where can I find it?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: keyBy called twice. Second time, INetAddress and Array[Byte] are empty

2017-01-20 Thread Jonas
Hey Jamie,

It turns out you were right :) I wrote my own implementation of IPAddress
and then it worked.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-called-twice-Second-time-INetAddress-and-Array-Byte-are-empty-tp10907p11179.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Rate-limit processing

2017-01-20 Thread Yassine MARZOUGUI
Hi,

You might find this similar thread from the mailing list archive helpful :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/throttled-stream-td6138.html
.

Best,
Yassine

2017-01-20 10:53 GMT+01:00 Florian König :

> Hi,
>
> i need to limit the rate of processing in a Flink stream application.
> Specifically, the number of items processed in a .map() operation has to
> stay under a certain maximum per second.
>
> At the moment, I have another .map() operation before the actual
> processing, which just sleeps for a certain time (e.g., 250ms for a limit
> of 4 requests / sec) and returns the item unchanged:
>
> …
>
> public T map(final T value) throws Exception {
> Thread.sleep(delay);
> return value;
> }
>
> …
>
> This works as expected, but is a rather crude approach. Checkpointing the
> job takes a very long time: minutes for a state of a few kB, which for
> other jobs is done in a few milliseconds. I assume that letting the whole
> thread sleep for most of the time interferes with the checkpointing - not
> good!
>
> Would using a different synchronization mechanism (e.g.,
> https://google.github.io/guava/releases/19.0/api/docs/
> index.html?com/google/common/util/concurrent/RateLimiter.html) help to
> make checkpointing work better?
>
> Or, preferably, is there a mechanism inside Flink that I can use to
> accomplish the desired rate limiting? I haven’t found anything in the docs.
>
> Cheers,
> Florian
>


Re: Streaming file source?

2017-01-20 Thread Stephan Ewen
Hi Niels!

There is the Continuous File Monitoring Source, used via

StreamExecutionEnvironment.readFile(FileInputFormat
inputFormat, String filePath, FileProcessingMode watchType, long interval);

This can be used to both continuously ingest from files, or to read files
once.

Kostas can probably comment more about whether and how you can make the
file order deterministic.

Stephan


On Fri, Jan 20, 2017 at 11:20 AM, Niels Basjes  wrote:

> Hi,
>
> For testing and optimizing a streaming application I want to have a "100%
> accurate repeatable" substitute for a Kafka source.
> I was thinking of creating a streaming source class that simply reads the
> records from a (static unchanging) set of files.
> Each file would then produce the data which (in the live situation) come
> from a single Kafka partition.
>
> I hate reinventing the wheel so I'm wondering is something like this
> already been built by someone?
> If so, where can I find it?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Streaming file source?

2017-01-20 Thread Niels Basjes
Hi,

For testing and optimizing a streaming application I want to have a "100%
accurate repeatable" substitute for a Kafka source.
I was thinking of creating a streaming source class that simply reads the
records from a (static unchanging) set of files.
Each file would then produce the data which (in the live situation) come
from a single Kafka partition.

I hate reinventing the wheel so I'm wondering is something like this
already been built by someone?
If so, where can I find it?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Re: NPE in JobManager

2017-01-20 Thread Stephan Ewen
I opened this issue: https://issues.apache.org/jira/browse/FLINK-5585

Assuming the bug is what I think it is (cannot be 100% sure from just the
small stack trace sample) it should be fixed soon...

On Fri, Jan 20, 2017 at 10:29 AM, Stephan Ewen  wrote:

> Hi!
>
> It seems that the accumulator behaves in a non-standard way, but the
> JobManager should also catch that (log a warning or debug message) and
> simply continue (not crash).
>
> I'll try to add a patch that the JobManager tolerates these kinds of
> issues in the accumulators.
>
> Stephan
>
>
> On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion  wrote:
>
>> Noticed I didn't cc the user list.
>>
>> -- Original Message --
>> From: Dave Marion 
>> To: Ted Yu 
>> Date: January 19, 2017 at 12:13 PM
>> Subject: Re: NPE in JobManager
>>
>> That might take some time. Here is a hand typed top N lines. If that is
>> not enough let me know and I will start the process of getting the full
>> stack trace.
>>
>>
>> NullPointerException
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
>>
>> at scala.collection.mutable.ResizableArray$class.forEach(ArrayB
>> uffer.scala:48)
>>
>> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$
>> flink$runtime$jobmanager$JobManager$$updateAccumulators
>> (JobManager.scala:1788)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1.applyOrElse(JobManager.scala:967)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun
>> $receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(
>> LogMessages.scala:28)
>>
>>
>> On January 19, 2017 at 11:58 AM Ted Yu  wrote:
>>
>> Can you pastebin the complete stack trace for the NPE ?
>>
>> Thanks
>>
>> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion 
>> wrote:
>>
>> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an
>> issue where after some period of time (measured in 1 - 3 hours) the
>> JobManager gets an NPE and shuts itself down. The failure is at
>> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using
>> a custom accumulator[1], but can't tell from the JobManager code whether
>> the issue is in my Accumulator, or is a bug in the JobManager.
>>
>>
>> [1] https://github.com/NationalSecurityAgency/timely/blob/master
>> /analytics/src/main/java/timely/analytics/flink/SortedS
>> tringAccumulator.java
>>
>>
>>
>


Rate-limit processing

2017-01-20 Thread Florian König
Hi,

i need to limit the rate of processing in a Flink stream application. 
Specifically, the number of items processed in a .map() operation has to stay 
under a certain maximum per second.

At the moment, I have another .map() operation before the actual processing, 
which just sleeps for a certain time (e.g., 250ms for a limit of 4 requests / 
sec) and returns the item unchanged:

…

public T map(final T value) throws Exception {
Thread.sleep(delay);
return value;
}

…

This works as expected, but is a rather crude approach. Checkpointing the job 
takes a very long time: minutes for a state of a few kB, which for other jobs 
is done in a few milliseconds. I assume that letting the whole thread sleep for 
most of the time interferes with the checkpointing - not good!

Would using a different synchronization mechanism (e.g., 
https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)
 help to make checkpointing work better?

Or, preferably, is there a mechanism inside Flink that I can use to accomplish 
the desired rate limiting? I haven’t found anything in the docs.

Cheers,
Florian


Re: Cluster failure after zookeeper glitch.

2017-01-20 Thread Stefan Richter
I would think that network problems between Flink and Zookeeper in HA mode 
could indeed lead to problems. Maybe Till (in CC) has a better idea of what is 
going on there).

> Am 19.01.2017 um 14:55 schrieb Andrew Ge Wu :
> 
> Hi Stefan
> 
> Yes we are running in HA mode with dedicated zookeeper cluster. As far as I 
> can see it looks like a networking issue with zookeeper cluster.
> 2 out of 5 zookeeper reported something around the same time:
> 
> server1
> 2017-01-19 11:52:13,044 [myid:1] - WARN  
> [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Follower@89] - Exception when 
> following the leader
> java.net.SocketTimeoutException: Read timed out
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.read(SocketInputStream.java:150)
> at java.net.SocketInputStream.read(SocketInputStream.java:121)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
> at java.io.DataInputStream.readInt(DataInputStream.java:387)
> at 
> org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:63)
> at 
> org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:83)
> at 
> org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:103)
> at 
> org.apache.zookeeper.server.quorum.Learner.readPacket(Learner.java:153)
> at 
> org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:85)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:786)
> 2017-01-19 11:52:13,045 [myid:1] - INFO  
> [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:Follower@166] - shutdown called
> java.lang.Exception: shutdown Follower
> at 
> org.apache.zookeeper.server.quorum.Follower.shutdown(Follower.java:166)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:790)
> 2017-01-19 11:52:13,045 [myid:1] - INFO  
> [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:NIOServerCnxn@1007] - Closed socket 
> connection for client /172.27.163.227:51800 which had sessionid 
> 0x159b505820a0009
> 2017-01-19 11:52:13,046 [myid:1] - INFO  
> [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:NIOServerCnxn@1007] - Closed socket 
> connection for client /172.27.163.227:51798 which had sessionid 
> 0x159b505820a0008
> 2017-01-19 11:52:13,046 [myid:1] - INFO  
> [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:NIOServerCnxn@1007] - Closed socket 
> connection for client /0:0:0:0:0:0:0:1:46891 which had sessionid 
> 0x1537b32bbe100ad
> 2017-01-19 11:52:13,046 [myid:1] - INFO  
> [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:NIOServerCnxn@1007] - Closed socket 
> connection for client /172.27.165.64:50075 which had sessionid 
> 0x159b505820a000d
> 2017-01-19 11:52:13,046 [myid:1] - INFO  
> [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:FollowerZooKeeperServer@139] - 
> Shutting down
> 2017-01-19 11:52:13,046 [myid:1] - INFO  
> [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:ZooKeeperServer@441] - shutting down
> 
> 
> server2
> 2017-01-19 11:52:13,061 [myid:2] - INFO  
> [WorkerReceiver[myid=2]:FastLeaderElection@597] - Notification: 1 (message 
> format version), 1 (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING 
> (n.state), 1 (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
> 2017-01-19 11:52:13,082 [myid:2] - INFO  
> [WorkerReceiver[myid=2]:FastLeaderElection@597] - Notification: 1 (message 
> format version), 4 (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING 
> (n.state), 4 (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
> 2017-01-19 11:52:13,083 [myid:2] - INFO  
> [WorkerReceiver[myid=2]:FastLeaderElection@597] - Notification: 1 (message 
> format version), 4 (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING 
> (n.state), 1 (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
> 2017-01-19 11:52:13,284 [myid:2] - INFO  
> [WorkerReceiver[myid=2]:FastLeaderElection@597] - Notification: 1 (message 
> format version), 4 (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING 
> (n.state), 4 (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
> 2017-01-19 11:52:13,285 [myid:2] - INFO  
> [WorkerReceiver[myid=2]:FastLeaderElection@597] - Notification: 1 (message 
> format version), 4 (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING 
> (n.state), 1 (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
> 2017-01-19 11:52:13,310 [myid:2] - INFO  
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - 
> Accepted socket connection from /172.27.163.227:39302
> 2017-01-19 11:52:13,311 [myid:2] - INFO  
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@861] - Client 
> attempting to renew session 0x159b505820a0009 at /172.27.163.227:39302
> 2017-01-19 11:52:13,312 [myid:2] - INFO  
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:Learner@108] - Revalidating 
> client: 0x159b505820a0009
> 2017-01-19 11:52:13,687 [myid:2] - INFO  
> 

Re: Re: NPE in JobManager

2017-01-20 Thread Stephan Ewen
Hi!

It seems that the accumulator behaves in a non-standard way, but the
JobManager should also catch that (log a warning or debug message) and
simply continue (not crash).

I'll try to add a patch that the JobManager tolerates these kinds of issues
in the accumulators.

Stephan


On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion  wrote:

> Noticed I didn't cc the user list.
>
> -- Original Message --
> From: Dave Marion 
> To: Ted Yu 
> Date: January 19, 2017 at 12:13 PM
> Subject: Re: NPE in JobManager
>
> That might take some time. Here is a hand typed top N lines. If that is
> not enough let me know and I will start the process of getting the full
> stack trace.
>
>
> NullPointerException
>
> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
>
> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
>
> at scala.collection.mutable.ResizableArray$class.forEach(
> ArrayBuffer.scala:48)
>
> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.
> scala:1788)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
>
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>
> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.
> applyOrEslse(LeaderSessionMessageFilter.scala:44)
>
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at org.apache.flink.runtime.LogMesages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>
> On January 19, 2017 at 11:58 AM Ted Yu  wrote:
>
> Can you pastebin the complete stack trace for the NPE ?
>
> Thanks
>
> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion  wrote:
>
> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an
> issue where after some period of time (measured in 1 - 3 hours) the
> JobManager gets an NPE and shuts itself down. The failure is at
> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using
> a custom accumulator[1], but can't tell from the JobManager code whether
> the issue is in my Accumulator, or is a bug in the JobManager.
>
>
> [1] https://github.com/NationalSecurityAgency/timely/blob/
> master/analytics/src/main/java/timely/analytics/flink/
> SortedStringAccumulator.java
>
>
>