Re-keying / sub-keying a stream without repartitioning

2017-04-21 Thread Elias Levy
This is something that has come up before on the list, but in a different
context.  I have a need to rekey a stream but would prefer the stream to
not be repartitioned.  There is no gain to repartitioning, as the new
partition key is a composite of the stream key, going from a key of A to a
key of (A, B), so all values for the resulting streams are already being
rerouted to the same node and repartitioning them to other nodes would
simply generate unnecessary network traffic and serde overhead.

Unlike previous use cases, I am not trying to perform aggregate
operations.  Instead I am executing CEP patterns.  Some patterns apply the
the stream keyed by A and some on the stream keyed by (A,B).

The API does not appear to have an obvious solution to this situation.
keyBy() will repartition and there is isn't something like subKey() to
subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).

I suppose I could accomplish it by using partitionCustom(), ignoring the
second element in the key, and delegating to the default partitioner
passing it only the first element, thus resulting in no change of task
assignment.

Thoughts?


Re: Why TimerService interface in ProcessFunction doesn't have deleteEventTimeTimer

2017-04-21 Thread Ted Yu
Benjamin has an implementation for Hierarchical Timing Wheels (Apache
License) :

https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java

If there is some interest, we can port the above over.

Cheers

On Fri, Apr 21, 2017 at 12:44 PM, Gyula Fóra  wrote:

> The timer will actually fire and will be removed at the original time, but
> we don't trigger any action on it. We also remove the tombstone state
> afterwards.
>
> So we use more memory yes depending on the length and number of timers
> that were deleted. But it is eventually cleaned up.
>
> Gyula
>
> Ted Yu  ezt írta (időpont: 2017. ápr. 21., P, 21:38):
>
>> A bit curious: wouldn't using "tombstone" markers constitute some memory
>> leak (since Timers are not released) ?
>>
>> Cheers
>>
>> On Fri, Apr 21, 2017 at 12:23 PM, Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> I thought I would drop my opinion here maybe it is relevant.
>>>
>>> We have used the Flink internal timer implementation in many of our
>>> production applications, this supports the Timer deletion but the deletion
>>> actually turned out to be a huge performance bottleneck because of the bad
>>> deletion performance of the Priority queue.
>>>
>>> In many of our cases deletion could have been avoided by some more
>>> clever registration/firing logic and we also ended up completely avoiding
>>> deletion and instead using "tombstone" markers by setting a flag in the
>>> state which timers not to fire when they actually trigger.
>>>
>>> Gyula
>>>
>>>
>>>
>>> Aljoscha Krettek  ezt írta (időpont: 2017. ápr.
>>> 21., P, 14:47):
>>>
 Hi,
 the reasoning behind the limited user facing API was that we were (are)
 not sure whether we would be able to support efficient deletion of timers
 for different ways of storing timers.

 @Stephan, If I remember correctly you were the strongest advocate for
 not allowing timer deletion. What’s your thinking on this? There was also a
 quick discussion on https://issues.apache.org/jira/browse/FLINK-3089 where
 Xiaogang explained that the (new, not merged) RocksDB based timers would
 have efficient timer deletion.

 Best,
 Aljoscha

 On 20. Apr 2017, at 11:56, Jagadish Bihani 
 wrote:

 Hi

 I am working on a use case where I want to start a timer for a given
 event type and when that timer expires it will perform certain action. This
 can be done using Process Function.

 But I also want to cancel scheduled timer in case of some other types
 of events. I also checked the implementation of HeapInternalTimerService
 which implements InternalTimerService interface has those implementations
 already. Also SimpleTimerService which overrides TimerService also uses
 InternalTimerService and simply passes VoidNamespace.INSTANCE.

 So in a way we are using InternalTimerService interface's
 implementations everywhere.

 So what is the reason that ProcessFunction.Context uses TimerService?
 Any reason 'deleteEventTimeTimer' is not exposed to users? If I want to use
 the deleteEvent functionality how should I go about it?

 --
 Thanks and Regards,
 Jagadish Bihani



>>


Re: UnilateralSortMerger error (again)

2017-04-21 Thread Flavio Pompermaier
Thanks for the explanation . Is there a way to force this behaviour in a
local environment (to try to debug the problem)?

On 21 Apr 2017 21:49, "Fabian Hueske"  wrote:

> Hi Flavio,
>
> these files are used for spilling data to disk. In your case sorted runs
> of records.
> Later all (up to a fanout threshold) these sorted runs are read and merged
> to get a completely sorted record stream.
>
> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier :
>
>> The error appears as soon as some taskmanager generates some inputchannel
>> file.
>> What are those files used for?
>>
>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> In another run of the job I had another Exception. Could it be helpful?
>>>
>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>> k.java:355)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>>> Serializer consumed more bytes than the record had. This indicates broken
>>> serialization. If you are using custom serialization types (Value or
>>> Writable), check their serialization methods. If you are using a
>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> .getIterator(UnilateralSortMerger.java:619)
>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>> ask.java:1094)
>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>> (GroupReduceDriver.java:99)
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>> ... 3 more
>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>> daptiveSpanningRecordDeserializer.java:123)
>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>> Reader.next(MutableRecordReader.java:42)
>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>> ReaderIterator.java:59)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>> ySegment.java:104)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:69)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:74)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:28)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:193)
>>> at 

Re: UnilateralSortMerger error (again)

2017-04-21 Thread Fabian Hueske
Hi Flavio,

these files are used for spilling data to disk. In your case sorted runs of
records.
Later all (up to a fanout threshold) these sorted runs are read and merged
to get a completely sorted record stream.

2017-04-21 14:09 GMT+02:00 Flavio Pompermaier :

> The error appears as soon as some taskmanager generates some inputchannel
> file.
> What are those files used for?
>
> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier  > wrote:
>
>> In another run of the job I had another Exception. Could it be helpful?
>>
>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:355)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1094)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:99)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> daptiveSpanningRecordDeserializer.java:123)
>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:72)
>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:42)
>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>> ySegment.java:104)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
>> readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:69)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:74)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:28)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:193)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:36)
>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>> gate.read(ReusingDeserializationDelegate.java:57)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> 

Re: Why TimerService interface in ProcessFunction doesn't have deleteEventTimeTimer

2017-04-21 Thread Gyula Fóra
The timer will actually fire and will be removed at the original time, but
we don't trigger any action on it. We also remove the tombstone state
afterwards.

So we use more memory yes depending on the length and number of timers that
were deleted. But it is eventually cleaned up.

Gyula

Ted Yu  ezt írta (időpont: 2017. ápr. 21., P, 21:38):

