Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code

2017-09-19 Thread XiangWei Huang
Hi Till, Thanks for your answer,it worked when i use StandaloneMiniCluster,but another problem is that i can’t find a way to cancel a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster i can do it with below code : for (job <-

Re: need instruction on how the Flink metric works

2017-09-19 Thread Michael Fong
I just did the same test as you had with SocketWindowWordCount, and the counter showed up all right. You should probably connect Jconsole to localhost:28781 (or whatever port you have your JMX server listened on) That's how I setup the env, perhaps there is other better ways to do it. On Wed,

Re: need instruction on how the Flink metric works

2017-09-19 Thread Jiewen Shao
Still got stuck, here are my steps (on my laptop) for example: Step1: public class MetricsTest extends RichMapFunction { private static final long serialVersionUID = 1L; private org.apache.flink.metrics.Meter meter; private Counter counter; @Override public void

Re: the design of spilling to disk

2017-09-19 Thread Kurt Young
Copied from my earlier response to some similar question: "Here is a short description for how it works: there are totally 3 threads working together, one for reading, one for sorting partial data in memory, and the last one is responsible for spilling. Flink will first figure out how many memory

Re: Task Manager was lost/killed due to full GC

2017-09-19 Thread ShB
Thanks for your response! Recommendation to decrease allotted memory? Which allotted memory should be decreased? I tried decreasing taskmanager.memory.fraction to give more memory to user managed operations, that doesn't work beyond a point. Also tried increasing containerized.heap-cutoff-ratio,

Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-19 Thread Emily McMahon
Thanks Eron & Fabian. The issue was hitting a yarn proxy url vs the node itself. For example this worked http:// {ip}:37716/jobs/1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/target-directory/s3%3A%2F%2F%2Fremit-flink But this did not http://

Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-19 Thread Eron Wright
Good news, it can be done if you carefully encode the target directory with percent-encoding, as per: https://tools.ietf.org/html/rfc3986#section-2.1 For example, given the directory `s3:///savepoint-bucket/my-awesome-job`, which encodes to `s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job`, I was

Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-19 Thread Fabian Hueske
Hi Emily, thanks for reaching out. I'm not familiar with the details of the Rest API but Ufuk (in CC) might be able to help you. Best, Fabian 2017-09-19 10:23 GMT+02:00 Emily McMahon : > I've tried every combination I can think of to pass an s3 path as the > target

Re: Problem in Flink 1.3.2 with Mesos task managers offers

2017-09-19 Thread Eron Wright
Hello, the current behavior is that Flink holds onto received offers for up to two minutes while it attempts to provision the TMs. Flink can combine small offers to form a single TM, to combat fragmentation that develops over time in a Mesos cluster. Are you saying that unused offers aren't

Re: Classpath/ClassLoader issues

2017-09-19 Thread Garrett Barton
Fabian, It looks like hive instantiates both input and output formats when doing either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler where it tries to load both. It looks like its happening after the writes complete and flink is in the finish/finalize stage. When I watch

Re: Flink kafka consumer that read from two partitions in local mode

2017-09-19 Thread Fabian Hueske
Hi Tovi, your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong. Just a side note: you don't need to set the parallelism to 2 to read from two partitions. A single consumer instance reads can read from multiple partitions. Best, Fabian 2017-09-19 17:02 GMT+02:00 Sofer,

Re: Delay in Flink timers

2017-09-19 Thread Narendra Joshi
The number of timers is about 400 per second. We have observed that onTimer calls are delayed only when the number of scheduled timers starts increasing from a minima. It would be great if you can share pointers to code I can look at to understand it better. :) Narendra Joshi On 14 Sep 2017

Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

2017-09-19 Thread Steven Wu
Stephan, agree that it is not a real memory leak. I haven't found it affecting the system. so it is sth odd for now. but if it is not really necessary, why do we want to defer memory release with unpredictable behavior? can StreamTask stop() method take care of the cleanup work and don't need to

Re: the design of spilling to disk

2017-09-19 Thread Florin Dinu
Hi Kostas, Thank you for the quick reply and the tips. I will check them out ! I would like to start by understanding the way secondary storage is used in batch processing. If you guys have additional pointers on that, it would certainly help me a lot. Thanks again, Florin

Re: the design of spilling to disk

2017-09-19 Thread Kostas Kloudas
Hi Florin, Unfortunately, there is no design document. The UnilateralSortMerger.java is used in the batch processing mode (not is streaming) and, in fact, the code dates some years back. I cc also Fabian as he may have more things to say on this. Now for the streaming side, Flink uses 3

the design of spilling to disk

2017-09-19 Thread Florin Dinu
Hello everyone, In our group at EPFL we're doing research on understanding and potentially improving the performance of data-parallel frameworks that use secondary storage. I was looking at the Flink code to understand how spilling to disk actually works. So far I got to the

Problem in Flink 1.3.2 with Mesos task managers offers

2017-09-19 Thread Francisco Gonzalez Barea
Hello guys, We have a flink 1.3.2 session deployed from Marathon json to Mesos with some of the following parameters as environment variables: "flink_mesos.initial-tasks": "8", "flink_mesos.resourcemanager.tasks.mem": "4096", And other environment variables including zookeeper, etc. The

Re: Load distribution through the cluster

2017-09-19 Thread AndreaKinn
If I apply a sharing slot as in the example: DataStream LTzAccStream = env .addSource(new FlinkKafkaConsumer010<>("topic", new CustomDeserializer(), properties)) .assignTimestampsAndWatermarks(new CustomTimestampExtractor())

Re: Noisy org.apache.flink.configuration.GlobalConfiguration

2017-09-19 Thread Elias Levy
Till, Using 1.3.2 and like Ufuk mentioned, using S3 for checkpointing. On Tue, Sep 19, 2017 at 4:28 AM, Till Rohrmann wrote: > Hi Elias, > > which version of Flink and which state backend are you running? I tried to > reproduce it and wasn't successful so far. > > We

Re: How to use operator list state like a HashMap?

2017-09-19 Thread Tony Wei
Hi Fabian, This is a good advice, but I had already tried adding random value to my data and it seems not very useful. The key set of my data is small, around 10 ~ 20. If the range of random number is small, the distribution might not be better, even worse. I think the reason is that KeyedStream

Re: Classpath/ClassLoader issues

2017-09-19 Thread Fabian Hueske
Hi Garrett, Flink distinguishes between two classloaders: 1) the system classloader which is the main classloader of the process. This classloader loads all jars in the ./lib folder and 2) the user classloader which loads the job jar. AFAIK, the different operators do not have distinct

