Dear all,
We are building a workflow with forks and joins and we get this exception:
Caused by: org.apache.oozie.workflow.WorkflowException: E0743: Multiple "ok to"
transitions to the same node, [calculateGlobalMoviesUse], are not allowed
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateForkJoin(LiteWorkflowAppParser.java:206)
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateForkJoin(LiteWorkflowAppParser.java:227)
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateForkJoin(LiteWorkflowAppParser.java:234)
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateForkJoin(LiteWorkflowAppParser.java:265)
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateForkJoin(LiteWorkflowAppParser.java:227)
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateForkJoin(LiteWorkflowAppParser.java:234)
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateForkJoin(LiteWorkflowAppParser.java:219)
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateForkJoin(LiteWorkflowAppParser.java:214)
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateForkJoin(LiteWorkflowAppParser.java:174)
at
org.apache.oozie.workflow.lite.LiteWorkflowAppParser.validateAndParse(LiteWorkflowAppParser.java:141)
at
org.apache.oozie.workflow.lite.LiteWorkflowLib.parseDef(LiteWorkflowLib.java:54)
at
org.apache.oozie.service.LiteWorkflowAppService.parseDef(LiteWorkflowAppService.java:47)
at
org.apache.oozie.service.LiteWorkflowAppService.parseDef(LiteWorkflowAppService.java:42)
at org.apache.oozie.command.wf.SubmitXCommand.execute
My workflow.xml is os big, sorry for the inconvenience(thanks in advance):
<workflow-app xmlns='uri:oozie:workflow:0.2' name='koios-$service-export'>
<start to='eventTimeConsumer' />
<!-- INCREMENTAL LOAD FIRST STEP -->
<action name='eventTimeConsumer'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.pdi.koios.orchestrator.ConsumerMain</main-class>
<arg>${mysql_url}</arg>
<arg>${mysql_user}</arg>
<arg>${mysql_pass}</arg>
<arg>${workflowId}</arg>
<capture-output />
</java>
<ok to="setupFork" />
<error to="fail" />
</action>
<fork name="setupFork">
<path start="checkDailyLoad"/>
<path start="checkWeeklyLoad"/>
<path start="checkMonthlyLoad"/>
</fork>
<join name="setupJoin" to="calculateGlobalFork"/>
<!-- MONTHLY LOAD -->
<decision name="checkMonthlyLoad">
<switch>
<case to="setupMonthlyFork">
${wf:actionData('eventTimeConsumer')['continue_monthly']}
</case>
<default to="setupJoin" />
</switch>
</decision>
<fork name="setupMonthlyFork">
<path start="setupMonthlyTmpHistogramCustomers"/>
<path start="setupMonthlyTmpHistogramMovies"/>
</fork>
<join name="setupMonthlyJoin" to="calculateMonthlyFork"/>
<action name='setupMonthlyTmpHistogramCustomers'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/monthlyCustomerHistTemporal.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>init_month=${wf:actionData('eventTimeConsumer')['initial.timestamp.monthly']}</param>
<param>end_month=${wf:actionData('eventTimeConsumer')['end.timestamp.monthly']}</param>
<param>service=${service}</param>
</hive>
<ok to="setupMonthlyJoin"/>
<error to="fail"/>
</action>
<action name='setupMonthlyTmpHistogramMovies'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/monthlyMovieHistTemporal.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>init_month=${wf:actionData('eventTimeConsumer')['initial.timestamp.monthly']}</param>
<param>end_month=${wf:actionData('eventTimeConsumer')['end.timestamp.monthly']}</param>
<param>service=${service}</param>
</hive>
<ok to="setupMonthlyJoin"/>
<error to="fail"/>
</action>
<fork name="calculateMonthlyFork">
<path start="calculateMonthlyTops"/>
<path start="calculateMonthlyHistograms"/>
</fork>
<join name="calculateMonthlyJoin" to="setupJoin"/>
<action name='calculateMonthlyTops'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/monthlyTops.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>init_month=${wf:actionData('eventTimeConsumer')['initial.timestamp.monthly']}</param>
<param>end_month=${wf:actionData('eventTimeConsumer')['end.timestamp.monthly']}</param>
<param>service=${service}</param>
</hive>
<ok to="loadMonthlyTops"/>
<error to="fail"/>
</action>
<action name='loadMonthlyTops'>
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>export</arg>
<arg>--username</arg>
<arg>${mysql_user}</arg>
<arg>--password</arg>
<arg>${mysql_pass}</arg>
<arg>--connect</arg>
<arg>${mysql_url}</arg>
<arg>--table</arg>
<arg>TOPS</arg>
<arg>--export-dir</arg>
<arg>/user/hive/warehouse/${service}.db/tops</arg>
<arg>--input-fields-terminated-by</arg>
<arg>\001</arg>
</sqoop>
<ok to="calculateMonthlyJoin"/>
<error to="fail"/>
</action>
<action name='calculateMonthlyHistograms'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/monthlyHistograms.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>init_month=${wf:actionData('eventTimeConsumer')['initial.timestamp.monthly']}</param>
<param>end_month=${wf:actionData('eventTimeConsumer')['end.timestamp.monthly']}</param>
<param>service=${service}</param>
</hive>
<ok to="loadMonthlyHistograms"/>
<error to="fail"/>
</action>
<action name='loadMonthlyHistograms'>
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>export</arg>
<arg>--username</arg>
<arg>${mysql_user}</arg>
<arg>--password</arg>
<arg>${mysql_pass}</arg>
<arg>--connect</arg>
<arg>${mysql_url}</arg>
<arg>--table</arg>
<arg>HISTOGRAMS</arg>
<arg>--export-dir</arg>
<arg>/user/hive/warehouse/${service}.db/histograms</arg>
<arg>--input-fields-terminated-by</arg>
<arg>\001</arg>
</sqoop>
<ok to="calculateMonthlyJoin"/>
<error to="fail"/>
</action>
<!-- WEEKLY LOAD -->
<decision name="checkWeeklyLoad">
<switch>
<!-- empty or doesn't exist -->
<case to="setupWeeklyFork">
${wf:actionData('eventTimeConsumer')['continue_weekly']}
</case>
<default to="setupJoin" />
</switch>
</decision>
<fork name="setupWeeklyFork">
<path start="setupWeeklyTmpHistogramCustomers"/>
<path start="setupWeeklyTmpHistogramMovies"/>
</fork>
<join name="setupWeeklyJoin" to="setupJoin"/>
<action name='setupWeeklyTmpHistogramCustomers'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/weeklyCustomerHistTemporal.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>init_week=${wf:actionData('eventTimeConsumer')['initial.timestamp.weekly']}</param>
<param>end_week=${wf:actionData('eventTimeConsumer')['end.timestamp.weekly']}</param>
<param>service=${service}</param>
</hive>
<ok to="setupWeeklyJoin"/>
<error to="fail"/>
</action>
<action name='setupWeeklyTmpHistogramMovies'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/weeklyMovieHistTemporal.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>init_week=${wf:actionData('eventTimeConsumer')['initial.timestamp.weekly']}</param>
<param>end_week=${wf:actionData('eventTimeConsumer')['end.timestamp.weekly']}</param>
<param>service=${service}</param>
</hive>
<ok to="setupWeeklyJoin"/>
<error to="fail"/>
</action>
<action name='calculateWeeklyResults'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/weeklyActivityIndexes.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>service=${service}</param>
</hive>
<ok to="setupJoin"/>
<error to="fail"/>
</action>
<!-- DAILY LOAD -->
<decision name="checkDailyLoad">
<switch>
<case to="setupDailyFork">
${wf:actionData('eventTimeConsumer')['continue_daily']}
</case>
<default to="setupJoin" />
</switch>
</decision>
<!-- ALL TIME LOAD -->
<fork name="setupDailyFork">
<path start="setupSummarizeAllFields"/>
<path start="setupSummarizeNoGenres"/>
<path start="setupSummarizeOnlyGenres"/>
</fork>
<join name="setupDailyJoin" to="calculateDailyResults"/>
<action name='setupSummarizeAllFields'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/dailySummarizeAllFields.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>init_day=${wf:actionData('eventTimeConsumer')['initial.timestamp.daily']}</param>
<param>end_day=${wf:actionData('eventTimeConsumer')['end.timestamp.daily']}</param>
<param>service=${service}</param>
</hive>
<ok to="setupDailyJoin"/>
<error to="fail"/>
</action>
<action name='setupSummarizeNoGenres'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/dailySummarizeNoGenres.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>init_day=${wf:actionData('eventTimeConsumer')['initial.timestamp.daily']}</param>
<param>end_day=${wf:actionData('eventTimeConsumer')['end.timestamp.daily']}</param>
<param>service=${service}</param>
</hive>
<ok to="setupDailyJoin"/>
<error to="fail"/>
</action>
<action name='setupSummarizeOnlyGenres'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/dailySummarizeOnlyGenres.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>init_day=${wf:actionData('eventTimeConsumer')['initial.timestamp.daily']}</param>
<param>end_day=${wf:actionData('eventTimeConsumer')['end.timestamp.daily']}</param>
<param>service=${service}</param>
</hive>
<ok to="setupDailyJoin"/>
<error to="fail"/>
</action>
<action name='calculateDailyResults'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/dailySummarizes.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>service=${service}</param>
</hive>
<ok to="loadDailySummarizes"/>
<error to="fail"/>
</action>
<action name='loadDailySummarizes'>
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>export</arg>
<arg>--username</arg>
<arg>${mysql_user}</arg>
<arg>--password</arg>
<arg>${mysql_pass}</arg>
<arg>--connect</arg>
<arg>${mysql_url}</arg>
<arg>--table</arg>
<arg>CONSUMPTION_SUMMARIZE</arg>
<arg>--export-dir</arg>
<arg>/user/hive/warehouse/${service}.db/consumption_summarize</arg>
<arg>--input-fields-terminated-by</arg>
<arg>\001</arg>
</sqoop>
<ok to="setupJoin"/>
<error to="fail"/>
</action>
<!-- Common aggregates for different periods -->
<fork name="calculateGlobalFork">
<path start="checkGlobalActivityIndexes"/>
<path start="checkGlobalMoviesUse"/>
<path start="checkGlobalCustomersUse"/>
</fork>
<join name="calculateGlobalJoin" to="checkFinalizer"/>
<decision name="checkGlobalActivityIndexes">
<switch>
<case to="calculateGlobalActivityIndexes">
${wf:actionData('eventTimeConsumer')['continue_monthly'] ||
wf:actionData('eventTimeConsumer')['continue_weekly']}
</case>
<default to="calculateGlobalJoin" />
</switch>
</decision>
<action name='calculateGlobalActivityIndexes'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/globalActivityIndexes.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>service=${service}</param>
</hive>
<ok to="loadGlobalActivityIndexes"/>
<error to="fail"/>
</action>
<action name='loadGlobalActivityIndexes'>
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>export</arg>
<arg>--username</arg>
<arg>${mysql_user}</arg>
<arg>--password</arg>
<arg>${mysql_pass}</arg>
<arg>--connect</arg>
<arg>${mysql_url}</arg>
<arg>--table</arg>
<arg>ACTIVITY_INDEXES</arg>
<arg>--export-dir</arg>
<arg>/user/hive/warehouse/${service}.db/activity_indexes</arg>
<arg>--input-fields-terminated-by</arg>
<arg>\001</arg>
</sqoop>
<ok to="calculateGlobalJoin"/>
<error to="fail"/>
</action>
<decision name="checkGlobalMoviesUse">
<switch>
<case to="calculateGlobalMoviesUse">
${wf:actionData('eventTimeConsumer')['continue_monthly'] ||
wf:actionData('eventTimeConsumer')['continue_weekly']}
</case>
<default to="calculateGlobalJoin" />
</switch>
</decision>
<action name='calculateGlobalMoviesUse'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/globalMoviesUse.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>service=${service}</param>
</hive>
<ok to="loadGlobalMoviesUse"/>
<error to="fail"/>
</action>
<action name='loadGlobalMoviesUse'>
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>export</arg>
<arg>--username</arg>
<arg>${mysql_user}</arg>
<arg>--password</arg>
<arg>${mysql_pass}</arg>
<arg>--connect</arg>
<arg>${mysql_url}</arg>
<arg>--table</arg>
<arg>MOVIES_USE</arg>
<arg>--export-dir</arg>
<arg>/user/hive/warehouse/${service}.db/movies_use</arg>
<arg>--input-fields-terminated-by</arg>
<arg>\001</arg>
</sqoop>
<ok to="calculateGlobalJoin"/>
<error to="fail"/>
</action>
<decision name="checkGlobalCustomersUse">
<switch>
<case to="calculateGlobalMoviesUse">
${wf:actionData('eventTimeConsumer')['continue_monthly'] ||
wf:actionData('eventTimeConsumer')['continue_weekly']}
</case>
<default to="calculateGlobalJoin" />
</switch>
</decision>
<action name='calculateGlobalCustomersUse'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>resources/hive-site.xml</job-xml>
<script>resources/globalCustomersUse.hql</script>
<param>udfs_jar=${workflow_path}/lib/etl-0.0.1-SNAPSHOT.jar</param>
<param>service=${service}</param>
</hive>
<ok to="loadGlobalCustomersUse"/>
<error to="fail"/>
</action>
<action name='loadGlobalCustomersUse'>
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>export</arg>
<arg>--username</arg>
<arg>${mysql_user}</arg>
<arg>--password</arg>
<arg>${mysql_pass}</arg>
<arg>--connect</arg>
<arg>${mysql_url}</arg>
<arg>--table</arg>
<arg>CUSTOMERS_USE</arg>
<arg>--export-dir</arg>
<arg>/user/hive/warehouse/${service}.db/customers_use</arg>
<arg>--input-fields-terminated-by</arg>
<arg>\001</arg>
</sqoop>
<ok to="calculateGlobalJoin"/>
<error to="fail"/>
</action>
<decision name="checkFinalizer">
<switch>
<case to="eventTimeFinalizer">
${wf:actionData('eventTimeConsumer')['continue_daily']}
</case>
<default to="end" />
</switch>
</decision>
<!-- LAST ACTION OF THE INCREMENTAL LOAD -->
<action name='eventTimeFinalizer'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.pdi.koios.orchestrator.FinalizerMain</main-class>
<arg>${mysql_url}</arg>
<arg>${mysql_user}</arg>
<arg>${mysql_pass}</arg>
<arg>${workflowId}</arg>
<arg>${wf:actionData('eventTimeConsumer')['end.date']}</arg>
<capture-output />
</java>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Map/Reduce failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
<end name='end' />
</workflow-app>
________________________________
Este mensaje se dirige exclusivamente a su destinatario. Puede consultar
nuestra pol?tica de env?o y recepci?n de correo electr?nico en el enlace
situado m?s abajo.
This message is intended exclusively for its addressee. We only send and
receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx