Hi Garrett,

data skew might be a reason for the performance degradation.

The plan you shared is pretty simple. The following happens you run the
program:
- The data source starts to read data and pushes the records to the
FlatMapFunction. From there the records are shuffed (using
hash-partitioning) to the sorter.
- The sorter tasks consume the records and write them into a memory buffer.
When the buffer is full, it is sorted and spilled to disk. When the buffer
was spilled, it is filled again with records, sorted, and spilled.
- The initially fast processing happens because at the beginning the sorter
is not waiting for buffers to be sorted or spilled because they are empty.

The performance of the plan depends (among other things) on the size of the
sort buffers. The sort buffers are taken from Flink's managed memory.
Unless you configured something else, 70% of to the TaskManager heap memory
is reserved as managed memory.
If you use Flink only for batch jobs, I would enable preallocation and
off-heap memory (see configuration options [1]). You can also configure a
fixed size for the managed memory. The more memory you configure, the more
is available for sorting.

The managed memory of a TM is evenly distributed to all its processing
slots. Hence, having more slots per TM means that each slot has fewer
managed memory (for sorting or joins or ...).
So many slots are not necessarily good for performance (unless you increase
the number of TMs / memory as well), especially in case of data skew when
most slots receive only little data and cannot leverage their memory.
If your data is heavily skewed, it might make sense to have fewer slots
such that each slot has more memory for sorting.

Skew has also an effect on downstream operations. In case of skew, some of
the sorter tasks are overloaded and cannot accept more data.
Due to the pipelined shuffles, this leads to a back pressure behavior that
propagates down to the sources.
You can disable pipelining by setting the execution mode on the execution
configuration to BATCH [2]. This will break the pipeline but write the
result of the FlatMap to disk.
This might help, if the FlatMap is compute intensive or filters many
records.

The data sizes don't sound particular large, so this should be something
that Flink should be able to handle.

Btw. you don't need to convert the JSON plan output. You can paste it into
the plan visualizer [3].
I would not worry about the missing statistics. The optimizer does not
leverage them at the current state.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#managed-memory
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/execution_configuration.html
[3] http://flink.apache.org/visualizer/

2017-12-06 16:45 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:

> Fabian,
>
>  Thank you for the reply.  Yes I do watch via the ui, is there another way
> to see progress through the steps?
>
> I think I just figured it out, the hangup is in the sort phase (ID 4)
> where 2 slots take all the time.  Looking in the UI most slots get less
> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together
> its about 272M records and these will run for hours at this point.  Looks
> like I need to figure out a different partitioning/sort strategy. I never
> noticed before because when I run the system at ~1400 slots I don't use the
> UI anymore as its gets unresponsive.  400 Slots is painfully slow, but
> still works.
>
>
> The getEnv output is very cool! Also very big, I've tried to summarize it
> here in more of a yaml format as its on a different network.  Note the
> parallelism was just set to 10 as I didn't know if that effected output.
> Hopefully I didn't flub a copy paste step, it looks good to me.
>
>
> ​This flow used to be far fewer steps, but as it wasn't scaling I broke it
> out into all the distinct pieces so I could see where it failed.​  Source
> and sink are both Hive tables.  I wonder if the inputformat is expected to
> give more info to seed some of these stat values?
>
> ​nodes
>     id: 6
>     type: source
>     pact: Data Source
>     contents: at CreateInput(ExecutionEnvironment.java:533)
>     parallelism: 10
>     global_properties:
>         name: partitioning v: RANDOM_PARTITIONED
>         name: Partitioning Order value: none
>         name: Uniqueness value: not unique
>     local_properties:
>         name: Order value: none
>         name: Grouping value: not grouped
>         name: Uniqueness value: not unique
>     estimates:
>         name: Est. Output Size value: unknown
>         name: Est Cardinality value: unknown
>     costs:
>         name: Network value: 0
>         name: Disk I/O value 0
>         name: CPU value: 0
>         name: Cumulative Network value: 0
>         name: Cumulative Disk I/O value: 0
>         name: Cumulative CPU value: 0
>     compiler_hints:
>         name: Output Size (bytes) value: none
>         name: Output Cardinality value: none
>         name: Avg. Output Record Size (bytes) value: none
>         name: Filter Factor value: none
>
>     id: 5
>     type: pact
>     pact: FlatMap
>     contents: FlatMap at main()
>     parallelism: 10
>     predecessors:
>         id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
>     driver_strategy: FlatMap
>     global_properties:
>         name: partitioning v: RANDOM_PARTITIONED
>         name: Partitioning Order value: none
>         name: Uniqueness value: not unique
>     local_properties:
>         name: Order value: none
>         name: Grouping value: not grouped
>         name: Uniqueness value: not unique
>     estimates:
>         name: Est. Output Size value: unknown
>         name: Est Cardinality value: unknown
>     costs:
>         name: Network value: 0
>         name: Disk I/O value 0
>         name: CPU value: 0
>         name: Cumulative Network value: 0
>         name: Cumulative Disk I/O value: 0
>         name: Cumulative CPU value: 0
>     compiler_hints:
>         name: Output Size (bytes) value: none
>         name: Output Cardinality value: none
>         name: Avg. Output Record Size (bytes) value: none
>         name: Filter Factor value: none
>
>     id: 4
>     type: pact
>     pact: Sort-Partition
>     contents: Sort at main()
>     parallelism: 10
>     predecessors:
>         id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: Sort
> on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
>     driver_strategy: No-Op
>     global_properties:
>         name: partitioning v: HASH_PARTITIONED
>         name: Partitioned on value: [0,2]
>         name: Partitioning Order value: none
>         name: Uniqueness value: not unique
>     local_properties:
>         name: Order value: [0:ASC,2:ASC,1:ASC]
>         name: Grouping value: [0,2,1]
>         name: Uniqueness value: not unique
>     estimates:
>         name: Est. Output Size value: unknown
>         name: Est Cardinality value: unknown
>     costs:
>         name: Network value: 0
>         name: Disk I/O value 0
>         name: CPU value: 0
>         name: Cumulative Network value: unknown
>         name: Cumulative Disk I/O value: unknown
>         name: Cumulative CPU value: unknown
>     compiler_hints:
>         name: Output Size (bytes) value: none
>         name: Output Cardinality value: none
>         name: Avg. Output Record Size (bytes) value: none
>         name: Filter Factor value: none
>
>     id: 3
>     type: pact
>     pact: GroupReduce
>     contents: GroupReduce at first(SortedGrouping.java:210)
>     parallelism: 10
>     predecessors:
>         id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
>     driver_strategy: Sorted Group Reduce
>     global_properties:
>         name: partitioning v: RANDOM_PARTITIONED
>         name: Partitioning Order value: none
>         name: Uniqueness value: not unique
>     local_properties:
>         name: Order value: none
>         name: Grouping value: not grouped
>         name: Uniqueness value: not unique
>     estimates:
>         name: Est. Output Size value: unknown
>         name: Est Cardinality value: unknown
>     costs:
>         name: Network value: 0
>         name: Disk I/O value 0
>         name: CPU value: 0
>         name: Cumulative Network value: unknown
>         name: Cumulative Disk I/O value: unknown
>         name: Cumulative CPU value: unknown
>     compiler_hints:
>         name: Output Size (bytes) value: none
>         name: Output Cardinality value: none
>         name: Avg. Output Record Size (bytes) value: none
>         name: Filter Factor value: none
>
>
>     id: 2
>     type: pact
>     pact: Map
>     contents: Map at ()
>     parallelism: 10
>     predecessors:
>         id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
>     driver_strategy: Map
>     global_properties:
>         name: partitioning v: RANDOM_PARTITIONED
>         name: Partitioning Order value: none
>         name: Uniqueness value: not unique
>     local_properties:
>         name: Order value: none
>         name: Grouping value: not grouped
>         name: Uniqueness value: not unique
>     estimates:
>         name: Est. Output Size value: unknown
>         name: Est Cardinality value: unknown
>     costs:
>         name: Network value: 0
>         name: Disk I/O value 0
>         name: CPU value: 0
>         name: Cumulative Network value: unknown
>         name: Cumulative Disk I/O value: unknown
>         name: Cumulative CPU value: unknown
>     compiler_hints:
>         name: Output Size (bytes) value: none
>         name: Output Cardinality value: none
>         name: Avg. Output Record Size (bytes) value: none
>         name: Filter Factor value: none
>
>     id: 1
>     type: pact
>     pact: Map
>     contents: map at main()
>     parallelism: 10
>     predecessors:
>         id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
>     driver_strategy: Map
>     global_properties:
>         name: partitioning v: RANDOM_PARTITIONED
>         name: Partitioning Order value: none
>         name: Uniqueness value: not unique
>     local_properties:
>         name: Order value: none
>         name: Grouping value: not grouped
>         name: Uniqueness value: not unique
>     estimates:
>         name: Est. Output Size value: unknown
>         name: Est Cardinality value: unknown
>     costs:
>         name: Network value: 0
>         name: Disk I/O value 0
>         name: CPU value: 0
>         name: Cumulative Network value: unknown
>         name: Cumulative Disk I/O value: unknown
>         name: Cumulative CPU value: unknown
>     compiler_hints:
>         name: Output Size (bytes) value: none
>         name: Output Cardinality value: none
>         name: Avg. Output Record Size (bytes) value: none
>         name: Filter Factor value: none
>
>     id: 0
>     type: sink
>     pact: Data Sink
>     contents: org.apache.flink.api.java.jadoop.mapreduce.
> HadoopOutputFormat
>     parallelism: 10
>     predecessors:
>         id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
>     driver_strategy: Map
>     global_properties:
>         name: partitioning v: RANDOM_PARTITIONED
>         name: Partitioning Order value: none
>         name: Uniqueness value: not unique
>     local_properties:
>         name: Order value: none
>         name: Grouping value: not grouped
>         name: Uniqueness value: not unique
>     estimates:
>         name: Est. Output Size value: unknown
>         name: Est Cardinality value: unknown
>     costs:
>         name: Network value: 0
>         name: Disk I/O value 0
>         name: CPU value: 0
>         name: Cumulative Network value: unknown
>         name: Cumulative Disk I/O value: unknown
>         name: Cumulative CPU value: unknown
>     compiler_hints:
>         name: Output Size (bytes) value: none
>         name: Output Cardinality value: none
>         name: Avg. Output Record Size (bytes) value: none
>         name: Filter Factor value: none​
>
>
>
>
> On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink's operators are designed to work in memory as long as possible and
>> spill to disk once the memory budget is exceeded.
>> Moreover, Flink aims to run programs in a pipelined fashion, such that
>> multiple operators can process data at the same time.
>> This behavior can make it a bit tricky to analyze the runtime behavior
>> and progress of operators.
>>
>> It would be interesting to have a look at the execution plan for the
>> program that you are running.
>> The plan can be obtained from the ExecutionEnvironment by calling
>> env.getExecutionPlan() instead of env.execute().
>>
>> I would also like to know how you track the progress of the program.
>> Are you looking at the record counts displayed in the WebUI?
>>
>> Best,
>> Fabian
>>
>>
>>
>> 2017-12-05 22:03 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>
>>> I have been moving some old MR and hive workflows into Flink because I'm
>>> enjoying the api's and the ease of development is wonderful.  Things have
>>> largely worked great until I tried to really scale some of the jobs
>>> recently.
>>>
>>> I have for example one etl job that reads in about 12B records at a time
>>> and does a sort, some simple transformations, validation, a re-partition
>>> and then output to a hive table.
>>> When I built it with the sample set, ~200M, it worked great, took maybe
>>> a minute and blew threw it.
>>>
>>> What I have observed is there is some kind of saturation reached
>>> depending on number of slots, number of nodes and the overall size of data
>>> to move.  When I run the 12B set, the first 1B go through in under 1
>>> minute, really really fast.  But its an extremely sharp drop off after
>>> that, the next 1B might take 15 minutes, and then if I wait for the next
>>> 1B, its well over an hour.
>>>
>>> What I cant find is any obvious indicators or things to look at,
>>> everything just grinds to a halt, I don't think the job would ever actually
>>> complete.
>>>
>>> Is there something in the design of flink in batch mode that is perhaps
>>> memory bound?  Adding more nodes/tasks does not fix it, just gets me a
>>> little further along.  I'm already running around ~1,400 slots at this
>>> point, I'd postulate needing 10,000+ to potentially make the job run, but
>>> thats too much of my cluster gone, and I have yet to get flink to be stable
>>> past 1,500.
>>>
>>> Any idea's on where to look, or what to debug?  GUI is also very
>>> cumbersome to use at this slot count too, so other measurement ideas are
>>> welcome too!
>>>
>>> Thank you all.
>>>
>>
>>
>

Reply via email to