Re: Tez : Anyway to avoid creating subdirectories by "Insert with union all² ?

2015-08-19 Thread Gopal Vijayaraghavan
> Is there anyway to avoid creating sub-directories? Or this is by design
>and can not be changed?

This is because of the way file-formats generate hadoop name files without
collisions.

For instance, any change to that would break Parquet-MR for Tez. That's
why we generate a compatible, but colliding mapreduce.task.attempt.id
artificially for Tez jobs.

³Map 1² and ³Map 2² would both have an attempt 0 of task 1, generating
colliding file names (0001_0).

The easy workaround is a ³re-load² of the table.

insert overwrite table h1_passwords_target select * from
h1_passwords_target;


The slightly more complex one is to add a DISTRIBUTE BY & trigger a
reducer after the UNION ALL.

Cheers,
Gopal




Re: Logical to Physical DAG conversion

2015-08-19 Thread Raajay Viswanathan
Thanks Jeff and Hitesh. Wonderful pointers / summary to get me started.

Raajay


> On Aug 19, 2015, at 4:20 PM, Hitesh Shah  wrote:
> 
> If you are mainly looking at this from a “mapper” and “reducer” perspective, 
> there are 2 main ways in which Tez affects parallelism: 
> 
> i) For the mapper, the no. of maps is effectively governed by how many splits 
> are created and how are splits assigned to tasks. The initial split creation 
> is controlled by the InputSplitFormat. Tez then looks at the cluster capacity 
> to decide whether the no. of splits are optimal. For example, the InputFormat 
> could create 1000 splits which would imply a 1000 tasks. However, if the max 
> no. of containers that can be launched is only 50, this would imply 20 waves 
> of tasks for the full stage/vertex to complete. So in this case, if grouping 
> is enabled, Tez tries and re-shuffles the splits to around 1.5 waves ( 1.5 is 
> configurable ) but also ensures that splits are kept within a bounded range 
> with respect to the amount of data being processed. For example, if each 
> split was processing 1 TB of data, there is no need to group splits as 1 TB 
> is already too large. Whereas if each split was 1 MB, then Tez will likely 
> end bundling up more and more splits to be processed by a single task until a 
> min limit is reached on a per task basis. Disabling grouping would allow you 
> to control no. of tasks by configuring the InputSplit as needed to a more 
> static nature.
> 
> 2) For reducers, the main problem everyone has is that today you could 
> configure a 1000 reducers but the mappers generated only 10 MB of data in 
> total. This could be processed by a single reducer. Tez monitors the data 
> being generated from the mappers and dynamically tries to decide how many 
> reducers it can scale down to. This is the auto-reduce parallelism that Jeff 
> referred to in his email earlier. Again, a static configuration for the 
> reducer stage i.e setParallelism() and disabling auto-reduce would stop Tez 
> from making any runtime changes. 
> 
> To answer your first question, each stage/vertex is governed by its own 
> VertexManager which in the end is user-code ( most of the common ones are 
> implemented within the framework but can be overridden if needed). These VMs 
> can be as dumb or as complex as possible and can make runtime decisions. 
> There are certain points in the processing up to which the VM can tweak 
> parallelism. After a certain point, parallelism becomes final ( due to the 
> requirement that failures need to be handled in a deterministic manner such 
> that re-running a task results in the same data output each time around ).
> 
> thanks
> — Hitesh
> 
> On Aug 19, 2015, at 3:24 AM, Raajay Viswanathan  wrote:
> 
>> Hello,
>> 
>> I am just getting started with Tez and need some clarification about the 
>> logical to physical DAG conversion in Tez. Does Tez convert a Logical DAG to 
>> physical DAG at one go or are the number of mappers and reducers for a stage 
>> determined only when the stage is ready to run ?
>> 
>> Also, what are some rules of thumb regarding the number of mappers/reducers 
>> for a stage ? I would ideally like to fix the #mappers / #reducers in the 
>> application itself rather than let Tez determine it. What are the common 
>> pitfalls in doing so ?
>> 
>> Thanks,
>> Raajay
> 



Tez : Anyway to avoid creating subdirectories by "Insert with union all” ?

2015-08-19 Thread Jim Green
Hi Team,

Below insert with union-all will create sub-directories:
set hive.execution.engine=tez;
create table h1_passwords_target like h1_passwords;

 insert overwrite table h1_passwords_target
 select * from
 (select * from h1_passwords limit 1
 union all
 select * from h1_passwords limit 2 ) sub;


[root@h1 h1_passwords_target]# ls -altr
total 2
drwxrwxrwx 115 xxx xxx 113 Aug 19 21:24 ..
drwxr-xr-x   2 xxx xxx   1 Aug 19 21:25 2
drwxr-xr-x   2 xxx xxx   1 Aug 19 21:25 1
drwxr-xr-x   4 xxx xxx   2 Aug 19 21:25 .

Is there anyway to avoid creating sub-directories? Or this is by design and
can not be changed?

Because non-Tez query by default they can not work fine since
hive.mapred.supports.subdirectories=false.

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Logical to Physical DAG conversion

2015-08-19 Thread Hitesh Shah
If you are mainly looking at this from a “mapper” and “reducer” perspective, 
there are 2 main ways in which Tez affects parallelism: 

