[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
[ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333119#comment-15333119 ] Aamir Abbas commented on SPARK-15919: - I have tried the solution you suggested, i-e window() function. Here's my code. {code} Duration batchInterval = new Duration(30); // 5 minutes javaStream.window(batchInterval, batchInterval).dstream().saveAsTextFiles(getBaseOutputPath(), ""); {code} The actual output of this snippet is that it gets the base output path once, creates folders in that path, and saves each record from RDDs as a separate file. The expected output was to get new base output path every time the window() function is applied, and save all the records from RDDs in a single file. Please let me know if I am applying the window() function wrongly, and how to do that right. > DStream "saveAsTextFile" doesn't update the prefix after each checkpoint > > > Key: SPARK-15919 > URL: https://issues.apache.org/jira/browse/SPARK-15919 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.6.1 > Environment: Amazon EMR >Reporter: Aamir Abbas > > I have a Spark streaming job that reads a data stream, and saves it as a text > file after a predefined time interval. In the function > stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); > The function getOutputPath() generates a new path every time the function is > called, depending on the current system time. > However, the output path prefix remains the same for all the batches, which > effectively means that function is not called again for the next batch of the > stream, although the files are being saved after each checkpoint interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
[ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329308#comment-15329308 ] Aamir Abbas commented on SPARK-15919: - This is exactly where our requirement is different. As per our use case, we need to save all the RDDs in an interval, say 5 minutes, in one file. So in case there are 1000 RDDs coming every 5 minutes, we need to save them in one file instead of 1000 files. Please let us know if and how Spark supports this, as this is a legitimate requirement. Please let us know if this should be added as a feature request, if it's currently not supported. > DStream "saveAsTextFile" doesn't update the prefix after each checkpoint > > > Key: SPARK-15919 > URL: https://issues.apache.org/jira/browse/SPARK-15919 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.6.1 > Environment: Amazon EMR >Reporter: Aamir Abbas > > I have a Spark streaming job that reads a data stream, and saves it as a text > file after a predefined time interval. In the function > stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); > The function getOutputPath() generates a new path every time the function is > called, depending on the current system time. > However, the output path prefix remains the same for all the batches, which > effectively means that function is not called again for the next batch of the > stream, although the files are being saved after each checkpoint interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
[ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329212#comment-15329212 ] Aamir Abbas commented on SPARK-15919: - Thank you for the response. If I generate new path in call function, it will save each RDD in a different location, which is not required. The requirement is to save an entire batch of data in one file at a given path. And that path should be different for next batch of data. Please suggest what should I do in this case. > DStream "saveAsTextFile" doesn't update the prefix after each checkpoint > > > Key: SPARK-15919 > URL: https://issues.apache.org/jira/browse/SPARK-15919 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.6.1 > Environment: Amazon EMR >Reporter: Aamir Abbas > > I have a Spark streaming job that reads a data stream, and saves it as a text > file after a predefined time interval. In the function > stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); > The function getOutputPath() generates a new path every time the function is > called, depending on the current system time. > However, the output path prefix remains the same for all the batches, which > effectively means that function is not called again for the next batch of the > stream, although the files are being saved after each checkpoint interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
[ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329201#comment-15329201 ] Aamir Abbas edited comment on SPARK-15919 at 6/14/16 9:49 AM: -- Here's the updated code, after using foreachRDD function. // javaStream.foreachRDD(new VoidFunction>(){ private static final long serialVersionUID = -2508020533798690700L; private int random = (int)(Math.random() * 256); private String pathPrefix = getBaseOutputPathPrefix(); private String pathSuffix = getBaseOutputPathSuffix(); public void call(JavaRDD s) { String path = pathPrefix + random + pathSuffix; s.saveAsTextFile(path); } private String getBaseOutputPathPrefix() { return "s3://bucket/folder/"; } private String getBaseOutputPathSuffix() { DecimalFormat format = new DecimalFormat("00"); DateTime dateTime = new DateTime(); return "/" + dateTime.getYear() + format.format(dateTime.getMonthOfYear()) + format.format(dateTime.getDayOfMonth()) + format.format(dateTime.getHourOfDay()) + format.format(dateTime.getMinuteOfHour()) + format.format(dateTime.getSecondOfMinute()) + "/"; } }); // The above code saves the output at the following location by adding part- files, where each file contains one RDD. s3://bucket/folder/random_number/date_time/ The expected behaviour is to save the output at the very same location. However, all of the RDDs should be in one file. For multiple batches of data, the output should be as follows: s3://bucket/folder/random_number_1/date_time_1/ s3://bucket/folder/random_number_2/date_time_2/ s3://bucket/folder/random_number_3/date_time_3/ s3://bucket/folder/random_number_1/date_time_4/ and so on. I executed this code as a Spark job on AWS EMR cluster. Please let me know if you need me to add more explanation anywhere. was (Author: aamir.abbas): Here's the updated code, after using foreachRDD function. javaStream.foreachRDD(new VoidFunction>(){ private static final long serialVersionUID = -2508020533798690700L; private int random = (int)(Math.random() * 256); private String pathPrefix = getBaseOutputPathPrefix(); private String pathSuffix = getBaseOutputPathSuffix(); public void call(JavaRDD s) { String path = pathPrefix + random + pathSuffix; s.saveAsTextFile(path); } private String getBaseOutputPathPrefix() { return "s3://bucket/folder/"; } private String getBaseOutputPathSuffix() { DecimalFormat format = new DecimalFormat("00"); DateTime dateTime = new DateTime(); return "/" + dateTime.getYear() + format.format(dateTime.getMonthOfYear()) + format.format(dateTime.getDayOfMonth()) + format.format(dateTime.getHourOfDay()) + format.format(dateTime.getMinuteOfHour()) + format.format(dateTime.getSecondOfMinute()) + "/"; } }); The above code saves the output at the following location by adding part- files, where each file contains one RDD. s3://bucket/folder/random_number/date_time/ The expected behaviour is to save the output at the very same location. However, all of the RDDs should be in one file. For multiple batches of data, the output should be as follows: s3://bucket/folder/random_number_1/date_time_1/ s3://bucket/folder/random_number_2/date_time_2/ s3://bucket/folder/random_number_3/date_time_3/ s3://bucket/folder/random_number_1/date_time_4/ and so on. I executed this code as a Spark job on AWS EMR cluster. Please let me know if you need me to add more explanation anywhere. > DStream "saveAsTextFile" doesn't update the prefix after each checkpoint > > > Key: SPARK-15919 > URL: https://issues.apache.org/jira/browse/SPARK-15919 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.6.1 > Environment: Amazon EMR >Reporter: Aamir Abbas > > I have a Spark streaming job that reads a data stream, and saves it as a text > file after a predefined time interval. In the function > stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); > The function getOutputPath() generates a new path every time the function is > called, depending on the current system time. > Howeve
[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
[ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15329201#comment-15329201 ] Aamir Abbas commented on SPARK-15919: - Here's the updated code, after using foreachRDD function. javaStream.foreachRDD(new VoidFunction>(){ private static final long serialVersionUID = -2508020533798690700L; private int random = (int)(Math.random() * 256); private String pathPrefix = getBaseOutputPathPrefix(); private String pathSuffix = getBaseOutputPathSuffix(); public void call(JavaRDD s) { String path = pathPrefix + random + pathSuffix; s.saveAsTextFile(path); } private String getBaseOutputPathPrefix() { return "s3://bucket/folder/"; } private String getBaseOutputPathSuffix() { DecimalFormat format = new DecimalFormat("00"); DateTime dateTime = new DateTime(); return "/" + dateTime.getYear() + format.format(dateTime.getMonthOfYear()) + format.format(dateTime.getDayOfMonth()) + format.format(dateTime.getHourOfDay()) + format.format(dateTime.getMinuteOfHour()) + format.format(dateTime.getSecondOfMinute()) + "/"; } }); The above code saves the output at the following location by adding part- files, where each file contains one RDD. s3://bucket/folder/random_number/date_time/ The expected behaviour is to save the output at the very same location. However, all of the RDDs should be in one file. For multiple batches of data, the output should be as follows: s3://bucket/folder/random_number_1/date_time_1/ s3://bucket/folder/random_number_2/date_time_2/ s3://bucket/folder/random_number_3/date_time_3/ s3://bucket/folder/random_number_1/date_time_4/ and so on. I executed this code as a Spark job on AWS EMR cluster. Please let me know if you need me to add more explanation anywhere. > DStream "saveAsTextFile" doesn't update the prefix after each checkpoint > > > Key: SPARK-15919 > URL: https://issues.apache.org/jira/browse/SPARK-15919 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.6.1 > Environment: Amazon EMR >Reporter: Aamir Abbas > > I have a Spark streaming job that reads a data stream, and saves it as a text > file after a predefined time interval. In the function > stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); > The function getOutputPath() generates a new path every time the function is > called, depending on the current system time. > However, the output path prefix remains the same for all the batches, which > effectively means that function is not called again for the next batch of the > stream, although the files are being saved after each checkpoint interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
[ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aamir Abbas reopened SPARK-15919: - I tried the solution you mentioned. It didn't work. I applied foreachRDD, and added a random number to change the output path. That random number remained the same for different batches of data. Also, it saved each of the RDD in a separate file instead of one file for a batch of data. > DStream "saveAsTextFile" doesn't update the prefix after each checkpoint > > > Key: SPARK-15919 > URL: https://issues.apache.org/jira/browse/SPARK-15919 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.6.1 > Environment: Amazon EMR >Reporter: Aamir Abbas > > I have a Spark streaming job that reads a data stream, and saves it as a text > file after a predefined time interval. In the function > stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); > The function getOutputPath() generates a new path every time the function is > called, depending on the current system time. > However, the output path prefix remains the same for all the batches, which > effectively means that function is not called again for the next batch of the > stream, although the files are being saved after each checkpoint interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
[ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aamir Abbas reopened SPARK-15919: - This is an issue, as I do not actually need the current timestamp to use in output path. I need the new path, which doesn't have the current timestamp, but a new output path. > DStream "saveAsTextFile" doesn't update the prefix after each checkpoint > > > Key: SPARK-15919 > URL: https://issues.apache.org/jira/browse/SPARK-15919 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.6.1 > Environment: Amazon EMR >Reporter: Aamir Abbas > > I have a Spark streaming job that reads a data stream, and saves it as a text > file after a predefined time interval. In the function > stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); > The function getOutputPath() generates a new path every time the function is > called, depending on the current system time. > However, the output path prefix remains the same for all the batches, which > effectively means that function is not called again for the next batch of the > stream, although the files are being saved after each checkpoint interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
[ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327229#comment-15327229 ] Aamir Abbas commented on SPARK-15919: - ForeachRDD is fine in case you want to save individual RDDs separately. I need to do this for entire batch of stream. Could you please share the relevant link to the documentation that can help me save the entire batch of the stream like this? > DStream "saveAsTextFile" doesn't update the prefix after each checkpoint > > > Key: SPARK-15919 > URL: https://issues.apache.org/jira/browse/SPARK-15919 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.6.1 > Environment: Amazon EMR >Reporter: Aamir Abbas > > I have a Spark streaming job that reads a data stream, and saves it as a text > file after a predefined time interval. In the function > stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); > The function getOutputPath() generates a new path every time the function is > called, depending on the current system time. > However, the output path prefix remains the same for all the batches, which > effectively means that function is not called again for the next batch of the > stream, although the files are being saved after each checkpoint interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
[ https://issues.apache.org/jira/browse/SPARK-15919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327212#comment-15327212 ] Aamir Abbas commented on SPARK-15919: - I need to save the output of each batch in a different place. This is available for a regular Spark job, should be available for streaming data as well. Should I add this as a feature requirement? > DStream "saveAsTextFile" doesn't update the prefix after each checkpoint > > > Key: SPARK-15919 > URL: https://issues.apache.org/jira/browse/SPARK-15919 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.6.1 > Environment: Amazon EMR >Reporter: Aamir Abbas > > I have a Spark streaming job that reads a data stream, and saves it as a text > file after a predefined time interval. In the function > stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); > The function getOutputPath() generates a new path every time the function is > called, depending on the current system time. > However, the output path prefix remains the same for all the batches, which > effectively means that function is not called again for the next batch of the > stream, although the files are being saved after each checkpoint interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15919) DStream "saveAsTextFile" doesn't update the prefix after each checkpoint
Aamir Abbas created SPARK-15919: --- Summary: DStream "saveAsTextFile" doesn't update the prefix after each checkpoint Key: SPARK-15919 URL: https://issues.apache.org/jira/browse/SPARK-15919 Project: Spark Issue Type: Bug Components: Java API Affects Versions: 1.6.1 Environment: Amazon EMR Reporter: Aamir Abbas I have a Spark streaming job that reads a data stream, and saves it as a text file after a predefined time interval. In the function stream.dstream().repartition(1).saveAsTextFiles(getOutputPath(), ""); The function getOutputPath() generates a new path every time the function is called, depending on the current system time. However, the output path prefix remains the same for all the batches, which effectively means that function is not called again for the next batch of the stream, although the files are being saved after each checkpoint interval. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org