Re: Resource under-utilization when using RocksDb state backend [SOLVED]

2016-12-08 Thread Aljoscha Krettek
Great to hear!

On Fri, 9 Dec 2016 at 01:02 Cliff Resnick  wrote:

> It turns out that most of the time in RocksDBFoldingState was spent on
> serialization/deserializaton. RocksDb read/write was performing well. By
> moving from Kryo to custom serialization we were able to increase
> throughput dramatically. Load is now where it should be.
>
> On Mon, Dec 5, 2016 at 1:15 PM, Robert Metzger 
> wrote:
>
> Another Flink user using RocksDB with large state on SSDs recently posted
> this video for oprimizing the performance of Rocks on SSDs:
> https://www.youtube.com/watch?v=pvUqbIeoPzM
> That could be relevant for you.
>
> For how long did you look at iotop. It could be that the IO access happens
> in bursts, depending on how data is cached.
>
> I'll also add Stefan Richter to the conversation, he has maybe some more
> ideas what we can do here.
>
>
> On Mon, Dec 5, 2016 at 6:19 PM, Cliff Resnick  wrote:
>
> Hi Robert,
>
> We're following 1.2-SNAPSHOT,  using event time. I have tried "iotop" and
> I see usually less than 1 % IO. The most I've seen was a quick flash here
> or there of something substantial (e.g. 19%, 52%) then back to nothing. I
> also assumed we were disk-bound, but to use your metaphor I'm having
> trouble finding any smoke. However, I'm not very experienced in sussing out
> IO issues so perhaps there is something else I'm missing.
>
> I'll keep investigating. If I continue to come up empty then I guess my
> next steps may be to stage some independent tests directly against RocksDb.
>
> -Cliff
>
>
> On Mon, Dec 5, 2016 at 5:52 AM, Robert Metzger 
> wrote:
>
> Hi Cliff,
>
> which Flink version are you using?
> Are you using Eventtime or processing time windows?
>
> I suspect that your disks are "burning" (= your job is IO bound). Can you
> check with a tool like "iotop" how much disk IO Flink is producing?
> Then, I would set this number in relation with the theoretical maximum of
> your SSD's (a good rough estimate is to use dd for that).
>
> If you find that your disk bandwidth is saturated by Flink, you could look
> into tuning the RocksDB settings so that it uses more memory for caching.
>
> Regards,
> Robert
>
>
> On Fri, Dec 2, 2016 at 11:34 PM, Cliff Resnick  wrote:
>
> In tests comparing RocksDb to fs state backend we observe much lower
> throughput, around 10x slower. While the lowered throughput is expected,
> what's perplexing is that machine load is also very low with RocksDb,
> typically falling to  < 25% CPU and negligible IO wait (around 0.1%). Our
> test instances are EC2 c3.xlarge which are 4 virtual CPUs and 7.5G RAM,
> each running a single TaskManager in YARN, with 6.5G allocated memory per
> TaskManager. The instances also have 2x40G attached SSDs which we have
> mapped to `taskmanager.tmp.dir`.
>
> With FS state and 4 slots per TM, we will easily max out with an average
> load average around 5 or 6, so we actually need throttle down the slots to
> 3. With RocksDb using the Flink SSD configured options we see a load
> average at around 1. Also, load (and actual) throughput remain more or less
> constant no matter how many slots we use. The weak load is spread over all
> CPUs.
>
> Here is a sample top:
>
> Cpu0  : 20.5%us,  0.0%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>  0.0%st
> Cpu1  : 18.5%us,  0.0%sy,  0.0%ni, 81.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>  0.0%st
> Cpu2  : 11.6%us,  0.7%sy,  0.0%ni, 87.0%id,  0.7%wa,  0.0%hi,  0.0%si,
>  0.0%st
> Cpu3  : 12.5%us,  0.3%sy,  0.0%ni, 86.8%id,  0.0%wa,  0.0%hi,  0.3%si,
>  0.0%st
>
> Our pipeline uses tumbling windows, each with a ValueState keyed to a
> 3-tuple of one string and two ints.. Each ValueState comprises a small set
> of tuples around 5-7 fields each. The WindowFunction simply diffs agains
> the set and updates state if there is a diff.
>
> Any ideas as to what the bottleneck is here? Any suggestions welcomed!
>
> -Cliff
>
>
>
>
>
>
>
>
>
>
>


Re: Parallelism and stateful mapping with Flink

2016-12-08 Thread Aljoscha Krettek
I commented on the issue with a way that should work.

On Fri, 9 Dec 2016 at 01:00 Chesnay Schepler  wrote:

> Done. https://issues.apache.org/jira/browse/FLINK-5299
>
> On 08.12.2016 16:50, Ufuk Celebi wrote:
> > Would you like to open an issue for this for starters Chesnay? Would be
> good to fix for the upcoming release even.
> >
> >
> > On 8 December 2016 at 16:39:58, Chesnay Schepler (ches...@apache.org)
> wrote:
> >> It would be neat if we could support arrays as keys directly; it should
> >> boil down to checking the key type and in case of an array injecting a
> >> KeySelector that calls Arrays.hashCode(array).
> >> This worked for me when i ran into the same issue while experimenting
> >> with some stuff.
> >>
> >> The batch API can use arrays as keys as well, so it's also a matter of
> >> consistency imo.
> >>
> >> Regards,
> >> Chesnay
> >>
> >> On 08.12.2016 16:23, Ufuk Celebi wrote:
> >>> @Aljoscha: I remember that someone else ran into this, too. Should we
> address arrays
> >> as keys specifically in the API? Prohibit? Document this?
> >>> – Ufuk
> >>>
> >>> On 7 December 2016 at 17:41:40, Andrew Roberts (arobe...@fuze.com)
> wrote:
>  Sure!
> 
>  (Aside, it turns out that the issue was using an `Array[Byte]` as a
> key - byte arrays
> >> don’t
>  appear to have a stable hashCode. I’ll provide the skeleton for
> fullness, though.)
> 
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  env.setParallelism(Config.callAggregator.parallelism)
> 
>  env.addSource(kafkaSource)
>  .flatMap(transformToRecords(_))
>  .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
>  .map(new StatefulAggregator())
>  .addSink(hbaseSink)
> 
> 
>  Again, wrapping my keyBy function in `new String()` has fixed my
> issue. Thanks!
> 
>  -a
> 
> 
> 
> > On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
> >
> > Hi,
> >
> > could you maybe provide the (minimal) code for the problematic job?
> Also, are you
> >> sure
>  that the keyBy is working on the correct key attribute?
> > Best,
> > Stefan
> >
> >> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
> >>
> >> Hello,
> >>
> >> I’m trying to perform a stateful mapping of some objects coming in
> from Kafka in a
> >> parallelized
>  flink job (set on the job using env.setParallelism(3)). The data
> source is a kafka
> >> topic,
>  but the partitions aren’t meaningfully keyed for this operation (each
> kafka message
>  is flatMapped to between 0-2 objects, with potentially different
> keys). I have a keyBy()
>  operator directly before my map(), but I’m seeing objects with the
> same key distributed
>  to different parallel task instances, as reported by
> getRuntimeContext().getIndexOfThisSubtask().
> >> My understanding of keyBy is that it would segment the stream by
> key, and guarantee
>  that all data with a given key would hit the same instance. Am I
> possibly seeing residual
>  “keying” from the kafka topic?
> >> I’m running flink 1.1.3 in scala. Please let me know if I can add
> more info.
> >>
> >> Thanks,
> >>
> >> Andrew
> 
> >>
> >>
> >
>
>


Re: separation of JVMs for different applications

2016-12-08 Thread Manu Zhang
If there are not any existing jira for standalone v2.0, may I open a new
one ?

Thanks,
Manu

On Wed, Dec 7, 2016 at 12:39 PM Manu Zhang  wrote:

