http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala ---------------------------------------------------------------------- diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala new file mode 100644 index 0000000..9fdd702 --- /dev/null +++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/HeartbeatClientSpec.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.kernel.protocol.v5.client.socket + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.testkit.{TestProbe, ImplicitSender, TestKit} +import com.ibm.spark.communication.ZMQMessage +import com.ibm.spark.kernel.protocol.v5.client.ActorLoader +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSpecLike} +import org.mockito.Matchers._ +import org.mockito.Mockito._ + +class HeartbeatClientSpec extends TestKit(ActorSystem("HeartbeatActorSpec")) + with ImplicitSender with FunSpecLike with Matchers with MockitoSugar { + + describe("HeartbeatClientActor") { + val socketFactory = mock[SocketFactory] + val mockActorLoader = mock[ActorLoader] + val probe : TestProbe = TestProbe() + when(socketFactory.HeartbeatClient(any(classOf[ActorSystem]), any(classOf[ActorRef]))).thenReturn(probe.ref) + + val heartbeatClient = system.actorOf(Props( + classOf[HeartbeatClient], socketFactory, mockActorLoader, true + )) + + describe("send heartbeat") { + it("should send ping ZMQMessage") { + heartbeatClient ! HeartbeatMessage + probe.expectMsgClass(classOf[ZMQMessage]) + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala ---------------------------------------------------------------------- diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala new file mode 100644 index 0000000..b592dcd --- /dev/null +++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala @@ -0,0 +1,300 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.kernel.protocol.v5.client.socket + +import java.util.UUID + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.pattern.ask +import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import akka.util.Timeout +import com.ibm.spark.comm.{CommCallbacks, CommRegistrar, CommStorage, CommWriter} +import com.ibm.spark.communication.ZMQMessage +import com.ibm.spark.kernel.protocol.v5 +import com.ibm.spark.kernel.protocol.v5._ +import com.ibm.spark.kernel.protocol.v5.client.Utilities._ +import com.ibm.spark.kernel.protocol.v5.client.execution.{DeferredExecution, DeferredExecutionManager} +import com.ibm.spark.kernel.protocol.v5.client.{ActorLoader, Utilities} +import com.ibm.spark.kernel.protocol.v5.content.{CommClose, CommMsg, CommOpen, StreamContent} +import com.typesafe.config.ConfigFactory +import org.mockito.Matchers.{eq => mockEq, _} +import org.mockito.Mockito._ +import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.mock.MockitoSugar +import org.scalatest.time.{Milliseconds, Span} +import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers} +import play.api.libs.json.Json + +import scala.concurrent.duration._ +import scala.concurrent.{Future, Promise} +import scala.util.Failure + +object IOPubClientSpec { + val config =""" + akka { + loglevel = "WARNING" + }""" +} + +class IOPubClientSpec extends TestKit(ActorSystem( + "IOPubClientSpecSystem", ConfigFactory.parseString(IOPubClientSpec.config) +)) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar + with ScalaFutures with BeforeAndAfter with Eventually +{ + private val TestTimeout = Timeout(10.seconds) + implicit override val patienceConfig = PatienceConfig( + timeout = scaled(Span(200, Milliseconds)), + interval = scaled(Span(5, Milliseconds)) + ) + private val SignatureEnabled = true + + private var clientSocketProbe: TestProbe = _ + private var mockClientSocketFactory: SocketFactory = _ + private var mockActorLoader: ActorLoader = _ + private var mockCommRegistrar: CommRegistrar = _ + private var spyCommStorage: CommStorage = _ + private var mockCommCallbacks: CommCallbacks = _ + private var ioPubClient: ActorRef = _ + + private var kmBuilder: KMBuilder = _ + + private val id = UUID.randomUUID().toString + private val TestTargetName = "some target" + private val TestCommId = UUID.randomUUID().toString + + before { + kmBuilder = KMBuilder() + mockCommCallbacks = mock[CommCallbacks] + mockCommRegistrar = mock[CommRegistrar] + + spyCommStorage = spy(new CommStorage()) + + clientSocketProbe = TestProbe() + mockActorLoader = mock[ActorLoader] + mockClientSocketFactory = mock[SocketFactory] + + // Stub the return value for the socket factory + when(mockClientSocketFactory.IOPubClient(anyObject(), any[ActorRef])) + .thenReturn(clientSocketProbe.ref) + + // Construct the object we will test against + ioPubClient = system.actorOf(Props( + classOf[IOPubClient], mockClientSocketFactory, mockActorLoader, + SignatureEnabled, mockCommRegistrar, spyCommStorage + )) + } + + describe("IOPubClient") { + describe("#receive") { + it("should execute all Comm open callbacks on comm_open message") { + val message: ZMQMessage = kmBuilder + .withHeader(CommOpen.toTypeString) + .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty)) + .build + + // Mark as target being provided + doReturn(Some(mockCommCallbacks)).when(spyCommStorage) + .getTargetCallbacks(anyString()) + + // Simulate receiving a message from the kernel + ioPubClient ! message + + // Check to see if "eventually" the callback is triggered + eventually { + verify(mockCommCallbacks).executeOpenCallbacks( + any[CommWriter], mockEq(TestCommId), + mockEq(TestTargetName), any[v5.MsgData]) + } + } + + it("should not execute Comm open callbacks if the target is not found") { + val message: ZMQMessage = kmBuilder + .withHeader(CommOpen.toTypeString) + .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty)) + .build + + // Mark as target NOT being provided + doReturn(None).when(spyCommStorage).getTargetCallbacks(anyString()) + + // Simulate receiving a message from the kernel + ioPubClient ! message + + // Check to see if "eventually" the callback is NOT triggered + eventually { + // Check that we have checked if the target exists + verify(spyCommStorage).getTargetCallbacks(TestTargetName) + + verify(mockCommCallbacks, never()).executeOpenCallbacks( + any[CommWriter], mockEq(TestCommId), + mockEq(TestTargetName), any[v5.MsgData]) + verify(mockCommRegistrar, never()).link(TestTargetName, TestCommId) + } + } + + it("should execute all Comm msg callbacks on comm_msg message") { + val message: ZMQMessage = kmBuilder + .withHeader(CommMsg.toTypeString) + .withContentString(CommMsg(TestCommId, v5.MsgData.Empty)) + .build + + // Mark as target being provided + doReturn(Some(mockCommCallbacks)).when(spyCommStorage) + .getCommIdCallbacks(any[v5.UUID]) + + // Simulate receiving a message from the kernel + ioPubClient ! message + + // Check to see if "eventually" the callback is triggered + eventually { + verify(mockCommCallbacks).executeMsgCallbacks( + any[CommWriter], mockEq(TestCommId), any[v5.MsgData]) + } + } + + it("should not execute Comm msg callbacks if the Comm id is not found") { + val message: ZMQMessage = kmBuilder + .withHeader(CommMsg.toTypeString) + .withContentString(CommMsg(TestCommId, v5.MsgData.Empty)) + .build + + // Mark as target NOT being provided + doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID]) + + // Simulate receiving a message from the kernel + ioPubClient ! message + + // Check to see if "eventually" the callback is NOT triggered + eventually { + // Check that we have checked if the target exists + verify(spyCommStorage).getCommIdCallbacks(TestCommId) + + verify(mockCommCallbacks, never()).executeMsgCallbacks( + any[CommWriter], mockEq(TestCommId), any[v5.MsgData]) + } + } + + it("should execute all Comm close callbacks on comm_close message") { + val message: ZMQMessage = kmBuilder + .withHeader(CommClose.toTypeString) + .withContentString(CommClose(TestCommId, v5.MsgData.Empty)) + .build + + // Mark as target being provided + doReturn(Some(mockCommCallbacks)).when(spyCommStorage) + .getCommIdCallbacks(any[v5.UUID]) + + // Simulate receiving a message from the kernel + ioPubClient ! message + + // Check to see if "eventually" the callback is triggered + eventually { + verify(mockCommCallbacks).executeCloseCallbacks( + any[CommWriter], mockEq(TestCommId), any[v5.MsgData]) + } + } + + it("should not execute Comm close callbacks if Comm id is not found") { + val message: ZMQMessage = kmBuilder + .withHeader(CommClose.toTypeString) + .withContentString(CommClose(TestCommId, v5.MsgData.Empty)) + .build + + // Mark as target NOT being provided + doReturn(None).when(spyCommStorage).getCommIdCallbacks(any[v5.UUID]) + + // Simulate receiving a message from the kernel + ioPubClient ! message + + // Check to see if "eventually" the callback is NOT triggered + eventually { + // Check that we have checked if the target exists + verify(spyCommStorage).getCommIdCallbacks(TestCommId) + + verify(mockCommCallbacks, never()).executeCloseCallbacks( + any[CommWriter], mockEq(TestCommId), any[v5.MsgData]) + } + } + + it("should call a registered callback on stream message") { + val result = StreamContent("foo", "bar") + val header = Header(id, "spark", id, + MessageType.Outgoing.Stream.toString, "5.0") + val parentHeader = Header(id, "spark", id, + MessageType.Incoming.ExecuteRequest.toString, "5.0") + + val kernelMessage = new KernelMessage( + Seq[String](), + "", + header, + parentHeader, + Metadata(), + Json.toJson(result).toString() + ) + val promise: Promise[String] = Promise() + val de: DeferredExecution = DeferredExecution().onStream( + (content: StreamContent) => { + promise.success(content.text) + } + ) + DeferredExecutionManager.add(id, de) + // Send the message to the IOPubClient + val zmqMessage: ZMQMessage = kernelMessage + + ioPubClient ! zmqMessage + + whenReady(promise.future) { + case res: String => + res should be eq("bar") + case _ => + fail(s"Received failure when asking IOPubClient") + } + } + + it("should not invoke callback when stream message's parent header is null") { + // Construct the kernel message + val result = StreamContent("foo", "bar") + val header = Header(id, "spark", id, + MessageType.Outgoing.Stream.toString, "5.0") + + val kernelMessage = new KernelMessage( + Seq[String](), + "", + header, + null, + Metadata(), + Json.toJson(result).toString() + ) + + // Send the message to the IOPubClient + val zmqMessage: ZMQMessage = kernelMessage + val futureResult: Future[Any] = ioPubClient.ask(zmqMessage)(TestTimeout) + whenReady(futureResult) { + case result: Failure[Any] => + // Getting the value of the failure will cause the underlying exception will be thrown + try { + result.get + } catch { + case t:RuntimeException => + t.getMessage should be("Parent Header was null in Kernel Message.") + } + case result => + fail(s"Did not receive failure!! ${result}") + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala ---------------------------------------------------------------------- diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala new file mode 100644 index 0000000..0110dfd --- /dev/null +++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala @@ -0,0 +1,79 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.kernel.protocol.v5.client.socket + +import java.util.UUID + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.testkit.{TestProbe, ImplicitSender, TestKit} +import com.ibm.spark.communication.ZMQMessage +import com.ibm.spark.communication.security.SecurityActorType +import com.ibm.spark.kernel.protocol.v5._ +import com.ibm.spark.kernel.protocol.v5.client.ActorLoader +import com.ibm.spark.kernel.protocol.v5.content.ExecuteRequest +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSpecLike} +import org.mockito.Mockito._ +import org.mockito.Matchers._ +import play.api.libs.json.Json + +class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec")) + with ImplicitSender with FunSpecLike with Matchers with MockitoSugar { + private val SignatureEnabled = true + + describe("ShellClientActor") { + val socketFactory = mock[SocketFactory] + val mockActorLoader = mock[ActorLoader] + val probe : TestProbe = TestProbe() + when(socketFactory.ShellClient( + any(classOf[ActorSystem]), any(classOf[ActorRef]) + )).thenReturn(probe.ref) + + val signatureManagerProbe = TestProbe() + doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString)) + .when(mockActorLoader).load(SecurityActorType.SignatureManager) + + val shellClient = system.actorOf(Props( + classOf[ShellClient], socketFactory, mockActorLoader, SignatureEnabled + )) + + describe("send execute request") { + it("should send execute request") { + val request = ExecuteRequest( + "foo", false, true, UserExpressions(), true + ) + val header = Header( + UUID.randomUUID().toString, "spark", + UUID.randomUUID().toString, MessageType.Incoming.ExecuteRequest.toString, + "5.0" + ) + val kernelMessage = KernelMessage( + Seq[String](), "", + header, HeaderBuilder.empty, + Metadata(), Json.toJson(request).toString + ) + shellClient ! kernelMessage + + // Echo back the kernel message sent to have a signature injected + signatureManagerProbe.expectMsgClass(classOf[KernelMessage]) + signatureManagerProbe.reply(kernelMessage) + + probe.expectMsgClass(classOf[ZMQMessage]) + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala ---------------------------------------------------------------------- diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala new file mode 100644 index 0000000..877c4f5 --- /dev/null +++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/StdinClientSpec.scala @@ -0,0 +1,160 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.kernel.protocol.v5.client.socket + +import akka.actor.{ActorRef, Props, ActorSystem} +import akka.testkit.{TestProbe, ImplicitSender, TestKit} +import com.ibm.spark.communication.ZMQMessage +import com.ibm.spark.communication.security.SecurityActorType +import com.ibm.spark.kernel.protocol.v5._ +import com.ibm.spark.kernel.protocol.v5.client.ActorLoader +import com.ibm.spark.kernel.protocol.v5.client.socket.StdinClient.ResponseFunction +import com.ibm.spark.kernel.protocol.v5.content.{InputReply, InputRequest, ClearOutput, ExecuteRequest} +import org.scalatest.mock.MockitoSugar +import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers} +import com.ibm.spark.kernel.protocol.v5.client.Utilities._ +import play.api.libs.json.Json +import scala.concurrent.duration._ + +import org.mockito.Mockito._ +import org.mockito.Matchers._ + +class StdinClientSpec extends TestKit(ActorSystem("StdinActorSpec")) + with ImplicitSender with FunSpecLike with Matchers with MockitoSugar + with BeforeAndAfter +{ + private val SignatureEnabled = true + private val TestReplyString = "some value" + private val TestResponseFunc: ResponseFunction = (_, _) => TestReplyString + + private var mockSocketFactory: SocketFactory = _ + private var mockActorLoader: ActorLoader = _ + private var signatureManagerProbe: TestProbe = _ + private var socketProbe: TestProbe = _ + private var stdinClient: ActorRef = _ + + before { + socketProbe = TestProbe() + signatureManagerProbe = TestProbe() + mockSocketFactory = mock[SocketFactory] + mockActorLoader = mock[ActorLoader] + doReturn(system.actorSelection(signatureManagerProbe.ref.path.toString)) + .when(mockActorLoader).load(SecurityActorType.SignatureManager) + doReturn(socketProbe.ref).when(mockSocketFactory) + .StdinClient(any[ActorSystem], any[ActorRef]) + + stdinClient = system.actorOf(Props( + classOf[StdinClient], mockSocketFactory, mockActorLoader, SignatureEnabled + )) + + // Set the response function for our client socket + stdinClient ! TestResponseFunc + } + + describe("StdinClient") { + describe("#receive") { + it("should update the response function if receiving a new one") { + val expected = "some other value" + val replacementFunc: ResponseFunction = (_, _) => expected + + // Update the function + stdinClient ! replacementFunc + + val inputRequestMessage: ZMQMessage = KMBuilder() + .withHeader(InputRequest.toTypeString) + .withContentString(InputRequest("", false)) + .build + + stdinClient ! inputRequestMessage + + // Echo back the kernel message sent to have a signature injected + signatureManagerProbe.expectMsgPF() { + case kernelMessage: KernelMessage => + signatureManagerProbe.reply(kernelMessage) + true + } + + socketProbe.expectMsgPF() { + case zmqMessage: ZMQMessage => + val kernelMessage: KernelMessage = zmqMessage + val inputReply = + Json.parse(kernelMessage.contentString).as[InputReply] + inputReply.value should be (expected) + } + } + + it("should do nothing if the incoming message is not an input_request") { + val notInputRequestMessage: ZMQMessage = KMBuilder() + .withHeader(ClearOutput.toTypeString) + .build + + stdinClient ! notInputRequestMessage + + socketProbe.expectNoMsg(300.milliseconds) + } + + it("should respond with an input_reply if the incoming message is " + + "an input_request") { + val inputRequestMessage: ZMQMessage = KMBuilder() + .withHeader(InputRequest.toTypeString) + .withContentString(InputRequest("", false)) + .build + + stdinClient ! inputRequestMessage + + // Echo back the kernel message sent to have a signature injected + signatureManagerProbe.expectMsgPF() { + case kernelMessage: KernelMessage => + signatureManagerProbe.reply(kernelMessage) + true + } + + socketProbe.expectMsgPF() { + case zmqMessage: ZMQMessage => + val kernelMessage: KernelMessage = zmqMessage + val messageType = kernelMessage.header.msg_type + messageType should be (InputReply.toTypeString) + } + } + + it("should use the result from the response function if the incoming " + + "message is an input_request") { + val inputRequestMessage: ZMQMessage = KMBuilder() + .withHeader(InputRequest.toTypeString) + .withContentString(InputRequest("", false)) + .build + + stdinClient ! inputRequestMessage + + // Echo back the kernel message sent to have a signature injected + signatureManagerProbe.expectMsgPF() { + case kernelMessage: KernelMessage => + signatureManagerProbe.reply(kernelMessage) + true + } + + socketProbe.expectMsgPF() { + case zmqMessage: ZMQMessage => + val kernelMessage: KernelMessage = zmqMessage + val inputReply = + Json.parse(kernelMessage.contentString).as[InputReply] + inputReply.value should be (TestReplyString) + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala b/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala deleted file mode 100644 index 994360f..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication - -import java.util.UUID -import java.util.concurrent.ConcurrentHashMap -import com.ibm.spark.communication.socket._ -import org.zeromq.ZMQ - -import scala.collection.JavaConverters._ - -/** - * Represents the factory for sockets that also manages ZMQ contexts and - * facilitates closing of sockets created by the factory. - */ -class SocketManager { - /** - * Creates a new ZMQ context with a single IO thread. - * - * @return The new ZMQ context - */ - protected def newZmqContext(): ZMQ.Context = ZMQ.context(1) - - private val socketToContextMap = - new ConcurrentHashMap[SocketLike, ZMQ.Context]().asScala - - /** - * Provides and registers a new ZMQ context, used for creating a new socket. - * @param mkSocket a function that creates a socket using a given context - * @return the new socket - * @see newZmqContext - */ - private def withNewContext[A <: SocketLike](mkSocket: ZMQ.Context => A): A = { - val ctx = newZmqContext() - val socket = mkSocket(ctx) - socketToContextMap.put(socket, ctx) - socket - } - - /** - * Closes the socket provided and also closes the context if no more sockets - * are using the context. - * - * @param socket The socket to close - */ - def closeSocket(socket: SocketLike) = { - socket.close() - - socketToContextMap.remove(socket).foreach(context => { - if (!socketToContextMap.values.exists(_ == context)) context.close() - }) - } - - /** - * Creates a new request socket. - * - * @param address The address to associate with the socket - * @param inboundMessageCallback The callback to use for incoming messages - * - * @return The new socket instance - */ - def newReqSocket( - address: String, - inboundMessageCallback: (Seq[String]) => Unit - ): SocketLike = withNewContext{ ctx => - new JeroMQSocket(new ReqSocketRunnable( - ctx, - Some(inboundMessageCallback), - Connect(address), - Linger(0) - )) - } - - /** - * Creates a new reply socket. - * - * @param address The address to associate with the socket - * @param inboundMessageCallback The callback to use for incoming messages - * - * @return The new socket instance - */ - def newRepSocket( - address: String, - inboundMessageCallback: (Seq[String]) => Unit - ): SocketLike = withNewContext{ ctx => - new JeroMQSocket(new ZeroMQSocketRunnable( - ctx, - RepSocket, - Some(inboundMessageCallback), - Bind(address), - Linger(0) - )) - } - - /** - * Creates a new publish socket. - * - * @param address The address to associate with the socket - * - * @return The new socket instance - */ - def newPubSocket( - address: String - ): SocketLike = withNewContext{ ctx => - new JeroMQSocket(new PubSocketRunnable( - ctx, - Bind(address), - Linger(0) - )) - } - - /** - * Creates a new subscribe socket. - * - * @param address The address to associate with the socket - * @param inboundMessageCallback The callback to use for incoming messages - * - * @return The new socket instance - */ - def newSubSocket( - address: String, - inboundMessageCallback: (Seq[String]) => Unit - ): SocketLike = withNewContext { ctx => - new JeroMQSocket(new ZeroMQSocketRunnable( - ctx, - SubSocket, - Some(inboundMessageCallback), - Connect(address), - Linger(0), - Subscribe.all - )) - } - - /** - * Creates a new router socket. - * - * @param address The address to associate with the socket - * @param inboundMessageCallback The callback to use for incoming messages - * - * @return The new socket instance - */ - def newRouterSocket( - address: String, - inboundMessageCallback: (Seq[String]) => Unit - ): SocketLike = withNewContext { ctx => - new JeroMQSocket(new ZeroMQSocketRunnable( - ctx, - RouterSocket, - Some(inboundMessageCallback), - Bind(address), - Linger(0) - )) - } - - /** - * Creates a new dealer socket. - * - * @param address The address to associate with the socket - * @param inboundMessageCallback The callback to use for incoming messages - * - * @return The new socket instance - */ - def newDealerSocket( - address: String, - inboundMessageCallback: (Seq[String]) => Unit, - identity: String = UUID.randomUUID().toString - ): SocketLike = withNewContext{ ctx => - new JeroMQSocket(new ZeroMQSocketRunnable( - ctx, - DealerSocket, - Some(inboundMessageCallback), - Connect(address), - Linger(0), - Identity(identity.getBytes(ZMQ.CHARSET)) - )) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala b/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala deleted file mode 100644 index ffa7705..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/ZMQMessage.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication - -import akka.util.ByteString - -/** - * Represents a ZeroMQ message containing a collection of Akka ByteString - * instances. - * - * @note This is left in for backwards compatibility! - * - * @param frames The collection of Akka ByteString instances - */ -case class ZMQMessage(frames: ByteString*) { - def frame(i: Int) = frames(i) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala deleted file mode 100644 index 0f0497d..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/actors/DealerSocketActor.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.actors - -import akka.actor.{Actor, ActorRef} -import akka.util.ByteString -import com.ibm.spark.communication.{ZMQMessage, SocketManager} -import com.ibm.spark.utils.LogLike -import org.zeromq.ZMQ - -/** - * Represents an actor containing a dealer socket. - * - * @param connection The address to connect to - * @param listener The actor to send incoming messages back to - */ -class DealerSocketActor(connection: String, listener: ActorRef) - extends Actor with LogLike -{ - logger.debug(s"Initializing dealer socket actor for $connection") - private val manager: SocketManager = new SocketManager - private val socket = manager.newDealerSocket(connection, (message: Seq[String]) => { - listener ! ZMQMessage(message.map(ByteString.apply): _*) - }) - - override def postStop(): Unit = { - manager.closeSocket(socket) - } - - override def receive: Actor.Receive = { - case zmqMessage: ZMQMessage => - val frames = zmqMessage.frames.map(byteString => - new String(byteString.toArray, ZMQ.CHARSET)) - socket.send(frames: _*) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala deleted file mode 100644 index f74764e..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/actors/PubSocketActor.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.actors - -import akka.actor.Actor -import com.ibm.spark.communication.utils.OrderedSupport -import com.ibm.spark.communication.{SocketManager, ZMQMessage} -import com.ibm.spark.kernel.protocol.v5.KernelMessage -import com.ibm.spark.utils.LogLike -import org.zeromq.ZMQ - -/** - * Represents an actor containing a publish socket. - * - * Note: OrderedSupport is used to ensure correct processing order. - * A similar pattern may be useful for other socket actors if - * issues arise in the future. - * - * @param connection The address to bind to - */ -class PubSocketActor(connection: String) - extends Actor with LogLike with OrderedSupport -{ - logger.debug(s"Initializing publish socket actor for $connection") - private val manager: SocketManager = new SocketManager - private val socket = manager.newPubSocket(connection) - - override def postStop(): Unit = { - manager.closeSocket(socket) - } - - override def receive: Actor.Receive = { - case zmqMessage: ZMQMessage => withProcessing { - val frames = zmqMessage.frames.map(byteString => - new String(byteString.toArray, ZMQ.CHARSET)) - - socket.send(frames: _*) - } - } - - /** - * Defines the types that will be stashed by {@link #waiting() waiting} - * while the Actor is in processing state. - * @return - */ - override def orderedTypes(): Seq[Class[_]] = Seq(classOf[ZMQMessage]) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala deleted file mode 100644 index b8643f5..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/actors/RepSocketActor.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.actors - -import akka.actor.{Actor, ActorRef} -import akka.util.ByteString -import com.ibm.spark.communication.{SocketManager, ZMQMessage} -import com.ibm.spark.utils.LogLike -import org.zeromq.ZMQ - -/** - * Represents an actor containing a reply socket. - * - * @param connection The address to bind to - * @param listener The actor to send incoming messages back to - */ -class RepSocketActor(connection: String, listener: ActorRef) - extends Actor with LogLike -{ - logger.debug(s"Initializing reply socket actor for $connection") - private val manager: SocketManager = new SocketManager - private val socket = manager.newRepSocket(connection, (message: Seq[String]) => { - listener ! ZMQMessage(message.map(ByteString.apply): _*) - }) - - override def postStop(): Unit = { - manager.closeSocket(socket) - } - - override def receive: Actor.Receive = { - case zmqMessage: ZMQMessage => - val frames = zmqMessage.frames.map(byteString => - new String(byteString.toArray, ZMQ.CHARSET)) - socket.send(frames: _*) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala deleted file mode 100644 index e38f2a0..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/actors/ReqSocketActor.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.actors - -import akka.actor.{Actor, ActorRef} -import akka.util.ByteString -import com.ibm.spark.communication.{ZMQMessage, SocketManager} -import com.ibm.spark.utils.LogLike -import org.zeromq.ZMQ - -/** - * Represents an actor containing a request socket. - * - * @param connection The address to connect to - * @param listener The actor to send incoming messages back to - */ -class ReqSocketActor(connection: String, listener: ActorRef) - extends Actor with LogLike -{ - logger.debug(s"Initializing request socket actor for $connection") - private val manager: SocketManager = new SocketManager - private val socket = manager.newReqSocket(connection, (message: Seq[String]) => { - listener ! ZMQMessage(message.map(ByteString.apply): _*) - }) - - override def postStop(): Unit = { - manager.closeSocket(socket) - } - - override def receive: Actor.Receive = { - case zmqMessage: ZMQMessage => - val frames = zmqMessage.frames.map(byteString => - new String(byteString.toArray, ZMQ.CHARSET)) - socket.send(frames: _*) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala deleted file mode 100644 index 6aa3bc5..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/actors/RouterSocketActor.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.actors - -import akka.actor.{Actor, ActorRef} -import akka.util.ByteString -import com.ibm.spark.communication.{SocketManager, ZMQMessage} -import com.ibm.spark.utils.LogLike -import org.zeromq.ZMQ - -/** - * Represents an actor containing a router socket. - * - * @param connection The address to bind to - * @param listener The actor to send incoming messages back to - */ -class RouterSocketActor(connection: String, listener: ActorRef) - extends Actor with LogLike -{ - logger.debug(s"Initializing router socket actor for $connection") - private val manager: SocketManager = new SocketManager - private val socket = manager.newRouterSocket(connection, (message: Seq[String]) => { - listener ! ZMQMessage(message.map(ByteString.apply): _*) - }) - - override def postStop(): Unit = { - manager.closeSocket(socket) - } - - override def receive: Actor.Receive = { - case zmqMessage: ZMQMessage => - val frames = zmqMessage.frames.map(byteString => - new String(byteString.toArray, ZMQ.CHARSET)) - socket.send(frames: _*) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala b/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala deleted file mode 100644 index 8fef496..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/actors/SubSocketActor.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.actors - -import akka.actor.{Actor, ActorRef} -import akka.util.ByteString -import com.ibm.spark.communication.{ZMQMessage, SocketManager} -import com.ibm.spark.utils.LogLike - -/** - * Represents an actor containing a subscribe socket. - * - * @param connection The address to connect to - * @param listener The actor to send incoming messages back to - */ -class SubSocketActor(connection: String, listener: ActorRef) - extends Actor with LogLike -{ - logger.debug(s"Initializing subscribe socket actor for $connection") - private val manager: SocketManager = new SocketManager - private val socket = manager.newSubSocket(connection, (message: Seq[String]) => { - listener ! ZMQMessage(message.map(ByteString.apply): _*) - }) - - override def postStop(): Unit = { - manager.closeSocket(socket) - } - - override def receive: Actor.Receive = { - case _ => - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala b/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala deleted file mode 100644 index 9f44177..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/security/Hmac.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.communication.security - -import javax.crypto.Mac -import javax.crypto.spec.SecretKeySpec - -import com.ibm.spark.communication.security.HmacAlgorithm.HmacAlgorithm - -object HmacAlgorithm extends Enumeration { - type HmacAlgorithm = Value - - def apply(key: String) = Value(key) - - val MD5 = Value("HmacMD5") - val SHA1 = Value("HmacSHA1") - val SHA256 = Value("HmacSHA256") -} - -object Hmac { - - def apply(key: String, algorithm: HmacAlgorithm = HmacAlgorithm.SHA256) = - new Hmac(key, algorithm) - - def newMD5(key: String): Hmac = this(key, HmacAlgorithm.MD5) - def newSHA1(key: String): Hmac = this(key, HmacAlgorithm.SHA1) - def newSHA256(key: String): Hmac = this(key, HmacAlgorithm.SHA256) -} - -class Hmac( - val key: String, - val algorithm: HmacAlgorithm = HmacAlgorithm.SHA256 -) { - - private var mac: Mac = _ - private var secretKeySpec: SecretKeySpec = _ - - if (key.nonEmpty) { - mac = Mac.getInstance(algorithm.toString) - secretKeySpec = new SecretKeySpec(key.getBytes, algorithm.toString) - mac.init(secretKeySpec) - } - - def apply(items: String*): String = digest(items) - - def digest(items: Seq[String]): String = if (key.nonEmpty) { - mac synchronized { - items.map(_.getBytes("UTF-8")).foreach(mac.update) - mac.doFinal().map("%02x" format _).mkString - } - } else "" -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala b/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala deleted file mode 100644 index c3fabd7..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureCheckerActor.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.communication.security - -import akka.actor.Actor -import com.ibm.spark.communication.utils.OrderedSupport -import com.ibm.spark.utils.LogLike - -/** - * Verifies whether or not a kernel message has a valid signature. - * @param hmac The HMAC to use for signature validation - */ -class SignatureCheckerActor( - private val hmac: Hmac -) extends Actor with LogLike with OrderedSupport { - override def receive: Receive = { - case (signature: String, blob: Seq[_]) => withProcessing { - val stringBlob: Seq[String] = blob.map(_.toString) - val hmacString = hmac(stringBlob: _*) - val isValidSignature = hmacString == signature - logger.trace(s"Signature ${signature} validity checked against " + - s"hmac ${hmacString} with outcome ${isValidSignature}") - sender ! isValidSignature - } - } - - /** - * Defines the types that will be stashed by {@link #waiting() waiting} - * while the Actor is in processing state. - * @return - */ - override def orderedTypes(): Seq[Class[_]] = Seq(classOf[(String, Seq[_])]) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala b/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala deleted file mode 100644 index f381644..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureManagerActor.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.communication.security - -import akka.actor.{Props, ActorRef, Actor} -import akka.util.Timeout -import com.ibm.spark.communication.utils.OrderedSupport -import com.ibm.spark.kernel.protocol.v5.KernelMessage -import com.ibm.spark.utils.LogLike - -import scala.concurrent.duration._ -import akka.pattern.ask -import akka.pattern.pipe - -class SignatureManagerActor( - key: String, scheme: String -) extends Actor with LogLike with OrderedSupport { - private val hmac = Hmac(key, HmacAlgorithm(scheme)) - - def this(key: String) = this(key, HmacAlgorithm.SHA256.toString) - - // NOTE: Required to provide the execution context for futures with akka - import context._ - - // NOTE: Required for ask (?) to function... maybe can define elsewhere? - implicit val timeout = Timeout(5.seconds) - - // - // List of child actors that the signature manager contains - // - private var signatureChecker: ActorRef = _ - private var signatureProducer: ActorRef = _ - - /** - * Initializes all child actors performing tasks for the interpreter. - */ - override def preStart() = { - signatureChecker = context.actorOf( - Props(classOf[SignatureCheckerActor], hmac), - name = SignatureManagerChildActorType.SignatureChecker.toString - ) - signatureProducer = context.actorOf( - Props(classOf[SignatureProducerActor], hmac), - name = SignatureManagerChildActorType.SignatureProducer.toString - ) - } - - override def receive: Receive = { - // Check blob strings for matching digest - case (signature: String, blob: Seq[_]) => - startProcessing() - val destActor = sender() - val sigFuture = signatureChecker ? ((signature, blob)) - - sigFuture foreach { case isValid => - destActor ! isValid - finishedProcessing() - } - - case message: KernelMessage => - startProcessing() - val destActor = sender() - - // TODO: Proper error handling for possible exception from mapTo - val sigFuture = (signatureProducer ? message).mapTo[String].map( - result => message.copy(signature = result) - ) - - sigFuture foreach { case kernelMessage => - destActor ! kernelMessage - finishedProcessing() - } - } - - /** - * Defines the types that will be stashed by {@link #waiting() waiting} - * while the Actor is in processing state. - * @return - */ - override def orderedTypes(): Seq[Class[_]] = Seq( - classOf[(String, Seq[_])], - classOf[KernelMessage] - ) -} - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala b/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala deleted file mode 100644 index 36b5688..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/security/SignatureProducerActor.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.communication.security - -import akka.actor.Actor -import com.ibm.spark.communication.utils.OrderedSupport -import com.ibm.spark.kernel.protocol.v5.KernelMessage -import com.ibm.spark.utils.LogLike -import play.api.libs.json.Json - -/** - * Constructs a signature from any kernel message received. - * @param hmac The HMAC to use for signature construction - */ -class SignatureProducerActor( - private val hmac: Hmac -) extends Actor with LogLike with OrderedSupport { - override def receive: Receive = { - case message: KernelMessage => withProcessing { - val signature = hmac( - Json.stringify(Json.toJson(message.header)), - Json.stringify(Json.toJson(message.parentHeader)), - Json.stringify(Json.toJson(message.metadata)), - message.contentString - ) - sender ! signature - } - } - - /** - * Defines the types that will be stashed by {@link #waiting() waiting} - * while the Actor is in processing state. - * @return - */ - override def orderedTypes(): Seq[Class[_]] = Seq(classOf[KernelMessage]) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/security/package.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/security/package.scala b/communication/src/main/scala/com/ibm/spark/communication/security/package.scala deleted file mode 100644 index 5c5b1d5..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/security/package.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.communication - -package object security { - object SecurityActorType extends Enumeration { - type SecurityActorType = Value - - val SignatureManager = Value("signature_manager") - } - - object SignatureManagerChildActorType extends Enumeration { - type SignatureManagerChildActorType = Value - - val SignatureChecker = Value("signature_checker") - val SignatureProducer = Value("signature_producer") - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala deleted file mode 100644 index c95eb69..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/socket/JeroMQSocket.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.socket - -import org.zeromq.ZMsg - -/** - * Represents a socket implemented using JeroMQ. - * - * @param runnable The underlying ZeroMQ socket runnable to use for the thread - * managed by this socket - */ -class JeroMQSocket(private val runnable: ZeroMQSocketRunnable) - extends SocketLike { - - private val socketThread = new Thread(runnable) - socketThread.start() - - /** - * Sends a message using this socket. - * - * @param message The message to send - */ - override def send(message: String*): Unit = { - assert(isAlive, "Socket is not alive to be able to send messages!") - - runnable.offer(ZMsg.newStringMsg(message: _*)) - } - - /** - * Closes the socket by closing the runnable and waiting for the underlying - * thread to close. - */ - override def close(): Unit = { - runnable.close() - socketThread.join() - } - - /** - * Indicates whether or not this socket is alive. - * - * @return True if alive (thread running), otherwise false - */ - override def isAlive: Boolean = socketThread.isAlive - - /** - * Indicates whether or not this socket is ready to send/receive messages. - * - * @return True if ready (runnable processing messages), otherwise false - */ - override def isReady: Boolean = runnable.isProcessing -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala deleted file mode 100644 index 43f3f3d..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/socket/PubSocketRunnable.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.socket - -import org.zeromq.ZMQ.{Socket, Context} - -/** - * Represents the runnable component of a socket specifically targeted towards - * publish sockets. No incoming messages are processed. - * - * @param context The ZMQ context to use with this runnable to create a socket - * @param socketOptions The options to use when creating the socket - */ -class PubSocketRunnable( - private val context: Context, - private val socketOptions: SocketOption* -) extends ZeroMQSocketRunnable( - context, - PubSocket, - None, - socketOptions: _* -) { - /** Does nothing. */ - override protected def processNextInboundMessage( - socket: Socket, - flags: Int - ): Unit = {} -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala deleted file mode 100644 index 0aa527f..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/socket/ReqSocketRunnable.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.socket - -import org.zeromq.ZMQ.{Socket, Context} - -/** - * Represents the runnable component of a socket that processes messages and - * sends messages placed on an outbound queue. Targeted towards the request - * socket, this runnable ensures that a message is sent out first and then a - * response is received before sending the next message. - * - * @param context The ZMQ context to use with this runnable to create a socket - * @param inboundMessageCallback The callback to invoke when receiving a message - * on the socket created - * @param socketOptions The options to use when creating the socket - */ -class ReqSocketRunnable( - private val context: Context, - private val inboundMessageCallback: Option[(Seq[String]) => Unit], - private val socketOptions: SocketOption* -) extends ZeroMQSocketRunnable( - context, - ReqSocket, - inboundMessageCallback, - socketOptions: _* -) { - /** Does nothing. */ - override protected def processNextInboundMessage( - socket: Socket, - flags: Int - ): Unit = {} - - /** - * Sends a message and then waits for an incoming response (if a message - * was sent from the outbound queue). - * - * @param socket The socket to use when sending the message - * - * @return True if a message was sent, otherwise false - */ - override protected def processNextOutboundMessage(socket: Socket): Boolean = { - val shouldReceiveMessage = super.processNextOutboundMessage(socket) - - if (shouldReceiveMessage) { - super.processNextInboundMessage(socket, 0) - } - - shouldReceiveMessage - } -} - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala deleted file mode 100644 index 9bf752d..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketLike.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.socket - -/** - * Represents a generic interface for socket communication. - */ -trait SocketLike { - /** - * Sends a message through the socket if alive. - * - * @throws AssertionError If the socket is not alive when attempting to send - * a message - * - * @param message The message to send - */ - def send(message: String*): Unit - - /** - * Closes the socket, marking it no longer able to process or send messages. - */ - def close(): Unit - - /** - * Returns whether or not the socket is alive (processing new messages and - * capable of sending out messages). - * - * @return True if alive, otherwise false - */ - def isAlive: Boolean - - /** - * Returns whether or not the socket is ready to send/receive messages. - * - * @return True if ready, otherwise false - */ - def isReady: Boolean -} - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala deleted file mode 100644 index 7685239..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketOption.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.socket - -import org.zeromq.ZMQ - -/** Represents an option to provide to a socket. */ -sealed trait SocketOption - -/** - * Represents the linger option used to communicate the millisecond duration - * to continue processing messages after the socket has been told to close. - * - * @note Provide -1 as the duration to wait until all messages are processed - * - * @param milliseconds The duration in milliseconds - */ -case class Linger(milliseconds: Int) extends SocketOption - -/** - * Represents the subscribe option used to filter messages coming into a - * socket subscribing to a publisher. Uses the provided byte prefix to filter - * incoming messages. - * - * @param topic The array of bytes to use as a filter based on the - * bytes at the beginning of incoming messages - */ -case class Subscribe(topic: Array[Byte]) extends SocketOption -object Subscribe { - val all = Subscribe(ZMQ.SUBSCRIPTION_ALL) -} - -/** - * Represents the identity option used to identify the socket. - * - * @param identity The identity to use with the socket - */ -case class Identity(identity: Array[Byte]) extends SocketOption - -/** - * Represents the bind option used to tell the socket what address to bind to. - * - * @param address The address for the socket to use - */ -case class Bind(address: String) extends SocketOption - -/** - * Represents the connect option used to tell the socket what address to - * connect to. - * - * @param address The address for the socket to use - */ -case class Connect(address: String) extends SocketOption http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala deleted file mode 100644 index 6c033cc..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketRunnable.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.socket - -import java.util.concurrent.ConcurrentLinkedQueue - -/** - * Represents the interface for a runnable used to send and receive messages - * for a socket. - * - * @param inboundMessageCallback The callback to use when receiving a message - * through this runnable - */ -abstract class SocketRunnable[T]( - private val inboundMessageCallback: Option[(Seq[String]) => Unit] -) extends Runnable { - - /** The collection of messages to be sent out through the socket. */ - val outboundMessages: ConcurrentLinkedQueue[T] = - new ConcurrentLinkedQueue[T]() - - /** - * Attempts to add a new message to the outbound queue to be sent out. - * - * @param message The message to add to the queue - * - * @return True if successfully queued the message, otherwise false - */ - def offer(message: T): Boolean = outboundMessages.offer(message) - - /** - * Indicates whether or not the runnable is processing messages (both - * sending and receiving). - * - * @return True if processing, otherwise false - */ - def isProcessing: Boolean - - /** - * Closes the runnable such that it no longer processes messages and also - * closes the underlying socket associated with the runnable. - */ - def close(): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala deleted file mode 100644 index 7062d85..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/socket/SocketType.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.socket - -import org.zeromq.ZMQ - -/** - * Represents the type option used to indicate the type of socket to create. - * - * @param `type` The type as an integer - */ -sealed class SocketType(val `type`: Int) - -/** Represents a publish socket. */ -case object PubSocket extends SocketType(ZMQ.PUB) - -/** Represents a subscribe socket. */ -case object SubSocket extends SocketType(ZMQ.SUB) - -/** Represents a reply socket. */ -case object RepSocket extends SocketType(ZMQ.REP) - -/** Represents a request socket. */ -case object ReqSocket extends SocketType(ZMQ.REQ) - -/** Represents a router socket. */ -case object RouterSocket extends SocketType(ZMQ.ROUTER) - -/** Represents a dealer socket. */ -case object DealerSocket extends SocketType(ZMQ.DEALER) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala deleted file mode 100644 index 6fee716..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.communication.socket - -import com.ibm.spark.utils.LogLike -import org.zeromq.{ZMsg, ZMQ} -import org.zeromq.ZMQ.Context - -import scala.collection.JavaConverters._ -import scala.util.Try - -/** - * Represents the runnable component of a socket that processes messages and - * sends messages placed on an outbound queue. - * - * @param context The ZMQ context to use with this runnable to create a socket - * @param socketType The type of socket to create - * @param inboundMessageCallback The callback to invoke when receiving a message - * on the socket created - * @param socketOptions The options to use when creating the socket - */ -class ZeroMQSocketRunnable( - private val context: Context, - private val socketType: SocketType, - private val inboundMessageCallback: Option[(Seq[String]) => Unit], - private val socketOptions: SocketOption* -) extends SocketRunnable[ZMsg](inboundMessageCallback) - with LogLike { - require(socketOptions.count { - case _: Bind => true - case _: Connect => true - case _ => false - } == 1, "ZeroMQ socket needs exactly one bind or connect!") - - @volatile private var notClosed: Boolean = true - @volatile private var _isProcessing: Boolean = false - - /** - * Indicates the processing state of this runnable. - * - * @return True if processing messages, otherwise false - */ - override def isProcessing: Boolean = _isProcessing - - /** - * Processes the provided options, performing associated actions on the - * specified socket. - * - * @param socket The socket to apply actions on - */ - protected def processOptions(socket: ZMQ.Socket): Unit = { - val socketOptionsString = socketOptions.map("\n- " + _.toString).mkString("") - logger.trace( - s"Processing options for socket $socketType: $socketOptionsString" - ) - - // Split our options based on connection (bind/connect) and everything else - val (connectionOptions, otherOptions) = socketOptions.partition { - case Bind(_) | Connect(_) => true - case _ => false - } - - // Apply non-connection options first since some (like identity) must be - // run before the socket does a bind/connect - otherOptions.foreach { - case Linger(milliseconds) => socket.setLinger(milliseconds) - case Subscribe(topic) => socket.subscribe(topic) - case Identity(identity) => socket.setIdentity(identity) - case option => logger.warn(s"Unknown option: $option") - } - - // Perform our bind or connect - connectionOptions.foreach { - case Bind(address) => socket.bind(address) - case Connect(address) => socket.connect(address) - case option => - logger.warn(s"Unknown connection option: $option") - } - - _isProcessing = true - } - - /** - * Sends the next outbound message from the outbound message queue. - * - * @param socket The socket to use when sending the message - * - * @return True if a message was sent, otherwise false - */ - protected def processNextOutboundMessage(socket: ZMQ.Socket): Boolean = { - val message = Option(outboundMessages.poll()) - - message.foreach(_.send(socket)) - - message.nonEmpty - } - - /** - * Retrieves the next inbound message (if available) and invokes the - * inbound message callback. - * - * @param socket The socket whose next incoming message to retrieve - */ - protected def processNextInboundMessage( - socket: ZMQ.Socket, - flags: Int = ZMQ.DONTWAIT - ): Unit = { - Option(ZMsg.recvMsg(socket, flags)).foreach(zMsg => { - inboundMessageCallback.foreach(_(zMsg.asScala.toSeq - .map(zFrame => new String(zFrame.getData, ZMQ.CHARSET)) - )) - }) - } - - /** - * Creates a new instance of a ZMQ Socket. - * - * @param zmqContext The context to use to create the socket - * @param socketType The type of socket to create - * - * @return The new ZMQ.Socket instance - */ - protected def newZmqSocket(zmqContext: ZMQ.Context, socketType: Int) = - zmqContext.socket(socketType) - - override def run(): Unit = { - val socket = newZmqSocket(context, socketType.`type`)//context.socket(socketType.`type`) - - try { - processOptions(socket) - - while (notClosed) { - Try(processNextOutboundMessage(socket)).failed.foreach( - logger.error("Failed to send next outgoing message!", _: Throwable) - ) - Try(processNextInboundMessage(socket)).failed.foreach( - logger.error("Failed to retrieve next incoming message!", _: Throwable) - ) - Thread.sleep(1) - } - } catch { - case ex: Exception => - logger.error("Unexpected exception in 0mq socket runnable!", ex) - } finally { - try{ - socket.close() - } catch { - case ex: Exception => - logger.error("Failed to close socket!", _: Throwable) - } - } - } - - /** - * Marks the runnable as closed such that it eventually stops processing - * messages and closes the socket. - * - * @throws AssertionError If the runnable is not processing messages or has - * already been closed - */ - override def close(): Unit = { - assert(_isProcessing && notClosed, - "Runnable is not processing or is closed!") - - _isProcessing = false - notClosed = false - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala ---------------------------------------------------------------------- diff --git a/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala b/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala deleted file mode 100644 index 8f41861..0000000 --- a/communication/src/main/scala/com/ibm/spark/communication/utils/OrderedSupport.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.communication.utils - -import akka.actor.{Actor, Stash} -import com.ibm.spark.utils.LogLike - -/** - * A trait to enforce ordered processing for messages of particular types. - */ -trait OrderedSupport extends Actor with Stash with LogLike { - /** - * Executes instead of the default receive when the Actor has begun - * processing. Stashes incoming messages of particular types, defined by - * {@link #orderedTypes() orderedTypes} function, for later processing. Uses - * the default receive method for all other types. Upon receiving a - * FinishedProcessing message, resumes processing all messages with the - * default receive. - * @return - */ - def waiting : Receive = { - case FinishedProcessing => - context.unbecome() - unstashAll() - case aVal: Any if (orderedTypes().contains(aVal.getClass)) => - logger.trace(s"Stashing message ${aVal} of type ${aVal.getClass}.") - stash() - case aVal: Any => - logger.trace(s"Forwarding message ${aVal} of type ${aVal.getClass} " + - "to default receive.") - receive(aVal) - } - - /** - * Suspends the default receive method for types defined by the - * {@link #orderedTypes() orderedTypes} function. - */ - def startProcessing(): Unit = { - logger.debug("Actor is in processing state and will stash messages of " + - s"types: ${orderedTypes.mkString(" ")}") - context.become(waiting, discardOld = false) - } - - /** - * Resumes the default receive method for all message types. - */ - def finishedProcessing(): Unit = { - logger.debug("Actor is no longer in processing state.") - self ! FinishedProcessing - } - - /** - * Executes a block of code, wrapping it in start/finished processing - * needed for ordered execution. - * - * @param block The block to execute - * @tparam T The return type of the block - * @return The result of executing the block - */ - def withProcessing[T](block: => T): T = { - startProcessing() - val results = block - finishedProcessing() - results - } - - /** - * Defines the types that will be stashed by {@link #waiting() waiting} - * while the Actor is in processing state. - * @return - */ - def orderedTypes(): Seq[Class[_]] - - case object FinishedProcessing -}