Re: Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-06 Thread Chesnay Schepler
It would also be useful to know which source/sink you are using and which kafka version that is. On 07.02.2018 08:58, Chesnay Schepler wrote: Thanks for reporting this. To narrow things down a bit, is your job using both kafka sources and sinks? On 06.02.2018 21:30, Edward wrote: I'm

Re: Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-06 Thread Chesnay Schepler
Thanks for reporting this. To narrow things down a bit, is your job using both kafka sources and sinks? On 06.02.2018 21:30, Edward wrote: I'm having an issue where off-heap memory is growing unchecked until I get OOM exceptions. I was hoping that upgrading to 1.4 would solve these, since the

Re: How to monitor the latency?

2018-02-06 Thread Chesnay Schepler
Correct, in pre-1.5 the latency metric can only be used via the JMXReporter. With 1.5 you will be able to access the latency metric via any reporter of the REST API, but as it stands still not the WebUI. On 07.02.2018 05:05, Marvin777 wrote: As far as I know, as for using the latency metric,

Re: Get the JobID when Flink job fails

2018-02-06 Thread Chesnay Schepler
As far as i know there's no way to specify the JobID when submitting a job. I've responded to your previous question in a separate mail. On 06.02.2018 18:14, Vinay Patil wrote: Hi, I see we can generate our own JobID, but how do I use it to submit the job to the cluster. I am using

Re: Get the JobID when Flink job fails

2018-02-06 Thread Chesnay Schepler
Yes I think that's the only way to get it. I'll open a JIRA to print the JobID also for failed jobs. On 01.02.2018 20:50, Vinay Patil wrote: Hi, When the Flink job executes successfully I get the jobID, however when the Flink job fails the jobID is not returned. How do I get the jobId in

Maximizing resource allocation in EMR (EC2)

2018-02-06 Thread Ishwara Varnasi
Hello, my question is regarding running Flink (streaming) on EMR. How do I maximize resource allocation? For example, how do I allocate optimal number of yarn job managers, slots, heap sizes etc given EC2 instance types I'll be using in EMR? I see that (by trial/error) I'm able to allocate just

Re: How to monitor the latency?