Flink kafka consumer that read from two partitions in local mode

2017-09-19 Thread Sofer, Tovi
Hi, I am trying to setup FlinkKafkaConsumer which reads from two partitions in local mode, using setParallelism=2. The producer writes to two partition (as it is shown in metrics report). But the consumer seems to read always from one partition only. Am I missing something in partition

Re: Clean GlobalWidnow state

2017-09-19 Thread gerardg
The UUIDs are assigned. As far as I can see (inspecting the metrics and how the task behaves) the mergeElements apply function receives all the elements (the main element and the other elements that it expects) so it seems that the correlation is correct. Also, nothing indicates that there are

Re: Noisy org.apache.flink.configuration.GlobalConfiguration

2017-09-19 Thread Ufuk Celebi
I saw this too recently when using HadoopFileSystem for checkpoints (HDFS or S3). I thought I had opened an issue for this, but I didn't. Here it is: https://issues.apache.org/jira/browse/FLINK-7643 On Tue, Sep 19, 2017 at 1:28 PM, Till Rohrmann wrote: > Hi Elias, > >

How to use operator list state like a HashMap?

2017-09-19 Thread Tony Wei
Hi, I have a basic streaming job that continuously persist data from Kafka to S3. Those data would be grouped by some dimensions and a limited amount. Originally, I used 'keyBy' and key state to fulfill the requirement. However, because the data is extremely skewed, I turned to use map function

Re: Noisy org.apache.flink.configuration.GlobalConfiguration

2017-09-19 Thread Till Rohrmann
Hi Elias, which version of Flink and which state backend are you running? I tried to reproduce it and wasn't successful so far. We recently changed a bit how we load the GlobalConfiguration in combination with dynamic properties [1]. Maybe this has affected what you've reported as well. [1]