> A bit curious: wouldn't using "tombstone" markers constitute some memory
> leak (since Timers are not released) ?
>
> Cheers
>
> On Fri, Apr 21, 2017 at 12:23 PM, Gyula Fóra  wrote:
>
>> Hi!
>>
>> I thought I would drop my opinion here maybe it is relevant.
>>
>> We have used the Flink internal timer implementation in many of our
>> production applications, this supports the Timer deletion but the deletion
>> actually turned out to be a huge performance bottleneck because of the bad
>> deletion performance of the Priority queue.
>>
>> In many of our cases deletion could have been avoided by some more clever
>> registration/firing logic and we also ended up completely avoiding deletion
>> and instead using "tombstone" markers by setting a flag in the state which
>> timers not to fire when they actually trigger.
>>
>> Gyula
>>
>>
>>
>> Aljoscha Krettek  ezt írta (időpont: 2017. ápr.
>> 21., P, 14:47):
>>
>>> Hi,
>>> the reasoning behind the limited user facing API was that we were (are)
>>> not sure whether we would be able to support efficient deletion of timers
>>> for different ways of storing timers.
>>>
>>> @Stephan, If I remember correctly you were the strongest advocate for
>>> not allowing timer deletion. What’s your thinking on this? There was also a
>>> quick discussion on https://issues.apache.org/jira/browse/FLINK-3089 where
>>> Xiaogang explained that the (new, not merged) RocksDB based timers would
>>> have efficient timer deletion.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 20. Apr 2017, at 11:56, Jagadish Bihani 
>>> wrote:
>>>
>>> Hi
>>>
>>> I am working on a use case where I want to start a timer for a given
>>> event type and when that timer expires it will perform certain action. This
>>> can be done using Process Function.
>>>
>>> But I also want to cancel scheduled timer in case of some other types of
>>> events. I also checked the implementation of HeapInternalTimerService which
>>> implements InternalTimerService interface has those implementations
>>> already. Also SimpleTimerService which overrides TimerService also uses
>>> InternalTimerService and simply passes VoidNamespace.INSTANCE.
>>>
>>> So in a way we are using InternalTimerService interface's
>>> implementations everywhere.
>>>
>>> So what is the reason that ProcessFunction.Context uses TimerService?
>>> Any reason 'deleteEventTimeTimer' is not exposed to users? If I want to use
>>> the deleteEvent functionality how should I go about it?
>>>
>>> --
>>> Thanks and Regards,
>>> Jagadish Bihani
>>>
>>>
>>>
>


Re: Why TimerService interface in ProcessFunction doesn't have deleteEventTimeTimer

2017-04-21 Thread Ted Yu
A bit curious: wouldn't using "tombstone" markers constitute some memory
leak (since Timers are not released) ?

Cheers

On Fri, Apr 21, 2017 at 12:23 PM, Gyula Fóra  wrote:

> Hi!
>
> I thought I would drop my opinion here maybe it is relevant.
>
> We have used the Flink internal timer implementation in many of our
> production applications, this supports the Timer deletion but the deletion
> actually turned out to be a huge performance bottleneck because of the bad
> deletion performance of the Priority queue.
>
> In many of our cases deletion could have been avoided by some more clever
> registration/firing logic and we also ended up completely avoiding deletion
> and instead using "tombstone" markers by setting a flag in the state which
> timers not to fire when they actually trigger.
>
> Gyula
>
>
>
> Aljoscha Krettek  ezt írta (időpont: 2017. ápr. 21.,
> P, 14:47):
>
>> Hi,
>> the reasoning behind the limited user facing API was that we were (are)
>> not sure whether we would be able to support efficient deletion of timers
>> for different ways of storing timers.
>>
>> @Stephan, If I remember correctly you were the strongest advocate for not
>> allowing timer deletion. What’s your thinking on this? There was also a
>> quick discussion on https://issues.apache.org/jira/browse/FLINK-3089 where
>> Xiaogang explained that the (new, not merged) RocksDB based timers would
>> have efficient timer deletion.
>>
>> Best,
>> Aljoscha
>>
>> On 20. Apr 2017, at 11:56, Jagadish Bihani 
>> wrote:
>>
>> Hi
>>
>> I am working on a use case where I want to start a timer for a given
>> event type and when that timer expires it will perform certain action. This
>> can be done using Process Function.
>>
>> But I also want to cancel scheduled timer in case of some other types of
>> events. I also checked the implementation of HeapInternalTimerService which
>> implements InternalTimerService interface has those implementations
>> already. Also SimpleTimerService which overrides TimerService also uses
>> InternalTimerService and simply passes VoidNamespace.INSTANCE.
>>
>> So in a way we are using InternalTimerService interface's implementations
>> everywhere.
>>
>> So what is the reason that ProcessFunction.Context uses TimerService? Any
>> reason 'deleteEventTimeTimer' is not exposed to users? If I want to use the
>> deleteEvent functionality how should I go about it?
>>
>> --
>> Thanks and Regards,
>> Jagadish Bihani
>>
>>
>>


Re: Why TimerService interface in ProcessFunction doesn't have deleteEventTimeTimer

2017-04-21 Thread Gyula Fóra
Hi!

I thought I would drop my opinion here maybe it is relevant.

We have used the Flink internal timer implementation in many of our
production applications, this supports the Timer deletion but the deletion
actually turned out to be a huge performance bottleneck because of the bad
deletion performance of the Priority queue.

In many of our cases deletion could have been avoided by some more clever
registration/firing logic and we also ended up completely avoiding deletion
and instead using "tombstone" markers by setting a flag in the state which
timers not to fire when they actually trigger.

Gyula



Aljoscha Krettek  ezt írta (időpont: 2017. ápr. 21.,
P, 14:47):

> Hi,
> the reasoning behind the limited user facing API was that we were (are)
> not sure whether we would be able to support efficient deletion of timers
> for different ways of storing timers.
>
> @Stephan, If I remember correctly you were the strongest advocate for not
> allowing timer deletion. What’s your thinking on this? There was also a
> quick discussion on https://issues.apache.org/jira/browse/FLINK-3089 where
> Xiaogang explained that the (new, not merged) RocksDB based timers would
> have efficient timer deletion.
>
> Best,
> Aljoscha
>
> On 20. Apr 2017, at 11:56, Jagadish Bihani  wrote:
>
> Hi
>
> I am working on a use case where I want to start a timer for a given event
> type and when that timer expires it will perform certain action. This can
> be done using Process Function.
>
> But I also want to cancel scheduled timer in case of some other types of
> events. I also checked the implementation of HeapInternalTimerService which
> implements InternalTimerService interface has those implementations
> already. Also SimpleTimerService which overrides TimerService also uses
> InternalTimerService and simply passes VoidNamespace.INSTANCE.
>
> So in a way we are using InternalTimerService interface's implementations
> everywhere.
>
> So what is the reason that ProcessFunction.Context uses TimerService? Any
> reason 'deleteEventTimeTimer' is not exposed to users? If I want to use the
> deleteEvent functionality how should I go about it?
>
> --
> Thanks and Regards,
> Jagadish Bihani
>
>
>


Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-21 Thread Kostas Kloudas
Hi Luis and Aljoscha,

In Flink-1.2 late events were not dropped, but they were processed as normal 
ones.
This is fixed for Flink-1.3 with 
https://issues.apache.org/jira/browse/FLINK-6205 
.

I would recommend you to switch to the master branch (this will be the upcoming 
Flink-1.3
release) and try it out to see if everything works as expected.

The CEP in Flink-1.3 will come with richer patterns and a lot of bug-fixes and 
by 
trying it out you will also help us stabilize it even further before its 
official release.

Thanks a lot,
Kostas

