Hello every body,
I am a student. I tried to run a workflow, which contains two parallel jobs
(using fork) in a single node (installed Pseudo mode of hadoop) but these
jobs are always in pending state (in hadoop jobtracker).
Any idea for me ? Thank you very much.
My workflow is bellow:
<workflow-app name="query1" xmlns="uri:oozie:workflow:0.1"
>
<start to="normalize_data"/>
<action name="normalize_data">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete
path="${nameNode}/user/${wf:user()}/${jobOutput}/data_normalized"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.compress.map.output</name>
<value>false</value>
</property>
</configuration>
<script>normalize_data.pig</script>
<param>input=/user/${wf:user()}/${flight_input}</param>
<param>output=/user/${wf:user()}/${jobOutput}/data_normalized</param>
</pig>
<ok to="forkSelectInboundOutbound" />
<error to="fail" />
</action>
<fork name="forkSelectInboundOutbound">
<path start="top_25_inbound"/>
<path start="top_25_outbound"/>
</fork>
<action name="top_25_inbound">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete
path="${nameNode}/user/${wf:user()}/${jobOutput}/top_25_inbound"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.compress.map.output</name>
<value>false</value>
</property>
</configuration>
<script>select_top_25_inbound.pig</script>
<param>input=/user/${wf:user()}/${jobOutput}/data_normalized/part*</param>
<param>output=/user/${wf:user()}/${jobOutput}/top_25_inbound</param>
</pig>
<ok to="joinInboundOutbound" />
<error to="fail" />
</action>
<action name="top_25_outbound">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete
path="${nameNode}/user/${wf:user()}/${jobOutput}/top_25_outbound"/>
<!--mkdir
path="${nameNode}/user/${wf:user()}/${jobOutput}/top_25_outbound"/-->
</prepare>
<!--job-xml>job_xml_file</job-xml-->
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.compress.map.output</name>
<value>false</value>
</property>
</configuration>
<script>select_top_25_outbound.pig</script>
<param>input=/user/${wf:user()}/${jobOutput}/data_normalized/part*</param>
<param>output=/user/${wf:user()}/${jobOutput}/top_25_outbound</param>
</pig>
<ok to="joinInboundOutbound" />
<error to="fail" />
</action>
<join name="joinInboundOutbound" to="end" />
<action name="top_25_total">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete
path="${nameNode}/user/${wf:user()}/${jobOutput}/top_25_total"/>
</prepare>
<!--job-xml>job_xml_file</job-xml-->
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.compress.map.output</name>
<value>false</value>
</property>
</configuration>
<script>select_top_25_total.pig</script>
<param>count_inbound=/user/${wf:user()}/${jobOutput}/top_25_inbound/part*</param>
<param>count_outbound=/user/${wf:user()}/${jobOutput}/top_25_outbound/part*</param>
<param>output=/user/${wf:user()}/${jobOutput}/top_25_total</param>
</pig>
<ok to="get_detail_result" />
<error to="fail" />
</action>
<action name="get_detail_result">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.compress.map.output</name>
<value>false</value>
</property>
</configuration>
<script>analyis_different_time_granuraties.pig</script>
<param>input=/user/${wf:user()}/${jobOutput}/top_25_total/part*</param>
<param>output=/user/${wf:user()}/${jobOutput}/detail_result</param>
</pig>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>
Pig failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
<end name="end"/>
</workflow-app>