[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2021-06-10 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361310#comment-17361310
 ] 

Jungtaek Lim commented on SPARK-24295:
--

[~parshabani]
Please follow up SPARK-27188.

But there're lots of other problems in file stream source/sink, and they 
require major efforts to deal with. 3rd party data lake solutions (e.g. Apache 
Hudi, Apache Iceberg, Delta Lake sorted by alphabetially) are trying to deal 
with these problems (and they already resolved known issues) so you may want to 
look into these solutions.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2021-06-10 Thread Parsa Shabani (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361245#comment-17361245
 ] 

Parsa Shabani commented on SPARK-24295:
---

Hello all,

Any updates on this? [~kabhwan] are you still planning to merge your PR?

We are having severe issues with the growing size of the .compact files. 

The functionality for the purging and truncating records for older syncs is 
badly needed in my opinion.

 

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2020-09-08 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192574#comment-17192574
 ] 

Jungtaek Lim commented on SPARK-24295:
--

[~sta...@gmail.com]

Thanks for sharing the workaround. I've proposed applying TTL on FileStreamSink 
output, which does the similar with your workaround, but purges for every 
compact batch. Unfortunately it hasn't made enough interest for committers, 
though.

SPARK-27188 ([https://github.com/apache/spark/pull/28363])

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2020-09-08 Thread Avner Livne (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192280#comment-17192280
 ] 

Avner Livne commented on SPARK-24295:
-

for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): 
Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
  override def hasNext: Boolean = underlying.hasNext
  override def next: T = underlying.next
}
wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - 
java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p => 
implicit val formats = DefaultFormats
p.collect({
case "v1" => "v1"
case x if { 
parse(x).extract[SinkFileStatus].modificationTime > ttl 
} => x
})
}).coalesce(1)
println(s"removing ${lines.count - reduced_lines.count} lines from 
${file.toString}...")
reduced_lines.saveAsTextFile(compacted_file)
FileUtil.copy(fs, new Path(compacted_file + "/part-0"), fs, file, 
false, sc.hadoopConfiguration)
removePath(compacted_file, fs)
}

/**
  * get last compacted files if exists
  */
def getLastCompactFile(path: Path) = {
fs.listFiles(path, 
true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) 
=> 
x.getPath
})
}

val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))

val df1 = spark
  .readStream
  .format("kafka") ///. whatever stream you like
{code}


this example will retain SinkFileStatus from the last 20 days and will purge 
everything else
I run this code on driver startup - but it can certainly run async in some 
sidecar cronjob 

tested on spark 3.0.0 writing parquet files 


> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2020-02-25 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045011#comment-17045011
 ] 

Jungtaek Lim commented on SPARK-24295:
--

[~iqbal_khattra] [~alfredo-gimenez-bv]

Hi, if you're open to try out something on your environment, could you please 
try out SPARK-30946 and see how much it helps? You will need to back up your 
checkpoint and "_spark_metadata" directory in output directory as SPARK-30946 
will convert them to V2 format which is in proposal (no guarantee whether it 
will be accepted, and when).

If you're not open to try out something but open to provide your metadata 
files, please upload it somewhere and let me know. The latest 1 compact file 
would be OK but it would be better if you can provide a set of one compact 
interval (9.compact to XXX(X+1)8, 9 files). If you would like to do it 
privately, please contact me via mail, kabhwan-opensource AT gmail.com

Thanks!

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-08-12 Thread Mohan Parthasarathy (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16905824#comment-16905824
 ] 

Mohan Parthasarathy commented on SPARK-24295:
-

Can someone explain how it is useful to know about every file that was written 
since the beginning ? I can see the check for batchId corresponding to the 
offsets directory. Is the content of the compact file even checked ? If we just 
truncate the file, would it still work ?

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-03-18 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16795519#comment-16795519
 ] 

Jungtaek Lim commented on SPARK-24295:
--

[~iqbal_khattra] [~alfredo-gimenez-bv]

I would be really appreciated if you could review SPARK-27188 and see whether 
it works for your cases. Thanks in advance!

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-03-07 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787511#comment-16787511
 ] 

Jungtaek Lim commented on SPARK-24295:
--

[~alfredo-gimenez-bv]

I meant removing "deleted output files by other process" instead of "old 
metadata files". 