> On Apr 19, 2017, at 3:28 PM, Luis Lázaro  wrote:
> 
> 
> Hi everyone, 
> i am working on a use case  with CEP and Flink:
> 
> Flink 1.2
> Source is Kafka configured with one single partition.
> Data are syslog standard messages parsed as LogEntry (object with attributes 
> like timestamp, service, severity, etc)
> An event is a LogEntry.
> If two consecutives LogEntry with severity ERROR (3) and same service are 
> matched in 10 minutes period, an ErrorAlert must be triggered.
> 
> 
> Allthough i cannot warrant the ascending order of events (LogEntry) when 
> consuming from kafka, i decided to try this implementation:
> Timestamps per Kafka partition 
> 
> 
> 
> //My events provide its own timestamps
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
> 
> //"Watermarks are generated inside the Kafka consumer, per Kafka partition":
> val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
> FlinkKafkaConsumer08[LogEntry](
>   parameterTool.getRequired("topic"), new 
> LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
>   parameterTool.getProperties)
> 
> //may not be ascending order
> val kafkaSourceAssignedTimesTamp = 
> kafkaSource.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor[LogEntry] {
>   override def extractAscendingTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)
> 
>  I implemented a pattern like:
> 
> myPattern = 
>  Pattern
>   .begin[LogEntry]("First Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .next("Second Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .within(Time.minutes(10))
>   }
> 
>   This pattern will trigger alert when two consecutives LogEntry with 
> severity ERROR and with same service (it will be generate alerts for each 
> service individually)
> 
>   CEP.pattern(stream
>   .keyBy(_.service),
>   myPattern)
> 
> 
> An alert is made of two logEntry:
> 
> ErrorAlert:
> service_name-ERROR-timestamp first event
> service_name-ERROR-timestamp second event
> 
> I am getting alerts like this:
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:57:49
> service_2-3-2017-04-19 07:02:23
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:32:37
> service_2-3-2017-04-19 07:34:06
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:25:04
> service_1-3-2017-04-19 07:29:39
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:29:39
> service_1-3-2017-04-19 07:30:37
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:54:48
> service_2-3-2017-04-19 06:55:03
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:21:11
> service_3-3-2017-04-19 07:24:52
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:30:05
> service_1-3-2017-04-19 07:31:33
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:08:31
> service_3-3-2017-04-19 07:17:42
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:02:30
> service_1-3-2017-04-19 07:06:58
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:03:50
> service_3-3-2017-04-19 07:11:48
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:19:04
> service_3-3-2017-04-19 07:21:25
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:33:13
> service_3-3-2017-04-19 07:38:47
> 
> 
> I also tried this approach:
> bounded out-of-orderness 
> 
> 
> kafkaSource.assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
>   override def extractTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> Time.seconds(0) —> if i set like this, do i prevent the events from being 
> delivered with delayed ?
> 
> But i get the same problem as decribed above:
> 
> ……
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> 

R: WELCOME to user@flink.apache.org

2017-04-21 Thread giacom...@libero.it
Dear Users and Apache Flink devs,

 For each one of my distributed computation, I'm generating and 
reading the json files produced by the getExecutionPlan() in order to motivate 
my benchmarks. Is there some guide providing an explaination of the exact 
meaning of the fields of the generated JSON file? I'm trying to differentiate 
from the timing result which part of the computation time was spent sending 
messages and which time was spent during either I/O or CPU operations.
 By the way, I also noticed that I do not get any information 
concerning the actual data that is been used and transmitted throughout the 
network (the actual data size and the messages' data size). 
 Moreover, currently I'm using the following way to get the JSON file

> createAndRegisterDataSinks();
> String plan = globalEnvironment.getExecutionPlan();
> createAndRegisterDataSinks();
> globalEnvironment.execute(getClass().getSimpleName()); // Running the actual 
class

  Is there a better way to do it?
  Thanks in advance for your support,

Giacomo90



Re: Failed checkpointing on HDFS : Flink don't use the right authentication

2017-04-21 Thread Aljoscha Krettek
+Gordon Could you please have a look at this? You probably know Kafka best by 
now and have also worked on security related stuff for a while now.

I’m afraid I’m not much help here but I’m hoping Gordon can help.

Best,
Aljoscha
> On 21. Apr 2017, at 12:46, Bruno Michelin Rakotondranaivo 
>  wrote:
> 
> Hi,
>  
> With flink-1.2.0, I want to consume datas from secured kafka 0.10 with 
> SASL_PLAINTEXT protocol using login/pwd from a JAAS file and store them on 
> HDFS in a kerberized cluster with user ‘hive’ as kerberos principal login.  
>  
> Checkpointing is enabled and states are back end on HDFS ‘filesystem’.
>  
> There is an error when the job want to initialize checkpoints. The app uses 
> JAAS authentication instead of Kerberos one to write on HDFS.
>  
> 15:32:56,300 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) 
> (373063aff1529c7e2ba362ad30c8b03c) switched from RUNNING to FAILED.
> java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom 
> Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)
> at 
> org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator 
> Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:534)
> at 
> org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:)
> ... 5 more
> Caused by: java.io.IOException: The given file URI 
> (hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints 
> ) points to 
> the HDFS NameNode at mynamenode:8020, but the File System could not be 
> initialized with that address.
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:288)
> at 
> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:105)
> at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)
> ... 6 more
> Caused by: java.lang.NullPointerException
> at 
> org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
> at 
> org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.(SaslRpcServer.java:378)
> at 
> org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:183)
> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:568)
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)
> ... 17 more
>  
>  
> What I 

Re: Setting operator parallelism of a running job - Flink 1.2

2017-04-21 Thread Aljoscha Krettek
Hi,
both savepoints and checkpoints use the configured state backend. Right now, 
the only difference between a checkpoint and a savepoint is that the savepoint 
has additional meta data stored with it that makes it persistent and 
relocatable. In the future, the (on-disk) format of savepoints and checkpoints 
will diverge, though.

Best,
Aljoscha
> On 21. Apr 2017, at 16:09, Dominik Safaric  wrote:
> 
> But what is then the difference between statepoints and checkpoints as 
> configured by using e.g. the StreamExecutionEnv’s setStateBackend() function? 
> 
> Best,
> Dominik
> 
>> On 21 Apr 2017, at 15:53, Aljoscha Krettek > > wrote:
>> 
>> Correct, the max-parallelism only sets bounds on how high you can set the 
>> parallelism in the future (by restoring from a savepoint).
>> 
>> Internally, the keyed state is partitioned into key groups where you have as 
>> many key groups as max parallelism. This is the unit of state that we can 
>> redistribute when the parallelism is changed and therefore the upper bound.
>> 
>> Best,
>> Aljoscha
>>> On 21. Apr 2017, at 15:50, Dominik Safaric >> > wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> In other words, jobs must be restarted manually? 
>>> 
>>> What about using maxParallelism() at the client level? I would expect that 
>>> it is complementary to parallelism.default in terms of allowing Flink to 
>>> handle the parallelism of operators, and changing it in accordance to 
>>> runtime conditions. However, it is not the case. 
>>> 
>>> Best,
>>> Dominik
>>> 
 On 21 Apr 2017, at 15:36, Aljoscha Krettek > wrote:
 
 Hi,
 changing the parallelism is not possible while a job is running 
 (currently). What you would have to do to change the parallelism is create 
 a savepoint and then restore from that savepoint with a different 
 parallelism.
 
 This is the savepoints documentation: 
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
  
 
 
 Best,
 Aljoscha
