[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-15 Thread Aamir Abbas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333119#comment-15333119
 ] 

Aamir Abbas commented on SPARK-15919:
-

I have tried the solution you suggested, i-e window() function. Here's my code.

{code}
Duration batchInterval = new Duration(30); // 5 minutes
javaStream.window(batchInterval, 
batchInterval).dstream().saveAsTextFiles(getBaseOutputPath(), "");
{code}

The actual output of this snippet is that it gets the base output path once, 
creates folders in that path, and saves each record from RDDs as a separate 
file.

The expected output was to get new base output path every time the window() 
function is applied, and save all the records from RDDs in a single file.

Please let me know if I am applying the window() function wrongly, and how to 
do that right.

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> 
>
> Key: SPARK-15919
> URL: https://issues.apache.org/jira/browse/SPARK-15919
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.6.1
> Environment: Amazon EMR
>Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text 
> file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is 
> called, depending on the current system time.
> However, the output path prefix remains the same for all the batches, which 
> effectively means that function is not called again for the next batch of the 
> stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-14 Thread Aamir Abbas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329308#comment-15329308
 ] 

Aamir Abbas commented on SPARK-15919:
-

This is exactly where our requirement is different. As per our use case, we 
need to save all the RDDs in an interval, say 5 minutes, in one file. So in 
case there are 1000 RDDs coming every 5 minutes, we need to save them in one 
file instead of 1000 files.

Please let us know if and how Spark supports this, as this is a legitimate 
requirement. Please let us know if this should be added as a feature request, 
if it's currently not supported.

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> 
>
> Key: SPARK-15919
> URL: https://issues.apache.org/jira/browse/SPARK-15919
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.6.1
> Environment: Amazon EMR
>Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text 
> file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is 
> called, depending on the current system time.
> However, the output path prefix remains the same for all the batches, which 
> effectively means that function is not called again for the next batch of the 
> stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-14 Thread Aamir Abbas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329212#comment-15329212
 ] 

Aamir Abbas commented on SPARK-15919:
-

Thank you for the response.

If I generate new path in call function, it will save each RDD in a different 
location, which is not required. The requirement is to save an entire batch of 
data in one file at a given path. And that path should be different for next 
batch of data.

Please suggest what should I do in this case.

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> 
>
> Key: SPARK-15919
> URL: https://issues.apache.org/jira/browse/SPARK-15919
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.6.1
> Environment: Amazon EMR
>Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text 
> file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is 
> called, depending on the current system time.
> However, the output path prefix remains the same for all the batches, which 
> effectively means that function is not called again for the next batch of the 
> stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-14 Thread Aamir Abbas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329201#comment-15329201
 ] 

Aamir Abbas edited comment on SPARK-15919 at 6/14/16 9:49 AM:
--

Here's the updated code, after using foreachRDD function.

//
javaStream.foreachRDD(new VoidFunction>(){

private static final long serialVersionUID = -2508020533798690700L;
private int random = (int)(Math.random() * 256);
private String pathPrefix = getBaseOutputPathPrefix();
private String pathSuffix = getBaseOutputPathSuffix();

public void call(JavaRDD s) {
String path = pathPrefix + random + pathSuffix;
s.saveAsTextFile(path);
}

private String getBaseOutputPathPrefix() {
return "s3://bucket/folder/";
}

private String getBaseOutputPathSuffix() {

DecimalFormat format = new DecimalFormat("00");
DateTime dateTime = new DateTime();

return "/" + dateTime.getYear() + 
format.format(dateTime.getMonthOfYear())
+ format.format(dateTime.getDayOfMonth()) + 
format.format(dateTime.getHourOfDay())
+ format.format(dateTime.getMinuteOfHour()) + 
format.format(dateTime.getSecondOfMinute()) + "/";

}

});
//

The above code saves the output at the following location by adding part- 
files, where each file contains one RDD.

s3://bucket/folder/random_number/date_time/

The expected behaviour is to save the output at the very same location. 
However, all of the RDDs should be in one file.

