Re: Read from and Write to Kafka through flink

2017-04-19 Thread Tzu-Li (Gordon) Tai
Hi Pradeep,

There is not single API or connector to take input as a file and writing it to 
Kafka.
In Flink, this operation consists of 2 parts, 1) source reading from input, and 
2) sink producing to Kafka.
So, all you have to have a job that consists of that source and sink.

You’ve already figured out 2). For 1), you can take a look at the built-in file 
reading source: `StreamExecutionEnvironment.readFile`.

The program quickly executes comes out. 

I might need some more information here:
Do you mean that the job finished executing very fast?
If so, there should be an error of some kind. Could you find and paste it here?

If the job is actually running, and you’re constantly writing to the Kafka 
topic, but the job just isn’t consuming them, there are a few things you could 
probably check:
1) are you sure the Kafka broker is the same version as the connector you are 
using?
2) make sure that you are using different consumer groups, if the offsets are 
committed back to Kafka. Check out [1] to see in which conditions offsets are 
committed.

By the way, I’m continuing this thread only on the user@ mailing list, as 
that’s the more suitable place for this.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
On 20 April 2017 at 7:38:36 AM, Pradeep Anumala (pradeep.anuma...@gmail.com) 
wrote:

Hi,  
I am a beginner with Apache Flink. I am trying to write to Kafka through  
a file and read the data from kafka. I see there is an API to read from and  
write to kafka.  

The following writes to kafka  
FlinkKafkaProducer08 myProducer = new FlinkKafkaProducer08(  
"localhost:9092", // broker list  
"my-topic", // target topic  
new SimpleStringSchema()); // serialization schema  

Is there any API which takes input as file and writes the file content to  
kafka ?  


My second question  
-  
I have run the kafka producer on the terminal  
I am trying to read from kafka using the below code. But this doesn't print  
any output though I am giving some input in the producer terminal.  
The program quickly executes comes out. Please let me know how I can read  
from kafka ?  

DataStream data = env.addSource(new  
FlinkKafkaConsumer010("myTopic",new SimpleStringSchema(),  
props)).print();  


回复:Yarn terminating TM for pmem limit cascades causing all jobs to fail

2017-04-19 Thread Zhijiang(wangzhijiang999)
Hi Shannon,

   Have you tried to increase the total memory size for task manager 
container?  Maybe the maximum memory requirement is beyond your current setting.
And also you should check your UDF would not consume memory increasingly which 
would not be recycled.

If your UDF is not consuming much memory and the container still 
exceeds pmem limits as a result after increase the memory size , that may 
indicate the memory leak. But you did not get OOM exception, so it is not 
related to heap memory issue, maybe the native memory causes this problem. 
RocksDB will use native memory, so you can try to upgrade the version as 
Stephan's suggestions.  Good luck!
Cheers,zhijiang--发件人:Stephan
 Ewen 发送时间:2017年4月19日(星期三) 21:25收件人:Shannon Carey 
抄 送:user@flink.apache.org 主 题:Re: 
Yarn terminating TM for pmem limit cascades causing all jobs to fail
Hi Shannon!
Increasing the number of retries is definitely a good idea.
The fact that you see increasing pmem use after failures / retries - let's dig 
into that. There are various possible leaks depending on what you use:
  (1) There may be a leak in class-loading (or specifically class unloading). 
1.1.x dynamically loads code when tasks are (re)started. This requires that 
code can be unloaded, which means that tasks (after being cancelled) must have 
no more references to the classes. Class leaks typically come when you spawn 
threads (or use libraries that spawn threads) but do not shut them down when 
tasks are cancelled.
    You can check this in the Flink UI by looking at the non-heap memory 
consumption of the TaskManagers. In case of that type of leak, that number 
should continuously grow.
    1.2.x does not re-load code on each task restart in the Yarn per-job mode.

  (2) There may be a leak in the native memory allocation of some library you 
use, such as Netty or so.

  (3) As for a RocksDB leak - I am not directly aware of a known leak in 1.1.x, 
but the RocksDB code has been improved quite a bit from 1.1.x to 1.2.x. It may 
be worth checking out 1.2.x to see if that fixes the issue.