> On 21. Apr 2017, at 15:22, Dominik Safaric  > wrote:
> 
> Hi all,
> 
> Is it possible to set the operator parallelism using Flink CLI while a 
> job is running? 
> 
> I have a cluster of 4 worker nodes, where each node has 4 CPUs, hence the 
> number of task slots is set to 4, whereas the paralellism.default to 16. 
> 
> However, if a worker fails, whereas the jobs were configured at system 
> level to run with 16 task slots, I get the exception “Not enough free 
> slots available to run the job.” raised and the job is not able to 
> continue but instead of aborts. 
> 
> Is this the excepted behaviour? Shouldn’t Flink continue the job 
> execution with in this case only 12 slots available? If not, can someone 
> change the parallelism of a job while in the restart mode in order to 
> allow the job to continue? 
> 
> Thanks,
> Dominik
 
>>> 
>> 
> 



Re: Setting operator parallelism of a running job - Flink 1.2

2017-04-21 Thread Dominik Safaric
Hi Aljoscha,

In other words, jobs must be restarted manually? 

What about using maxParallelism() at the client level? I would expect that it 
is complementary to parallelism.default in terms of allowing Flink to handle 
the parallelism of operators, and changing it in accordance to runtime 
conditions. However, it is not the case. 

Best,
Dominik

> On 21 Apr 2017, at 15:36, Aljoscha Krettek  wrote:
> 
> Hi,
> changing the parallelism is not possible while a job is running (currently). 
> What you would have to do to change the parallelism is create a savepoint and 
> then restore from that savepoint with a different parallelism.
> 
> This is the savepoints documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
>  
> 
> 
> Best,
> Aljoscha
>> On 21. Apr 2017, at 15:22, Dominik Safaric > > wrote:
>> 
>> Hi all,
>> 
>> Is it possible to set the operator parallelism using Flink CLI while a job 
>> is running? 
>> 
>> I have a cluster of 4 worker nodes, where each node has 4 CPUs, hence the 
>> number of task slots is set to 4, whereas the paralellism.default to 16. 
>> 
>> However, if a worker fails, whereas the jobs were configured at system level 
>> to run with 16 task slots, I get the exception “Not enough free slots 
>> available to run the job.” raised and the job is not able to continue but 
>> instead of aborts. 
>> 
>> Is this the excepted behaviour? Shouldn’t Flink continue the job execution 
>> with in this case only 12 slots available? If not, can someone change the 
>> parallelism of a job while in the restart mode in order to allow the job to 
>> continue? 
>> 
>> Thanks,
>> Dominik
> 



Re: Setting operator parallelism of a running job - Flink 1.2

2017-04-21 Thread Aljoscha Krettek
Hi,
changing the parallelism is not possible while a job is running (currently). 
What you would have to do to change the parallelism is create a savepoint and 
then restore from that savepoint with a different parallelism.

This is the savepoints documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
 


Best,
Aljoscha
> On 21. Apr 2017, at 15:22, Dominik Safaric  wrote:
> 
> Hi all,
> 
> Is it possible to set the operator parallelism using Flink CLI while a job is 
> running? 
> 
> I have a cluster of 4 worker nodes, where each node has 4 CPUs, hence the 
> number of task slots is set to 4, whereas the paralellism.default to 16. 
> 
> However, if a worker fails, whereas the jobs were configured at system level 
> to run with 16 task slots, I get the exception “Not enough free slots 
> available to run the job.” raised and the job is not able to continue but 
> instead of aborts. 
> 
> Is this the excepted behaviour? Shouldn’t Flink continue the job execution 
> with in this case only 12 slots available? If not, can someone change the 
> parallelism of a job while in the restart mode in order to allow the job to 
> continue? 
> 
> Thanks,
> Dominik



Setting operator parallelism of a running job - Flink 1.2

2017-04-21 Thread Dominik Safaric
Hi all,

Is it possible to set the operator parallelism using Flink CLI while a job is 
running? 

I have a cluster of 4 worker nodes, where each node has 4 CPUs, hence the 
number of task slots is set to 4, whereas the paralellism.default to 16. 

However, if a worker fails, whereas the jobs were configured at system level to 
run with 16 task slots, I get the exception “Not enough free slots available to 
run the job.” raised and the job is not able to continue but instead of aborts. 

Is this the excepted behaviour? Shouldn’t Flink continue the job execution with 
in this case only 12 slots available? If not, can someone change the 
parallelism of a job while in the restart mode in order to allow the job to 
continue? 

Thanks,
Dominik

Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-21 Thread Aljoscha Krettek
+Kostas and +Dawid

Could you please have a look? You two have worked in these parts most recently. 
I recall that there were some problems when it comes to event time and 
out-of-order processing in CEP in Flink 1.2

Best,
Aljoscha
> On 19. Apr 2017, at 15:28, Luis Lázaro  wrote:
> 
> 
> Hi everyone, 
> i am working on a use case  with CEP and Flink:
> 
> Flink 1.2
> Source is Kafka configured with one single partition.
> Data are syslog standard messages parsed as LogEntry (object with attributes 
> like timestamp, service, severity, etc)
> An event is a LogEntry.
> If two consecutives LogEntry with severity ERROR (3) and same service are 
> matched in 10 minutes period, an ErrorAlert must be triggered.
> 
> 
> Allthough i cannot warrant the ascending order of events (LogEntry) when 
> consuming from kafka, i decided to try this implementation:
> Timestamps per Kafka partition 
> 
> 
> 
> //My events provide its own timestamps
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
> 
> //"Watermarks are generated inside the Kafka consumer, per Kafka partition":
> val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
> FlinkKafkaConsumer08[LogEntry](
>   parameterTool.getRequired("topic"), new 
> LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
>   parameterTool.getProperties)
> 
> //may not be ascending order
> val kafkaSourceAssignedTimesTamp = 
> kafkaSource.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor[LogEntry] {
>   override def extractAscendingTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)
> 
>  I implemented a pattern like:
> 
> myPattern = 
>  Pattern
>   .begin[LogEntry]("First Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .next("Second Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .within(Time.minutes(10))
>   }
> 
>   This pattern will trigger alert when two consecutives LogEntry with 
> severity ERROR and with same service (it will be generate alerts for each 
> service individually)
> 
>   CEP.pattern(stream
>   .keyBy(_.service),
>   myPattern)
> 
> 
> An alert is made of two logEntry:
> 
> ErrorAlert:
> service_name-ERROR-timestamp first event
> service_name-ERROR-timestamp second event
> 
> I am getting alerts like this:
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:57:49
> service_2-3-2017-04-19 07:02:23
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:32:37
> service_2-3-2017-04-19 07:34:06
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:25:04
> service_1-3-2017-04-19 07:29:39
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:29:39
> service_1-3-2017-04-19 07:30:37
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:54:48
> service_2-3-2017-04-19 06:55:03
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:21:11
> service_3-3-2017-04-19 07:24:52
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:30:05
> service_1-3-2017-04-19 07:31:33
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:08:31
> service_3-3-2017-04-19 07:17:42
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:02:30
> service_1-3-2017-04-19 07:06:58
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:03:50
> service_3-3-2017-04-19 07:11:48
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:19:04
> service_3-3-2017-04-19 07:21:25
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:33:13
> service_3-3-2017-04-19 07:38:47
> 
> 
> I also tried this approach:
> bounded out-of-orderness 
> 
> 
> kafkaSource.assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
>   override def extractTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> Time.seconds(0) —> if i set like this, do i prevent the events from being 
> delivered with delayed ?
> 
> But i get the same problem as decribed above:
> 
> ……
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> …...
> 
> Initially i thought my pattern was not correctly implemented but the problem 
> seems to be i am unable to assign timestamp and consequently emit watermark 
> properly when events are unordered.
> 
> Any sugestion is well apreciated, 