> Good to know that.
>
> Is it the "standalone setup v2.0" section ? The wiki page has no
> Google-Doc-like change histories.
> Any jiras opened for that ? Not sure that will be noticed given FLIP-6 is
> almost finished.
>
> Thanks,
> Manu
>
> On Tue, Dec 6, 2016 at 11:55 PM Stephan Ewen  wrote:
>
> Hi!
>
> We are currently changing the resource and process model quite a bit:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> As part of that, I think it makes sense to introduce something like that.
>
> What you can do today is to set TaskManagers to use one slot only, and
> then start multiple TaskManagers per machine. That makes sure that JVMs are
> never shared across machines.
> If you use the "start-cluster.sh" script from Flink, you can enter the
> same hostname multiple times in the workers file, and it will start
> multiple TaskManagers on a machine.
>
> Best,
> Stephan
>
>
>
> On Tue, Dec 6, 2016 at 3:51 AM, Manu Zhang 
> wrote:
>
> Thanks Stephan,
>
> They don't use YARN now but I think they will consider it.  Do you think
> it would be beneficial to provide such an option as "separate-jvm" in
> stand-alone mode for streaming processor and long running services ? Or do
> you think it would introduce too much complexity ?
>
> Manu
>
> On Tue, Dec 6, 2016 at 1:04 AM Stephan Ewen  wrote:
>
> Hi!
>
> Are your customers using YARN? In that case, the default configuration
> will start a new YARN application per Flink job, no JVMs are shared between
> jobs. By default, even each slot has its own JVM.
>
> Greetings,
> Stephan
>
> PS: I think the "spawning new JVMs" is what Till referred to when saying
> "spinning up a new cluster". Keep in mind that Flink is also a batch
> processor, and it handles sequences of short batch jobs (as issued for
> example by interactive shells) and it pre-allocates and manages a lot of
> memory for batch jobs.
>
>
>
> On Mon, Dec 5, 2016 at 3:48 PM, Manu Zhang 
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job.
>
>
> I don't think we have to spin up a new cluster for each job if every job
> gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
> new job when free slots are available. How can we share data between jobs
> and why ?
>
>
>
> On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann 
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job. This
> might be helpful for scenarios where you want to run many short-lived and
> light-weight jobs.
>
> But the important part is that you don't have to use this method. You can
> also start a new Flink cluster per job which will then execute the job
> isolated from any other jobs (given that you don't submit other jobs to
> this cluster).
>
> Cheers,
> Till
>
> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang 
> wrote:
>
> Thanks Fabian and Till.
>
> We have customers who are interested in using Flink but very concerned
> about that "multiple jobs share the same set of TMs". I've just joined the
> community recently so I'm not sure whether there has been a discussion over
> the "multi-tenant cluster mode" before.
>
> The cons are one job/user's failure may crash another, which is
> unacceptable in a multi-tenant scenario.
> What are the pros ? Do the pros overweigh the cons ?
>
> Manu
>
> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann  wrote:
>
> Hi Manu,
>
> with Flip-6 we will be able to support stricter application isolation by
> starting for each job a dedicated JobManager which will execute its tasks
> on TM reserved solely for this job. But at the same time we will continue
> supporting the multi-tenant cluster mode where tasks belonging to multiple
> jobs share the same set of TMs and, thus, might share information between
> them.
>
> Cheers,
> Till
>
> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske  wrote:
>
> Hi Manu,
>
> As far as I know, there are not plans to change the stand-alone deployment.
> FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
> etc.) which allow to start Flink processes per job.
>
> Till (in CC) is more familiar with the FLIP-6 effort and might be able to
> add more detail.
>
> Best,
> Fabian
>
> 2016-12-01 4:16 GMT+01:00 Manu Zhang :
>
> Hi all,
>
> It seems tasks of different Flink applications can end up in the same JVM
> (TaskManager) in standalone mode. Isn't this fragile since errors in one
> application could crash another ? I checked FLIP-6
> 

Re: Serializers and Schemas

2016-12-08 Thread Matt
Hi people,

This is what I was talking about regarding a generic de/serializer for POJO
classes [1].

The Serde class in [2] can be used in both Kafka [3] and Flink [4], and it
works out of the box for any POJO class.

Do you see anything wrong in this approach? Any way to improve it?

Cheers,
Matt

[1] https://github.com/Dromit/StreamTest/
[2]
https://github.com/Dromit/StreamTest/blob/master/src/main/java/com/stream/Serde.java
[3]
https://github.com/Dromit/StreamTest/blob/master/src/main/java/com/stream/MainProducer.java#L19
[4]
https://github.com/Dromit/StreamTest/blob/master/src/main/java/com/stream/MainConsumer.java#L19



On Thu, Dec 8, 2016 at 4:15 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Matt,
>
> 1. There’s some in-progress work on wrapper util classes for Kafka
> de/serializers here [1] that allows
> Kafka de/serializers to be used with the Flink Kafka Consumers/Producers
> with minimal user overhead.
> The PR also has some proposed adds to the documentations for the wrappers.
>
> 2. I feel that it would be good to have more documentation on Flink’s
> de/serializers because they’ve been
> frequently asked about on the mailing lists, but at the same time,
> probably the fastest / efficient de/serialization
> approach would be tailored for each use case, so we’d need to think more
> on the presentation and the purpose
> of the documentation.
>
> Cheers,
> Gordon
>
> [1] https://github.com/apache/flink/pull/2705
>
> On December 8, 2016 at 5:00:19 AM, milind parikh (milindspar...@gmail.com)
> wrote:
>
> Why not use a self-describing format  (json), stream as String and read
> through a json reader and avoid top-level reflection?
>
> Github.com/milindparikh/streamingsi
>
> https://github.com/milindparikh/streamingsi/tree/master/epic-poc/sprint-2-
> simulated-data-no-cdc-advanced-eventing/2-dataprocessing
>
> ?
>
> Apologies if I misunderstood the question. But I can quite see how to
> model your Product class (or indeed POJO) in a fairly generic way ( assumes
> JSON).
>
> The real issues faced when you have different versions of same POJO class
> requires storing enough information to dynamically instantiate the actual
> version of the class; which I believe is beyond the simple use case.
>
> Milind
> On Dec 7, 2016 2:42 PM, "Matt"  wrote:
>
>> I've read your example, but I've found the same problem. You're
>> serializing your POJO as a string, where all fields are separated by "\t".
>> This may work for you, but not in general.
>>
>> https://github.com/luigiselmi/pilot-sc4-fcd-producer/blob/ma
>> ster/src/main/java/eu/bde/sc4pilot/fcd/FcdTaxiEvent.java#L60
>>
>> I would like to see a more "generic" approach for the class Product in my
>> last message. I believe a more general purpose de/serializer for POJOs
>> should be possible to achieve using reflection.
>>
>> On Wed, Dec 7, 2016 at 1:16 PM, Luigi Selmi  wrote:
>>
>>> Hi Matt,
>>>
>>> I had the same problem, trying to read some records in event time using
>>> a POJO, doing some transformation and save the result into Kafka for
>>> further processing. I am not yet done but maybe the code I wrote starting
>>> from the Flink Forward 2016 training docs
>>> 
>>> could be useful.
>>>
>>> https://github.com/luigiselmi/pilot-sc4-fcd-producer
>>>
>>>
>>> Best,
>>>
>>> Luigi
>>>
>>> On 7 December 2016 at 16:35, Matt  wrote:
>>>
 Hello,

 I don't quite understand how to integrate Kafka and Flink, after a lot
 of thoughts and hours of reading I feel I'm still missing something
 important.

 So far I haven't found a non-trivial but simple example of a stream of
 a custom class (POJO). It would be good to have such an example in Flink
 docs, I can think of many many scenarios in which using SimpleStringSchema
 is not an option, but all Kafka+Flink guides insist on using that.

 Maybe we can add a simple example to the documentation [1], it would be
 really helpful for many of us. Also, explaining how to create a Flink
 De/SerializationSchema from a Kafka De/Serializer would be really useful
 and would save a lot of time to a lot of people, it's not clear why you
 need both of them or if you need both of them.

 As far as I know Avro is a common choice for serialization, but I've
 read Kryo's performance is much better (true?). I guess though that the
 fastest serialization approach is writing your own de/serializer.

 1. What do you think about adding some thoughts on this to the
 documentation?
 2. Can anyone provide an example for the following class?

 ---
 public class Product {
 public String code;
 public double price;
 public String description;
 public long created;
 }
 ---

 Regards,
 Matt

 [1] 

