This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 145cc2b [LIVY-633][SERVER] session should not be gc-ed for long
running queries
145cc2b is described below
commit 145cc2b77db4c7d7bdd8f953dc3a16856d9fcf0f
Author: yihengwang
AuthorDate: Tue Sep 17 17:21:28 2019 +0800
[LIVY-633][SERVER] session should not be gc-ed for long running queries
## What changes were proposed in this pull request?
Currently, Livy records the last activity time of the session before
statement execution. If a statement runs too long, exceeding then the session
timeout, the session will be garbage collected after the statement execution.
This should not be the expected behavior. The statement execution time
should not be count into idle. We should update the last activity time after
the statement execution.
We cannot be updated when session changes state from busy to idle in the
Session class. So in this patch, we add a replLastActivity field into the
rscClient, which will be updated when the repl state changes. So when session
changes its state from busy to idle, this field will catch the time and finally
reflect on the session last activity.
## How was this patch tested?
Manual test. Also, add a new unit test.
Existing unit tests and integration tests.
Author: yihengwang
Author: Yiheng Wang
Closes #224 from yiheng/fix_633.
---
rsc/pom.xml| 7 +++
rsc/src/main/java/org/apache/livy/rsc/RSCClient.java | 18 ++
.../livy/server/interactive/InteractiveSession.scala | 12
.../server/interactive/InteractiveSessionSpec.scala| 15 +++
4 files changed, 52 insertions(+)
diff --git a/rsc/pom.xml b/rsc/pom.xml
index dcb58a6..1f3d6a3 100644
--- a/rsc/pom.xml
+++ b/rsc/pom.xml
@@ -49,6 +49,13 @@
${project.version}
test
+
+ org.apache.livy
+ livy-core_${scala.binary.version}
+ ${project.version}
+
+ provided
+
com.esotericsoftware.kryo
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
index f2879b8..c1c9534 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
@@ -44,6 +44,7 @@ import org.apache.livy.client.common.BufferUtils;
import org.apache.livy.rsc.driver.AddFileJob;
import org.apache.livy.rsc.driver.AddJarJob;
import org.apache.livy.rsc.rpc.Rpc;
+import org.apache.livy.sessions.SessionState;
import static org.apache.livy.rsc.RSCConf.Entry.*;
@@ -64,6 +65,8 @@ public class RSCClient implements LivyClient {
private Process driverProcess;
private volatile boolean isAlive;
private volatile String replState;
+ // Record the last activity timestamp of the repl
+ private volatile long replLastActivity = System.nanoTime();
RSCClient(RSCConf conf, Promise ctx, Process driverProcess)
throws IOException {
this.conf = conf;
@@ -315,6 +318,16 @@ public class RSCClient implements LivyClient {
return replState;
}
+ /**
+ * Get the timestamp of the last activity of the repl. It will be updated
when the repl state
+ * changed from busy to idle
+ *
+ * @return last activity timestamp
+ */
+ public long getReplLastActivity() {
+return replLastActivity;
+ }
+
private class ClientProtocol extends BaseProtocol {
JobHandleImpl submit(Job job) {
@@ -411,6 +424,11 @@ public class RSCClient implements LivyClient {
private void handle(ChannelHandlerContext ctx, ReplState msg) {
LOG.trace("Received repl state for {}", msg.state);
+ // Update last activity timestamp when state change is from busy to idle.
+ if (SessionState.Busy$.MODULE$.state().equals(replState) && msg != null
&&
+SessionState.Idle$.MODULE$.state().equals(msg.state)) {
+replLastActivity = System.nanoTime();
+ }
replState = msg.state;
}
}
diff --git
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index bccdb4d..cdeddda 100644
---
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -626,4 +626,16 @@ class InteractiveSession(
}
override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
+
+ override def lastActivity: Long = {
+val serverSideLastActivity = super.lastActivity
+if (serverSideState == SessionState.Running) {
+ // If the rsc client is running, we compare the lastActivity of the
sessi