Re: Why TimerService interface in ProcessFunction doesn't have deleteEventTimeTimer

2017-04-21 Thread Aljoscha Krettek
Hi,
the reasoning behind the limited user facing API was that we were (are) not 
sure whether we would be able to support efficient deletion of timers for 
different ways of storing timers.

@Stephan, If I remember correctly you were the strongest advocate for not 
allowing timer deletion. What’s your thinking on this? There was also a quick 
discussion on https://issues.apache.org/jira/browse/FLINK-3089 
 where Xiaogang explained 
that the (new, not merged) RocksDB based timers would have efficient timer 
deletion.

Best,
Aljoscha
> On 20. Apr 2017, at 11:56, Jagadish Bihani  wrote:
> 
> Hi
> 
> I am working on a use case where I want to start a timer for a given event 
> type and when that timer expires it will perform certain action. This can be 
> done using Process Function.
> 
> But I also want to cancel scheduled timer in case of some other types of 
> events. I also checked the implementation of HeapInternalTimerService which 
> implements InternalTimerService interface has those implementations already. 
> Also SimpleTimerService which overrides TimerService also uses 
> InternalTimerService and simply passes VoidNamespace.INSTANCE.
> 
> So in a way we are using InternalTimerService interface's implementations 
> everywhere.
> 
> So what is the reason that ProcessFunction.Context uses TimerService? Any 
> reason 'deleteEventTimeTimer' is not exposed to users? If I want to use the 
> deleteEvent functionality how should I go about it?
> 
> -- 
> Thanks and Regards,
> Jagadish Bihani



Re: Flink slots, threads, task, etc

2017-04-21 Thread Aljoscha Krettek
Hi,
there are currently no built-in metrics for InputSplit consumption but I do see 
that this could be quite helpful. I think you can have a custom RichInputFormat 
that uses metrics to record stuff, though.

I think adding built-in metrics should be possible at this point in the code: 
https://github.com/apache/flink/blob/8f3d6d239996c83f7cbd102dc8a85ee626a56bf5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java#L144-L144
 


Best,
Aljoscha
> On 19. Apr 2017, at 09:25, Flavio Pompermaier  wrote:
> 
> Hi Aljoscha,
> thanks for the reply, it was not urgent and I was aware of the FF...btw, 
> congratulations for it, I saw many interesting talks!
> Flink community has grown a lot since it was Stratosphere ;)
> Just one last question: in many of my use cases it could be helpful to see 
> how many of the created splits were "consumed" by an inputFormat/source.
> Is it possible to monitor this part somewhere in the dashboards or with a 
> custom metric?
> 
> On Tue, Apr 18, 2017 at 5:24 PM, Aljoscha Krettek  > wrote:
> Hi,
> sorry for not getting any responses but I think everyone was quite busy with 
> Flink Forward SF. I’m also no expert on the topic but I’ll try and give some 
> answers.
> 
> Regarding a Google Doc version, I don’t think that there is any. You would 
> have to modify the Markdown version we have in the doc.
> 
> For the other answers I’ll reuse an example program that consists of Source 
> -> Map -> Sink, with chaining disabled and parallelism 2. We’ll this have 
> three Tasks: Source, Map, and Sink, with each having two subtasks. Let’s 
> denote the subtasks by a number in parenthesis so the first subtask for 
> Source is Source(1), second one is Source(2). I’ll also refer to Source(1) -> 
> Map(1) -> Sink(1) as a slice of the execution graph since these can be 
> executed within one slot.
> 
> Regarding 1, I think this is true. However, a single slot can execute a 
> complete slice of the execution graph where each subtask (from a different 
> task) would be executed by its own thread.
> 
> Regarding 2.1, Yes, I think it cannot run multiple subtasks of the same task 
> while it is possible (and in fact done) to execute all the subtasks of a 
> slide in the same slot.
> 
> Regarding 2.2, This is so to allow executing a pipeline of parallelism 8 
> using a cluster that has 8 free slots. Basically, each slice fills one slot.
> 
> Regarding 3, I don’t really have an answer.
> 
> Regarding 4, Yes, this can get a bit out of hand if you have very long 
> pipelines.
> 
> Best,
> Aljoscha
> 
>> On 11. Apr 2017, at 14:37, Flavio Pompermaier > > wrote:
>> 
>> Any feedback here..?
>> 
>> On Wed, Apr 5, 2017 at 7:43 PM, Flavio Pompermaier > > wrote:
>> Hi to all,
>> I had a very long but useful chat with Fabian and I understood a lot of 
>> concepts that was not clear at all to me. We started from the Flink runtime 
>> documentation page 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html
>>  
>> )
>>  but
>> I discovered that the terminology is very inconsistent and misleading along 
>> the page...
>> 
>> For example, one of the very first sentences is :
>> "Flink chains operator subtasks together into tasks. Each task is executed 
>> by one thread."
>> What I first understood was that every operator can be executed only by a 
>> single thread in all the clusterprobably it should be better "one thread 
>> per task slot" (at least). 
>> Moreover, if I'm not wrong, a Task Slot can execute only 1 subtask (aka 
>> parallel instance) of each task and there's no limit to the number of 
>> subtasks per slot (and this is not highlighted at all in that document). The 
>> only constraint is that they should belong to different tasks (right?).
>> 
>> If there's a google doc version of that page I could try to rewrite it down 
>> in order to make it easier to understand some parts...however I still have 
>> some more questions:
>> Is it correct that a single Task Slot can execute only a single subtask of 
>> each task and that this task is executed by a single thread within the slot)?
>> If it so:
>> why at that page there's written "By default, Flink allows subtasks to share 
>> slots even if they are subtasks of different tasks, so long as they are from 
>> the same job"? It seems that it is more common to run multiple subtasks of 
>> the same task (in a slot) than executing different substasks of different 
>> tasks, although this is still permitted...from what I understood a slot 
>> cannot run multiple subtask 

Re: UnilateralSortMerger error (again)

2017-04-21 Thread Flavio Pompermaier
The error appears as soon as some taskmanager generates some inputchannel
file.
What are those files used for?

On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier 
wrote:

> In another run of the job I had another Exception. Could it be helpful?
>
> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception:
> Serializer consumed more bytes than the record had. This indicates broken
> serialization. If you are using custom serialization types (Value or
> Writable), check their serialization methods. If you are using a
> Kryo-serialized type, check the corresponding Kryo serializer.
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.io.IOException: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:123)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
> at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1035)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
> at org.apache.flink.core.memory.HeapMemorySegment.get(
> HeapMemorySegment.java:104)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(
> SpillingAdaptiveSpanningRecordDeserializer.java:226)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
> readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:69)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:74)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:28)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(
> RowSerializer.java:193)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(
> RowSerializer.java:36)
> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
> ReusingDeserializationDelegate.java:57)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:109)
> ... 5 more
>
> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
> stefano.bort...@huawei.com> wrote:
>
>> In fact the old problem was with the KryoSerializer missed initialization
>> on the exception that would trigger the spilling on disk. This would lead
>> to dirty serialization buffer that would eventually 