Re: dataartisans flink training maven build error

2016-12-08 Thread Conny Gu
Hi all,

I think, I found the solution, I just used a new version of Ubuntu and
install the latest version of maven, then everything is settled down.

best regards,
Conny

2016-12-08 17:51 GMT+01:00 Conny Gu [via Apache Flink User Mailing List
archive.] :

> Hi all,
>
> I try to use the follow the dataartisans github training tutorial,
> http://dataartisans.github.io/flink-training/devEnvSetup.html
>
> but I got errors at the setup the DevelopmentEnvironment
>
>
> The errors happend at the step:
>
> "Clone and build the flink-training-exercises project"
>
> git clone https://github.com/dataArtisans/flink-training-exercises.git
> cd flink-training-exercises
> mvn clean install
>
> after running "mvn clean install", then
>
> 
> 
> [INFO] Scanning for projects...
> [INFO]
>
> [INFO] 
> 
> [INFO] Building Apache Flink Training Exercises 0.6
> [INFO] 
> 
> [INFO]
> [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @
> flink-training-exercises ---
> [INFO]
> [INFO] --- maven-checkstyle-plugin:2.12.1:check (validate) @
> flink-training-exercises ---
> [INFO]
> [INFO]
> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @
> flink-training-exercises ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] skip non existing resourceDirectory /home/flink/workspace-neo/
> flink-training-exercises/src/main/resources
> [INFO]
> [INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @
> flink-training-exercises ---
> [WARNING]  Expected all dependencies to require Scala version: 2.10.4
> [WARNING]  org.scala-lang:scala-reflect:2.10.4 requires scala version:
> 2.10.4
> [WARNING]  org.apache.flink:flink-scala_2.10:1.1.3 requires scala
> version: 2.10.4
> [WARNING]  org.apache.flink:flink-scala_2.10:1.1.3 requires scala
> version: 2.10.4
> [WARNING]  org.scala-lang:scala-compiler:2.10.4 requires scala version:
> 2.10.4
> [WARNING]  org.scalamacros:quasiquotes_2.10:2.0.1 requires scala version:
> 2.10.4
> [WARNING]  org.apache.flink:flink-runtime_2.10:1.1.3 requires scala
> version: 2.10.4
> [WARNING]  com.typesafe.akka:akka-actor_2.10:2.3.7 requires scala
> version: 2.10.4
> [WARNING]  com.typesafe.akka:akka-remote_2.10:2.3.7 requires scala
> version: 2.10.4
> [WARNING]  com.typesafe.akka:akka-slf4j_2.10:2.3.7 requires scala
> version: 2.10.4
> [WARNING]  org.clapper:grizzled-slf4j_2.10:1.0.2 requires scala version:
> 2.10.3
> [WARNING] Multiple versions of scala libraries detected!
> [INFO] /home/flink/workspace-neo/flink-training-exercises/src/main/java:-1:
> info: compiling
> [INFO] /home/flink/workspace-neo/flink-training-exercises/src/main/scala:-1:
> info: compiling
> [INFO] Compiling 32 source files to /home/flink/workspace-neo/
> flink-training-exercises/target/classes at 1481215394963
> [ERROR] error: error while loading , invalid END header (bad central
> directory offset)
> [ERROR] error: scala.reflect.internal.MissingRequirementError: object
> scala.runtime in compiler mirror not found.
> [ERROR] at scala.reflect.internal.MissingRequirementError$.signal(
> MissingRequirementError.scala:16)
> [ERROR] at scala.reflect.internal.MissingRequirementError$.notFound(
> MissingRequirementError.scala:17)
> [ERROR] at scala.reflect.internal.Mirrors$RootsBase.
> getModuleOrClass(Mirrors.scala:48)
> [ERROR] at scala.reflect.internal.Mirrors$RootsBase.
> getModuleOrClass(Mirrors.scala:40)
> [ERROR] at scala.reflect.internal.Mirrors$RootsBase.
> getModuleOrClass(Mirrors.scala:61)
> [ERROR] at scala.reflect.internal.Mirrors$RootsBase.getPackage(
> Mirrors.scala:172)
> [ERROR] at scala.reflect.internal.Mirrors$RootsBase.
> getRequiredPackage(Mirrors.scala:175)
> [ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.
> RuntimePackage$lzycompute(Definitions.scala:183)
> [ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.
> RuntimePackage(Definitions.scala:183)
> [ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.
> RuntimePackageClass$lzycompute(Definitions.scala:184)
> [ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.
> RuntimePackageClass(Definitions.scala:184)
> [ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.
> AnnotationDefaultAttr$lzycompute(Definitions.scala:1024)
> [ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.
> AnnotationDefaultAttr(Definitions.scala:1023)
> [ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.
> syntheticCoreClasses$lzycompute(Definitions.scala:1153)
> [ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.
> syntheticCoreClasses(Definitions.scala:1152)
> [ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.
> 

dataartisans flink training maven build error

2016-12-08 Thread Conny Gu
Hi all,

I try to use the follow the dataartisans github training tutorial,
http://dataartisans.github.io/flink-training/devEnvSetup.html

but I got errors at the setup the DevelopmentEnvironment


The errors happend at the step:

"Clone and build the flink-training-exercises project"

git clone https://github.com/dataArtisans/flink-training-exercises.git
cd flink-training-exercises
mvn clean install

after running "mvn clean install", then


[INFO] Scanning for projects...
[INFO]

[INFO]

[INFO] Building Apache Flink Training Exercises 0.6
[INFO]

[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @
flink-training-exercises ---
[INFO]
[INFO] --- maven-checkstyle-plugin:2.12.1:check (validate) @
flink-training-exercises ---
[INFO]
[INFO]
[INFO] --- maven-resources-plugin:2.3:resources (default-resources) @
flink-training-exercises ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory
/home/flink/workspace-neo/flink-training-exercises/src/main/resources
[INFO]
[INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @
flink-training-exercises ---
[WARNING]  Expected all dependencies to require Scala version: 2.10.4
[WARNING]  org.scala-lang:scala-reflect:2.10.4 requires scala version:
2.10.4
[WARNING]  org.apache.flink:flink-scala_2.10:1.1.3 requires scala version:
2.10.4
[WARNING]  org.apache.flink:flink-scala_2.10:1.1.3 requires scala version:
2.10.4
[WARNING]  org.scala-lang:scala-compiler:2.10.4 requires scala version:
2.10.4
[WARNING]  org.scalamacros:quasiquotes_2.10:2.0.1 requires scala version:
2.10.4
[WARNING]  org.apache.flink:flink-runtime_2.10:1.1.3 requires scala
version: 2.10.4
[WARNING]  com.typesafe.akka:akka-actor_2.10:2.3.7 requires scala version:
2.10.4
[WARNING]  com.typesafe.akka:akka-remote_2.10:2.3.7 requires scala version:
2.10.4
[WARNING]  com.typesafe.akka:akka-slf4j_2.10:2.3.7 requires scala version:
2.10.4
[WARNING]  org.clapper:grizzled-slf4j_2.10:1.0.2 requires scala version:
2.10.3
[WARNING] Multiple versions of scala libraries detected!
[INFO] /home/flink/workspace-neo/flink-training-exercises/src/main/java:-1:
info: compiling
[INFO]
/home/flink/workspace-neo/flink-training-exercises/src/main/scala:-1: info:
compiling
[INFO] Compiling 32 source files to
/home/flink/workspace-neo/flink-training-exercises/target/classes at
1481215394963
[ERROR] error: error while loading , invalid END header (bad central
directory offset)
[ERROR] error: scala.reflect.internal.MissingRequirementError: object
scala.runtime in compiler mirror not found.
[ERROR] at
scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
[ERROR] at
scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
[ERROR] at
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
[ERROR] at
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
[ERROR] at
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
[ERROR] at
scala.reflect.internal.Mirrors$RootsBase.getPackage(Mirrors.scala:172)
[ERROR] at
scala.reflect.internal.Mirrors$RootsBase.getRequiredPackage(Mirrors.scala:175)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage$lzycompute(Definitions.scala:183)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage(Definitions.scala:183)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass$lzycompute(Definitions.scala:184)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass(Definitions.scala:184)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr$lzycompute(Definitions.scala:1024)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr(Definitions.scala:1023)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreClasses$lzycompute(Definitions.scala:1153)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreClasses(Definitions.scala:1152)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode$lzycompute(Definitions.scala:1196)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode(Definitions.scala:1196)
[ERROR] at
scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1261)
[ERROR] at scala.tools.nsc.Global$Run.(Global.scala:1290)
[ERROR] at scala.tools.nsc.Driver.doCompile(Driver.scala:32)
[ERROR] at scala.tools.nsc.Main$.doCompile(Main.scala:79)
[ERROR] at scala.tools.nsc.Driver.process(Driver.scala:54)
[ERROR] at scala.tools.nsc.Driver.main(Driver.scala:67)
[ERROR] at 

Re: Resource under-utilization when using RocksDb state backend [SOLVED]

2016-12-08 Thread Cliff Resnick
It turns out that most of the time in RocksDBFoldingState was spent on
serialization/deserializaton. RocksDb read/write was performing well. By
moving from Kryo to custom serialization we were able to increase
throughput dramatically. Load is now where it should be.

On Mon, Dec 5, 2016 at 1:15 PM, Robert Metzger  wrote:

> Another Flink user using RocksDB with large state on SSDs recently posted
> this video for oprimizing the performance of Rocks on SSDs:
> https://www.youtube.com/watch?v=pvUqbIeoPzM
> That could be relevant for you.
>
> For how long did you look at iotop. It could be that the IO access happens
> in bursts, depending on how data is cached.
>
> I'll also add Stefan Richter to the conversation, he has maybe some more
> ideas what we can do here.
>
>
> On Mon, Dec 5, 2016 at 6:19 PM, Cliff Resnick  wrote:
>
>> Hi Robert,
>>
>> We're following 1.2-SNAPSHOT,  using event time. I have tried "iotop" and
>> I see usually less than 1 % IO. The most I've seen was a quick flash here
>> or there of something substantial (e.g. 19%, 52%) then back to nothing. I
>> also assumed we were disk-bound, but to use your metaphor I'm having
>> trouble finding any smoke. However, I'm not very experienced in sussing out
>> IO issues so perhaps there is something else I'm missing.
>>
>> I'll keep investigating. If I continue to come up empty then I guess my
>> next steps may be to stage some independent tests directly against RocksDb.
>>
>> -Cliff
>>
>>
>> On Mon, Dec 5, 2016 at 5:52 AM, Robert Metzger 
>> wrote:
>>
>>> Hi Cliff,
>>>
>>> which Flink version are you using?
>>> Are you using Eventtime or processing time windows?
>>>
>>> I suspect that your disks are "burning" (= your job is IO bound). Can
>>> you check with a tool like "iotop" how much disk IO Flink is producing?
>>> Then, I would set this number in relation with the theoretical maximum
>>> of your SSD's (a good rough estimate is to use dd for that).
>>>
>>> If you find that your disk bandwidth is saturated by Flink, you could
>>> look into tuning the RocksDB settings so that it uses more memory for
>>> caching.
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Fri, Dec 2, 2016 at 11:34 PM, Cliff Resnick  wrote:
>>>
 In tests comparing RocksDb to fs state backend we observe much lower
 throughput, around 10x slower. While the lowered throughput is expected,
 what's perplexing is that machine load is also very low with RocksDb,
 typically falling to  < 25% CPU and negligible IO wait (around 0.1%). Our
 test instances are EC2 c3.xlarge which are 4 virtual CPUs and 7.5G RAM,
 each running a single TaskManager in YARN, with 6.5G allocated memory per
 TaskManager. The instances also have 2x40G attached SSDs which we have
 mapped to `taskmanager.tmp.dir`.

 With FS state and 4 slots per TM, we will easily max out with an
 average load average around 5 or 6, so we actually need throttle down the
 slots to 3. With RocksDb using the Flink SSD configured options we see a
 load average at around 1. Also, load (and actual) throughput remain more or
 less constant no matter how many slots we use. The weak load is spread over
 all CPUs.

 Here is a sample top:

 Cpu0  : 20.5%us,  0.0%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,
  0.0%st
 Cpu1  : 18.5%us,  0.0%sy,  0.0%ni, 81.5%id,  0.0%wa,  0.0%hi,  0.0%si,
  0.0%st
 Cpu2  : 11.6%us,  0.7%sy,  0.0%ni, 87.0%id,  0.7%wa,  0.0%hi,  0.0%si,
  0.0%st
 Cpu3  : 12.5%us,  0.3%sy,  0.0%ni, 86.8%id,  0.0%wa,  0.0%hi,  0.3%si,
  0.0%st

 Our pipeline uses tumbling windows, each with a ValueState keyed to a
 3-tuple of one string and two ints.. Each ValueState comprises a small set
 of tuples around 5-7 fields each. The WindowFunction simply diffs agains
 the set and updates state if there is a diff.

 Any ideas as to what the bottleneck is here? Any suggestions welcomed!

 -Cliff







>>>
>>
>


Re: Parallelism and stateful mapping with Flink

2016-12-08 Thread Chesnay Schepler

Done. https://issues.apache.org/jira/browse/FLINK-5299

On 08.12.2016 16:50, Ufuk Celebi wrote:

Would you like to open an issue for this for starters Chesnay? Would be good to 
fix for the upcoming release even.


On 8 December 2016 at 16:39:58, Chesnay Schepler (ches...@apache.org) wrote:

It would be neat if we could support arrays as keys directly; it should
boil down to checking the key type and in case of an array injecting a
KeySelector that calls Arrays.hashCode(array).
This worked for me when i ran into the same issue while experimenting
with some stuff.
  
The batch API can use arrays as keys as well, so it's also a matter of

consistency imo.
  
Regards,

Chesnay
  
On 08.12.2016 16:23, Ufuk Celebi wrote:

@Aljoscha: I remember that someone else ran into this, too. Should we address 
arrays

as keys specifically in the API? Prohibit? Document this?

– Ufuk

On 7 December 2016 at 17:41:40, Andrew Roberts (arobe...@fuze.com) wrote:

Sure!

(Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte 
arrays

don’t

appear to have a stable hashCode. I’ll provide the skeleton for fullness, 
though.)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(Config.callAggregator.parallelism)

env.addSource(kafkaSource)
.flatMap(transformToRecords(_))
.keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
.map(new StatefulAggregator())
.addSink(hbaseSink)


Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!

-a




On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:

Hi,

could you maybe provide the (minimal) code for the problematic job? Also, are 
you

sure

that the keyBy is working on the correct key attribute?

Best,
Stefan


Am 07.12.2016 um 15:57 schrieb Andrew Roberts :

Hello,

I’m trying to perform a stateful mapping of some objects coming in from Kafka 
in a

parallelized

flink job (set on the job using env.setParallelism(3)). The data source is a 
kafka

topic,

but the partitions aren’t meaningfully keyed for this operation (each kafka 
message
is flatMapped to between 0-2 objects, with potentially different keys). I have 
a keyBy()
operator directly before my map(), but I’m seeing objects with the same key 
distributed
to different parallel task instances, as reported by 
getRuntimeContext().getIndexOfThisSubtask().

My understanding of keyBy is that it would segment the stream by key, and 
guarantee

that all data with a given key would hit the same instance. Am I possibly 
seeing residual
“keying” from the kafka topic?

I’m running flink 1.1.3 in scala. Please let me know if I can add more info.

Thanks,

Andrew


  
  






Re: Parallelism and stateful mapping with Flink

2016-12-08 Thread Ufuk Celebi
Would you like to open an issue for this for starters Chesnay? Would be good to 
fix for the upcoming release even.


On 8 December 2016 at 16:39:58, Chesnay Schepler (ches...@apache.org) wrote:
> It would be neat if we could support arrays as keys directly; it should
> boil down to checking the key type and in case of an array injecting a
> KeySelector that calls Arrays.hashCode(array).
> This worked for me when i ran into the same issue while experimenting
> with some stuff.
>  
> The batch API can use arrays as keys as well, so it's also a matter of
> consistency imo.
>  
> Regards,
> Chesnay
>  
> On 08.12.2016 16:23, Ufuk Celebi wrote:
> > @Aljoscha: I remember that someone else ran into this, too. Should we 
> > address arrays  
> as keys specifically in the API? Prohibit? Document this?
> >
> > – Ufuk
> >
> > On 7 December 2016 at 17:41:40, Andrew Roberts (arobe...@fuze.com) wrote:
> >> Sure!
> >>
> >> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - 
> >> byte arrays  
> don’t
> >> appear to have a stable hashCode. I’ll provide the skeleton for fullness, 
> >> though.)  
> >>
> >> val env = StreamExecutionEnvironment.getExecutionEnvironment
> >> env.setParallelism(Config.callAggregator.parallelism)
> >>
> >> env.addSource(kafkaSource)
> >> .flatMap(transformToRecords(_))
> >> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
> >> .map(new StatefulAggregator())
> >> .addSink(hbaseSink)
> >>
> >>
> >> Again, wrapping my keyBy function in `new String()` has fixed my issue. 
> >> Thanks!
> >>
> >> -a
> >>
> >>
> >>
> >>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
> >>>
> >>> Hi,
> >>>
> >>> could you maybe provide the (minimal) code for the problematic job? Also, 
> >>> are you  
> sure
> >> that the keyBy is working on the correct key attribute?
> >>> Best,
> >>> Stefan
> >>>
>  Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
> 
>  Hello,
> 
>  I’m trying to perform a stateful mapping of some objects coming in from 
>  Kafka in a  
> parallelized
> >> flink job (set on the job using env.setParallelism(3)). The data source is 
> >> a kafka  
> topic,
> >> but the partitions aren’t meaningfully keyed for this operation (each 
> >> kafka message  
> >> is flatMapped to between 0-2 objects, with potentially different keys). I 
> >> have a keyBy()  
> >> operator directly before my map(), but I’m seeing objects with the same 
> >> key distributed  
> >> to different parallel task instances, as reported by 
> >> getRuntimeContext().getIndexOfThisSubtask().  
>  My understanding of keyBy is that it would segment the stream by key, 
>  and guarantee  
> >> that all data with a given key would hit the same instance. Am I possibly 
> >> seeing residual  
> >> “keying” from the kafka topic?
>  I’m running flink 1.1.3 in scala. Please let me know if I can add more 
>  info.
> 
>  Thanks,
> 
>  Andrew
> >>
> >>
> >
>  
>  



Re: conditional dataset output

2016-12-08 Thread Chesnay Schepler

Hello Lars,

The only other way i can think of how this could be done is by wrapping 
the used
outputformat in a custom format, which calls open on the wrapped 
outputformat

when you receive the first record.

This should work but is quite hacky though as it interferes with the 
format life-cycle.


Regards,
Chesnay

On 08.12.2016 16:39, lars.bachm...@posteo.de wrote:

Hi,

let's assume I have a dataset and depending on the input data and 
different filter operations this dataset can be empty. Now I want to 
output the dataset to HD, but I want that files are only created if 
the dataset is not empty. If the dataset is empty I don't want any 
files. The default way: dataset.write(...) will always create as many 
files as the parallelism of this operator is configured - in case of 
an empty dataset all files would be empty as well. I thought about 
doing something like:


if (dataset.count() > 0) {
   dataset.write(...)
}

but I don't think thats the way to go, because dataset.count() 
triggers a execution of the (sub)program.


Is there a simple way how to avoid creating empty files for empty 
datasets?


Regards,

Lars





Re: Parallelism and stateful mapping with Flink

2016-12-08 Thread Chesnay Schepler
It would be neat if we could support arrays as keys directly; it should 
boil down to checking the key type and in case of an array injecting a 
KeySelector that calls Arrays.hashCode(array).
This worked for me when i ran into the same issue while experimenting 
with some stuff.


The batch API can use arrays as keys as well, so it's also a matter of 
consistency imo.


Regards,
Chesnay

On 08.12.2016 16:23, Ufuk Celebi wrote:

@Aljoscha: I remember that someone else ran into this, too. Should we address 
arrays as keys specifically in the API? Prohibit? Document this?

– Ufuk

On 7 December 2016 at 17:41:40, Andrew Roberts (arobe...@fuze.com) wrote:

Sure!
  
(Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays don’t

appear to have a stable hashCode. I’ll provide the skeleton for fullness, 
though.)
  
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(Config.callAggregator.parallelism)
  
env.addSource(kafkaSource)

.flatMap(transformToRecords(_))
.keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
.map(new StatefulAggregator())
.addSink(hbaseSink)
  
  
Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!
  
-a
  
  
  

On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:

Hi,

could you maybe provide the (minimal) code for the problematic job? Also, are 
you sure

that the keyBy is working on the correct key attribute?

Best,
Stefan


Am 07.12.2016 um 15:57 schrieb Andrew Roberts :

Hello,

I’m trying to perform a stateful mapping of some objects coming in from Kafka 
in a parallelized

flink job (set on the job using env.setParallelism(3)). The data source is a 
kafka topic,
but the partitions aren’t meaningfully keyed for this operation (each kafka 
message
is flatMapped to between 0-2 objects, with potentially different keys). I have 
a keyBy()
operator directly before my map(), but I’m seeing objects with the same key 
distributed
to different parallel task instances, as reported by 
getRuntimeContext().getIndexOfThisSubtask().

My understanding of keyBy is that it would segment the stream by key, and 
guarantee

that all data with a given key would hit the same instance. Am I possibly 
seeing residual
“keying” from the kafka topic?

I’m running flink 1.1.3 in scala. Please let me know if I can add more info.

Thanks,

Andrew
  
  






conditional dataset output

2016-12-08 Thread lars . bachmann

Hi,

let's assume I have a dataset and depending on the input data and 
different filter operations this dataset can be empty. Now I want to 
output the dataset to HD, but I want that files are only created if the 
dataset is not empty. If the dataset is empty I don't want any files. 
The default way: dataset.write(...) will always create as many files as 
the parallelism of this operator is configured - in case of an empty 
dataset all files would be empty as well. I thought about doing 
something like:


if (dataset.count() > 0) {
   dataset.write(...)
}

but I don't think thats the way to go, because dataset.count() triggers 
a execution of the (sub)program.


Is there a simple way how to avoid creating empty files for empty 
datasets?


Regards,

Lars


Re: Parallelism and stateful mapping with Flink

2016-12-08 Thread Ufuk Celebi
@Aljoscha: I remember that someone else ran into this, too. Should we address 
arrays as keys specifically in the API? Prohibit? Document this?

– Ufuk

On 7 December 2016 at 17:41:40, Andrew Roberts (arobe...@fuze.com) wrote:
> Sure!
>  
> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - 
> byte arrays don’t  
> appear to have a stable hashCode. I’ll provide the skeleton for fullness, 
> though.)
>  
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(Config.callAggregator.parallelism)
>  
> env.addSource(kafkaSource)
> .flatMap(transformToRecords(_))
> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
> .map(new StatefulAggregator())
> .addSink(hbaseSink)
>  
>  
> Again, wrapping my keyBy function in `new String()` has fixed my issue. 
> Thanks!
>  
> -a
>  
>  
>  
> > On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:  
> >
> > Hi,
> >
> > could you maybe provide the (minimal) code for the problematic job? Also, 
> > are you sure  
> that the keyBy is working on the correct key attribute?
> >
> > Best,
> > Stefan
> >
> >> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
> >>
> >> Hello,
> >>
> >> I’m trying to perform a stateful mapping of some objects coming in from 
> >> Kafka in a parallelized  
> flink job (set on the job using env.setParallelism(3)). The data source is a 
> kafka topic,  
> but the partitions aren’t meaningfully keyed for this operation (each kafka 
> message  
> is flatMapped to between 0-2 objects, with potentially different keys). I 
> have a keyBy()  
> operator directly before my map(), but I’m seeing objects with the same key 
> distributed  
> to different parallel task instances, as reported by 
> getRuntimeContext().getIndexOfThisSubtask().  
> >>
> >> My understanding of keyBy is that it would segment the stream by key, and 
> >> guarantee  
> that all data with a given key would hit the same instance. Am I possibly 
> seeing residual  
> “keying” from the kafka topic?
> >>
> >> I’m running flink 1.1.3 in scala. Please let me know if I can add more 
> >> info.
> >>
> >> Thanks,
> >>
> >> Andrew
> >
>  
>  



Re: Replace Flink job while cluster is down

2016-12-08 Thread Ufuk Celebi
With HA enabled, Flink checks the configured ZooKeeper node for pre-existing 
jobs and checkpoints when starting.

What Stefan meant was that you can configure a different ZooKeeper node, which 
will start the cluster with a clean state.

You can check the available config options here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.html#high-availability-ha


On 8 December 2016 at 15:05:52, Al-Isawi Rami (rami.al-is...@comptel.com) wrote:
> >



RE: Collect() freeze on yarn cluster on strange recover/deserialization error

2016-12-08 Thread Ufuk Celebi
Great! :)


On 8 December 2016 at 15:28:05, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote:
> -yjm works, and suits me better than a global fink-conf.yml parameter. I've 
> looked for  
> a command line parameter like that, but I've missed it in the doc, my mistake.
> Thanks,
> Arnaud
>  
> -Message d'origine-
> De : Ufuk Celebi [mailto:u...@apache.org]
> Envoyé : jeudi 8 décembre 2016 14:43
> À : LINZ, Arnaud ; user@flink.apache.org
> Cc : rmetz...@apache.org
> Objet : RE: Collect() freeze on yarn cluster on strange 
> recover/deserialization error  
>  
> Good point with the collect() docs. Would you mind opening a JIRA issue for 
> that?
>  
> I'm not sure whether you can specify it via that key for YARN. Can you try to 
> use -yjm 8192  
> when submitting the job?
>  
> Looping in Robert who knows best whether this config key is picked up or not 
> for YARN.
>  
> – Ufuk
>  
> On 8 December 2016 at 14:05:41, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote:
> > Hi Ufuk,
> >
> > Yes, I have a large set of data to collect for a data science job that
> > cannot be distributed easily. Increasing the akka.framesize size do
> > get rid of the collect hang (maybe you should highlight this parameter
> > in the collect() documentation, 10Mb si not that big), thanks.
> >
> > However my job manager now fails with OutOfMemory.
> >
> > Despite the fact that I have setup
> > jobmanager.heap.mb: 8192
> >
> > in my flink-conf.yaml, logs shows that it was created with less memory 
> > (1374 Mb) :
> >
> > 2016-12-08 13:50:13,808 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > -
> > --  
> > --
> > 2016-12-08 13:50:13,809 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - Starting YARN ApplicationMaster / JobManager (Version: 1.1.3,
> > Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC)
> > 2016-12-08 13:50:13,809 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - Current user: datcrypt
> > 2016-12-08 13:50:13,809 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation -
> > 1.7/24.45-b08
> > 2016-12-08 13:50:13,809 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - Maximum heap size: 1374 MiBytes
> > 2016-12-08 13:50:13,810 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - JAVA_HOME: /usr/java/default
> > 2016-12-08 13:50:13,811 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - Hadoop version: 2.6.3
> > 2016-12-08 13:50:13,811 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - JVM Options:
> > 2016-12-08 13:50:13,811 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - -Xmx1434M
> > 2016-12-08 13:50:13,811 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > -
> > -Dlog.file=/data/1/hadoop/yarn/log/application_1480512120243_3635/cont  
> > ainer_e17_1480512120243_3635_01_01/jobmanager.log
> >
> >
> > Is there a command line option of flink / env variable that overrides
> > it or am I missing something ?
> > -- Arnaud
> >
> > -Message d'origine-
> > De : Ufuk Celebi [mailto:u...@apache.org] Envoyé : jeudi 8 décembre
> > 2016 10:49 À : LINZ, Arnaud ; user@flink.apache.org Objet : RE:
> > Collect() freeze on yarn cluster on strange recover/deserialization
> > error
> >
> > I also don't get why the job is recovering, but the oversized message
> > is very likely the cause for the freezing collect, because the data set is 
> > gather via  
> Akka.
> >
> > You can configure the frame size via "akka.framesize", which defaults
> > to 10485760b
> > (10 MB).
> >
> > Is the collected result larger than that? Could you try to increase
> > the frame size and report back?
> >
> > – Ufuk
> >
> > On 7 December 2016 at 17:57:22, LINZ, Arnaud (al...@bouyguestelecom.fr) 
> > wrote:
> > > Hi,
> > >
> > > Any news? It's maybe caused by an oversized akka payload (many
> > > akka.remote.OversizedPayloadException: Discarding oversized payload
> > > sent to 
> > > Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]:  
> > > max allowed size 10485760 bytes, actual size of encoded class
> > > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMe
> > > ss
> > > age
> > > was 69074412 bytes in the log)
> > >
> > > How do I set akka's maximum-payload-bytes in my flink cluster?
> > >
> > > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about
> > > that. I do not use ExecutionEnvironment.createRemoteEnvironment() but 
> > > ExecutionEnvironment.getExecutionEnvironment().  
> > >
> > > Do I have to change the way I'm doing things ? How ?
> > >
> > > Thanks,
> > > Arnaud
> > >
> > > -Message d'origine-
> > > De : LINZ, Arnaud
> > > Envoyé : mercredi 30 novembre 2016 08:59 À : user@flink.apache.org
> > > Objet : RE: Collect() freeze on yarn cluster on strange
> > > recover/deserialization error
> > >
> > > Hi,
> > >
> > > 

Re: Replace Flink job while cluster is down

2016-12-08 Thread Al-Isawi Rami
Hi Stefan,

Yes, a cluster of 3 machines. Version 1.1.1

I did not get what is the difference between “remove entry from zookeeper” and 
“using flink zookeeper namespaces feature”.

Eventually, I started the cluster and it did recover the old program. However, 
I was fast enough to click Cancel in 4 sec, before the first checkpoint kicks 
(5sec) .

Wouldn’t it make sense that we can still use the fink.sh to deal with jobs and 
cancel them even if the cluster is offline or semi offline. I am not sure what 
is the best solution to the case that I have faced.

Regards,
-Rami

> On 7 Dec 2016, at 16:10, Stefan Richter  wrote:
>
> Hi,
>
> first a few quick questions: I assume you are running in HA mode, right? Also 
> what version of Flink are you running?
>
> In case you are not running HA, nothing is automatically recovered. With HA, 
> you would need to manually remove the corresponding entry from Zookeeper. If 
> this is the problem, I suggest using Flink’s Zookeeper namespaces feature, to 
> isolate different runs of a job.
>
> Best,
> Stefan
>
>
>> Am 07.12.2016 um 13:20 schrieb Al-Isawi Rami :
>>
>> Hi,
>>
>> I have faulty flink streaming program running on a cluster that is consuming 
>> from kafka,so I brought the cluster down. Now I have a new version that has 
>> the fix. Now if I bring up the flink cluster again, the old faulty program 
>> will be recovered and it will consume and stream faulty results. How can i 
>> cancel it before brining up the cluster again? there is a million of kafka 
>> messages waiting to be consumed and I do not want the old program to consume 
>> them. The cluster is backed by S3 and I found some blobs there that flink 
>> will recover the old program from, but it sounds like bad idea to just 
>> delete them.
>>
>> Any ideas?
>>
>>
>> Regards,
>> -Rami
>> Disclaimer: This message and any attachments thereto are intended solely for 
>> the addressed recipient(s) and may contain confidential information. If you 
>> are not the intended recipient, please notify the sender by reply e-mail and 
>> delete the e-mail (including any attachments thereto) without producing, 
>> distributing or retaining any copies thereof. Any review, dissemination or 
>> other use of, or taking of any action in reliance upon, this information by 
>> persons or entities other than the intended recipient(s) is prohibited. 
>> Thank you.
>

Disclaimer: This message and any attachments thereto are intended solely for 
the addressed recipient(s) and may contain confidential information. If you are 
not the intended recipient, please notify the sender by reply e-mail and delete 
the e-mail (including any attachments thereto) without producing, distributing 
or retaining any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or entities 
other than the intended recipient(s) is prohibited. Thank you.


RE: Collect() freeze on yarn cluster on strange recover/deserialization error

2016-12-08 Thread Ufuk Celebi
Good point with the collect() docs. Would you mind opening a JIRA issue for 
that?

I'm not sure whether you can specify it via that key for YARN. Can you try to 
use -yjm 8192 when submitting the job?

Looping in Robert who knows best whether this config key is picked up or not 
for YARN.

– Ufuk

On 8 December 2016 at 14:05:41, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote:
> Hi Ufuk,
>  
> Yes, I have a large set of data to collect for a data science job that cannot 
> be distributed  
> easily. Increasing the akka.framesize size do get rid of the collect hang 
> (maybe you  
> should highlight this parameter in the collect() documentation, 10Mb si not 
> that big),  
> thanks.
>  
> However my job manager now fails with OutOfMemory.
>  
> Despite the fact that I have setup
> jobmanager.heap.mb: 8192
>  
> in my flink-conf.yaml, logs shows that it was created with less memory (1374 
> Mb) :
>  
> 2016-12-08 13:50:13,808 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - 
> 
>   
> 2016-12-08 13:50:13,809 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - Starting YARN ApplicationMaster / JobManager (Version: 1.1.3, Rev:8e8d454, 
> Date:10.10.2016  
> @ 13:26:32 UTC)
> 2016-12-08 13:50:13,809 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - Current user: datcrypt
> 2016-12-08 13:50:13,809 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
> 2016-12-08 13:50:13,809 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - Maximum heap size: 1374 MiBytes
> 2016-12-08 13:50:13,810 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - JAVA_HOME: /usr/java/default
> 2016-12-08 13:50:13,811 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - Hadoop version: 2.6.3
> 2016-12-08 13:50:13,811 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - JVM Options:
> 2016-12-08 13:50:13,811 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - -Xmx1434M
> 2016-12-08 13:50:13,811 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - 
> -Dlog.file=/data/1/hadoop/yarn/log/application_1480512120243_3635/container_e17_1480512120243_3635_01_01/jobmanager.log
>   
>  
>  
> Is there a command line option of flink / env variable that overrides it or 
> am I missing  
> something ?
> -- Arnaud
>  
> -Message d'origine-
> De : Ufuk Celebi [mailto:u...@apache.org]
> Envoyé : jeudi 8 décembre 2016 10:49
> À : LINZ, Arnaud ; user@flink.apache.org
> Objet : RE: Collect() freeze on yarn cluster on strange 
> recover/deserialization error  
>  
> I also don't get why the job is recovering, but the oversized message is very 
> likely the  
> cause for the freezing collect, because the data set is gather via Akka.
>  
> You can configure the frame size via "akka.framesize", which defaults to 
> 10485760b  
> (10 MB).
>  
> Is the collected result larger than that? Could you try to increase the frame 
> size and  
> report back?
>  
> – Ufuk
>  
> On 7 December 2016 at 17:57:22, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote:
> > Hi,
> >
> > Any news? It's maybe caused by an oversized akka payload (many
> > akka.remote.OversizedPayloadException: Discarding oversized payload
> > sent to 
> > Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]:  
> > max allowed size 10485760 bytes, actual size of encoded class
> > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMess  
> > age
> > was 69074412 bytes in the log)
> >
> > How do I set akka's maximum-payload-bytes in my flink cluster?
> >
> > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about
> > that. I do not use ExecutionEnvironment.createRemoteEnvironment() but 
> > ExecutionEnvironment.getExecutionEnvironment().  
> >
> > Do I have to change the way I'm doing things ? How ?
> >
> > Thanks,
> > Arnaud
> >
> > -Message d'origine-
> > De : LINZ, Arnaud
> > Envoyé : mercredi 30 novembre 2016 08:59 À : user@flink.apache.org
> > Objet : RE: Collect() freeze on yarn cluster on strange
> > recover/deserialization error
> >
> > Hi,
> >
> > Don't think so. I always delete the ZK path before launching the batch
> > (with /usr/bin/zookeeper-client -server $FLINK_HA_ZOOKEEPER_SERVERS
> > rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery" log line appears 
> > only before  
> the collect() phase, not at the beginning.
> >
> > Full log is availlable here :
> > https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r
> >
> > Thanks,
> > Arnaud
> >
> >
> > -Message d'origine-
> > De : Ufuk Celebi [mailto:u...@apache.org] Envoyé : mardi 29 novembre
> > 2016 18:43 À : LINZ, Arnaud ; user@flink.apache.org Objet : Re:
> > Collect() freeze on yarn cluster on strange recover/deserialization
> > error
> >
> > Hey Arnaud,
> >
> > could this be a left 

Re: Recursive directory traversal with TextInputFormat

2016-12-08 Thread Ufuk Celebi
Looping in Kostas who recently worked on the continuous file inputs.

@Kostas: do you have an idea what's happening here?

– Ufuk

On 8 December 2016 at 08:43:32, Lukas Kircher (lukas.kirc...@uni-konstanz.de) 
wrote:
> Hi Stefan,
>  
> thanks for your answer.
>  
> > I think there is a field in FileInputFormat (which TextInputFormat is 
> > subclassing)  
> that could serve your purpose if you override the default:
>  
> That was my first guess as well. I use the basic setup from 
> org.apache.flink.api.java.io.TextInputFormatTest.java  
> and call setNestedFileEnumeration(true), but once the stream is processed 
> only the  
> content of the .csv file in the top-most folder is printed. The example is 
> just a few lines  
> of self-contained code, see below. Does anybody have an idea?
>  
> Cheers,
> Lukas
>  
>  
> import org.apache.flink.api.java.io.TextInputFormat;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
>  
>  
> import java.io.BufferedWriter;
> import java.io.File;
> import java.io.FileWriter;
>  
> public class ReadDirectorySSCCE {
> public static void main(String[] args) throws Exception {
> // create given dirs and add a .csv file to each one
> String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
> for (String dir: dirs) {
> // create input file
> File tmpDir = new File(dir);
> if (!tmpDir.exists()) {
> tmpDir.mkdirs();
> }
> File tempFile = File.createTempFile("file", ".csv", tmpDir);
> BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
> w.write("content of " + dir + "/file.csv");
> w.close();
> tempFile.deleteOnExit();
> }
> File root = new File("tmp");
>  
> TextInputFormat inputFormat = new TextInputFormat(new 
> Path(root.toURI().toString()));  
> inputFormat.setNestedFileEnumeration(true);
>  
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();  
> env.createInput(inputFormat).print();
> env.execute();
> }
>  
>  
> > On 7 Dec 2016, at 17:44, Stefan Richter wrote:
> >
> > Hi,
> >
> > I think there is a field in FileInputFormat (which TextInputFormat is 
> > subclassing)  
> that could serve your purpose if you override the default:
> >
> > /**
> > * The flag to specify whether recursive traversal of the input directory
> > * structure is enabled.
> > */
> > protected boolean enumerateNestedFiles = false;
> > As for compression, I think this class also provides a 
> > InflaterInputStreamFactory  
> to read compressed data.
> >
> > Best,
> > Stefan
> >
> >> Am 07.12.2016 um 12:10 schrieb Lukas Kircher >:  
> >>
> >> Hi all,
> >>
> >> I am trying to read nested .csv files from a directory and want to switch 
> >> from a custom  
> SourceFunction I implemented to the TextInputFormat. I have two questions:
> >>
> >> 1) Somehow only the file in the root directory is processed, nested files 
> >> are skipped.  
> What am I missing? See the attachment for an SSCCE. I get the same result 
> with flink 1.1.3  
> no matter if I run it via the IDE or submit the job to the standalone binary. 
> The file input  
> splits are all there, yet they don't seem to be processed.
> >>
> >> 2) What is the easiest way to read compressed .csv files (.zip)?
> >>
> >> Thanks for your help, cheers
> >> Lukas
> >>
> >>  
> >
>  
>  



RE: Collect() freeze on yarn cluster on strange recover/deserialization error

2016-12-08 Thread Ufuk Celebi
I also don't get why the job is recovering, but the oversized message is very 
likely the cause for the freezing collect, because the data set is gather via 
Akka.

You can configure the frame size via "akka.framesize", which defaults to 
10485760b (10 MB).

Is the collected result larger than that? Could you try to increase the frame 
size and report back?

– Ufuk

On 7 December 2016 at 17:57:22, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote:
> Hi,
>  
> Any news? It's maybe caused by an oversized akka payload
> (many akka.remote.OversizedPayloadException: Discarding oversized payload 
> sent  
> to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]:  
> max allowed size 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage  
> was 69074412 bytes in the log)
>  
> How do I set akka's maximum-payload-bytes in my flink cluster?
>  
> https://issues.apache.org/jira/browse/FLINK-2373 is not clear about that. I 
> do  
> not use ExecutionEnvironment.createRemoteEnvironment() but 
> ExecutionEnvironment.getExecutionEnvironment().  
>  
> Do I have to change the way I'm doing things ? How ?
>  
> Thanks,
> Arnaud
>  
> -Message d'origine-
> De : LINZ, Arnaud
> Envoyé : mercredi 30 novembre 2016 08:59
> À : user@flink.apache.org
> Objet : RE: Collect() freeze on yarn cluster on strange 
> recover/deserialization error  
>  
> Hi,
>  
> Don't think so. I always delete the ZK path before launching the batch (with 
> /usr/bin/zookeeper-client  
> -server $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and  
> the "recovery" log line appears only before the collect() phase, not at the 
> beginning.  
>  
> Full log is availlable here : 
> https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r  
>  
> Thanks,
> Arnaud
>  
>  
> -Message d'origine-
> De : Ufuk Celebi [mailto:u...@apache.org] Envoyé : mardi 29 novembre 2016 
> 18:43 À : LINZ,  
> Arnaud ; user@flink.apache.org Objet : Re: Collect()  
> freeze on yarn cluster on strange recover/deserialization error
>  
> Hey Arnaud,
>  
> could this be a left over job that is recovered from ZooKeeper? Recovery only 
> happens  
> if the configured ZK root contains data.
>  
> A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails 
> terminally  
> w/o restarting, cancelled). If you just shut down the cluster this is treated 
> as a failure.  
>  
> – Ufuk
>  
> The complete JM logs will be helpful to further check what's happening there.
>  
>  
> On 29 November 2016 at 18:15:16, LINZ, Arnaud (al...@bouyguestelecom.fr) 
> wrote:
> > Hello,
> >
> > I have a Flink 1.1.3 batch application that makes a simple aggregation
> > but freezes when
> > collect() is called when the app is deployed on a ha-enabled yarn
> > cluster (it works on a local cluster).
> > Just before it hangs, I have the following deserialization error in the 
> > logs :
> >
> > (...)
> > 2016-11-29 15:10:10,422 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96)
> > switched from DEPLOYING to RUNNING
> > 2016-11-29 15:10:13,175 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map
> > (Key Remover)
> > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to
> > FINISHED
> > 2016-11-29 15:10:17,816 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map
> > (Key Remover)
> > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to
> > FINISHED
> > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager -
> > Attempting to recover all jobs.
> > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal 
> > error:  
> > Failed to recover jobs.
> > java.io.StreamCorruptedException: invalid type code: 00 at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
> > at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199  
> > 0) at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> > at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17  
> > 98) at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199  
> > 0) at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> > at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17  
> > 98) at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > at java.util.HashMap.readObject(HashMap.java:1184)
> > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at
> > 

Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-08 Thread Robert Metzger
Thank you for speaking up Kanstantsin. I really don't want to downgrade the
experience on the user@ list.