The "Association with remote system … has failed, address is now gated for 
[5000] ms. Reason is: [Disassociated]." is what akka logs if a remote system is 
lost - hence a normal artifact of taskmanager failures.
Greetings,Stephan


On Wed, Apr 19, 2017 at 12:26 AM, Shannon Carey  wrote:
I'm on Flink 1.1.4. We had yet another occurrence of Yarn killing a TM due to 
exceeding pmem limits and all jobs failing as a result. I thought I had 
successfully disabled that check, but apparently the property doesn't work as 
expected in EMR.
From what I can tell in the logs, it looks like after the first TM was killed 
by Yarn, the jobs failed and were retried. However, when they are retried they 
cause increased pmem load on yet another TM, which results in Yarn killing 
another TM. That caused the jobs to fail again. This happened 5 times until our 
job retry policy gave up and allowed the jobs to fail permanently. Obviously, 
this situation is very problematic because it results in the loss of all job 
state, plus it requires manual intervention to start the jobs again.
The job retries eventually fail due to, "Could not restart the job ... The slot 
in which the task was executed has been released. Probably loss of TaskManager" 
or due to "Could not restart the job … Connection unexpectedly closed by remote 
task manager … This might indicate that the remote task manager was lost." 
Those are only the final failure causes: Flink does not appear to log the cause 
of intermediate restart failures.
I assume that the messages logged from the JobManager about "Association with 
remote system … has failed, address is now gated for [5000] ms. Reason is: 
[Disassociated]." is due to the TM failing, and is expected/harmless?
It seems like disabling the pmem check will fix this problem, but I am 
wondering if this is related: 
https://flink.apache.org/faq.html#the-slot-allocated-for-my-task-manager-has-been-released-what-should-i-do
 ? I don't see any log messages about quarantined TMs…
Do you think that increasing the # of job retries so that the jobs don't fail 
until all TMs are replaced with fresh ones fix this issue? The 
"memory.percent-free" metric from Collectd did go down to 2-3% on the TMs 
before they failed, and shot back up to 30-40% on TM restart (though I'm not 
sure how much of that had to do with the loss of state).  So, memory usage may 
be a real problem, but we don't get an OOM exception so I'm not sure we can 
control this from the JVM perspective. Are there other memory adjustments we 
should make which would allow our TMs to run for long periods of time without 
having this problem? Is there perhaps a memory leak in RocksDB?
Thanks for any help you can provide,Shannon



Re: UnilateralSortMerger error (again)

2017-04-19 Thread Flavio Pompermaier
I could but only if there's a good probability that it fix the
problem...how confident are you about it?

On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu  wrote:

> Looking at git log of DataInputDeserializer.java , there has been some
> recent change.
>
> If you have time, maybe try with 1.2.1 RC and see if the error is
> reproducible ?
>
> Cheers
>
> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier  > wrote:
>
>> Hi to all,
>> I think I'm again on the weird Exception with the
>> SpillingAdaptiveSpanningRecordDeserializer...
>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>> disk but the Exception thrown is not very helpful. Any idea?
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1094)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:99)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.EOFException
>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>> gnedByte(DataInputDeserializer.java:306)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:69)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:74)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:28)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:193)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:36)
>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>> gate.read(ReusingDeserializationDelegate.java:57)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> daptiveSpanningRecordDeserializer.java:144)
>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:72)
>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:42)
>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>
>>
>> Best,
>> Flavio
>>
>
>


Re: Flink memory usage

2017-04-19 Thread Fabian Hueske
Hi Billy,

Flink's internal operators are implemented to not allocate heap space
proportional to the size of the input data.
Whenever Flink needs to hold data in memory (e.g., for sorting or building
a hash table) the data is serialized into managed memory. If all memory is
in use, Flink starts spilling to disk. This blog post discusses how Flink
uses its managed memory [1] (still up to date, even though it's almost 2
years old).
The runtime code should actually quite stable. Most of the code has been
there for several years (even before Flink was donated to the ASF) and we
haven't seen many bugs reported for the DataSet runtime. Of course this
does not mean that the code doesn't contain bugs.

