[
https://issues.apache.org/jira/browse/PIO-192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714683#comment-16714683
]
ASF GitHub Bot commented on PIO-192:
takezoe closed pull request #494: [PIO-192] Enhance PySpark support
URL: https://github.com/apache/predictionio/pull/494
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/bin/pio-shell b/bin/pio-shell
index cd119cdee..e23041ac8 100755
--- a/bin/pio-shell
+++ b/bin/pio-shell
@@ -65,7 +65,6 @@ then
# Get paths of assembly jars to pass to pyspark
. ${PIO_HOME}/bin/compute-classpath.sh
shift
- export PYTHONSTARTUP=${PIO_HOME}/python/pypio/shell.py
export PYTHONPATH=${PIO_HOME}/python
${SPARK_HOME}/bin/pyspark --jars ${ASSEMBLY_JARS} $@
else
diff --git a/build.sbt b/build.sbt
index 2a6204215..fc1fcf255 100644
--- a/build.sbt
+++ b/build.sbt
@@ -151,19 +151,19 @@ val core = (project in file("core")).
enablePlugins(SbtTwirl).
disablePlugins(sbtassembly.AssemblyPlugin)
-val tools = (project in file("tools")).
+val e2 = (project in file("e2")).
dependsOn(core).
- dependsOn(data).
settings(commonSettings: _*).
- settings(commonTestSettings: _*).
- settings(skip in publish := true).
enablePlugins(GenJavadocPlugin).
- enablePlugins(SbtTwirl)
+ disablePlugins(sbtassembly.AssemblyPlugin)
-val e2 = (project in file("e2")).
+val tools = (project in file("tools")).
+ dependsOn(e2).
settings(commonSettings: _*).
+ settings(commonTestSettings: _*).
+ settings(skip in publish := true).
enablePlugins(GenJavadocPlugin).
- disablePlugins(sbtassembly.AssemblyPlugin)
+ enablePlugins(SbtTwirl)
val dataEs = if (majorVersion(es) == 1) dataElasticsearch1 else
dataElasticsearch
diff --git
a/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
b/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
index cfc83eb1d..011cd95c9 100644
---
a/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
+++
b/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala
@@ -55,9 +55,10 @@ object EngineServerPluginContext extends Logging {
EngineServerPlugin.outputSniffer -> mutable.Map())
val pluginParams = mutable.Map[String, JValue]()
val serviceLoader = ServiceLoader.load(classOf[EngineServerPlugin])
-val variantJson = parse(stringFromFile(engineVariant))
-(variantJson \ "plugins").extractOpt[JObject].foreach { pluginDefs =>
- pluginDefs.obj.foreach { pluginParams += _ }
+stringFromFile(engineVariant).foreach { variantJson =>
+ (parse(variantJson) \ "plugins").extractOpt[JObject].foreach {
pluginDefs =>
+pluginDefs.obj.foreach { pluginParams += _ }
+ }
}
serviceLoader foreach { service =>
pluginParams.get(service.pluginName) map { params =>
@@ -77,11 +78,15 @@ object EngineServerPluginContext extends Logging {
log)
}
- private def stringFromFile(filePath: String): String = {
+ private def stringFromFile(filePath: String): Option[String] = {
try {
- val uri = new URI(filePath)
- val fs = FileSystem.get(uri, new Configuration())
- new String(ByteStreams.toByteArray(fs.open(new Path(uri))).map(_.toChar))
+ val fs = FileSystem.get(new Configuration())
+ val path = new Path(new URI(filePath))
+ if (fs.exists(path)) {
+Some(new String(ByteStreams.toByteArray(fs.open(path)).map(_.toChar)))
+ } else {
+None
+ }
} catch {
case e: java.io.IOException =>
error(s"Error reading from file: ${e.getMessage}. Aborting.")
diff --git
a/core/src/main/scala/org/apache/predictionio/workflow/JsonExtractor.scala
b/core/src/main/scala/org/apache/predictionio/workflow/JsonExtractor.scala
index cb71f14e4..3aafe67e0 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/JsonExtractor.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/JsonExtractor.scala
@@ -32,7 +32,6 @@ import org.json4s.native.JsonMethods.compact
import org.json4s.native.JsonMethods.pretty
import org.json4s.native.JsonMethods.parse
import org.json4s.native.JsonMethods.render
-import org.json4s.reflect.TypeInfo
object JsonExtractor {
@@ -144,7 +143,13 @@ object JsonExtractor {
formats: Formats,
clazz: Class[T]): T = {
-Extraction.extract(parse(json), TypeInfo(clazz,
None))(formats).asInstanceOf[T]
+implicit val f = formats
+implicit val m = if (clazz == classOf[Map[_, _]]) {
+ Manifest.classType(clazz, manifest[String], manifest[Any])
+} else {
+ Manifest.classType(clazz)
+}
+Extraction.extract(parse(json))