For multiple batches of data, the output should be as follows:

s3://bucket/folder/random_number_1/date_time_1/
s3://bucket/folder/random_number_2/date_time_2/
s3://bucket/folder/random_number_3/date_time_3/
s3://bucket/folder/random_number_1/date_time_4/

and so on. I executed this code as a Spark job on AWS EMR cluster.

Please let me know if you need me to add more explanation anywhere.


was (Author: aamir.abbas):
Here's the updated code, after using foreachRDD function.

javaStream.foreachRDD(new VoidFunction>(){

private static final long serialVersionUID = -2508020533798690700L;
private int random = (int)(Math.random() * 256);
private String pathPrefix = getBaseOutputPathPrefix();
private String pathSuffix = getBaseOutputPathSuffix();

public void call(JavaRDD s) {
String path = pathPrefix + random + pathSuffix;
s.saveAsTextFile(path);
}

private String getBaseOutputPathPrefix() {
return "s3://bucket/folder/";
}

private String getBaseOutputPathSuffix() {

DecimalFormat format = new DecimalFormat("00");
DateTime dateTime = new DateTime();

return "/" + dateTime.getYear() + 
format.format(dateTime.getMonthOfYear())
+ format.format(dateTime.getDayOfMonth()) + 
format.format(dateTime.getHourOfDay())
+ format.format(dateTime.getMinuteOfHour()) + 
format.format(dateTime.getSecondOfMinute()) + "/";

}

});

The above code saves the output at the following location by adding part- 
files, where each file contains one RDD.

s3://bucket/folder/random_number/date_time/

The expected behaviour is to save the output at the very same location. 
However, all of the RDDs should be in one file.

For multiple batches of data, the output should be as follows:

s3://bucket/folder/random_number_1/date_time_1/
s3://bucket/folder/random_number_2/date_time_2/
s3://bucket/folder/random_number_3/date_time_3/
s3://bucket/folder/random_number_1/date_time_4/

and so on. I executed this code as a Spark job on AWS EMR cluster.

Please let me know if you need me to add more explanation anywhere.

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> 
>
> Key: SPARK-15919
> URL: https://issues.apache.org/jira/browse/SPARK-15919
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.6.1
> Environment: Amazon EMR
>Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text 
> file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is 
> called, depending on the current system time.
> Howeve

[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-14 Thread Aamir Abbas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329201#comment-15329201
 ] 

Aamir Abbas commented on SPARK-15919:
-

Here's the updated code, after using foreachRDD function.

javaStream.foreachRDD(new VoidFunction>(){

private static final long serialVersionUID = -2508020533798690700L;
private int random = (int)(Math.random() * 256);
private String pathPrefix = getBaseOutputPathPrefix();
private String pathSuffix = getBaseOutputPathSuffix();

public void call(JavaRDD s) {
String path = pathPrefix + random + pathSuffix;
s.saveAsTextFile(path);
}

private String getBaseOutputPathPrefix() {
return "s3://bucket/folder/";
}

private String getBaseOutputPathSuffix() {

DecimalFormat format = new DecimalFormat("00");
DateTime dateTime = new DateTime();

return "/" + dateTime.getYear() + 
format.format(dateTime.getMonthOfYear())
+ format.format(dateTime.getDayOfMonth()) + 
format.format(dateTime.getHourOfDay())
+ format.format(dateTime.getMinuteOfHour()) + 
format.format(dateTime.getSecondOfMinute()) + "/";

}

});

The above code saves the output at the following location by adding part- 
files, where each file contains one RDD.

s3://bucket/folder/random_number/date_time/

The expected behaviour is to save the output at the very same location. 
However, all of the RDDs should be in one file.

For multiple batches of data, the output should be as follows:

s3://bucket/folder/random_number_1/date_time_1/
s3://bucket/folder/random_number_2/date_time_2/
s3://bucket/folder/random_number_3/date_time_3/
s3://bucket/folder/random_number_1/date_time_4/