However, Flink does not take care of the user code. For example a
GroupReduceFunction that collects a lot of data, e.g., in a List on the
heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.
Also reducing the size of the managed memory to have more heap space
available might help.
If that doesn't solve the problem, it would be good if you could share some
details about your job (which operators, which local strategies, how many
operators) that might help to identify the misbehaving operator.

Thanks, Fabian

[1]
https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html



2017-04-19 16:09 GMT+02:00 Newport, Billy :

> How does Flink use memory? We’re seeing cases when running a job on larger
> datasets where it throws OOM exceptions during the job. We’re using the
> Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround
> by using fewer slots but it seems unintuitive that I need to change these
> settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why
> couldn’t I run a job with a single task and a single slot for any size job
> successfully other than it takes much longer to run.
>
>
>
> Thanks
>
> Billy
>
>
>
>
>


Re: Flink groupBy

2017-04-19 Thread Fabian Hueske
Hi Alieh,

Flink uses hash partitioning to assign grouping keys to parallel tasks by
default.
You can implement a custom partitioner or use range partitioning (which has
some overhead) to control the skew.

There is no automatic load balancing happening.

Best, Fabian

2017-04-19 14:42 GMT+02:00 Alieh :

> Hi All
>
> Is there anyway in Flink to send a process to a reducer?
>
> If I do "test.groupby(1).reduceGroup", each group is processed on one
> reducer? And if the number of groups is more than the number of task slots
> we have, does Flink distribute the process evenly? I mean if we have for
> example groups of size 10, 5, 5 and we have two task slots, is the process
> distributed in this way?
>
> task slot1: group of size 10
>
> task slot2: two groups of size 5
>
> Best,
>
> Alieh
>
>


Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-19 Thread Luis Lázaro

Hi everyone, 
i am working on a use case  with CEP and Flink:

Flink 1.2
Source is Kafka configured with one single partition.
Data are syslog standard messages parsed as LogEntry (object with attributes 
like timestamp, service, severity, etc)
An event is a LogEntry.
If two consecutives LogEntry with severity ERROR (3) and same service are 
matched in 10 minutes period, an ErrorAlert must be triggered.


Allthough i cannot warrant the ascending order of events (LogEntry) when 
consuming from kafka, i decided to try this implementation:
Timestamps per Kafka partition 



//My events provide its own timestamps
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

//"Watermarks are generated inside the Kafka consumer, per Kafka partition":
val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
FlinkKafkaConsumer08[LogEntry](
  parameterTool.getRequired("topic"), new 
LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
  parameterTool.getProperties)

//may not be ascending order
val kafkaSourceAssignedTimesTamp = 
kafkaSource.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor[LogEntry] {
  override def extractAscendingTimestamp(t: LogEntry): Long = {
ProcessorHelper.toTimestamp(t.timestamp).getTime
  }
})

val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)

 I implemented a pattern like:

myPattern = 
 Pattern
  .begin[LogEntry]("First Event")
  .subtype(classOf[LogEntry])
  .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
  .next("Second Event")
  .subtype(classOf[LogEntry])
  .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
  .within(Time.minutes(10))
  }

  This pattern will trigger alert when two consecutives LogEntry with severity 
ERROR and with same service (it will be generate alerts for each service 
individually)

  CEP.pattern(stream
.keyBy(_.service),
myPattern)


An alert is made of two logEntry:

ErrorAlert:
service_name-ERROR-timestamp first event
service_name-ERROR-timestamp second event

I am getting alerts like this:

ErrorAlert:
service_2-3-2017-04-19 06:57:49
service_2-3-2017-04-19 07:02:23

ErrorAlert:
service_2-3-2017-04-19 07:32:37
service_2-3-2017-04-19 07:34:06

ErrorAlert:
service_1-3-2017-04-19 07:25:04
service_1-3-2017-04-19 07:29:39

ErrorAlert:
service_1-3-2017-04-19 07:29:39
service_1-3-2017-04-19 07:30:37

ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 06:54:48
service_2-3-2017-04-19 06:55:03

ErrorAlert:
service_3-3-2017-04-19 07:21:11
service_3-3-2017-04-19 07:24:52

ErrorAlert:
service_1-3-2017-04-19 07:30:05
service_1-3-2017-04-19 07:31:33