See here: 
[https://github.com/apache/spark/blob/d8f77e11a42bf664a02124a8b6830797979550b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala#L109-L112]

and here:

[https://github.com/apache/spark/blob/d8f77e11a42bf664a02124a8b6830797979550b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala#L100-L108]

the issue is, while the code exists to get rid of obsolete output files, 
setting FileStreamSinkLog.DELETE_ACTION never happens.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-03-07 Thread Alfredo Gimenez (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787502#comment-16787502
 ] 

Alfredo Gimenez commented on SPARK-24295:
-

Currently old metadata files are already deleted after expiring (see 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala#L222)].
 The problem is that the compaction file includes the contents from the 
previous compaction file, so every compaction is at least as large as the last 
one and they grow linearly.

I am tempted to say that old compaction files should be deleted or not included 
in the next compaction, but I don't know if that will negatively affect 
checkpointing or any other internals that may depend on the metadata logs.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-03-07 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16786569#comment-16786569
 ] 

Jungtaek Lim commented on SPARK-24295:
--

[~iqbal_khattra] [~alfredo-gimenez-bv]

Would we be happy if File Stream Sink periodically checks the existence of 
output files (maybe from older one?) and mark them as deleted so that next 
compaction would filter out them? If you think this sounds good, I'll take a 
step forward.

cc. to [~tdas] [~zsxwing] [~joseph.torres] since this looks like ongoing issue 
which multiple end users have been affected.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-03-02 Thread Alfredo Gimenez (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782510#comment-16782510
 ] 

Alfredo Gimenez commented on SPARK-24295:
-

Our current workaround FWII:

We've added a streaming query listener that, at every query progress event, 
writes out a manual checkpoint (from the QueryProgressEvent sourceOffset member 
that contains the last used source offsets). We gracefully stop the stream job 
every 6 hours, purge the _spark_metadata and spark checkpoints, and upon 
restart check for the existence of the manual checkpoint and use it if 
available. We do the stop/purge/restart via Airflow but it would be trivial to 
do this by looping around a stream awaitTermination with a provided timeout. 

A simple solution would be to just have an option to disable metadata file 
compaction that also allows old metadata files to be deleted after a delay. 
Currently it appears that all files stay around until compaction, upon which 
files older than the delay and not in the compaction are purged.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-03-01 Thread Iqbal Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781879#comment-16781879
 ] 

Iqbal Singh commented on SPARK-24295:
-

Agree, [~kabhwan]. It's very tricky to purge metadata as it can mess up the 
read for the downstream job, I think the best solution to add the purge is by 
 * Adding a retention period based on the age of the FileSink data in the 
streaming job. We already have a delete flag in the code for the compact file. 
By default, it can be set to false and the user can enable it based on his 
requirement.
 * Add functionality for the downstream jobs to avoid using "_spark_metadata" 
for reading the old dataset (by default use metadata),  as we are not purging 
the output data but only metadata log for the output. Which is a bit risky too.

 

-- We do not have any gracefull kill for Structured streaming jobs, whenever we 
need to stop a job we kill it from Command line or Resource Manager which can 
cause issues if the job is processing a batch and we will get some partially 
processed data in the output directory. In such cases reading from 
"_spark_metadata" dir is required to have exactly once guarantee else 
downstream will have duplicate data. 

Thanks for working on it, I will look into the PR also for understanding.

--Iqbal Singh

 

 

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-02-25 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777240#comment-16777240
 ] 

Jungtaek Lim commented on SPARK-24295:
--

[~alfredo-gimenez-bv]
Unfortunately no, cause I don't think metadata on file stream sink can be 
reliably purged as I explained earlier. My PR resolves the issue, but also 
breaks the internal of metadata, so have to bring some bad option too.

Btw, there's another concern SPARK-26411 which you may be interested to 
consider about it.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-02-25 Thread Alfredo Gimenez (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777197#comment-16777197
 ] 

Alfredo Gimenez commented on SPARK-24295:
-

We've run into the exact same issue, I uploaded a minimal reproducible example 
showing the continuously growing metadata compaction files. This is especially 
an issue in streaming jobs that rely on checkpointing, as we cannot purge 
metadata files and restart–the checkpointing mechanism depends on the metadata. 
A current workaround we have is to manually grab the last checkpoint offsets, 
purge both checkpoints and metadata, and set the "startingOffsets" to the 
latest offsets that we grabbed. This is obviously not ideal, as it relies on 
the current serialized data structure for the checkpoints, which can change 
with spark versions. It also introduces the possibility of losing checkpoint 
data if a spark job fails before creating a new checkpoint file.

[~kabhwan] can you point us to your PR? 

Is there another reliable workaround for this setup?

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-02-19 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16772588#comment-16772588
 ] 

Jungtaek Lim commented on SPARK-24295:
--

While I submitted the PR to reflect the last comment, I enumerated other 
options in the PR and open to apply anything if we feel better on other option.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-02-19 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16772322#comment-16772322
 ] 

Jungtaek Lim commented on SPARK-24295:
--

FileStreamSinkLog cannot be removed even we don't leverage file sink metadata 
in other query, but it can be purged since it only leverages the last batch.