Failed checkpointing on HDFS : Flink don't use the right authentication

2017-04-21 Thread Bruno Michelin Rakotondranaivo
Hi,

With flink-1.2.0, I want to consume datas from secured kafka 0.10 with 
SASL_PLAINTEXT protocol using login/pwd from a JAAS file and store them on HDFS 
in a kerberized cluster with user 'hive' as kerberos principal login.

Checkpointing is enabled and states are back end on HDFS 'filesystem'.

There is an error when the job want to initialize checkpoints. The app uses 
JAAS authentication instead of Kerberos one to write on HDFS.

15:32:56,300 INFO  org.apache.flink.runtime.taskmanager.Task
 - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) 
(373063aff1529c7e2ba362ad30c8b03c) switched from RUNNING to FAILED.
java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom 
Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)
at 
org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator 
Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:534)
at 
org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:)
... 5 more
Caused by: java.io.IOException: The given file URI 
(hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints) points to the 
HDFS NameNode at mynamenode:8020, but the File System could not be initialized 
with that address.
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:288)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:105)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)
... 6 more
Caused by: java.lang.NullPointerException
at 
org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
at 
org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.(SaslRpcServer.java:378)
at 
org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:183)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:568)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)
... 17 more


What I have misunderstand?
How can I use JAAS in/ or with Kerberos?

Thanks in advance

MR



Re: UnilateralSortMerger error (again)

2017-04-21 Thread Flavio Pompermaier
In another run of the job I had another Exception. Could it be helpful?

Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception:
Serializer consumed more bytes than the record had. This indicates broken
serialization. If you are using custom serialization types (Value or
Writable), check their serialization methods. If you are using a
Kryo-serialized type, check the corresponding Kryo serializer.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.IOException: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:123)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
at
org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)
at
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
... 5 more

On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
stefano.bort...@huawei.com> wrote:

> In fact the old problem was with the KryoSerializer missed initialization
> on the exception that would trigger the spilling on disk. This would lead
> to dirty serialization buffer that would eventually break the program. Till
> worked on it debugging the source code generating the error. Perhaps
> someone could try the same also this time. If Flavio can make the problem
> reproducible in a shareable program+data.
>
>
>
> Stefano
>
>
>
> *From:* Stephan Ewen [mailto:se...@apache.org]
> *Sent:* Friday, April 21, 2017 10:04 AM
> *To:* user 
> *Subject:* Re: 

RE: Kafka offset commits

2017-04-21 Thread Gwenhael Pasquiers
We need more tests but we think we found the cause for the loss of our kafka 
consumer offset in kafka 0.10.  It might be because of the server-side 
parameter “offsets.topic.retention.minutes” that defaults to 1440 minutes (1 
day). And our flink consumer was “off” for more than a day before restarting.

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: mercredi 19 avril 2017 11:39
To: user@flink.apache.org
Subject: Re: Kafka offset commits

Thanks for the clarification Aljoscha!
Yes, you cannot restore from a 1.0 savepoint in Flink 1.2 (sorry, I missed the 
“1.0” part on my first reply).

@Gwenhael, I’ll try to reclarify some of the questions you asked:

Does that means that flink does not rely on the offset in written to zookeeper 
anymore, but relies on the snapshots data, implying that it’s crucial to keep 
the same snapshot folder before and after the migration to Flink 1.2 ?

For the case of 1.0 —> 1.2, you’ll have to rely on committed offsets in Kafka / 
ZK for the migration. State migration from 1.0 to 1.2 is not possible.

As Aljoscha pointed out, if you are using the same “group.id”, then there 
shouldn’t be a problem w.r.t. retaining the offset position. You just have to 
keep in mind of [1], as you would need to manually increase all committed 
offsets in Kafka / ZK by 1 for that consumer group.

Note that there is no state migration happening here, but just simply relying 
on offsets committed in Kafka / ZK to define the starting position when you’re 
starting the job in 1.2.

We were also wondering if the flink consumer was able to restore it’s offset 
from Zookeeper.

For FlinkKafkaConsumer08, the starting offset is actually always read from ZK.
Again, this isn’t a “restore”, but just defining start position using committed 
offsets.

Another question : is there an expiration to the snapshots ? We’ve been having 
issues with an app that we forgot to restart. We did it after a couple of days, 
but it looks like it did not restore correctly the offset and it started 
consuming from the oldest offset, creating duplicated data (the kafka queue has 
over a week of buffer).

There is no expiration to the offsets stored in the snapshots. The only issue 
would be if Kafka has expired that offset due to data retention settings.
If you’re sure that at the time of the restore the data hasn’t expired yet, 
there might be something weird going on.
AFAIK, the only issue that was previously known to possibly cause this was [2].
Could you check if that issue may be the case?

[1] https://issues.apache.org/jira/browse/FLINK-4723
[2] https://issues.apache.org/jira/browse/FLINK-6006

On 19 April 2017 at 5:14:35 PM, Aljoscha Krettek 
(aljos...@apache.org) wrote:
Hi,
AFAIK, restoring a Flink 1.0 savepoint should not be possible on Flink 1.2. 
Only restoring from Flink 1.1 savepoints is supported.

@Gordon If the consumer group stays the same the new Flink job should pick up 
where the old one stopped, right?

Best,
Aljoscha

On 18. Apr 2017, at 16:19, Gwenhael Pasquiers 
> wrote:

Thanks for your answer.
Does that means that flink does not rely on the offset in written to zookeeper 
anymore, but relies on the snapshots data, implying that it’s crucial to keep 
the same snapshot folder before and after the migration to Flink 1.2 ?
We were also wondering if the flink consumer was able to restore it’s offset 
from Zookeeper.
Another question : is there an expiration to the snapshots ? We’ve been having 
issues with an app that we forgot to restart. We did it after a couple of days, 
but it looks like it did not restore correctly the offset and it started 
consuming from the oldest offset, creating duplicated data (the kafka queue has 
over a week of buffer).
B.R.

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: lundi 17 avril 2017 07:40
To: user@flink.apache.org
Subject: Re: Kafka offset commits

Hi,

The FlinkKafkaConsumer in 1.2 is able to restore from older version state 
snapshots and bridge the migration, so there should be no problem in reading 
the offsets from older state. The smallest or highest offsets will only be used 
if the offset no longer exists due to Kafka data retention settings.

Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 
1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 
1 (wrt to how Kafka itself defines the committed offsets).
However, this should not affect the behavior of restoring from offsets in 
savepoints, so it should be fine.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-4723


On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers 
(gwenhael.pasqui...@ericsson.com) wrote:
Hello,

We’re going to migrate some applications that consume data from a Kafka 0.8 
from Flink 1.0 to Flink 1.2.

We are wondering if the 

Re: UnilateralSortMerger error (again)

2017-04-21 Thread Flavio Pompermaier
The types I read are:

[String, String, String, String, String, String, String, String, String,
Boolean, Long, Long, Long, Integer, Integer, Long, String, String, Long,
Long, String, Long, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String]

On Fri, Apr 21, 2017 at 10:03 AM, Stephan Ewen  wrote:

