Thanks for the reply. Unfortunately that project was unexpectedly cancelled
but for other reasons. I was happy to work on it, and hopefully gained some
insight. I have another question today unrelated towards Elasticsearch
sinks, and will ask there.

On Fri, Jan 5, 2018 at 2:52 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Colin,
>
> There are two things that come to my mind:
>
> 1) You mentioned "suspect jobs are grouping by a field of constant
> values". Does that mean that the grouping key is always constant? Flink
> parallelizes the window computation per key, i.e., there is one thread per
> key. Although it would be possible to perform pre-aggregations, this is not
> done yet. There is an effort to add support for this to the DataStream API
> [1]. The Table API will hopefully leverage this once it has been added to
> the DataStream API.
> 2) Another reason for backpressure can be non-aligned watermarks, i.e.,
> the watermarks of different partitions diverge too much from each other. In
> this case, windows cannot be finalized because everything is aligned to the
> lowest watermark.
>
> Hope this helps to clarify things.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-7561
>
> 2017-12-30 0:11 GMT+01:00 Colin Williams <colin.williams.seat...@gmail.com
> >:
>
>> Hi Timo and flink-user,
>>
>>
>> It's been a few weeks and we've made some changes to the application
>> mentioned on this email. we've also updated for flink 1.4 . We are using
>> the SQL / Table API with a tumbling window and user defined agg to generate
>> a SQL query string like:
>>
>>
>> SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)),
>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).
>>
>>
>>
>> I've experimented with parallelism of the operators and setting the
>> environments parallelism as suggested. I've been setting parallelism values
>> of 2 or 4 to all operators except the consumer and sink.
>>
>>
>> For some jobs with large kafka source topics, under load we experience
>> back pressure and see some lag. But when trying to address via parallelism:
>> so far I've only seen very degraded performance from the increased
>> parallelism settings.
>>
>>
>> Furthermore, the suspect jobs are grouping by a field of constant values.
>> Then these jobs usually have 40,000 or so grouped records enter the
>> aggregator for each minute window.
>>
>>
>>
>> I would think that the tumbling windows would allow the job to process
>> each window in another task slot, parallelizing each window. But maybe
>> that's not happening?
>>
>>
>>
>> Can you help us to understand why parallelizing the job only has a
>> degraded impact on performance and what I can do to change this?
>>
>>
>>
>>
>> Happy New Year!
>>
>>
>>
>> Colin Williams
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi Colin,
>>>
>>> unfortunately, selecting the parallelism for parts of a SQL query is not
>>> supported yet. By default, tumbling window operators use the default
>>> parallelism of the environment. Simple project and select operations have
>>> the same parallelism as the inputs they are applied on.
>>>
>>> I think the easiest solution so far is to explicilty set the parallelism
>>> of operators that are not part of the Table API and use the environment's
>>> parallelism to scale the SQL query.
>>>
>>> I hope that helps.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 12/9/17 um 3:06 AM schrieb Colin Williams:
>>>
>>> Hello,
>>>
>>> I've inherited some flink application code.
>>>
>>> We're currently creating a table using a Tumbling SQL query similar to
>>> the first example in
>>>
>>>  https://ci.apache.org/projects/flink/flink-docs-release-1.3
>>> /dev/table/sql.html#group-windows
>>>
>>> Where each generated SQL query looks something like
>>>
>>> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
>>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
>>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)
>>>
>>> We are also using a UDFAGG function in some of the queries which I think
>>> might be cleaned up and optimized a bit (using scala types and possibly not
>>> well implemented)
>>>
>>> We then turn the result table back into a datastream using
>>> toAppendStream, and eventually add a derivative stream to a sink. We've
>>> configured TimeCharacteristic to event-time processing.
>>>
>>> In some streaming scenarios everything is working fine with a
>>> parallelism of 1, but in others it appears that we can't keep up with the
>>> event source.
>>>
>>> Then we are investigating how to enable parallelism specifically on the
>>> SQL table query or aggregator.
>>>
>>> Can anyone suggest a good way to go about this? It wasn't clear from the
>>> documentation.
>>>
>>> Best,
>>>
>>> Colin Williams
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to