http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/AllocatedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/AllocatedEvaluatorBridge.java new file mode 100644 index 0000000..ffee177 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/AllocatedEvaluatorBridge.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.client.IDriverServiceClient; +import org.apache.reef.bridge.client.JVMClientProcess; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorProcess; +import org.apache.reef.tang.Configuration; +import org.apache.reef.util.Optional; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * Allocated Evaluator Stub. + */ +@Private +public final class AllocatedEvaluatorBridge implements AllocatedEvaluator { + + private final String evaluatorId; + + private final EvaluatorDescriptor evaluatorDescriptor; + + private final IDriverServiceClient driverServiceClient; + + private final List<File> addFileList = new ArrayList<>(); + + private final List<File> addLibraryList = new ArrayList<>(); + + private JVMClientProcess evaluatorProcess = null; + + public AllocatedEvaluatorBridge( + final String evaluatorId, + final EvaluatorDescriptor evaluatorDescriptor, + final IDriverServiceClient driverServiceClient) { + this.evaluatorId = evaluatorId; + this.evaluatorDescriptor = evaluatorDescriptor; + this.driverServiceClient = driverServiceClient; + } + + @Override + public String getId() { + return this.evaluatorId; + } + + @Override + public void addFile(final File file) { + this.addFileList.add(file); + } + + @Override + public void addLibrary(final File file) { + this.addLibraryList.add(file); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluatorDescriptor; + } + + @Override + public void setProcess(final EvaluatorProcess process) { + if (process instanceof JVMClientProcess) { + this.evaluatorProcess = (JVMClientProcess) process; + } else { + throw new IllegalArgumentException(JVMClientProcess.class.getCanonicalName() + " required."); + } + } + + @Override + public void close() { + this.driverServiceClient.onEvaluatorClose(getId()); + } + + @Override + public void submitTask(final Configuration taskConfiguration) { + this.driverServiceClient.onEvaluatorSubmit( + getId(), + Optional.<Configuration>empty(), + Optional.of(taskConfiguration), + this.evaluatorProcess== null ? + Optional.<JVMClientProcess>empty() : + Optional.of(this.evaluatorProcess), + this.addFileList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addFileList), + this.addLibraryList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addLibraryList)); + } + + @Override + public void submitContext(final Configuration contextConfiguration) { + + this.driverServiceClient.onEvaluatorSubmit( + getId(), + Optional.of(contextConfiguration), + Optional.<Configuration>empty(), + this.evaluatorProcess== null ? + Optional.<JVMClientProcess>empty() : + Optional.of(this.evaluatorProcess), + this.addFileList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addFileList), + this.addLibraryList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addLibraryList)); + } + + @Override + public void submitContextAndService( + final Configuration contextConfiguration, + final Configuration serviceConfiguration) { + throw new UnsupportedOperationException(); + } + + @Override + public void submitContextAndTask( + final Configuration contextConfiguration, + final Configuration taskConfiguration) { + + this.driverServiceClient.onEvaluatorSubmit( + getId(), + Optional.of(contextConfiguration), + Optional.of(taskConfiguration), + this.evaluatorProcess== null ? + Optional.<JVMClientProcess>empty() : + Optional.of(this.evaluatorProcess), + this.addFileList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addFileList), + this.addLibraryList.size() == 0 ? + Optional.<List<File>>empty() : + Optional.of(this.addLibraryList)); + } + + @Override + public void submitContextAndServiceAndTask( + final Configuration contextConfiguration, + final Configuration serviceConfiguration, + final Configuration taskConfiguration) { + throw new UnsupportedOperationException(); + } + + +}
http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ClosedContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ClosedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ClosedContextBridge.java new file mode 100644 index 0000000..d40f052 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ClosedContextBridge.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.util.Optional; + +/** + * Closed context bridge. + */ +@Private +public final class ClosedContextBridge implements ClosedContext { + + private final String contextId; + + private final String evaluatorId; + + private final ActiveContext parentContext; + + private final EvaluatorDescriptor evaluatorDescriptor; + + public ClosedContextBridge( + final String contextId, + final String evaluatorId, + final ActiveContext parentContext, + final EvaluatorDescriptor evaluatorDescriptor) { + this.contextId = contextId; + this.evaluatorId = evaluatorId; + this.parentContext = parentContext; + this.evaluatorDescriptor = evaluatorDescriptor; + } + + @Override + public ActiveContext getParentContext() { + return this.parentContext; + } + + @Override + public String getId() { + return this.contextId; + } + + @Override + public String getEvaluatorId() { + return this.evaluatorId; + } + + @Override + public Optional<String> getParentId() { + return Optional.of(this.parentContext.getId()); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluatorDescriptor; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedEvaluatorBridge.java new file mode 100644 index 0000000..12f6e3b --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedEvaluatorBridge.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.events; + +import org.apache.reef.driver.evaluator.CompletedEvaluator; + +/** + * Completed Evaluator bridge. + */ +public final class CompletedEvaluatorBridge implements CompletedEvaluator { + + private final String id; + + public CompletedEvaluatorBridge(final String id) { + this.id = id; + } + + @Override + public String getId() { + return this.id; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedTaskBridge.java new file mode 100644 index 0000000..bed9129 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedTaskBridge.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.CompletedTask; + +/** + * Completed task bridge. + */ +@Private +public final class CompletedTaskBridge implements CompletedTask { + + private final String taskId; + + private final ActiveContext context; + + private final byte[] result; + + public CompletedTaskBridge( + final String taskId, + final ActiveContext context, + final byte[] result) { + this.taskId = taskId; + this.context = context; + this.result = result; + } + + @Override + public ActiveContext getActiveContext() { + return this.context; + } + + @Override + public String getId() { + return this.taskId; + } + + @Override + public byte[] get() { + return this.result; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ContextMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ContextMessageBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ContextMessageBridge.java new file mode 100644 index 0000000..aea29f6 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ContextMessageBridge.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ContextMessage; + +/** + * Context message bridge. + */ +@Private +public final class ContextMessageBridge implements ContextMessage { + + private final String contextId; + + private final String messageSourceId; + + private final long sequenceNumber; + + private final byte[] message; + + public ContextMessageBridge( + final String contextId, + final String messageSourceId, + final long sequenceNumber, + final byte[] message) { + this.contextId = contextId; + this.messageSourceId = messageSourceId; + this.sequenceNumber = sequenceNumber; + this.message = message; + } + + @Override + public byte[] get() { + return this.message; + } + + @Override + public String getId() { + return this.contextId; + } + + @Override + public String getMessageSourceID() { + return this.messageSourceId; + } + + @Override + public long getSequenceNumber() { + return this.sequenceNumber; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedContextBridge.java new file mode 100644 index 0000000..45bb8af --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedContextBridge.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.events; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.exception.EvaluatorException; +import org.apache.reef.util.Optional; + +/** + * Failed context bridge. + */ +public final class FailedContextBridge implements FailedContext { + + private final String contextId; + + private final String evaluatorId; + + private final String message; + + private final EvaluatorDescriptor evaluatorDescriptor; + + private final Optional<ActiveContext> parentContext; + + private final Optional<byte[]> data; + + public FailedContextBridge( + final String contextId, + final String evaluatorId, + final String message, + final EvaluatorDescriptor evaluatorDescriptor, + final Optional<ActiveContext> parentContext, + final Optional<byte[]> data) { + this.contextId = contextId; + this.evaluatorId = evaluatorId; + this.message = message; + this.evaluatorDescriptor = evaluatorDescriptor; + this.parentContext = parentContext; + this.data = data; + } + + @Override + public Optional<ActiveContext> getParentContext() { + return this.parentContext; + } + + @Override + public String getMessage() { + return this.message; + } + + @Override + public Optional<String> getDescription() { + return Optional.of(message); + } + + @Override + public Optional<Throwable> getReason() { + return Optional.<Throwable>of(new EvaluatorException(this.evaluatorId, this.message)); + } + + @Override + public Optional<byte[]> getData() { + return this.data; + } + + @Override + public Throwable asError() { + return new EvaluatorException(this.evaluatorId, this.message); + } + + @Override + public String getEvaluatorId() { + return this.evaluatorId; + } + + @Override + public Optional<String> getParentId() { + return this.parentContext.isPresent() ? + Optional.of(this.parentContext.get().getId()) : Optional.<String>empty(); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluatorDescriptor; + } + + @Override + public String getId() { + return this.contextId; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedEvaluatorBridge.java new file mode 100644 index 0000000..40bdc58 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedEvaluatorBridge.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.exception.EvaluatorException; +import org.apache.reef.util.Optional; + +import java.util.List; + +/** + * Failed Evaluator bridge. + */ +@Private +public final class FailedEvaluatorBridge implements FailedEvaluator { + + private final String id; + + private final EvaluatorException evaluatorException; + + private final List<FailedContext> failedContextList; + + private Optional<FailedTask> failedTask; + + public FailedEvaluatorBridge( + final String id, + final EvaluatorException evaluatorException, + final List<FailedContext> failedContextList, + final Optional<FailedTask> failedTask) { + this.id = id; + this.evaluatorException = evaluatorException; + this.failedContextList = failedContextList; + this.failedTask = failedTask; + } + + @Override + public EvaluatorException getEvaluatorException() { + return this.evaluatorException; + } + + @Override + public List<FailedContext> getFailedContextList() { + return this.failedContextList; + } + + @Override + public Optional<FailedTask> getFailedTask() { + return this.failedTask; + } + + @Override + public String getId() { + return this.id; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/RunningTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/RunningTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/RunningTaskBridge.java new file mode 100644 index 0000000..a24c294 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/RunningTaskBridge.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.client.IDriverServiceClient; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.runtime.common.driver.task.TaskRepresenter; +import org.apache.reef.util.Optional; + +/** + * Running task bridge. + */ +@Private +public final class RunningTaskBridge implements RunningTask { + + private final IDriverServiceClient driverServiceClient; + + private final String taskId; + + private final ActiveContext context; + + + public RunningTaskBridge( + final IDriverServiceClient driverServiceClient, + final String taskId, + final ActiveContext context) { + this.driverServiceClient = driverServiceClient; + this.taskId = taskId; + this.context = context; + } + + @Override + public ActiveContext getActiveContext() { + return this.context; + } + + @Override + public void send(final byte[] message) { + this.driverServiceClient.onTaskMessage(this.taskId, message); + } + + @Override + public void suspend(final byte[] message) { + throw new UnsupportedOperationException("Suspend task not supported"); + } + + @Override + public void suspend() { + throw new UnsupportedOperationException("Suspend task not supported"); + } + + @Override + public void close(final byte[] message) { + this.driverServiceClient.onTaskClose(this.taskId, Optional.of(message)); + } + + @Override + public void close() { + this.driverServiceClient.onTaskClose(this.taskId, Optional.<byte[]>empty()); + } + + @Override + public TaskRepresenter getTaskRepresenter() { + throw new UnsupportedOperationException("Not a public API"); + } + + @Override + public String getId() { + return this.taskId; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/TaskMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/TaskMessageBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/TaskMessageBridge.java new file mode 100644 index 0000000..625f3cc --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/TaskMessageBridge.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.events; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.task.TaskMessage; + +/** + * Task message bridge. + */ +@Private +public final class TaskMessageBridge implements TaskMessage { + + private final String taskId; + + private final String contextId; + + private final String messageSourceId; + + private final long sequenceNumber; + + private final byte[] message; + + public TaskMessageBridge( + final String taskId, + final String contextId, + final String messageSourceId, + final long sequenceNumber, + final byte[] message) { + this.taskId = taskId; + this.contextId = contextId; + this.messageSourceId = messageSourceId; + this.sequenceNumber = sequenceNumber; + this.message = message; + } + + @Override + public byte[] get() { + return this.message; + } + + @Override + public String getId() { + return this.taskId; + } + + @Override + public long getSequenceNumber() { + return this.sequenceNumber; + } + + @Override + public String getContextId() { + return this.contextId; + } + + @Override + public String getMessageSourceID() { + return this.messageSourceId; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/package-info.java new file mode 100644 index 0000000..0c9504d --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * REEF event stubs. + */ +package org.apache.reef.bridge.client.events; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientGrpcConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientGrpcConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientGrpcConfiguration.java new file mode 100644 index 0000000..7c65ce5 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientGrpcConfiguration.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.grpc; + +import org.apache.reef.bridge.client.IDriverClientService; +import org.apache.reef.bridge.client.IDriverServiceClient; +import org.apache.reef.bridge.client.grpc.parameters.DriverServicePort; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.RequiredParameter; + +/** + * Configuration module for Grpc runtime. + */ +public final class DriverClientGrpcConfiguration extends ConfigurationModuleBuilder { + + public static final RequiredParameter<Integer> DRIVER_SERVICE_PORT = new RequiredParameter<>(); + + public static final ConfigurationModule CONF = new DriverClientGrpcConfiguration() + .bindImplementation(IDriverClientService.class, DriverClientService.class) + .bindImplementation(IDriverServiceClient.class, DriverServiceClient.class) + .bindNamedParameter(DriverServicePort.class, DRIVER_SERVICE_PORT) + .build(); + +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientService.java new file mode 100644 index 0000000..87f2241 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientService.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.grpc; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.reef.bridge.client.DriverClientDispatcher; +import org.apache.reef.bridge.client.IDriverClientService; +import org.apache.reef.bridge.client.JVMClientProcess; +import org.apache.reef.bridge.client.events.*; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.bridge.proto.Void; +import org.apache.reef.bridge.service.DriverClientException; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.exception.EvaluatorException; +import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorImpl; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The driver client service that accepts incoming messages driver service and + * dispatches appropriate objects to the application. + */ +public final class DriverClientService extends DriverClientGrpc.DriverClientImplBase + implements IDriverClientService { + + private static final Logger LOG = Logger.getLogger(DriverClientService.class.getName()); + + private Server server; + + private final InjectionFuture<Clock> clock; + + private final DriverServiceClient driverServiceClient; + + private final TcpPortProvider tcpPortProvider; + + private final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher; + + private final Map<String, AllocatedEvaluatorBridge> evaluatorBridgeMap = new HashMap<>(); + + private final Map<String, ActiveContextBridge> activeContextBridgeMap = new HashMap<>(); + + @Inject + private DriverClientService( + final DriverServiceClient driverServiceClient, + final TcpPortProvider tcpPortProvider, + final InjectionFuture<Clock> clock, + final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher) { + this.driverServiceClient = driverServiceClient; + this.tcpPortProvider = tcpPortProvider; + this.clock = clock; + this.clientDriverDispatcher = clientDriverDispatcher; + } + + @Override + public void start() throws IOException { + for (final Integer port : this.tcpPortProvider) { + try { + this.server = ServerBuilder.forPort(port) + .addService(this) + .build() + .start(); + LOG.info("Driver Client Server started, listening on " + port); + break; + } catch (IOException e) { + LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); + } + } + if (this.server == null || this.server.isTerminated()) { + throw new IOException("Unable to start gRPC server"); + } + this.driverServiceClient.registerDriverClientService("localhost", this.server.getPort()); + } + + @Override + public void awaitTermination() throws InterruptedException { + if (this.server != null) { + this.server.awaitTermination(); + } + } + + @Override + public void idlenessCheckHandler(final Void request, final StreamObserver<IdleStatus> responseObserver) { + responseObserver.onNext(IdleStatus.newBuilder() + .setReason("DriverClient checking idleness") + .setIsIdle( + !this.clock.get().isIdle() && + this.activeContextBridgeMap.isEmpty() && + this.evaluatorBridgeMap.isEmpty()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void startHandler(final StartTimeInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "StartHandler at time {0}", request.getStartTime()); + final StartTime startTime = new StartTime(request.getStartTime()); + this.clientDriverDispatcher.get().dispatch(startTime); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void stopHandler(final StopTimeInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "StopHandler at time {0}", request.getStopTime()); + final StopTime stopTime = new StopTime(request.getStopTime()); + this.clientDriverDispatcher.get().dispatch(stopTime); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void alarmTrigger(final AlarmTriggerInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Alarm Trigger id {0}", request.getAlarmId()); + this.clientDriverDispatcher.get().dispatchAlarm(request.getAlarmId()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void allocatedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Allocated evaluator id {0}", request.getEvaluatorId()); + final AllocatedEvaluatorBridge eval = new AllocatedEvaluatorBridge( + request.getEvaluatorId(), + toEvaluatorDescriptor(request.getDescriptorInfo()), + this.driverServiceClient); + this.evaluatorBridgeMap.put(eval.getId(), eval); + this.clientDriverDispatcher.get().dispatch(eval); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void completedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Completed Evaluator id {0}", request.getEvaluatorId()); + this.evaluatorBridgeMap.remove(request.getEvaluatorId()); + this.clientDriverDispatcher.get().dispatch(new CompletedEvaluatorBridge(request.getEvaluatorId())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void failedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Failed Evaluator id {0}", request.getEvaluatorId()); + final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.remove(request.getEvaluatorId()); + List<FailedContext> failedContextList = new ArrayList<>(); + if (request.getFailure().getFailedContextsList() != null) { + for (final String failedContextId : request.getFailure().getFailedContextsList()) { + final ActiveContextBridge context = this.activeContextBridgeMap.get(failedContextId); + failedContextList.add(new FailedContextBridge( + context.getId(), + eval.getId(), + request.getFailure().getMessage(), + eval.getEvaluatorDescriptor(), + context.getParentId().isPresent() ? + Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) : + Optional.<ActiveContext>empty(), + Optional.<byte[]>empty())); + } + for (final String failedContextId : request.getFailure().getFailedContextsList()) { + this.activeContextBridgeMap.remove(failedContextId); + } + } + this.clientDriverDispatcher.get().dispatch( + new FailedEvaluatorBridge( + eval.getId(), + new EvaluatorException(request.getEvaluatorId(), request.getFailure().getMessage()), + failedContextList, + request.getFailure().getFailedTaskId() != null ? + Optional.of(new FailedTask( + request.getFailure().getFailedTaskId(), + request.getFailure().getMessage(), + Optional.<String>empty(), + Optional.<Throwable>empty(), + Optional.<byte[]>empty(), + Optional.<ActiveContext>empty())) : + Optional.<FailedTask>empty())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void activeContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Active context id {0}", request.getContextId()); + final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.get(request.getEvaluatorId()); + final ActiveContextBridge context = new ActiveContextBridge( + this.driverServiceClient, + request.getContextId(), + request.getParentId() != null ? Optional.of(request.getParentId()) : Optional.<String>empty(), + eval.getId(), + eval.getEvaluatorDescriptor()); + this.activeContextBridgeMap.put(context.getId(), context); + this.clientDriverDispatcher.get().dispatch(context); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void closedContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Closed context id {0}", request.getContextId()); + try { + final ActiveContextBridge context = this.activeContextBridgeMap.remove(request.getContextId()); + this.clientDriverDispatcher.get().dispatch( + new ClosedContextBridge( + context.getId(), + context.getEvaluatorId(), + this.activeContextBridgeMap.get(request.getParentId()), + context.getEvaluatorDescriptor())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError( + new DriverClientException("Unknown context id " + request.getContextId() + " in close")); + } + } + + @Override + public void failedContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Failed context id {0}", request.getContextId()); + try { + final ActiveContextBridge context = this.activeContextBridgeMap.remove(request.getContextId()); + final Optional<ActiveContext> parent = context.getParentId().isPresent() ? + Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) : + Optional.<ActiveContext>empty(); + final Optional<byte[]> data = request.getException().getData() != null ? + Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty(); + this.clientDriverDispatcher.get().dispatch( + new FailedContextBridge( + context.getId(), + context.getEvaluatorId(), + request.getException().getMessage(), + context.getEvaluatorDescriptor(), + parent, + data)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError( + new DriverClientException("Unknown context id " + request.getContextId() + " in close")); + } + } + + @Override + public void contextMessageHandler(final ContextMessageInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Message context id {0}", request.getContextId()); + try { + this.clientDriverDispatcher.get().dispatch( + new ContextMessageBridge( + request.getContextId(), + request.getMessageSourceId(), + request.getSequenceNumber(), + request.getPayload().toByteArray())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError( + new DriverClientException("Unknown context id " + request.getContextId() + " in close")); + } + } + + @Override + public void runningTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Running task id {0}", request.getTaskId()); + try { + final ActiveContextBridge context = this.activeContextBridgeMap.get(request.getContextId()); + this.clientDriverDispatcher.get().dispatch( + new RunningTaskBridge(this.driverServiceClient, request.getTaskId(), context)); + } finally { + responseObserver.onCompleted(); + } + } else { + responseObserver.onError( + new DriverClientException("Unknown context id: " + request.getContextId())); + } + } + + @Override + public void failedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { + try { + LOG.log(Level.INFO, "Failed task id {0}", request.getTaskId()); + final Optional<ActiveContext> context = this.activeContextBridgeMap.containsKey(request.getContextId()) ? + Optional.<ActiveContext>of(this.activeContextBridgeMap.get(request.getContextId())) : + Optional.<ActiveContext>empty(); + final Optional<byte[]> data = request.getException().getData() != null ? + Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty(); + this.clientDriverDispatcher.get().dispatch( + new FailedTask( + request.getTaskId(), + request.getException().getMessage(), + Optional.of(request.getException().getName()), + Optional.<Throwable>of(new EvaluatorException(request.getException().getMessage())), + data, + context)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void completedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Completed task id {0}", request.getTaskId()); + try { + final ActiveContextBridge context = this.activeContextBridgeMap.get(request.getContextId()); + this.clientDriverDispatcher.get().dispatch( + new CompletedTaskBridge( + request.getTaskId(), + context, + request.getResult() != null ? request.getResult().toByteArray() : null)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError( + new DriverClientException("Unknown context id: " + request.getContextId())); + } + } + + @Override + public void suspendedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { + responseObserver.onError(new DriverClientException("Not supported")); + } + + @Override + public void taskMessageHandler(final TaskMessageInfo request, final StreamObserver<Void> responseObserver) { + if (this.activeContextBridgeMap.containsKey(request.getContextId())) { + LOG.log(Level.INFO, "Message task id {0}", request.getTaskId()); + try { + this.clientDriverDispatcher.get().dispatch( + new TaskMessageBridge( + request.getTaskId(), + request.getContextId(), + request.getMessageSourceId(), + request.getSequenceNumber(), + request.getPayload().toByteArray())); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError( + new DriverClientException("Unknown context id: " + request.getContextId())); + } + } + + @Override + public void clientMessageHandler(final ClientMessageInfo request, final StreamObserver<Void> responseObserver) { + LOG.log(Level.INFO, "Client message"); + try { + this.clientDriverDispatcher.get().clientMessageDispatch(request.getPayload().toByteArray()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void clientCloseHandler(final Void request, final StreamObserver<Void> responseObserver) { + LOG.log(Level.INFO, "Client close"); + try { + this.clientDriverDispatcher.get().clientCloseDispatch(); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void clientCloseWithMessageHandler( + final ClientMessageInfo request, + final StreamObserver<Void> responseObserver) { + LOG.log(Level.INFO, "Client close with message"); + try { + this.clientDriverDispatcher.get().clientCloseWithMessageDispatch(request.getPayload().toByteArray()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + // Helper methods + + private EvaluatorDescriptor toEvaluatorDescriptor(final EvaluatorDescriptorInfo info) { + return new EvaluatorDescriptorImpl( + null, + info.getMemory(), + info.getCores(), + new JVMClientProcess(), + info.getRuntimeName()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverServiceClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverServiceClient.java new file mode 100644 index 0000000..0bc29ce --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverServiceClient.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.grpc; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.client.IDriverServiceClient; +import org.apache.reef.bridge.client.JVMClientProcess; +import org.apache.reef.bridge.client.grpc.parameters.DriverServicePort; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.Optional; + +import javax.inject.Inject; +import java.io.File; +import java.util.List; + +/** + * The client that exposes methods for communicating back to the + * driver service. + */ +@Private +public final class DriverServiceClient implements IDriverServiceClient { + + /** Used for the evaluator configuration, which is not needed in Java. */ + private static final Configuration EMPTY_CONF = + Tang.Factory.getTang().newConfigurationBuilder().build(); + + private final ConfigurationSerializer configurationSerializer; + + private final DriverServiceGrpc.DriverServiceBlockingStub serviceStub; + + @Inject + private DriverServiceClient( + final ConfigurationSerializer configurationSerializer, + @Parameter(DriverServicePort.class) final Integer driverServicePort) { + this.configurationSerializer = configurationSerializer; + final ManagedChannel channel = ManagedChannelBuilder + .forAddress("localhost", driverServicePort) + .usePlaintext(true) + .build(); + this.serviceStub = DriverServiceGrpc.newBlockingStub(channel); + } + + public void registerDriverClientService(final String host, final int port) { + this.serviceStub.registerDriverClient( + DriverClientRegistration.newBuilder() + .setHost(host) + .setPort(port) + .build()); + } + + @Override + public void onShutdown() { + this.serviceStub.shutdown(ShutdownRequest.newBuilder().build()); + } + + @Override + public void onShutdown(final Throwable ex) { + this.serviceStub.shutdown(ShutdownRequest.newBuilder() + .setException(ExceptionInfo.newBuilder() + .setName(ex.getCause() != null ? ex.getCause().toString() : ex.toString()) + .setMessage(ex.getMessage()) + .build()) + .build()); + } + + @Override + public void onSetAlarm(final String alarmId, final int timeoutMS) { + this.serviceStub.setAlarm( + AlarmRequest.newBuilder() + .setAlarmId(alarmId) + .setTimeoutMs(timeoutMS) + .build()); + } + + @Override + public void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest) { + this.serviceStub.requestResources( + ResourceRequest.newBuilder() + .setCores(evaluatorRequest.getNumberOfCores()) + .setMemorySize(evaluatorRequest.getMegaBytes()) + .setRelaxLocality(evaluatorRequest.getRelaxLocality()) + .setResourceCount(evaluatorRequest.getNumber()) + .setRuntimeName(evaluatorRequest.getRuntimeName()) + .addAllRackNameList(evaluatorRequest.getRackNames()) + .addAllNodeNameList(evaluatorRequest.getNodeNames()) + .build()); + } + + @Override + public void onEvaluatorClose(final String evalautorId) { + this.serviceStub.allocatedEvaluatorOp( + AllocatedEvaluatorRequest.newBuilder() + .setEvaluatorId(evalautorId) + .setCloseEvaluator(true) + .build()); + } + + @Override + public void onEvaluatorSubmit( + final String evaluatorId, + final Optional<Configuration> contextConfiguration, + final Optional<Configuration> taskConfiguration, + final Optional<JVMClientProcess> evaluatorProcess, + final Optional<List<File>> addFileList, + final Optional<List<File>> addLibraryList) { + final AllocatedEvaluatorRequest.Builder builder = + AllocatedEvaluatorRequest.newBuilder().setEvaluatorId(evaluatorId); + if (addFileList.isPresent()) { + for (final File file : addFileList.get()) { + builder.addAddFiles(file.getAbsolutePath()); + } + } + if (addLibraryList.isPresent()) { + for (final File file : addLibraryList.get()) { + builder.addAddLibraries(file.getAbsolutePath()); + } + } + if (evaluatorProcess.isPresent()) { + final JVMClientProcess rawEP = evaluatorProcess.get(); + builder.setSetProcess( + AllocatedEvaluatorRequest.EvaluatorProcessRequest.newBuilder() + .setConfigurationFileName(rawEP.getConfigurationFileName()) + .setMemoryMb(rawEP.getMemory()) + .setStandardOut(rawEP.getStandardOut()) + .setStandardErr(rawEP.getStandardErr()) + .addAllOptions(rawEP.getOptions()) + .build()); + } + if (contextConfiguration.isPresent()) { + builder.setContextConfiguration( + this.configurationSerializer.toString(contextConfiguration.get())); + } + if (taskConfiguration.isPresent()) { + builder.setTaskConfiguration( + this.configurationSerializer.toString(taskConfiguration.get())); + } + this.serviceStub.allocatedEvaluatorOp(builder.build()); + } + + // Context Operations + + @Override + public void onContextClose(final String contextId) { + this.serviceStub.activeContextOp( + ActiveContextRequest.newBuilder() + .setContextId(contextId) + .setCloseContext(true) + .build()); + } + + @Override + public void onContextSubmitContext( + final String contextId, + final Configuration contextConfiguration) { + this.serviceStub.activeContextOp( + ActiveContextRequest.newBuilder() + .setContextId(contextId) + .setNewContextRequest(this.configurationSerializer.toString(contextConfiguration)) + .build()); + } + + @Override + public void onContextSubmitTask( + final String contextId, + final Configuration taskConfiguration) { + this.serviceStub.activeContextOp( + ActiveContextRequest.newBuilder() + .setContextId(contextId) + .setNewTaskRequest(this.configurationSerializer.toString(taskConfiguration)) + .build()); + } + + @Override + public void onContextMessage(final String contextId, final byte[] message) { + this.serviceStub.activeContextOp( + ActiveContextRequest.newBuilder() + .setContextId(contextId) + .setMessage(ByteString.copyFrom(message)) + .build()); + } + + // Task operations + + @Override + public void onTaskClose(final String taskId, final Optional<byte[]> message) { + this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder() + .setTaskId(taskId) + .setCloseTask(true) + .setMessage(message.isPresent() ? ByteString.copyFrom(message.get()) : null) + .build()); + } + + @Override + public void onTaskMessage(final String taskId, final byte[] message) { + this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder() + .setTaskId(taskId) + .setMessage(ByteString.copyFrom(message)) + .build()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/package-info.java new file mode 100644 index 0000000..63518d0 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * gRPC specific implementations of the driver client bridge. + */ +package org.apache.reef.bridge.client.grpc; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/DriverServicePort.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/DriverServicePort.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/DriverServicePort.java new file mode 100644 index 0000000..f8ac2d6 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/DriverServicePort.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.client.grpc.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * gRPC driver service port. + */ +@NamedParameter(doc = "Driver Service Grpc port", short_name = "driver-service-port") +public final class DriverServicePort implements Name<Integer> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/package-info.java new file mode 100644 index 0000000..750eda0 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * gRPC specific parameters. + */ +package org.apache.reef.bridge.client.grpc.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/package-info.java new file mode 100644 index 0000000..0da2369 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Java bridge client driver. + */ +package org.apache.reef.bridge.client; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/ClientDriverStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/ClientDriverStopHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/ClientDriverStopHandler.java new file mode 100644 index 0000000..9d16c80 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/ClientDriverStopHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.parameters; + +import org.apache.reef.bridge.client.DefaultDriverClientStopHandler; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StopTime; + +import java.util.Set; + +/** + * Client driver stop handler. + */ +@NamedParameter(doc ="Java driver client stop handler", + default_class = DefaultDriverClientStopHandler.class) +public final class ClientDriverStopHandler implements Name<Set<EventHandler<StopTime>>> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/DriverClientDispatchThreadCount.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/DriverClientDispatchThreadCount.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/DriverClientDispatchThreadCount.java new file mode 100644 index 0000000..b0adf1c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/DriverClientDispatchThreadCount.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * Driver client dispatcher thread count. + */ +@NamedParameter(doc = "Number of dispatch threads", default_value = "1") +public class DriverClientDispatchThreadCount implements Name<Integer> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/package-info.java new file mode 100644 index 0000000..b39fce9 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Java bridge driver client specific parameters. + */ +package org.apache.reef.bridge.client.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java new file mode 100644 index 0000000..dac1200 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.examples; + +import org.apache.reef.runtime.common.files.RuntimePathProvider; + +import javax.inject.Inject; +/** + * Supplies the java binary's path for HDInsight. + */ +public final class WindowsRuntimePathProvider implements RuntimePathProvider { + + @Inject + public WindowsRuntimePathProvider() { + } + + @Override + public String getPath() { + return "java"; + } + + @Override + public String toString() { + return getPath(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java new file mode 100644 index 0000000..020a0eb --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.examples.hello; + +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.task.TaskConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StartTime; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The Driver code for the Hello REEF Application. + */ +@Unit +public final class HelloDriver { + + private static final Logger LOG = Logger.getLogger(HelloDriver.class.getName()); + + private final EvaluatorRequestor requestor; + + /** + * Job driver constructor - instantiated via TANG. + * + * @param requestor evaluator requestor object used to create new evaluator containers. + */ + @Inject + private HelloDriver(final EvaluatorRequestor requestor) { + this.requestor = requestor; + LOG.log(Level.FINE, "Instantiated 'HelloDriver'"); + } + + /** + * Handles the StartTime event: Request as single Evaluator. + */ + public final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + HelloDriver.this.requestor.newRequest() + .setNumber(1) + .setMemory(64) + .setNumberOfCores(1) + .submit(); + LOG.log(Level.INFO, "Requested Evaluator."); + } + } + + /** + * Handles AllocatedEvaluator: Submit the HelloTask. + */ + public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator allocatedEvaluator) { + LOG.log(Level.INFO, "Submitting HelloREEF task to AllocatedEvaluator: {0}", allocatedEvaluator); + final Configuration taskConfiguration = TaskConfiguration.CONF + .set(TaskConfiguration.IDENTIFIER, "HelloREEFTask") + .set(TaskConfiguration.TASK, HelloTask.class) + .build(); + allocatedEvaluator.submitTask(taskConfiguration); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java new file mode 100644 index 0000000..928828c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.examples.hello; + +import org.apache.reef.bridge.client.DriverClientConfiguration; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.bridge.service.DriverServiceLauncher; +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.util.ThreadLogger; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The Client for Hello REEF example. + */ +public final class HelloREEF { + + private static final Logger LOG = Logger.getLogger(HelloREEF.class.getName()); + + /** Configuration of the HelloREEF driver. */ + private static final Configuration DRIVER_CONFIG = + DriverClientConfiguration.CONF + .set(DriverClientConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class) + .set(DriverClientConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class) + .build(); + + /** + * Start Hello REEF job with local runtime. + * @param args command line parameters. + * @throws InjectionException configuration error. + */ + public static void main(final String[] args) throws InjectionException, IOException { + + final ClientProtocol.DriverClientConfiguration.Builder builder = + ClientProtocol.DriverClientConfiguration.newBuilder(); + builder.setJobid("HelloREEF"); + builder.setLocalRuntime(ClientProtocol.LocalRuntimeParameters.newBuilder() + .setMaxNumberOfEvaluators(1) + .build()); + builder.addHandler(ClientProtocol.DriverClientConfiguration.Handlers.START); + builder.addHandler(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_ALLOCATED); + builder.addGlobalLibraries(EnvironmentUtils.getClassLocation(HelloDriver.class)); + + final LauncherStatus status = + DriverServiceLauncher.submit(builder.build(), DRIVER_CONFIG); + + LOG.log(Level.INFO, "REEF job completed: {0}", status); + + ThreadLogger.logThreads(LOG, Level.FINE, "Threads running at the end of HelloREEF:"); + } + + /** Empty private constructor to prohibit instantiation of utility class. */ + private HelloREEF() { } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloTask.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloTask.java new file mode 100644 index 0000000..57a7367 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloTask.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.examples.hello; + +import org.apache.reef.task.Task; + +import javax.inject.Inject; + +/** + * A 'hello REEF' Task. + */ +public final class HelloTask implements Task { + + @Inject + private HelloTask() { + } + + @Override + public byte[] call(final byte[] memento) { + System.out.println("Hello, REEF!"); + return null; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/package-info.java new file mode 100644 index 0000000..320e720 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Java bridge Hello REEF. + */ +package org.apache.reef.bridge.examples.hello; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/package-info.java new file mode 100644 index 0000000..a39bd45 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Java bridge examples. + */ +package org.apache.reef.bridge.examples; http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java new file mode 100644 index 0000000..4f83fdb --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.service; + +/** + * An exception thrown by the driver client. + */ +public final class DriverClientException extends Exception { + + public DriverClientException(final String message) { + super(message); + } +}