[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710643#comment-16710643
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r239241291
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
+  

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405074#comment-16405074
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r175496952
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
   

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405038#comment-16405038
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r175490223
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
   

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404991#comment-16404991
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r175481806
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
+

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-19 Thread Joseph Witt (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404862#comment-16404862
 ] 

Joseph Witt commented on NIFI-4946:
---

removed fix version for now until it is closer to merge.  Is in active 
review/discussion cycle.  Will need committer review/merge.  Looks like a cool 
feature/PR

> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404732#comment-16404732
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r175417724
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
   

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392281#comment-16392281
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173350325
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
   

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392271#comment-16392271
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173349239
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
--- End diff --

Once current approach is accepted this can be taken care


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392268#comment-16392268
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173349052
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -83,6 +83,62 @@
 .expressionLanguageSupported(true)
 .build();
 
+public static final PropertyDescriptor IS_BATCH_JOB = new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-is_batch_job")
+.displayName("Is Batch Job")
+.description("If true, the `Code` part is ignored and the flow 
file from previous stage is considered "
++ "as a triggering event and not as code for Spark 
session. When `Wait` state is self routed"
++ "the livy json response flow file from previous 
Spark job is used to poll the job status"
++ "for sucess or failure")
+.required(true)
+.allowableValues("true", "false")
+.defaultValue("false")
+.build();
+
+public static final PropertyDescriptor PY_FILES =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-pyfiles")
+.displayName("pyFiles")
+.description("Python files to be used in this batch session 
that includes *.py, *.zip files")
+.required(false)
+.addValidator(StandardValidators.createURLorFileValidator())
+.expressionLanguageSupported(false)
+.build();
+
+public static final PropertyDescriptor JAR_FILES =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-jarfiles")
+.displayName("jars")
+.description("jars to be used in this batch session")
+.required(false)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.expressionLanguageSupported(false)
+.build();
+
+public static final PropertyDescriptor NAME =  new 
PropertyDescriptor.Builder()
--- End diff --

Like said before, not yet considered. We just wanted to get a hang of the 
code with our basic requirements


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392267#comment-16392267
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173348751
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -83,6 +83,62 @@
 .expressionLanguageSupported(true)
 .build();
 
+public static final PropertyDescriptor IS_BATCH_JOB = new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-is_batch_job")
+.displayName("Is Batch Job")
+.description("If true, the `Code` part is ignored and the flow 
file from previous stage is considered "
++ "as a triggering event and not as code for Spark 
session. When `Wait` state is self routed"
++ "the livy json response flow file from previous 
Spark job is used to poll the job status"
++ "for sucess or failure")
+.required(true)
+.allowableValues("true", "false")
+.defaultValue("false")
+.build();
+
+public static final PropertyDescriptor PY_FILES =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-pyfiles")
+.displayName("pyFiles")
+.description("Python files to be used in this batch session 
that includes *.py, *.zip files")
+.required(false)
+.addValidator(StandardValidators.createURLorFileValidator())
+.expressionLanguageSupported(false)
+.build();
+
+public static final PropertyDescriptor JAR_FILES =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-jarfiles")
+.displayName("jars")
--- End diff --

Sure, will do that.


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392266#comment-16392266
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173348709
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
--- End diff --

Sure, this will be removed in the next commit.


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392264#comment-16392264
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173348636
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -129,7 +185,13 @@
 public void init(final ProcessorInitializationContext context) {
 List properties = new ArrayList<>();
 properties.add(LIVY_CONTROLLER_SERVICE);
+properties.add(IS_BATCH_JOB);
+properties.add(PY_FILES);
+//properties.add(JAR_FILES);
+properties.add(MAIN_PY_FILE);
+properties.add(NAME);
 properties.add(CODE);
+//properties.add(ARGS);
--- End diff --

Only `pyfiles` and `file` options are tested. Rest are yet to be tested.

Plan was to go with implementing test modules and test other features, 
since the current manual testing takes a long routine of compile, copy and 
restart of the Nifi.


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391545#comment-16391545
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173211644
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -129,7 +185,13 @@
 public void init(final ProcessorInitializationContext context) {
 List properties = new ArrayList<>();
 properties.add(LIVY_CONTROLLER_SERVICE);
+properties.add(IS_BATCH_JOB);
+properties.add(PY_FILES);
+//properties.add(JAR_FILES);
+properties.add(MAIN_PY_FILE);
+properties.add(NAME);
 properties.add(CODE);
+//properties.add(ARGS);
--- End diff --

Comments to be removed?


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391551#comment-16391551
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173223531
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
+

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391552#comment-16391552
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173226045
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
+

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391553#comment-16391553
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173216684
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -83,6 +83,62 @@
 .expressionLanguageSupported(true)
 .build();
 
+public static final PropertyDescriptor IS_BATCH_JOB = new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-is_batch_job")
+.displayName("Is Batch Job")
+.description("If true, the `Code` part is ignored and the flow 
file from previous stage is considered "
++ "as a triggering event and not as code for Spark 
session. When `Wait` state is self routed"
++ "the livy json response flow file from previous 
Spark job is used to poll the job status"
++ "for sucess or failure")
+.required(true)
+.allowableValues("true", "false")
+.defaultValue("false")
+.build();
+
+public static final PropertyDescriptor PY_FILES =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-pyfiles")
+.displayName("pyFiles")
+.description("Python files to be used in this batch session 
that includes *.py, *.zip files")
+.required(false)
+.addValidator(StandardValidators.createURLorFileValidator())
+.expressionLanguageSupported(false)
+.build();
+
+public static final PropertyDescriptor JAR_FILES =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-jarfiles")
+.displayName("jars")
+.description("jars to be used in this batch session")
+.required(false)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.expressionLanguageSupported(false)
+.build();
+
+public static final PropertyDescriptor NAME =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-name")
+.displayName("name")
+.description("The name of this session")
+.required(false)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.expressionLanguageSupported(false)
+.build();
+
+public static final PropertyDescriptor ARGS =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-args")
+.displayName("args")
+.description("The name of this session")
+.required(false)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.expressionLanguageSupported(false)
+.build();
+
+public static final PropertyDescriptor MAIN_PY_FILE = new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-main-py-file")
+.displayName("file")
--- End diff --

Same as the JARs case. Most of the `PropertyDescriptor` use all lowercase 
characters for `displayName`. Please change it.


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** 

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391549#comment-16391549
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173213146
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
--- End diff --

Cosmetic change: It would be great if this log.debug message can be changed 
to something of a proper standard, like "JSON Response : " 
i.e. Remove > 


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391550#comment-16391550
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173215603
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -83,6 +83,62 @@
 .expressionLanguageSupported(true)
 .build();
 
+public static final PropertyDescriptor IS_BATCH_JOB = new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-is_batch_job")
+.displayName("Is Batch Job")
+.description("If true, the `Code` part is ignored and the flow 
file from previous stage is considered "
++ "as a triggering event and not as code for Spark 
session. When `Wait` state is self routed"
++ "the livy json response flow file from previous 
Spark job is used to poll the job status"
++ "for sucess or failure")
+.required(true)
+.allowableValues("true", "false")
+.defaultValue("false")
+.build();
+
+public static final PropertyDescriptor PY_FILES =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-pyfiles")
+.displayName("pyFiles")
+.description("Python files to be used in this batch session 
that includes *.py, *.zip files")
+.required(false)
+.addValidator(StandardValidators.createURLorFileValidator())
+.expressionLanguageSupported(false)
+.build();
+
+public static final PropertyDescriptor JAR_FILES =  new 
PropertyDescriptor.Builder()
+.name("exec-spark-iactive-jarfiles")
+.displayName("jars")
--- End diff --

`displayName` is what will be rendered on the UI. So lets change it to JARs 
or Application JARs?


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391546#comment-16391546
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173216966
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
--- End diff --

This will be true all the time, right?


> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391547#comment-16391547
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r173213566
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
+

[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390898#comment-16390898
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

Github user Mageswaran1989 commented on the issue:

https://github.com/apache/nifi/pull/2521
  
Team,

This MR was created in line with our current requirements. 
Currently the changes are tested manually and found working.

We would like to make this changes go in mainline, for which we need 
community help with reviewing and adding test code, since we are new to the 
Nifi extensions.

Next plan is to add test cases : 
- Create a sample PySpark example modules as part of Test resources 
- Use that in tests

Could any one point out some existing test cases that does similar testing.



> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: nifi-spark-options.png, nifi-spark.png
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 
> )|https://github.com/apache/nifi/pull/2521]*
>  * Check whether the processor has to handle code or submit a Spark job
>  * Read incoming flow file
>  ** If batch == true
>  *** If flow file matches Livy `batches` JSON response through `wait` loop
>   Wait for Status Check Interval
>   Read the state
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  *** Else
>   Ignore the flow file
>   Trigger the Spark job over Livy `batches` endpoint
>   Read the state of the submitted job
>   If state is `running` route it to `wait` or if it  is `success` or 
> `dead` route it accordingly
>  ** Else:
>  *** Existing Logic to handle `Code`
>  
> !nifi-spark-options.png!
> !nifi-spark.png!
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options

2018-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390869#comment-16390869
 ] 

ASF GitHub Bot commented on NIFI-4946:
--

GitHub user Mageswaran1989 opened a pull request:

https://github.com/apache/nifi/pull/2521

NIFI-4946 nifi-spark-bundle : Adding support for pyfiles, file, jars options

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ x] Does your PR title start with NIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [x ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ x] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhiraa/nifi 
NIFI-4946_nifi-spark-bundle_Adding_support_for_pyfiles

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2521.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2521


commit a8301cdad0194328d420282f5e3260127776ed84
Author: Mageswaran 
Date:   2018-03-08T07:39:26Z

NIFI-4946 initial commit




> nifi-spark-bundle : Adding support for pyfiles, file, jars options
> --
>
> Key: NIFI-4946
> URL: https://issues.apache.org/jira/browse/NIFI-4946
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.6.0
> Environment: Ubuntu 16.04, IntelliJ
>Reporter: Mageswaran
>Priority: Major
> Fix For: 1.6.0
>
>
> Adding support for submitting PySpark based Sparks jobs (which is normally 
> structured as modules) over Livy on existing "ExecuteSparkInteractive" 
> processor.
> This is done by reading file paths for pyfiles and file and an option from 
> user whether the processor should trigger a batch job or not.
> [https://livy.incubator.apache.org/docs/latest/rest-api.html]
>  
> More details will be posted in the Git link.
>  
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)