ErrorAlert:
service_3-3-2017-04-19 07:08:31
service_3-3-2017-04-19 07:17:42

ErrorAlert:
service_1-3-2017-04-19 07:02:30
service_1-3-2017-04-19 07:06:58

ErrorAlert:
service_3-3-2017-04-19 07:03:50
service_3-3-2017-04-19 07:11:48

ErrorAlert:
service_3-3-2017-04-19 07:19:04
service_3-3-2017-04-19 07:21:25

ErrorAlert:
service_3-3-2017-04-19 07:33:13
service_3-3-2017-04-19 07:38:47


I also tried this approach:
bounded out-of-orderness 


kafkaSource.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
  override def extractTimestamp(t: LogEntry): Long = {
ProcessorHelper.toTimestamp(t.timestamp).getTime
  }
})

Time.seconds(0) —> if i set like this, do i prevent the events from being 
delivered with delayed ?

But i get the same problem as decribed above:

……
ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!
…...

Initially i thought my pattern was not correctly implemented but the problem 
seems to be i am unable to assign timestamp and consequently emit watermark 
properly when events are unordered.

Any sugestion is well apreciated, thanks in advance.


Best regards, Luis



Re: Yarn terminating TM for pmem limit cascades causing all jobs to fail

2017-04-19 Thread Stephan Ewen
Hi Shannon!

Increasing the number of retries is definitely a good idea.

The fact that you see increasing pmem use after failures / retries - let's
dig into that. There are various possible leaks depending on what you use:

  (1) There may be a leak in class-loading (or specifically class
unloading). 1.1.x dynamically loads code when tasks are (re)started. This
requires that code can be unloaded, which means that tasks (after being
cancelled) must have no more references to the classes. Class leaks
typically come when you spawn threads (or use libraries that spawn threads)
but do not shut them down when tasks are cancelled.

You can check this in the Flink UI by looking at the non-heap memory
consumption of the TaskManagers. In case of that type of leak, that number
should continuously grow.

1.2.x does not re-load code on each task restart in the Yarn per-job
mode.


  (2) There may be a leak in the native memory allocation of some library
you use, such as Netty or so.


  (3) As for a RocksDB leak - I am not directly aware of a known leak in
1.1.x, but the RocksDB code has been improved quite a bit from 1.1.x to
1.2.x. It may be worth checking out 1.2.x to see if that fixes the issue.


The "Association with remote system … has failed, address is now gated for
[5000] ms. Reason is: [Disassociated]." is what akka logs if a remote
system is lost - hence a normal artifact of taskmanager failures.

Greetings,
Stephan



On Wed, Apr 19, 2017 at 12:26 AM, Shannon Carey  wrote:

> I'm on Flink 1.1.4. We had yet another occurrence of Yarn killing a TM due
> to exceeding pmem limits and all jobs failing as a result. I thought I had
> successfully disabled that check, but apparently the property doesn't work
> as expected in EMR.
>
> From what I can tell in the logs, it looks like after the first TM was
> killed by Yarn, the jobs failed and were retried. However, when they are
> retried they cause increased pmem load on yet another TM, which results in
> Yarn killing another TM. That caused the jobs to fail again. This happened
> 5 times until our job retry policy gave up and allowed the jobs to fail
> permanently. Obviously, this situation is very problematic because it
> results in the loss of all job state, plus it requires manual intervention
> to start the jobs again.
>
> The job retries eventually fail due to, "Could not restart the job ... The
> slot in which the task was executed has been released. Probably loss of
> TaskManager" or due to "Could not restart the job … Connection unexpectedly
> closed by remote task manager … This might indicate that the remote task
> manager was lost." Those are only the final failure causes: Flink does not
> appear to log the cause of intermediate restart failures.
>
> I assume that the messages logged from the JobManager about "Association
> with remote system … has failed, address is now gated for [5000] ms. Reason
> is: [Disassociated]." is due to the TM failing, and is expected/harmless?
>
> It seems like disabling the pmem check will fix this problem, but I am
> wondering if this is related: https://flink.apache.org/faq.html#the-slot-
> allocated-for-my-task-manager-has-been-released-what-should-i-do ? I
> don't see any log messages about quarantined TMs…
>
> Do you think that increasing the # of job retries so that the jobs don't
> fail until all TMs are replaced with fresh ones fix this issue? The
> "memory.percent-free" metric from Collectd did go down to 2-3% on the TMs
> before they failed, and shot back up to 30-40% on TM restart (though I'm
> not sure how much of that had to do with the loss of state).  So, memory
> usage may be a real problem, but we don't get an OOM exception so I'm not
> sure we can control this from the JVM perspective. Are there other memory
> adjustments we should make which would allow our TMs to run for long
> periods of time without having this problem? Is there perhaps a memory leak
> in RocksDB?
>
> Thanks for any help you can provide,
> Shannon
>