I wonder if jobs@flink would be a too narrowly-scoped mailing list.
Maybe we could also start a community@flink (alternatively also general@)
mailing list for everything relating to the broader Flink community,
including job offers, meetups, conferences and everything else that is
important for the community to grow.

On Thu, Dec 8, 2016 at 3:10 AM, Radu Tudoran 
wrote:

> Hi,
>
>
>
> I think the idea of having such a monthly thread is very good and it might
> even help to further attract new people in the community.
>
> In the same time I do not think that 1 extra mail per month is necessary a
> spam J
>
> In the same time – we can also consider a jobs@flink mailing list
>
>
>
>
>
> Dr. Radu Tudoran
>
> Senior Research Engineer - Big Data Expert
>
> IT R Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330 <01520%209084330>
>
> Telephone: +49 891588344173 <089%201588344173>
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>
> *From:* Kanstantsin Kamkou [mailto:kkam...@gmail.com]
> *Sent:* Wednesday, December 07, 2016 9:57 PM
> *To:* user@flink.apache.org
> *Subject:* Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the
> mailing lists?
>
>
>
> Is it possible to avoid such a spam here? If I need a new job, I could
> search it. The same way I might want to subscribe to a different thread,
> like jobs@flink. * The idea itself is great.
>
>
>
> On Tue, 6 Dec 2016 at 14:04, Kostas Tzoumas  wrote:
>
> yes, of course!
>
>
>
> On Tue, Dec 6, 2016 at 12:54 PM, Márton Balassi 
> wrote:
>
> +1. It keeps it both organized and to a reasonable minimum overhead.
>
>
>
> Would you volunteer for starting the mail thread each month then, Kostas?
>
>
>
> Best,
>
>
>
> Marton
>
>
>
> On Tue, Dec 6, 2016 at 6:42 AM, Kostas Tzoumas 
> wrote:
>
> Hi folks,
>
>
>
>
>
> I'd like to see how the community feels about a monthly "Who is hiring on
>
>
> Flink" email thread on the dev@ and user@ mailing lists where folks can
>
>
> post job positions related to Flink.
>
>
>
>
>
> I personally think that posting individual job offerings in the mailing
>
>
> list is off-topic (hence I have refrained to do that wearing my company
>
>
> hat, and I have discouraged others when they asked for my opinion on this),
>
>
> but I thought that a monthly thread like this would be both helpful to the
>
>
> community and not cause overhead.
>
>
>
>
>
> Cheers,
>
>
> Kostas
>
>
>
>
>
>
>
>
>
>