i) For the mapper, the no. of maps is effectively governed by how many splits 
are created and how are splits assigned to tasks. The initial split creation is 
controlled by the InputSplitFormat. Tez then looks at the cluster capacity to 
decide whether the no. of splits are optimal. For example, the InputFormat 
could create 1000 splits which would imply a 1000 tasks. However, if the max 
no. of containers that can be launched is only 50, this would imply 20 waves of 
tasks for the full stage/vertex to complete. So in this case, if grouping is 
enabled, Tez tries and re-shuffles the splits to around 1.5 waves ( 1.5 is 
configurable ) but also ensures that splits are kept within a bounded range 
with respect to the amount of data being processed. For example, if each split 
was processing 1 TB of data, there is no need to group splits as 1 TB is 
already too large. Whereas if each split was 1 MB, then Tez will likely end 
bundling up more and more splits to be processed by a single task until a min 
limit is reached on a per task basis. Disabling grouping would allow you to 
control no. of tasks by configuring the InputSplit as needed to a more static 
nature.

2) For reducers, the main problem everyone has is that today you could 
configure a 1000 reducers but the mappers generated only 10 MB of data in 
total. This could be processed by a single reducer. Tez monitors the data being 
generated from the mappers and dynamically tries to decide how many reducers it 
can scale down to. This is the auto-reduce parallelism that Jeff referred to in 
his email earlier. Again, a static configuration for the reducer stage i.e 
setParallelism() and disabling auto-reduce would stop Tez from making any 
runtime changes. 

To answer your first question, each stage/vertex is governed by its own 
VertexManager which in the end is user-code ( most of the common ones are 
implemented within the framework but can be overridden if needed). These VMs 
can be as dumb or as complex as possible and can make runtime decisions. There 
are certain points in the processing up to which the VM can tweak parallelism. 
After a certain point, parallelism becomes final ( due to the requirement that 
failures need to be handled in a deterministic manner such that re-running a 
task results in the same data output each time around ).

thanks
— Hitesh

On Aug 19, 2015, at 3:24 AM, Raajay Viswanathan  wrote:

> Hello,
> 
> I am just getting started with Tez and need some clarification about the 
> logical to physical DAG conversion in Tez. Does Tez convert a Logical DAG to 
> physical DAG at one go or are the number of mappers and reducers for a stage 
> determined only when the stage is ready to run ?
> 
> Also, what are some rules of thumb regarding the number of mappers/reducers 
> for a stage ? I would ideally like to fix the #mappers / #reducers in the 
> application itself rather than let Tez determine it. What are the common 
> pitfalls in doing so ?
> 
> Thanks,
> Raajay



Re: Logical to Physical DAG conversion

2015-08-19 Thread Jianfeng (Jeff) Zhang

You can specify the number of tasks (number of mappers/reducers> of a
stage when you build the DAG before you run it. Tez also allow user to
change the number of tasks dynamically at runtime. ShuffleVertexManager is
the user plugin to change the parallelism based on the outputs of its
upstream stage.
Of course, you can make your own VertexManager plugin for a different
strategy for determining the parallelism. But I would suggest you to use
ShuffleVertexManager because it is not easy to write your own
VertexManager when you are not familiar with the Tez internals. And
ShuffleVertexManager can only be used for
ScatterGather edge (similar as shuffle of map reduce)

Here¹s the parameter you can set for ShuffleVertexManaget to control how
to determine the parallelism (copy from the source code), let me know if
you have any future question.


 /**
   * In case of a ScatterGather connection, the fraction of source tasks
which
   * should complete before tasks for the current vertex are scheduled
   */
  public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION =
   
"tez.shuffle-vertex-manager.min-src-fraction";

  /**
   * In case of a ScatterGather connection, once this fraction of source
tasks
   * have completed, all tasks on the current vertex can be scheduled.
Number of
   * tasks ready for scheduling on the current vertex scales linearly
between
   * min-fraction and max-fraction
   */
  public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION =
   
"tez.shuffle-vertex-manager.max-src-fraction";

  
  /**
   * Enables automatic parallelism determination for the vertex. Based on
input data
   * statisitics the parallelism is decreased to a desired level.
   */
  public static final String
TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL =
   
"tez.shuffle-vertex-manager.enable.auto-parallel";

  
  /**
   * The desired size of input per task. Parallelism will be changed to
meet this criteria
   */
  public static final String
TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE =
   
"tez.shuffle-vertex-manager.desired-task-input-size";


  /**
   * Automatic parallelism determination will not decrease parallelism
below this value
   */
  public static final String
TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM =
   
"tez.shuffle-vertex-manager.min-task-parallelism";








Best Regard,
Jeff Zhang





On 8/19/15, 6:24 PM, "Raajay Viswanathan"  wrote:

>Hello,
>
>I am just getting started with Tez and need some clarification about the
>logical to physical DAG conversion in Tez. Does Tez convert a Logical DAG
>to physical DAG at one go or are the number of mappers and reducers for a
>stage determined only when the stage is ready to run ?
>
>Also, what are some rules of thumb regarding the number of
>mappers/reducers for a stage ? I would ideally like to fix the #mappers /
>#reducers in the application itself rather than let Tez determine it.
>What are the common pitfalls in doing so ?
>
>Thanks,
>Raajay



Logical to Physical DAG conversion

2015-08-19 Thread Raajay Viswanathan
Hello,

I am just getting started with Tez and need some clarification about the 
logical to physical DAG conversion in Tez. Does Tez convert a Logical DAG to 
physical DAG at one go or are the number of mappers and reducers for a stage 
determined only when the stage is ready to run ?

Also, what are some rules of thumb regarding the number of mappers/reducers for 
a stage ? I would ideally like to fix the #mappers / #reducers in the 
application itself rather than let Tez determine it. What are the common 
pitfalls in doing so ?

Thanks,
Raajay