Re: [DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Sean Owen
For the curious, here's the overall task in Spark: https://issues.apache.org/jira/browse/SPARK-14220 and most of the code-related changes: https://github.com/apache/spark/pull/18645 and where it's stuck at the moment:

Re: Clean GlobalWidnow state

2017-09-19 Thread Aljoscha Krettek
Hi, Are the UUIDs randomly generated when calling .uuid or are they assigned and then .uuid will return the same UUID when calling multiple times? The latter would be problematic because we would not correctly assign state. Best, Aljoscha > On 19. Sep 2017, at 11:41, Fabian Hueske

Re: Load distribution through the cluster

2017-09-19 Thread Fabian Hueske
There is no notion of "full" in Flink except that one slot will run at most one subtask of each operator. The scheduling depends on the structure of the job, the parallelism of the operators, and the number of slots per TM. It's hard to tell without knowing the details. 2017-09-19 11:57

Re: [DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Márton Balassi
Hi Aljoscha, I am in favor of the change. No concerns on my side, just one remark that I have talked to Sean last week (ccd) and he mentioned that he has faced some technical issues while driving the transition from 2.10 to 2.12 for Spark. It had to do with changes in the scope of implicits. You

Re: [DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Till Rohrmann
Given that the last maintenance release Scala 2.10.6 is from about 2 years ago, I would also be in favour of dropping Scala 2.10 support from Flink. This will make maintenance easier for us and allow us to drop artifacts like Flakka. Cheers, Till On Tue, Sep 19, 2017 at 11:56 AM, Aljoscha

Re: NoResourceAvailable exception

2017-09-19 Thread AndreaKinn
Thank you, unfortunately it had no effects. As I add more load on the computation appears the error taskmanager killed on the node on use, without calling other nodes to sustain the computation. I also increased akka.watch.heartbeat.interval akka.watch.heartbeat.pause

Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

2017-09-19 Thread Stephan Ewen
Hi! >From my understanding, overriding finalize() still has some use cases and is valid if done correctly, (although PhantomReference has more control over the cleanup process). finalize() is still used in JDK classes as well. Whenever one overrides finalize(), the object cannot be immediately

Re: Load distribution through the cluster

2017-09-19 Thread AndreaKinn
So Flink use the other nodes just if one is completely "full" ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink SocketTextStream source scheduled to a single machine

2017-09-19 Thread Till Rohrmann
Hi Le Xu, the reason why all different SocketTextStreamFunction sources are scheduled to the same machine is because of slot sharing. Slot sharing allows Flink to schedule tasks belonging to different operators into the same slot. This allows, for example, to achieve better colocation between

[DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Aljoscha Krettek
Hi, Talking to some people I get the impression that Scala 2.10 is quite outdated by now. I would like to drop support for Scala 2.10 and my main motivation is that this would allow us to drop our custom Flakka build of Akka that we use because newer Akka versions only support Scala 2.11/2.12

Re: Clean GlobalWidnow state

2017-09-19 Thread Fabian Hueske
If this would be the case, that would be a bug in Flink. As I said before, your implementation looked good to me. All state of window and trigger should be wiped if the trigger returns FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented. I'll CC Aljoscha again for his

Re: Custom Serializers

2017-09-19 Thread nragon
createInstance(Object[] fields) at TupleSerializerBase seems not to be part of TypeSerializer API. Will I be loosing any functionality? In what cases do you use this instead of createInstance()? // We use this in the Aggregate and Distinct Operators to create instances // of immutable Tuples

Re: Clean GlobalWidnow state

2017-09-19 Thread gerardg
Thanks Fabian, I'll take a look to these improvements. I was wondering if the increasing state size could be due to that the UUID used in the keyBy are randomly generated. Maybe even if I correctly delete all the state related to a given key there is still some metadata related to the key

Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-19 Thread Emily McMahon
I've tried every combination I can think of to pass an s3 path as the target directory (url encode, include trailing slash, etc) I can successfully pass a local path as the target directory (ie /jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't think there's a problem with the

Re: Clean GlobalWidnow state

2017-09-19 Thread Fabian Hueske
Hi Gerard, I had a look at your Trigger implementation but did not spot something suspicious that would cause the state size to grow. However, I notices a few things that can be improved: - use ctx.getCurrentProcessingTime instead of System.currentTimeMillis to make the Trigger easier to test

Re: Custom Serializers

2017-09-19 Thread Chesnay Schepler
Have a look at the TupleTypeInfo class. It has a constructor that accepts an array of TypeInformation, and supports automatically generating a serializer from them. On 18.09.2017 18:28, nragon wrote: One other thing :). Can i set tuple generic type dynamically? Meaning, build a tuple of N