Flink groupBy

2017-04-19 Thread Alieh

Hi All

Is there anyway in Flink to send a process to a reducer?

If I do "test.groupby(1).reduceGroup", each group is processed on one 
reducer? And if the number of groups is more than the number of task 
slots we have, does Flink distribute the process evenly? I mean if we 
have for example groups of size 10, 5, 5 and we have two task slots, is 
the process distributed in this way?


task slot1: group of size 10

task slot2: two groups of size 5

Best,

Alieh



Re: Kafka offset commits

2017-04-19 Thread Tzu-Li (Gordon) Tai
Thanks for the clarification Aljoscha!
Yes, you cannot restore from a 1.0 savepoint in Flink 1.2 (sorry, I missed the 
“1.0” part on my first reply).

@Gwenhael, I’ll try to reclarify some of the questions you asked:

Does that means that flink does not rely on the offset in written to zookeeper 
anymore, but relies on the snapshots data, implying that it’s crucial to keep 
the same snapshot folder before and after the migration to Flink 1.2 ?

For the case of 1.0 —> 1.2, you’ll have to rely on committed offsets in Kafka / 
ZK for the migration. State migration from 1.0 to 1.2 is not possible.

As Aljoscha pointed out, if you are using the same “group.id”, then there 
shouldn’t be a problem w.r.t. retaining the offset position. You just have to 
keep in mind of [1], as you would need to manually increase all committed 
offsets in Kafka / ZK by 1 for that consumer group.

Note that there is no state migration happening here, but just simply relying 
on offsets committed in Kafka / ZK to define the starting position when you’re 
starting the job in 1.2.

We were also wondering if the flink consumer was able to restore it’s offset 
from Zookeeper.

For FlinkKafkaConsumer08, the starting offset is actually always read from ZK.
Again, this isn’t a “restore”, but just defining start position using committed 
offsets.

Another question : is there an expiration to the snapshots ? We’ve been having 
issues with an app that we forgot to restart. We did it after a couple of days, 
but it looks like it did not restore correctly the offset and it started 
consuming from the oldest offset, creating duplicated data (the kafka queue has 
over a week of buffer).

There is no expiration to the offsets stored in the snapshots. The only issue 
would be if Kafka has expired that offset due to data retention settings.
If you’re sure that at the time of the restore the data hasn’t expired yet, 
there might be something weird going on.
AFAIK, the only issue that was previously known to possibly cause this was [2].
Could you check if that issue may be the case?

[1] https://issues.apache.org/jira/browse/FLINK-4723
[2] https://issues.apache.org/jira/browse/FLINK-6006
On 19 April 2017 at 5:14:35 PM, Aljoscha Krettek (aljos...@apache.org) wrote:

Hi,
AFAIK, restoring a Flink 1.0 savepoint should not be possible on Flink 1.2. 
Only restoring from Flink 1.1 savepoints is supported.

@Gordon If the consumer group stays the same the new Flink job should pick up 
where the old one stopped, right?

Best,
Aljoscha

On 18. Apr 2017, at 16:19, Gwenhael Pasquiers  
wrote:

Thanks for your answer.
Does that means that flink does not rely on the offset in written to zookeeper 
anymore, but relies on the snapshots data, implying that it’s crucial to keep 
the same snapshot folder before and after the migration to Flink 1.2 ?
We were also wondering if the flink consumer was able to restore it’s offset 
from Zookeeper.
Another question : is there an expiration to the snapshots ? We’ve been having 
issues with an app that we forgot to restart. We did it after a couple of days, 
but it looks like it did not restore correctly the offset and it started 
consuming from the oldest offset, creating duplicated data (the kafka queue has 
over a week of buffer).
B.R.
 