2018-02-06 Thread Marvin777
As far as I know, as for using the latency metric, only the JMXReporter now. FLINK-7608 may be help you. 2018-02-07 11:40 GMT+08:00 FatMouse <934336...@qq.com>: > Hello: > > I hava set the LatencyTrackingInterval as 2000,but int the `Task

How to monitor the latency?

2018-02-06 Thread FatMouse
Hello: I hava set the LatencyTrackingInterval as 2000,but int the `Task Metrics` latency was always Nan.How can I monitor the latency?Thanks. Code: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setLatencyTrackingInterval(2000);

Triggering a Savepoint

2018-02-06 Thread Gregory Fee
Hi group, I want to bootstrap some aggregates based on historic data in S3 and then keep them updated based on a stream. To do this I was thinking of doing something like processing all of the historic data, doing a save point, then restoring my program from that save point but with a stream

Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-06 Thread Edward
I'm having an issue where off-heap memory is growing unchecked until I get OOM exceptions. I was hoping that upgrading to 1.4 would solve these, since the child-first classloader is supposed to resolve issues with Avro classes cached in a different classloader (which prevents the classloaders

Fwd: Question about flink checkpoint

2018-02-06 Thread Chengzhi Zhao
Hey, I am new to flink and I have a question and want to see if anyone can help here. So we have a s3 path that flink is monitoring that path to see new files available. val avroInputStream_activity = env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1) I am doing both

Re: Get the JobID when Flink job fails

2018-02-06 Thread Vinay Patil
Hi, I see we can generate our own JobID, but how do I use it to submit the job to the cluster. I am using remoteExecutionEnvironment to submit the job to the cluster. Also, can you please answer the query of earlier mail. Regards, Vinay Patil On Thu, Feb 1, 2018 at 1:50 PM, Vinay Patil

Re: Flink CEP exception during RocksDB update

2018-02-06 Thread Kostas Kloudas
Hi Varun, The branch I previously sent you has been now merged to the master. So could you try the master and tell us if you see any change in the behavior? Has the problem been fixed, or has the message of the exception changed? Thanks, Kostas > On Jan 29, 2018, at 10:09 AM, Kostas Kloudas

Re: CEP issue

2018-02-06 Thread Kostas Kloudas
Thanks a lot Vishal! We are looking forward to a test case that reproduces the failure. Kostas > On Feb 2, 2018, at 4:05 PM, Vishal Santoshi wrote: > > This is the pattern. Will create a test case. > /** > * > * @param condition a single condition is applied as

Re: how to match external checkpoints with jobs during recovery

2018-02-06 Thread Aljoscha Krettek
Hi, I'm afraid it's currently not possible to distinguish between externalised checkpoints when running multiple jobs on one JobManager because the externalised checkpoints of all jobs would be written to the same directory. In Flink 1.5 (which is not yet released, but the code for this is in

Re: deduplication with streaming sql

2018-02-06 Thread Fabian Hueske
Hi Henkka, This should be fairly easy to implement in a ProcessFunction. You're making a good call to worry about the number of timers. If you register a timer multiple times on the same time, the timer is deduplicated ;-) and will only fire once for that time. That's why the state retention time

Re: deduplication with streaming sql

2018-02-06 Thread Henri Heiskanen
Hi, Thanks. Doing this deduplication would be easy just by using vanilla flink api and state (check if this is a new key and then emit), but the issue has been automatic state cleanup. However, it looks like this streaming sql retention time implementation uses the process function and timer. I

kafka as recovery only source

2018-02-06 Thread Sofer, Tovi
Hi group, I wanted to get your suggestion on how to implement two requirements we have: * One is to read from external message queue (JMS) at very fast latency * Second is to support zero data loss, so that in case of restart and recovery, messages not checkpointed (and not

Re: deduplication with streaming sql

2018-02-06 Thread Fabian Hueske
Hi Henri, thanks for reaching out and providing code and data to reproduce the issue. I think you are right, a "SELECT DISTINCT a, b, c FROM X" should not result in a retraction stream. However, with the current implementation we internally need a retraction stream if a state retention time is

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-06 Thread Fabian Hueske
According to the JavaDocs of BucketingSink, in-progress files are still being written to. I don't know what would cause a file to remain in that state. Another thing to mention, you might want to ensure that only the last task that renames the file generates the _SUCCESS file. Otherwise, the data

Re: deduplication with streaming sql

2018-02-06 Thread Timo Walther
Hi Henri, I just noticed that I had a tiny mistake in my little test program. So SELECT DISTINCT is officially supported. But the question if this is a valid append stream is still up for discussion. I will loop in Fabian (in CC). For the general behavior you can also look into the code and

Re: deduplication with streaming sql

2018-02-06 Thread Timo Walther
Hi Henri, I try to answer your question: 1) You are right, SELECT DISTINCT should not need a retract stream. Internally, this is translated into an aggregation without an aggregate function call. So this definitely needs improvement. 2) The problem is that SELECT DISTINCT is not officially

deduplication with streaming sql

2018-02-06 Thread Henri Heiskanen
Hi, I have a use case where I would like to find distinct rows over certain period of time. Requirement is that new row is emitted asap. Otherwise the requirement is mainly to just filter out data to have smaller dataset for downstream. I noticed that SELECT DISTINCT and state retention time of

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-06 Thread xiaobin yan
Hi, Thanks for your reply! See here :https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L652

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-06 Thread Fabian Hueske
The notifyCheckpointComplete() method will be called when all subtasks of a job completed their checkpoints. Check the JavaDocs of the CheckpointListener class [1]. Please note that you need to handle the case where multiple tasks try to create the _SUCCESS file concurrently. So, there is a good

Re: Will that be a problem if POJO key type didn't override equals?

2018-02-06 Thread Tony Wei
Hi Timo, Thanks a lot. I will try it out. Best Regards, Tony Wei 2018-02-06 17:25 GMT+08:00 Timo Walther : > With heap-based state I meant state that is stored using the > MemoryStateBackend or FsStateBackend [1]. In general, even if you are just > using ValueState, the key

Re: Will that be a problem if POJO key type didn't override equals?

2018-02-06 Thread Timo Walther
With heap-based state I meant state that is stored using the MemoryStateBackend or FsStateBackend [1]. In general, even if you are just using ValueState, the key might be used internally to store your value state in hash table. I think the migration should work in your case. Otherwise feel

Re: Rebalance to subtasks in same TaskManager instance

2018-02-06 Thread Piotr Nowojski
Hi, Unfortunately I don’t think it’s currently possible in the Flink. Please feel free to submit a feature request for it on our JIRA https://issues.apache.org/jira/projects/FLINK/summary Have you tried out the setup using rebalance? In

Re: Reduce parallelism without network transfer.

2018-02-06 Thread Piotr Nowojski
Hi, Rebalance is more safe default setting that protects against data skew. And even the smallest data skew can create a bottleneck much larger then the serialisation/network transfer cost. Especially if one changes the parallelism to a value that’s not a result of multiplication or division