[GitHub] spark pull request #20952: [SPARK-6951][core] Speed up parsing of event logs...

2018-04-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20952


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20952: [SPARK-6951][core] Speed up parsing of event logs...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20952#discussion_r180187517
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -125,6 +126,7 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   private val pendingReplayTasksCount = new 
java.util.concurrent.atomic.AtomicInteger(0)
--- End diff --

The class comment is no longer correct when it talks about finding new 
attempts based on modification time.  You should have a listing entry for every 
file in the dir


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20952: [SPARK-6951][core] Speed up parsing of event logs...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20952#discussion_r180197210
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -417,15 +419,23 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 .filter { entry =>
   try {
 val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
-if (info.fileSize < entry.getLen()) {
-  // Log size has changed, it should be parsed.
-  true
-} else {
+
+if (info.appId.isDefined) {
   // If the SHS view has a valid application, update the time 
the file was last seen so
   // that the entry is not deleted from the SHS listing.
-  if (info.appId.isDefined) {
-listing.write(info.copy(lastProcessed = newLastScanTime))
+  listing.write(info.copy(lastProcessed = newLastScanTime))
+}
+
+if (info.fileSize < entry.getLen()) {
+  if (info.appId.isDefined && fastInProgressParsing) {
+// When fast in-progress parsing is on, we don't need to 
re-parse when the
+// size changes, but we do need to invalidate any existing 
UIs.
+invalidateUI(info.appId.get, info.attemptId)
+false
--- End diff --

do you need to update info.fileSize here too?  Because you skip the 
re-parse, you don't update it in mergeApplicationListings.  So i think once you 
hit this condition once, you'll always invalidate the UI on every iteration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20952: [SPARK-6951][core] Speed up parsing of event logs...

2018-03-30 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/20952

[SPARK-6951][core] Speed up parsing of event logs during listing.

This change introduces two optimizations to help speed up generation
of listing data when parsing events logs.

The first one allows the parser to be stopped when enough data to
create the listing entry has been read. This is currently the start
event plus environment info, to capture UI ACLs. If the end event is
needed, the code will skip to the end of the log to try to find that
information, instead of parsing the whole log file.

Unfortunately this works better with uncompressed logs. Skipping bytes
on compressed logs only saves the work of parsing lines and some events,
so not a lot of gains are observed.

The second optimization deals with in-progress logs. It works in two
ways: first, it completely avoids parsing the rest of the log for
these apps when enough listing data is read. This, unlike the above,
also speeds things up for compressed logs, since only the very beginning
of the log has to be read.

On top of data, the code that decides whether to re-parse logs to get
updated listing data will ignore in-progress applications until they've
completed.

Both optimizations can be disabled but are enabled by default.

I tested this on some fake event logs to see the effect. I created
500 logs of about 60M each (so ~30G uncompressed; each log was 1.7M
when compressed with zstd). Below, C = completed, IP = in-progress,
the size means the amount of data re-parsed at the end of logs
when necessary.

```
none/C   none/IP   zstd/C   zstd/IP
On / 16k  2s   2s   22s   2s
On / 1m   3s   2s   24s   2s
Off  1.1m 1.1m  26s  24s
```

This was with 4 threads on a single local SSD. As expected from the
previous explanations, there are considerable gains for in-progress
logs, and for uncompressed logs, but not so much when looking at the
full compressed log.

As a side note, I removed the custom code to get the scan time by
creating a file on HDFS; since file mod times are not used to detect
changed logs anymore, local time is enough for the current use of
the SHS.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-6951

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20952.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20952


commit 5d3e0f40a3861374cf934887dec231a83a31ac4e
Author: Marcelo Vanzin 
Date:   2018-03-09T20:00:04Z

[SPARK-6951][core] Speed up parsing of event logs during listing.

This change introduces two optimizations to help speed up generation
of listing data when parsing events logs.

The first one allows the parser to be stopped when enough data to
create the listing entry has been read. This is currently the start
event plus environment info, to capture UI ACLs. If the end event is
needed, the code will skip to the end of the log to try to find that
information, instead of parsing the whole log file.

Unfortunately this works better with uncompressed logs. Skipping bytes
on compressed logs only saves the work of parsing lines and some events,
so not a lot of gains are observed.

The second optimization deals with in-progress logs. It works in two
ways: first, it completely avoids parsing the rest of the log for
these apps when enough listing data is read. This, unlike the above,
also speeds things up for compressed logs, since only the very beginning
of the log has to be read.

On top of data, the code that decides whether to re-parse logs to get
updated listing data will ignore in-progress applications until they've
completed.

Both optimizations can be disabled but are enabled by default.

I tested this on some fake event logs to see the effect. I created
500 logs of about 60M each (so ~30G uncompressed; each log was 1.7M
when compressed with zstd). Below, C = completed, IP = in-progress,
the size means the amount of data re-parsed at the end of logs
when necessary.

```
none/C   none/IP   zstd/C   zstd/IP
On / 16k  2s   2s   22s   2s
On / 1m   3s   2s   24s   2s
Off  1.1m 1.1m  26s  24s
```

This was with 4 threads on a single local SSD. As expected from the
previous explanations, there are considerable gains for in-progress
logs, and for uncompressed logs, but not so much when looking at the
full compressed log.