From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] 
Sent: lundi 17 avril 2017 07:40
To: user@flink.apache.org
Subject: Re: Kafka offset commits
 
Hi,
 
The FlinkKafkaConsumer in 1.2 is able to restore from older version state 
snapshots and bridge the migration, so there should be no problem in reading 
the offsets from older state. The smallest or highest offsets will only be used 
if the offset no longer exists due to Kafka data retention settings.
 
Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 
1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 
1 (wrt to how Kafka itself defines the committed offsets).
However, this should not affect the behavior of restoring from offsets in 
savepoints, so it should be fine.
 
Cheers,
Gordon
 
[1] https://issues.apache.org/jira/browse/FLINK-4723
 
On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers 
(gwenhael.pasqui...@ericsson.com) wrote:

Hello,
 
We’re going to migrate some applications that consume data from a Kafka 0.8 
from Flink 1.0 to Flink 1.2.
 
We are wondering if the offset commitment system changed between those two 
versions: is there a risk that the Flink 1.2-based application will start with 
no offset (thus will use either the smallest or highest one) ?
Or can we assume that the Flink 1.2 app will resume its work at the same offset 
than the Flink 1.0 app stopped (if they use the same consumer group id) ?
 
B.R.



Re: Kafka offset commits

2017-04-19 Thread Aljoscha Krettek
Hi,
AFAIK, restoring a Flink 1.0 savepoint should not be possible on Flink 1.2. 
Only restoring from Flink 1.1 savepoints is supported.

@Gordon If the consumer group stays the same the new Flink job should pick up 
where the old one stopped, right?

Best,
Aljoscha

> On 18. Apr 2017, at 16:19, Gwenhael Pasquiers 
>  wrote:
> 
> Thanks for your answer.
> Does that means that flink does not rely on the offset in written to 
> zookeeper anymore, but relies on the snapshots data, implying that it’s 
> crucial to keep the same snapshot folder before and after the migration to 
> Flink 1.2 ?
> We were also wondering if the flink consumer was able to restore it’s offset 
> from Zookeeper.
> Another question : is there an expiration to the snapshots ? We’ve been 
> having issues with an app that we forgot to restart. We did it after a couple 
> of days, but it looks like it did not restore correctly the offset and it 
> started consuming from the oldest offset, creating duplicated data (the kafka 
> queue has over a week of buffer).
> B.R.
>  
> From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] 
> Sent: lundi 17 avril 2017 07:40
> To: user@flink.apache.org
> Subject: Re: Kafka offset commits
>  
> Hi,
>  
> The FlinkKafkaConsumer in 1.2 is able to restore from older version state 
> snapshots and bridge the migration, so there should be no problem in reading 
> the offsets from older state. The smallest or highest offsets will only be 
> used if the offset no longer exists due to Kafka data retention settings.
>  
> Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 
> 1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off 
> by 1 (wrt to how Kafka itself defines the committed offsets).
> However, this should not affect the behavior of restoring from offsets in 
> savepoints, so it should be fine.
>  
> Cheers,
> Gordon
>  
> [1] https://issues.apache.org/jira/browse/FLINK-4723 
> 
>  
> On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers 
> (gwenhael.pasqui...@ericsson.com ) 
> wrote:
> 
> Hello,
>  
> We’re going to migrate some applications that consume data from a Kafka 0.8 
> from Flink 1.0 to Flink 1.2.
>  
> We are wondering if the offset commitment system changed between those two 
> versions: is there a risk that the Flink 1.2-based application will start 
> with no offset (thus will use either the smallest or highest one) ?
> Or can we assume that the Flink 1.2 app will resume its work at the same 
> offset than the Flink 1.0 app stopped (if they use the same consumer group 
> id) ?
>  
> B.R.



Re: Flink slots, threads, task, etc

