[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710643#comment-16710643 ] ASF GitHub Bot commented on NIFI-4946: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r239241291 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Map headers = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405074#comment-16405074 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r175496952 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405038#comment-16405038 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r175490223 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404991#comment-16404991 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r175481806 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404862#comment-16404862 ] Joseph Witt commented on NIFI-4946: --- removed fix version for now until it is closer to merge. Is in active review/discussion cycle. Will need committer review/merge. Looks like a cool feature/PR > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404732#comment-16404732 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r175417724 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392281#comment-16392281 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173350325 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392271#comment-16392271 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173349239 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { --- End diff -- Once current approach is accepted this can be taken care > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392268#comment-16392268 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173349052 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") +.description("jars to be used in this batch session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor NAME = new PropertyDescriptor.Builder() --- End diff -- Like said before, not yet considered. We just wanted to get a hang of the code with our basic requirements > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392267#comment-16392267 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173348751 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") --- End diff -- Sure, will do that. > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392266#comment-16392266 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173348709 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); --- End diff -- Sure, this will be removed in the next commit. > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392264#comment-16392264 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173348636 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -129,7 +185,13 @@ public void init(final ProcessorInitializationContext context) { List properties = new ArrayList<>(); properties.add(LIVY_CONTROLLER_SERVICE); +properties.add(IS_BATCH_JOB); +properties.add(PY_FILES); +//properties.add(JAR_FILES); +properties.add(MAIN_PY_FILE); +properties.add(NAME); properties.add(CODE); +//properties.add(ARGS); --- End diff -- Only `pyfiles` and `file` options are tested. Rest are yet to be tested. Plan was to go with implementing test modules and test other features, since the current manual testing takes a long routine of compile, copy and restart of the Nifi. > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391545#comment-16391545 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173211644 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -129,7 +185,13 @@ public void init(final ProcessorInitializationContext context) { List properties = new ArrayList<>(); properties.add(LIVY_CONTROLLER_SERVICE); +properties.add(IS_BATCH_JOB); +properties.add(PY_FILES); +//properties.add(JAR_FILES); +properties.add(MAIN_PY_FILE); +properties.add(NAME); properties.add(CODE); +//properties.add(ARGS); --- End diff -- Comments to be removed? > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391551#comment-16391551 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173223531 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391552#comment-16391552 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173226045 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391553#comment-16391553 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173216684 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") +.description("jars to be used in this batch session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor NAME = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-name") +.displayName("name") +.description("The name of this session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor ARGS = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-args") +.displayName("args") +.description("The name of this session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor MAIN_PY_FILE = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-main-py-file") +.displayName("file") --- End diff -- Same as the JARs case. Most of the `PropertyDescriptor` use all lowercase characters for `displayName`. Please change it. > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ***
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391549#comment-16391549 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173213146 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); --- End diff -- Cosmetic change: It would be great if this log.debug message can be changed to something of a proper standard, like "JSON Response : " i.e. Remove > > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. --
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391550#comment-16391550 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173215603 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") --- End diff -- `displayName` is what will be rendered on the UI. So lets change it to JARs or Application JARs? > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391546#comment-16391546 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173216966 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { --- End diff -- This will be true all the time, right? > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391547#comment-16391547 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173213566 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390898#comment-16390898 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on the issue: https://github.com/apache/nifi/pull/2521 Team, This MR was created in line with our current requirements. Currently the changes are tested manually and found working. We would like to make this changes go in mainline, for which we need community help with reviewing and adding test code, since we are new to the Nifi extensions. Next plan is to add test cases : - Create a sample PySpark example modules as part of Test resources - Use that in tests Could any one point out some existing test cases that does similar testing. > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390869#comment-16390869 ] ASF GitHub Bot commented on NIFI-4946: -- GitHub user Mageswaran1989 opened a pull request: https://github.com/apache/nifi/pull/2521 NIFI-4946 nifi-spark-bundle : Adding support for pyfiles, file, jars options Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ x] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhiraa/nifi NIFI-4946_nifi-spark-bundle_Adding_support_for_pyfiles Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2521.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2521 commit a8301cdad0194328d420282f5e3260127776ed84 Author: MageswaranDate: 2018-03-08T07:39:26Z NIFI-4946 initial commit > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > > More details will be posted in the Git link. > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)