Is there any plans for this in future. I could see at the plans and without
these stats I am bit lost on what to look for like what are pain points
etc. I can see some very obvious things but not too much with these plans.

My question is there a guide or document which describes what your plans
should look like and what needs to look into this?

Also, I would like to know if there is a very complex execution plan(maybe
not expensive but very complex) is it usually beneficial to save the
intermediate datasets/tables and read them back and do the next steps.

Thanks

On Tue, Feb 20, 2018 at 9:34 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> No, there is no size or cardinality estimation happening at the moment.
>
> Best, Fabian
>
> 2018-02-19 21:56 GMT+01:00 Darshan Singh <darshan.m...@gmail.com>:
>
>> Thanks , is there a metric or other way to know how much space each
>> task/job is taking? Does execution plan has these details?
>>
>> Thanks
>>
>> On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> that's a difficult question without knowing the details of your job.
>>> A NoSpaceLeftOnDevice error occurs when a file system is full.
>>>
>>> This can happen if:
>>> - A Flink algorithm writes to disk, e.g., an external sort or the hash
>>> table of a hybrid hash join. This can happen for GroupBy, Join, Distinct,
>>> or any other operation that requires to group or join data. Filters will
>>> never spill to disk.
>>> - An OutputFormat writes to disk.
>>>
>>> The data is written to a temp directory, that can be configured in the
>>> ./conf/flink-conf.yaml file.
>>>
>>> Did you check how the tasks are distributed across the task managers?
>>> The web UI can help to diagnose such problems.
>>>
>>> Best, Fabian
>>>
>>> 2018-02-19 11:22 GMT+01:00 Darshan Singh <darshan.m...@gmail.com>:
>>>
>>>> Thanks Fabian for such detailed explanation.
>>>>
>>>> I am using a datset in between so i guess csv is read once. Now to my
>>>> real issue i have 6 task managers each having 4 cores and i have 2 slots
>>>> per task manager.
>>>>
>>>> Now my csv file is jus 1 gb and i create table and transform to dataset
>>>> and then run 15 different filters and extra processing which all run in
>>>> almost parallel.
>>>>
>>>> However it fails with error no space left on device on one of the task
>>>> manager. Space on each task manager is 32 gb in /tmp. So i am not sure why
>>>> it is running out of space. I do use some joins with othrr tables but those
>>>> are few megabytes.
>>>>
>>>> So i was assuming that somehow all parallel executions were storing
>>>> data in /tmp and were filling it.
>>>>
>>>> So i would like to know wht could be filling space.
>>>>
>>>> Thanks
>>>>
>>>> On 19 Feb 2018 10:10 am, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> this works as follows.
>>>>
>>>> - Table API and SQL queries are translated into regular DataSet jobs
>>>> (assuming you are running in a batch ExecutionEnvironment).
>>>> - A query is translated into a sequence of DataSet operators when you
>>>> 1) transform the Table into a DataSet or 2) write it to a TableSink. In
>>>> both cases, the optimizer is invoked and recursively goes back from the
>>>> converted/emitted Table back to its roots, i.e., a TableSource or a
>>>> DataSet.
>>>>
>>>> This means, that if you create a Table from a TableSource and apply
>>>> multiple filters on it and write each filter to a TableSink, the CSV file
>>>> will be read 10 times, filtered 10 times and written 10 times. This is not
>>>> efficient, because, you could also just read the file once and apply all
>>>> filters in parallel.
>>>> You can do this by converting the Table that you read with a
>>>> TableSource into a DataSet and register the DataSet again as a Table. In
>>>> that case, the translations of all TableSinks will stop at the DataSet and
>>>> not include the TableSource which reads the file.
>>>>
>>>> The following figures illustrate the difference:
>>>>
>>>> 1) Without DataSet in the middle:
>>>>
>>>> TableSource -> Filter1 -> TableSink1
>>>> TableSource -> Filter2 -> TableSink2
>>>> TableSource -> Filter3 -> TableSink3
>>>>
>>>> 2) With DataSet in the middle:
>>>>
>>>>                         /-> Filter1 -> TableSink1
>>>> TableSource -<-> Filter2 -> TableSink2
>>>>                         \-> Filter3 -> TableSink3
>>>>
>>>> I'll likely add a feature to internally translate an intermediate Table
>>>> to make this a bit easier.
>>>> The underlying problem is that the SQL optimizer cannot translate
>>>> queries with multiple sinks.
>>>> Instead, each sink is individually translated and the optimizer does
>>>> not know that common execution paths could be shared.
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>>
>>>> 2018-02-19 2:19 GMT+01:00 Darshan Singh <darshan.m...@gmail.com>:
>>>>
>>>>> Thanks for reply.
>>>>>
>>>>> I guess I am not looking for alternate. I am trying to understand what
>>>>> flink does in this scenario and if 10 tasks ar egoing in parallel I am 
>>>>> sure
>>>>> they will be reading csv as there is no other way.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mon, Feb 19, 2018 at 12:48 AM, Niclas Hedhman <nic...@hedhman.org>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Do you really need the large single table created in step 2?
>>>>>>
>>>>>> If not, what you typically do is that the Csv source first do the
>>>>>> common transformations. Then depending on whether the 10 outputs have
>>>>>> different processing paths or not, you either do a split() to do 
>>>>>> individual
>>>>>> processing depending on some criteria, or you just have the sink put each
>>>>>> record in separate tables.
>>>>>> You have full control, at each step along the transformation path
>>>>>> whether it can be parallelized or not, and if there are no sequential
>>>>>> constraints on your model, then you can easily fill all cores on all 
>>>>>> hosts
>>>>>> quite easily.
>>>>>>
>>>>>> Even if you need the step 2 table, I would still just treat that as a
>>>>>> split(), a branch ending in a Sink that does the storage there. No need 
>>>>>> to
>>>>>> read records from file over and over again, nor to store them first in 
>>>>>> step
>>>>>> 2 table and read them out again.
>>>>>>
>>>>>> Don't ask *me* about what happens in failure scenarios... I have
>>>>>> myself not figured that out yet.
>>>>>>
>>>>>> HTH
>>>>>> Niclas
>>>>>>
>>>>>> On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh <
>>>>>> darshan.m...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi I would like to understand the execution model.
>>>>>>>
>>>>>>> 1. I have a csv files which is say 10 GB.
>>>>>>> 2. I created a table from this file.
>>>>>>>
>>>>>>> 3. Now I have created filtered tables on this say 10 of these.
>>>>>>> 4. Now I created a writetosink for all these 10 filtered tables.
>>>>>>>
>>>>>>> Now my question is that are these 10 filetered tables be written in
>>>>>>> parallel (suppose i have 40 cores and set up parallelism to say 40 as 
>>>>>>> well.
>>>>>>>
>>>>>>> Next question I have is that the table which I created form the csv
>>>>>>> file which is common wont be persisted by flink internally rather for 
>>>>>>> all
>>>>>>> 10 filtered tables it will read csv files and then apply the filter and
>>>>>>> write to sink.
>>>>>>>
>>>>>>> I think that for all 10 filtered tables it will read csv again and
>>>>>>> again in this case it will be read 10 times.  Is my understanding 
>>>>>>> correct
>>>>>>> or I am missing something.
>>>>>>>
>>>>>>> What if I step 2 I change table to dataset and back?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Niclas Hedhman, Software Developer
>>>>>> http://polygene.apache.org - New Energy for Java
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to