2017-04-19 Thread Flavio Pompermaier
Hi Aljoscha,
thanks for the reply, it was not urgent and I was aware of the FF...btw,
congratulations for it, I saw many interesting talks!
Flink community has grown a lot since it was Stratosphere ;)
Just one last question: in many of my use cases it could be helpful to see
how many of the created splits were "consumed" by an inputFormat/source.
Is it possible to monitor this part somewhere in the dashboards or with a
custom metric?

On Tue, Apr 18, 2017 at 5:24 PM, Aljoscha Krettek 
wrote:

> Hi,
> sorry for not getting any responses but I think everyone was quite busy
> with Flink Forward SF. I’m also no expert on the topic but I’ll try and
> give some answers.
>
> Regarding a Google Doc version, I don’t think that there is any. You would
> have to modify the Markdown version we have in the doc.
>
> For the other answers I’ll reuse an example program that consists of
> Source -> Map -> Sink, with chaining disabled and parallelism 2. We’ll this
> have three Tasks: Source, Map, and Sink, with each having two subtasks.
> Let’s denote the subtasks by a number in parenthesis so the first subtask
> for Source is Source(1), second one is Source(2). I’ll also refer to
> Source(1) -> Map(1) -> Sink(1) as a slice of the execution graph since
> these can be executed within one slot.
>
> Regarding 1, I think this is true. However, a single slot can execute a
> complete slice of the execution graph where each subtask (from a different
> task) would be executed by its own thread.
>
> Regarding 2.1, Yes, I think it cannot run multiple subtasks of the same
> task while it is possible (and in fact done) to execute all the subtasks of
> a slide in the same slot.
>
> Regarding 2.2, This is so to allow executing a pipeline of parallelism 8
> using a cluster that has 8 free slots. Basically, each slice fills one slot.
>
> Regarding 3, I don’t really have an answer.
>
> Regarding 4, Yes, this can get a bit out of hand if you have very long
> pipelines.
>
> Best,
> Aljoscha
>
> On 11. Apr 2017, at 14:37, Flavio Pompermaier 
> wrote:
>
> Any feedback here..?
>
> On Wed, Apr 5, 2017 at 7:43 PM, Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I had a very long but useful chat with Fabian and I understood a lot of
>> concepts that was not clear at all to me. We started from the Flink runtime
>> documentation page (https://ci.apache.org/project
>> s/flink/flink-docs-release-1.2/concepts/runtime.html) but
>> I discovered that the terminology is very inconsistent and misleading
>> along the page...
>>
>> For example, one of the very first sentences is :
>> "Flink chains operator subtasks together into tasks. Each task is
>> executed by one thread."
>> What I first understood was that every operator can be executed only by a
>> single thread in all the clusterprobably it should be better "one
>> thread per task slot" (at least).
>> Moreover, if I'm not wrong, a Task Slot can execute only 1 subtask (aka
>> parallel instance) of each task and there's no limit to the number of
>> subtasks per slot (and this is not highlighted at all in that document).
>> The only constraint is that they should belong to different tasks (right?).
>>
>> If there's a google doc version of that page I could try to rewrite it
>> down in order to make it easier to understand some parts...however I still
>> have some more questions:
>>
>>1. Is it correct that a single Task Slot can execute only a single
>>subtask of each task and that this task is executed by a single thread
>>within the slot)?
>>2. If it so:
>>   1. why at that page there's written "By default, Flink allows
>>   subtasks to share slots even if they are subtasks of different tasks, 
>> so
>>   long as they are from the same job"? It seems that it is more common 
>> to run
>>   multiple subtasks of the same task (in a slot) than executing different
>>   substasks of different tasks, although this is still permitted...from 
>> what
>>   I understood a slot cannot run multiple subtask of the same task at 
>> all!
>>   2. and why this constraint? Is there any good reason for that? A
>>   subtask is mapped to 1 thread in the TaskManager, so why a TM with 2 
>> slots
>>   can run 2 subtasks of the same task (in the same JVM) while a TM with 1
>>   slot cannot  (while it can execute an arbitrary number of subtasks of
>>   different tasks)?
>>3. It it is not so, there's no images representing such a situation
>>in that page...
>>4. Isn't dangerous to allow (potentially) an unlimited number of
>>threads per TM slot??
>>
>> Cheers,
>> Flavio
>>
>>
>