Given that in high volume both of maintaining file sink metadata and reading 
file sink metadata would be problematic, I guess we could add an option to 
disable reading from metadata: let file stream sink purge metadata log just 
after adding new batch, and let file stream source skip using file sink 
metadata even it is available. Makes sense?

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2019-02-19 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16772296#comment-16772296
 ] 

Jungtaek Lim commented on SPARK-24295:
--

Please correct me if I'm missing here. I just skimmed the codebase to determine 
usage around FileStreamSinkLog, and looks like it's for other queries to speed 
up reading list of source when the query is chained.

If I'm not mistaken then I'm not sure Spark can purge this metadata, because 
Spark can't determine which files are processed by all queries, and it actually 
never happens because new query can be run at any time.

If end users periodically run file deletion based on their data retention 
policy, metadata should be purged but it cannot be done in Spark cause it 
doesn't even know file deletion is happening. So IMHO what [~iqbal_khattra] is 
doing actually seems to be the right approach - it's just not ideal because end 
users should understand about metadata and be able to modify it.

I can see the benefit of file sink metadata (avoid listing files which would 
take too long), but given that it can only grow and also would be out of sync 
when separate processes (like data retention) delete the part of sink output, 
we may need to have data retention on file sink (actually I would feel very 
strange if sink is removing its output) and purge metadata based on file 
deletion, or just don't rely on sink metadata at all.
(I feel we may need to have explicit option to not leverage file stream sink 
metadata - both source and sink.)

[~iqbal_khattra]
Could memory file index help on your case in the long run? Passing glob path 
would skip using file sink metadata.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2018-10-04 Thread Iqbal Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16638635#comment-16638635
 ] 

Iqbal Singh commented on SPARK-24295:
-

hey [~XuanYuan], 

DO we have any plans of pulling this one in coming versions, this is really a 
pain in long running high volume queries.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2018-07-18 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547873#comment-16547873
 ] 

Li Yuanjian commented on SPARK-24295:
-

Thanks for your detailed explain. 
You can check this: SPARK-17604, seems like the same requirements about purging 
the compact aged file. The small difference is we need the purge logic in 
FileStreamSinkLog while the jira support in FileSourceSinkLog, but I think the 
strategy can be reused. Also cc the original author [~jerryshao2015] for 
SPARK-17604. 


> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2018-07-16 Thread Iqbal Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545577#comment-16545577
 ] 

Iqbal Singh commented on SPARK-24295:
-

Hey [~XuanYuan],

We are processing 3000 files every 5 minutes 24X7 using structured streaming, 
File size is 120MB on average. 
 * Every Structured streaming batch commit file size is around 800KB to 1000KB 
and compact file keep track of all the data from the start of the process. It 
goes up to 8Gb after 45 days and structured streaming process takes more than 
15 mins to compact the file every 10th batch.

 * We are using Dynamic partitions while dumping the data which also increases 
the output file count for each micro batch ratio is 2:3. (2 input files give us 
3 output files). 

 * Spark forces the jobs to read the data using _spark__metadata files if the 
input directory of the job is a structured streaming output, Which wastes 
another 10-15 minutes for generating a list of files from "_spark_metadata" 
commit compact file.

 * Compact file has data in json format and grows in size very fast, if  we 
have too many files to process in each batch.

 

*File:* org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala

-- Delete Action is defined in the Class "FileStreamSinkLog" but it is not 
implemented any where in code.

 
{code:java}
object FileStreamSinkLog {
 val VERSION = 1
 val DELETE_ACTION = "delete"
 val ADD_ACTION = "add"
} 
{code}
 

-- Below code never executes, Where we are deleting the Sink logs with action 
"DELETE" while compacting the files
{code:java}
override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
 val deletedFiles = logs.filter(_.action == 
FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
 if (deletedFiles.isEmpty) {
 logs
 } else {
 logs.filter(f => !deletedFiles.contains(f.path))
 }
}{code}
 

-- We do not have batch Number info in the Compact file as a metric, it is 
tough to keep  defined number of batches  in the file. We have modification 
Time and can use it to mark the sink metadata log records as delete based on 
some data retention on time. 

 

We have developed a Spark job to read the metadata as a spark job and generate 
a list of files to have exactly once guarantee and it passes the list of files 
for a particular batch to the spark job, it takes 60 seconds to read the 
compact file using spark.

 

We are working on a explicit data purge job for the compact file to keep its 
size under control, Please let me know if more details are required and also if 
there is anything we are missing out.

 

Appreciate your help.

 

Thanks,

Iqbal Singh

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2018-07-15 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544453#comment-16544453
 ] 

Li Yuanjian commented on SPARK-24295:
-

Could you give more detailed information about how the compact file size 
growing up to 10GB in your scenario? As the implementation of 
FileStreamSinkLog, batches in compactInterval(default value is 10) will be 
merged into a single file, all the content in this file is serialized 
SinkFileStatus, it seems hardly can grow to 10GB.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org