and so on. I executed this code as a Spark job on AWS EMR cluster.

Please let me know if you need me to add more explanation anywhere.

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> 
>
> Key: SPARK-15919
> URL: https://issues.apache.org/jira/browse/SPARK-15919
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.6.1
> Environment: Amazon EMR
>Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text 
> file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is 
> called, depending on the current system time.
> However, the output path prefix remains the same for all the batches, which 
> effectively means that function is not called again for the next batch of the 
> stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-14 Thread Aamir Abbas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aamir Abbas reopened SPARK-15919:
-

I tried the solution you mentioned. It didn't work.

I applied foreachRDD, and added a random number to change the output path. That 
random number remained the same for different batches of data. Also, it saved 
each of the RDD in a separate file instead of one file for a batch of data.

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> 
>
> Key: SPARK-15919
> URL: https://issues.apache.org/jira/browse/SPARK-15919
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.6.1
> Environment: Amazon EMR
>Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text 
> file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is 
> called, depending on the current system time.
> However, the output path prefix remains the same for all the batches, which 
> effectively means that function is not called again for the next batch of the 
> stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-13 Thread Aamir Abbas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aamir Abbas reopened SPARK-15919:
-

This is an issue, as I do not actually need the current timestamp to use in 
output path. I need the new path, which doesn't have the current timestamp, but 
a new output path.

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> 
>
> Key: SPARK-15919
> URL: https://issues.apache.org/jira/browse/SPARK-15919
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.6.1
> Environment: Amazon EMR
>Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text 
> file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is 
> called, depending on the current system time.
> However, the output path prefix remains the same for all the batches, which 
> effectively means that function is not called again for the next batch of the 
> stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-13 Thread Aamir Abbas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327229#comment-15327229
 ] 

Aamir Abbas commented on SPARK-15919:
-

ForeachRDD is fine in case you want to save individual RDDs separately. I need 
to do this for entire batch of stream. Could you please share the relevant link 
to the documentation that can help me save the entire batch of the stream like 
this?

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> 
>
> Key: SPARK-15919
> URL: https://issues.apache.org/jira/browse/SPARK-15919
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.6.1
> Environment: Amazon EMR
>Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text 
> file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is 
> called, depending on the current system time.
> However, the output path prefix remains the same for all the batches, which 
> effectively means that function is not called again for the next batch of the 
> stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-13 Thread Aamir Abbas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327212#comment-15327212
 ] 

Aamir Abbas commented on SPARK-15919:
-

I need to save the output of each batch in a different place. This is available 
for a regular Spark job, should be available for streaming data as well. Should 
I add this as a feature requirement?

> DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
> 
>
> Key: SPARK-15919
> URL: https://issues.apache.org/jira/browse/SPARK-15919
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.6.1
> Environment: Amazon EMR
>Reporter: Aamir Abbas
>
> I have a Spark streaming job that reads a data stream, and saves it as a text 
> file after a predefined time interval. In the function 
> stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");
> The function getOutputPath() generates a new path every time the function is 
> called, depending on the current system time.
> However, the output path prefix remains the same for all the batches, which 
> effectively means that function is not called again for the next batch of the 
> stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint

2016-06-13 Thread Aamir Abbas (JIRA)
Aamir Abbas created SPARK-15919:
---

 Summary: DStream "saveAsTextFile" doesn't update the prefix after 
each checkpoint
 Key: SPARK-15919
 URL: https://issues.apache.org/jira/browse/SPARK-15919
 Project: Spark
  Issue Type: Bug
  Components: Java API
Affects Versions: 1.6.1
 Environment: Amazon EMR
Reporter: Aamir Abbas


I have a Spark streaming job that reads a data stream, and saves it as a text 
file after a predefined time interval. In the function 

stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), "");

The function getOutputPath() generates a new path every time the function is 
called, depending on the current system time.
However, the output path prefix remains the same for all the batches, which 
effectively means that function is not called again for the next batch of the 
stream, although the files are being saved after each checkpoint interval. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org