Re: Parallelizing a tumbling group window

2017-12-29 Thread Colin Williams
Hi Timo and flink-user,


It's been a few weeks and we've made some changes to the application
mentioned on this email. we've also updated for flink 1.4 . We are using
the SQL / Table API with a tumbling window and user defined agg to generate
a SQL query string like:


SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)),
TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).



I've experimented with parallelism of the operators and setting the
environments parallelism as suggested. I've been setting parallelism values
of 2 or 4 to all operators except the consumer and sink.


For some jobs with large kafka source topics, under load we experience back
pressure and see some lag. But when trying to address via parallelism: so
far I've only seen very degraded performance from the increased parallelism
settings.


Furthermore, the suspect jobs are grouping by a field of constant values.
Then these jobs usually have 40,000 or so grouped records enter the
aggregator for each minute window.



I would think that the tumbling windows would allow the job to process each
window in another task slot, parallelizing each window. But maybe that's
not happening?



Can you help us to understand why parallelizing the job only has a degraded
impact on performance and what I can do to change this?




Happy New Year!



Colin Williams










On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther  wrote:

> Hi Colin,
>
> unfortunately, selecting the parallelism for parts of a SQL query is not
> supported yet. By default, tumbling window operators use the default
> parallelism of the environment. Simple project and select operations have
> the same parallelism as the inputs they are applied on.
>
> I think the easiest solution so far is to explicilty set the parallelism
> of operators that are not part of the Table API and use the environment's
> parallelism to scale the SQL query.
>
> I hope that helps.
>
> Regards,
> Timo
>
>
> Am 12/9/17 um 3:06 AM schrieb Colin Williams:
>
> Hello,
>
> I've inherited some flink application code.
>
> We're currently creating a table using a Tumbling SQL query similar to the
> first example in
>
>  https://ci.apache.org/projects/flink/flink-docs-release-1.
> 3/dev/table/sql.html#group-windows
>
> Where each generated SQL query looks something like
>
> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)
>
> We are also using a UDFAGG function in some of the queries which I think
> might be cleaned up and optimized a bit (using scala types and possibly not
> well implemented)
>
> We then turn the result table back into a datastream using toAppendStream,
> and eventually add a derivative stream to a sink. We've configured
> TimeCharacteristic to event-time processing.
>
> In some streaming scenarios everything is working fine with a parallelism
> of 1, but in others it appears that we can't keep up with the event source.
>
> Then we are investigating how to enable parallelism specifically on the
> SQL table query or aggregator.
>
> Can anyone suggest a good way to go about this? It wasn't clear from the
> documentation.
>
> Best,
>
> Colin Williams
>
>
>
>
>


Parquet Format Read and Write

2017-12-29 Thread Imran Tariq
Hi,

I am new to Flink and exploring it. Just curious to know that, do Flink API
support Parquet format like Spark API can easily read and write in Parquet.

-- 


*Thanks & RegardsMuhammad Imran Tariq*


queryable state and maintaining all time counts

2017-12-29 Thread jelmer
Hi,

I've been going through various talks on flink's support for queryable
state. Like this talk by Jamie Grier at 2016's Flink forward :

https://www.youtube.com/watch?v=uuv-lnOrD0o

I see how you can easily use this to produce time series data. Eg calculate
the number of events per hour.

But I am wondering how one would go about using it for also maintaining all
time counts. Eg count the number of events since the beginning of time. Is
anybody doing this ? And if so what is your strategy ?


Re: MergingWindow

2017-12-29 Thread jincheng sun
Hi  aitozi,

`MergingWindowSet` is a Utility, used for keeping track of merging Windows
when using a MergingWindowAssigner in a WindowOperator.

In flink  `MergingWindowAssigner`  only used for SessionWindow. The
implementations of  `MergingWindowAssigner` are `EventTimeSessionWindows`
and `ProcessingTimeSessionWindows`. As we know Session window depends on
the element of the time gap to split the window, if you encounter
out-of-order elements, there is a merge window situation.


In the `processElement` method of WindowOperator, when adding the new
window might result in a merge. the merge logic is in the `addWindow`
method of `MergingWindowSet`. The snippet you mentioned is in that method.
To understand the code snippet above, we must understand the collation
logic of the merge window。 Let me cite a merge example to illustrate the
merging logic, if we have two windows WinA, WinB, when WinC is added, we
find WinA, WinB, WinC should be merged. So, in this time WinC is new Window
WinA and WinB are in `MergingWindowSet.mapping`, the `mapping` is Map, Mapping from window to the window that keeps the window state. When we
are incrementally merging windows starting from some window we keep that
starting window as the state window to prevent costly state juggling. As
shown below:


​
Now we  know the logic of merge window, and we talk about the logic of you
mentioned above:

> if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
> mergeFunction.merge(mergeResult,
> mergedWindows,
>
> this.mapping.get(mergeResult),
> mergedStateWindows);
> }