> In the past, these errors were most often caused by bugs in the
> serializers, not in the sorter.
>
> What types are you using at that point? The Stack Trace reveals ROW and
> StringValue, any other involved types?
>
> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier 
> wrote:
>
>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>> spilling to disk) and the job failed almost immediately..
>>
>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> I debugged a bit the process repeating the job on a sub-slice of the
>>> entire data (using the id value to filter data with parquet push down
>>> filters) and all slices completed successfully :(
>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>>> if this was somehow a factor of stress but it didn't cause any error.
>>> Then I almost doubled the number of rows to process and finally the
>>> error showed up again.
>>> It seems somehow related to spilling to disk but I can't really
>>> understand what's going on :(
>>> This is a summary of my debug attempts:
>>>
>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>
>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>> id >= 10.010.000.000 && id < 99.945.000.000   => 20.926.903 rows =>
>>> OK
>>> id >= 99.945.000.000 && id < 99.960.000.000   => 23.888.750  rows =>
>>> OK
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>
>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>
>>> Any help is appreciated..
>>> Best,
>>> Flavio
>>>
>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 I could but only if there's a good probability that it fix the
 problem...how confident are you about it?

 On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu  wrote:

> Looking at git log of DataInputDeserializer.java , there has been some
> recent change.
>
> If you have time, maybe try with 1.2.1 RC and see if the error is
> reproducible ?
>
> Cheers
>
> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Hi to all,
>> I think I'm again on the weird Exception with the
>> SpillingAdaptiveSpanningRecordDeserializer...
>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills
>> to disk but the Exception thrown is not very helpful. Any idea?
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>> null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1094)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:99)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>> ava:460)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.EOFException
>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>> gnedByte(DataInputDeserializer.java:306)
>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>> va:747)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:69)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:74)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:28)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>> serialize(RowSerializer.java:193)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>> serialize(RowSerializer.java:36)
>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>> gate.read(ReusingDeserializationDelegate.java:57)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> 

Re: UnilateralSortMerger error (again)

2017-04-21 Thread Stephan Ewen
In the past, these errors were most often caused by bugs in the
serializers, not in the sorter.

What types are you using at that point? The Stack Trace reveals ROW and
StringValue, any other involved types?

On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier 
wrote:

> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
> spilling to disk) and the job failed almost immediately..
>
> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier  > wrote:
>
>> I debugged a bit the process repeating the job on a sub-slice of the
>> entire data (using the id value to filter data with parquet push down
>> filters) and all slices completed successfully :(
>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>> if this was somehow a factor of stress but it didn't cause any error.
>> Then I almost doubled the number of rows to process and finally the error
>> showed up again.
>> It seems somehow related to spilling to disk but I can't really
>> understand what's going on :(
>> This is a summary of my debug attempts:
>>
>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>
>> id < 10.000.000.000  => 1.857.365 rows => OK
>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>> id >= 10.010.000.000 && id < 99.945.000.000   => 20.926.903 rows =>
>> OK
>> id >= 99.945.000.000 && id < 99.960.000.000   => 23.888.750  rows =>
>> OK
>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>
>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>
>> id >= 99.960.000.000 => 32.936.422 rows => OK
>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>
>> Any help is appreciated..
>> Best,
>> Flavio
>>
>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier > > wrote:
>>
>>> I could but only if there's a good probability that it fix the
>>> problem...how confident are you about it?
>>>
>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu  wrote:
>>>
 Looking at git log of DataInputDeserializer.java , there has been some
 recent change.

 If you have time, maybe try with 1.2.1 RC and see if the error is
 reproducible ?

 Cheers

 On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi to all,
> I think I'm again on the weird Exception with the
> SpillingAdaptiveSpanningRecordDeserializer...
> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
> disk but the Exception thrown is not very helpful. Any idea?
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted
> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
> null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> .getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
> ask.java:1094)
> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
> (GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
> ava:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> $ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.io.EOFException
> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
> gnedByte(DataInputDeserializer.java:306)
> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
> at org.apache.flink.api.common.typeutils.base.StringSerializer.
> deserialize(StringSerializer.java:69)
> at org.apache.flink.api.common.typeutils.base.StringSerializer.
> deserialize(StringSerializer.java:74)
> at org.apache.flink.api.common.typeutils.base.StringSerializer.
> deserialize(StringSerializer.java:28)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
> serialize(RowSerializer.java:193)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
> serialize(RowSerializer.java:36)
> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
> gate.read(ReusingDeserializationDelegate.java:57)
> at org.apache.flink.runtime.io.network.api.serialization.Spilli
> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
> daptiveSpanningRecordDeserializer.java:144)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
> dReader.getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
> Reader.next(MutableRecordReader.java:42)
> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
> ReaderIterator.java:59)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> $ReadingThread.go(UnilateralSortMerger.java:1035)
> at 

Please participate in a research survey on graphs

2017-04-21 Thread Siddhartha Sahu
Hi,

My name is Siddhartha Sahu and I am a Master's student at University of
Waterloo working on graph processing with Prof. Semih Salihoglu. As part of
my research, I am running a survey on how graphs are used in the industry
and academia.

If you work with any kind of graph technology, such as the Apache Flink
Graph API: Gelly, please participate in the survey:

*Survey link*:
https://dsg-uwaterloo.typeform.com/to/EBihxG

The survey will take about *5-10 minutes* to finish. Most of the questions
are *multiple-choice questions*, and you can *skip any number of questions*.

I would really appreciate if you filled out my survey :)
Also, kindly forward the survey to any one else you know who also works on
graph processing.

Our goal is to help researchers who work on graph processing understand the
types of data, computations, and software that are used in practice. We
hope the information we can gather from participation in this survey can
guide the research people like ourselves do in universities. We plan to
share the information we gather from the survey with the wider research
community through a publication.

This survey has been reviewed and received ethics clearance through a
University of Waterloo Research Ethics Committee. You will see that the
first page of the survey contains a long consent page as required by the
university.

Thank you very much in advance for your support.

Regards,
Siddhartha Sahu
s3s...@uwaterloo.ca


Re: Using FlinkML from Java?

2017-04-21 Thread Till Rohrmann
Hi Steve,