Re: Partitioning operator state

2016-12-08 Thread Stefan Richter
Hi Dominik,

as Gordon’s response only covers keyed-state, I will briefly explain what 
happens for non-keyed operator state. In contrast to Flink 1.1, Flink 1.2 
checkpointing does not write a single blackbox object (e.g. ONE object that is 
a set of all kafka offsets is emitted), but a list of blackbox objects instead 
(e.g. think of all kafka offsets being emitted individually, as MULTIPLE 
objects). While Flink 1.2 still has no knowledge about the emitted objects in 
the list (thus they remain blackboxes), what the contract allows is that those 
objects can be freely redistributed in case of scale-out or scale-in. Scaling 
is merely splitting or merging of the checkpointed lists.

Best,
Stefan 

> Am 08.12.2016 um 08:00 schrieb Tzu-Li (Gordon) Tai :
> 
> Hi Dominik,
> 
> Do you mean how Flink redistributes an operator’s state when the parallelism 
> of the operator is changed?
> If so, you can take a look at [1] and [2].
> 
> Cheers,
> Gordon
> 
> [1] https://issues.apache.org/jira/browse/FLINK-3755 
> 
> [2] 
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#
>  
> 
> 
> On December 8, 2016 at 4:40:18 AM, Dominik Safaric (dominiksafa...@gmail.com 
> ) wrote:
> 
>> Hi everyone, 
>> 
>> In the case of scaling out a Flink cluster, how does Flink handle operator 
>> state partitioning of a staged topology?  
>> 
>> Regards, 
>> Dominik