This code is guarantee that don't merge the new window itself, it never had
any state associated with it i.e. if we are only merging one pre-existing
window into itself without extending the pre-exising window.

I am not sure if the explanation is clear, but I hope to be helpful to you.
:)
And welcome anybody feedback... :)

Best, Jincheng

2017-12-27 16:58 GMT+08:00 Ufuk Celebi :

> Please check your email before sending it the next time as three
> emails for the same message is a little spammy ;-)
>
> This is internal code that is used to implement session windows as far
> as I can tell. The idea is to not merge the new window as it never had
> any state associated with it. The general idea of merging windows is
> to keep one of the original windows as the state window, i.e. the
> window that is used as namespace to store the window elements.
> Elements from the state windows of merged windows must be merged into
> this one state window.
>
> For more details, this should be directed to the dev mailing list.
>
> – Ufuk
>
> On Tue, Dec 26, 2017 at 4:58 AM, aitozi  wrote:
> > Hi,
> >
> > i cant unserstand usage of this snippest of the code in
> > MergingWindowSet.java, can anyone explain this for me ?
> >
> >
> > if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() ==
> 1)) {
> > mergeFunction.merge(mergeResult,
> > mergedWindows,
> >
>  this.mapping.get(mergeResult),
> > mergedStateWindows);
> > }
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: keyby() issue

2017-12-29 Thread Jinhua Luo
I misuse the key selector. I checked the doc and found it must return
deterministic key, so using random is wrong, but I still could not
understand why it would cause oom.



2017-12-28 21:57 GMT+08:00 Jinhua Luo :
> It's very strange, when I change the key selector to use random key,
> the jvm reports oom.
>
>.keyBy(new KeySelector() {
>  public Integer getKey(MyEvent ev) { return
> ThreadLocalRandom.current().nextInt(1, 100);}
>})
>
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at 
> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469)
> at 
> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230)
> at 
> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144)
> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>
> Could anybody explain the internal of keyby()?
>
> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi :
>> Hey Jinhua,
>>
>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo  wrote:
>>> The keyby() upon the field would generate unique key as the field
>>> value, so if the number of the uniqueness is huge, flink would have
>>> trouble both on cpu and memory. Is it considered in the design of
>>> flink?
>>
>> Yes, keyBy hash partitions the data across the nodes of your Flink
>> application and thus you can easily scale your application up if you
>> need more processing power.
>>
>> I'm not sure that this is the problem in your case though. Can you
>> provide some more details what you are doing exactly? Are you
>> aggregating by time (for the keyBy you mention no windowing, but then
>> you mention windowAll)? What kind of aggregation are you doing? If
>> possible, feel free to share some code.
>>
>>> Since windowsAll() could be set parallelism, so I try to use key
>>> selector to use field hash but not value, that I hope it would
>>> decrease the number of the keys, but the flink throws key out-of-range
>>> exception. How to use key selector in correct way?
>>
>> Can you paste the exact Exception you use? I think this might indicate
>> that you don't correctly extract the key from your record, e.g. you
>> extract a different key on sender and receiver.
>>
>> I'm sure we can figure this out after you provide more context. :-)
>>
>> – Ufuk


Re: How to stop FlinkKafkaConsumer and make job finished?

2017-12-29 Thread Ufuk Celebi
Yes, that sounds like what Jaxon is looking for. :-) Thanks for the
pointer Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright  wrote:
> I believe you can extend the `KeyedDeserializationSchema` that you pass to
> the consumer to check for end-of-stream markers.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T-
>
> Eron
>
> On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi  wrote:
>>
>> Hey Jaxon,
>>
>> I don't think it's possible to control this via the life-cycle methods
>> of your functions.
>>
>> Note that Flink currently does not support graceful stop in a
>> meaningful manner and you can only cancel running jobs. What comes to
>> my mind to cancel on EOF:
>>
>> 1) Extend Kafka consumer to stop emitting records after your EOF
>> record. Look at the flink-connector-kafka-base module. This is
>> probably not feasible and some work to get familiar with the code.
>> Just putting in out there.
>>
>> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>>
>> 3) Use an Http client and cancel your job via the Http endpoint
>>
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
>> Easy, but not nice, since you need quite some logic in your function
>> (e.g. ignore records after EOF record until cancellation, etc.).
>>
>> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>>
>> – Ufuk
>>
>>
>> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu  wrote:
>> > I would like to stop FlinkKafkaConsumer consuming data from kafka
>> > manually.
>> > But I find it won't be close when I invoke "cancel()" method. What I am
>> > trying to do is add an EOF symbol meaning the end of my kafka data, and
>> > when
>> > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
>> > "cancel()" method. It doesn't work. Flink streaming job won't finish
>> > unless
>> > it get canceled or failed, when I use kafka as source.
>> >
>> > Somebody knowing  gives me some help, thx~~
>
>