unfortunately, FlinkML's pipeline mechanism depends on Scala's implicit
value feature. Therefore, FlinkML can only be used with Scala if you don't
want to construct the pipelines manually (which I wouldn't recommend).

Cheers,
Till

On Thu, Apr 20, 2017 at 6:56 PM, Steve Jerman  wrote:

> Hi Folks,
>
> I’m trying to use FlinkML 1.2 from Java … getting this:
>
> SVM svm = new SVM()
>   .setBlocks(env.getParallelism())
>   .setIterations(100)
>   .setRegularization(0.001)
>   .setStepsize(0.1)
>   .setSeed(42);
>
>
> svm.fit(labelledTraining);
>
> The type org.apache.flink.api.scala.DataSet cannot be resolved. It is
> indirectly referenced from required .class files.
>
> Are there any tricks required to get it running? Or is Java not supported?
>
> Steve
>


Re: Flink memory usage

2017-04-21 Thread Till Rohrmann
Hi Billy,

if it's possible that you can share some parts of your code privately with
me, then I can try to figure out what's going wrong.

Cheers,
Till

On Thu, Apr 20, 2017 at 6:00 PM, Newport, Billy 
wrote:

> Ok
>
> The concensus seems to be that it’s us not Flink J So we’ll look harder
> at what we’re doing in case there is anything silly. We are using 16K
> network buffers BTW which is around 0.5GB with the defaults.
>
>
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Thursday, April 20, 2017 11:52 AM
> *To:* Stefano Bortoli
> *Cc:* Newport, Billy [Tech]; Fabian Hueske; user@flink.apache.org
>
> *Subject:* Re: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> if you didn't split the different data sets up into different slot sharing
> groups, then your maximum parallelism is 40. Thus, it should be enough to
> assign 40^2 * 20 * 4 = 128000 network buffers. If that is not enough
> because you have more than 4 shuffling steps in parallel running then you
> have to increase the last term.
>
>
>
> OOM exceptions should actually only occur due to user code objects. Given
> that you have reserved a massive amount of memory for the network buffers
> the remaining heap for the user code is probably very small. Try whether
> you can decrease the number of network buffers. Moreover, check whether
> your user code keeps somewhere references to objects which could cause the
> OOM.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli <
> stefano.bort...@huawei.com> wrote:
>
> I think that if you have a lot of memory available, the GC gets kind of
> lazy. In our case, the issue was just the latency caused by the GC, cause
> we were loading more data than it could fit in memory. Hence optimizing the
> code gave us a lot of improvements. FlatMaps are also dangerous as objects
> can multiply beyond expected, making co-group extremely costly. :-) A
> distinct() well placed saves a lot of time and memory.
>
>
>
> My point is that having worked with scarce resources I learned that almost
> all the time the issue was my code, not the framework.
>
>
>
> Good luck.
>
>
>
> Stefano
>
>
>
> *From:* Newport, Billy [mailto:billy.newp...@gs.com]
> *Sent:* Thursday, April 20, 2017 4:46 PM
> *To:* Stefano Bortoli ; 'Fabian Hueske' <
> fhue...@gmail.com>
>
>
> *Cc:* 'user@flink.apache.org' 
> *Subject:* RE: Flink memory usage
>
>
>
> Your reuse idea kind of implies that it’s a GC generation rate issue, i.e.
> it’s not collecting fast enough so it’s running out of memory versus heap
> that’s actually anchored, right?
>
>
>
>
>
> *From:* Stefano Bortoli [mailto:stefano.bort...@huawei.com
> ]
> *Sent:* Thursday, April 20, 2017 10:33 AM
> *To:* Newport, Billy [Tech]; 'Fabian Hueske'
> *Cc:* 'user@flink.apache.org'
> *Subject:* RE: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> The only suggestion I can give is to check very well in your code for
> useless variable allocations, and foster reuse as much as possible. Don’t
> create a new collection at any map execution, but rather clear, reuse the
> collected output of the flatMap, and so on.  In the past we run long
> process of lot of data and small memory without problems. Many more complex
> co-group, joins and so on without any issue.
>
>
>
> My2c. Hope it helps.
>
>
>
> Stefano
>
>
>
> *From:* Newport, Billy [mailto:billy.newp...@gs.com ]
>
> *Sent:* Thursday, April 20, 2017 1:31 PM
> *To:* 'Fabian Hueske' 
> *Cc:* 'user@flink.apache.org' 
> *Subject:* RE: Flink memory usage
>
>
>
> I don’t think our function are memory heavy they typically are cogroups
> and merge the records on the left with the records on the right.
>
>
>
> We’re currently requiring 720GB of heap to do our processing which frankly
> appears ridiculous to us. Could too much parallelism be causing the
> problem? Looking at:
>
>
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html
> 
>
>
>
> If we are processing 17 “datasets” in a single job and each has an
> individual parallelism of 40 is that a total parallelism (potential) of
> 17*40 and given your network buffers calculation of parallelism squared,
> would that do it or only if we explicitly configure it that way:
>
>
>
> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4
>
>
> where p is the maximum parallelism of the job and t is the number of task
> manager.
>
> You can process more than one parallel task per TM if you configure 

Re: UnilateralSortMerger error (again)

2017-04-21 Thread Flavio Pompermaier
As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
spilling to disk) and the job failed almost immediately..

On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier 
wrote:

> I debugged a bit the process repeating the job on a sub-slice of the
> entire data (using the id value to filter data with parquet push down
> filters) and all slices completed successfully :(
> So I tried to increase the parallelism (from 1 slot per TM to 4) to see if
> this was somehow a factor of stress but it didn't cause any error.
> Then I almost doubled the number of rows to process and finally the error
> showed up again.
> It seems somehow related to spilling to disk but I can't really understand
> what's going on :(
> This is a summary of my debug attempts:
>
> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>
> id < 10.000.000.000  => 1.857.365 rows => OK
> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
> id >= 10.010.000.000 && id < 99.945.000.000   => 20.926.903 rows => OK
> id >= 99.945.000.000 && id < 99.960.000.000   => 23.888.750  rows =>
> OK
> id >= 99.960.000.000 => 32.936.422 rows => OK
>
> 4 TM with 8 GB and 4 slot each, parallelism 16
>
> id >= 99.960.000.000 => 32.936.422 rows => OK
> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>
> Any help is appreciated..
> Best,
> Flavio
>
> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier 
> wrote:
>
>> I could but only if there's a good probability that it fix the
>> problem...how confident are you about it?
>>
>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu  wrote:
>>
>>> Looking at git log of DataInputDeserializer.java , there has been some
>>> recent change.
>>>
>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>> reproducible ?
>>>
>>> Cheers
>>>
>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Hi to all,
 I think I'm again on the weird Exception with the
 SpillingAdaptiveSpanningRecordDeserializer...
 I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
 disk but the Exception thrown is not very helpful. Any idea?

 Caused by: java.lang.RuntimeException: Error obtaining the sorted
 input: Thread 'SortMerger Reading Thread' terminated due to an exception:
 null
 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
 .getIterator(UnilateralSortMerger.java:619)
 at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
 ask.java:1094)
 at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
 (GroupReduceDriver.java:99)
 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
 ... 3 more
 Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
 terminated due to an exception: null
 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
 $ThreadBase.run(UnilateralSortMerger.java:799)
 Caused by: java.io.EOFException
 at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
 gnedByte(DataInputDeserializer.java:306)
 at org.apache.flink.types.StringValue.readString(StringValue.java:747)
 at org.apache.flink.api.common.typeutils.base.StringSerializer.
 deserialize(StringSerializer.java:69)
 at org.apache.flink.api.common.typeutils.base.StringSerializer.
 deserialize(StringSerializer.java:74)
 at org.apache.flink.api.common.typeutils.base.StringSerializer.
 deserialize(StringSerializer.java:28)
 at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
 serialize(RowSerializer.java:193)
 at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
 serialize(RowSerializer.java:36)
 at org.apache.flink.runtime.plugable.ReusingDeserializationDele
 gate.read(ReusingDeserializationDelegate.java:57)
 at org.apache.flink.runtime.io.network.api.serialization.Spilli
 ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
 daptiveSpanningRecordDeserializer.java:144)
 at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
 dReader.getNextRecord(AbstractRecordReader.java:72)
 at org.apache.flink.runtime.io.network.api.reader.MutableRecord
 Reader.next(MutableRecordReader.java:42)
 at org.apache.flink.runtime.operators.util.ReaderIterator.next(
 ReaderIterator.java:59)
 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
 $ReadingThread.go(UnilateralSortMerger.java:1035)
 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
 $ThreadBase.run(UnilateralSortMerger.java:796)


 Best,
 Flavio

>>>
>>>
>>
>>
>