[jira] [Commented] (SPARK-27623) Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated

2019-07-04 Thread Paul Wais (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878896#comment-16878896
 ] 

Paul Wais commented on SPARK-27623:
---

I don't think this issue stems from a particular setup, but rather the public 
package seems broken.

See stack overflow: 

 * 
[https://stackoverflow.com/questions/55873023/built-in-spark-avro-unable-to-read-avro-file-from-shell]
 * [https://stackoverflow.com/questions/53715347/spark-reading-avro-file]


This is also broken for me in spark 2.4.3.  Repro:

docker run --rm -it au2018/env:v1.5.1-draft bash

# Add org.apache.spark:spark-avro_2.12:2.4.3 :

$ vim /opt/spark/conf/spark-defaults.conf

# Run demo
$ python3
Python 3.6.8 (default, Jan 14 2019, 11:02:34)
[GCC 8.0.1 20180414 (experimental) [trunk revision 259383]] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import findspark
>>> findspark.init()
>>> import pyspark
>>> import pyspark.sql
>>> spark = pyspark.sql.SparkSession.builder.getOrCreate()

...
databricks#spark-deep-learning added as a dependency
databricks#tensorframes added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency

:: resolving dependencies :: 
org.apache.spark#spark-submit-parent-4ba8057c-a59d-4005-ae5b-5ac5e3b9d91d;1.0
...
found org.spark-project.spark#unused;1.0.0 in central
downloading 
https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/2.4.3/spark-avro_2.12-2.4.3.jar
 ...
[SUCCESSFUL ] 
org.apache.spark#spark-avro_2.12;2.4.3!spark-avro_2.12.jar (107ms)
downloading 
https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar
 ...
[SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (32ms)
...
>>> df = 
>>> spark.read.format("avro").load("/opt/spark/examples/src/main/resources/users.avro")
Traceback (most recent call last):
  File "", line 1, in 
  File "/opt/spark/python/pyspark/sql/readwriter.py", line 166, in load
return self._df(self._jreader.load(path))
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 
1257, in __call__
  File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, 
in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o28.load.
: java.util.ServiceConfigurationError: 
org.apache.spark.sql.sources.DataSourceRegister: Provider 
org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at 
scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at 
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: 
org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V
at 
org.apache.spark.sql.avro.AvroFileFormat.(AvroFileFormat.scala:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 

[jira] [Comment Edited] (SPARK-10399) Off Heap Memory Access for non-JVM libraries (C++)

2015-09-14 Thread Paul Wais (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744634#comment-14744634
 ] 

Paul Wais edited comment on SPARK-10399 at 9/15/15 1:16 AM:


After investigating this issue a bit further, it might be feasible to expose 
*on-heap* Spark memory (without a copy) to native code through the 
Get/Release*Critical() JNI interface.  Android[1] uses this interface for 
copying on-heap data to devices (e.g. the GPU).  It's important to note that 
the interface is not necessarily zero-copy and will cause some JVMs to block GC 
(e.g. Hotspot [2])-- could lead to longer Spark GC pauses?  In any case, this 
feature might help expose the individual elements of an RDD to native code 
without any major changes to Spark (e.g. to the BlockManager).

Nevertheless, native code would ideally not run a JNI call per-item (e.g. per 
row) and instead could get access to a segment of rows or an entire partition.  
However, blocking the GC while processing an entire partition would probably 
not work well in practice...

[1] 
https://github.com/android/platform_frameworks_base/search?p=3=GetPrimitiveArrayCritical=%E2%9C%93
[2] 
https://github.com/openjdk-mirror/jdk7u-hotspot/blob/50bdefc3afe944ca74c3093e7448d6b889cd20d1/src/share/vm/prims/jni.cpp#L4235


was (Author: pwais):
After investigating this issue a bit further, it might be feasible to expose 
*on*-heap Spark memory (without a copy) to native code through the 
{Get,Release}*Critical() JNI interface.  Android[1] uses this interface for 
copying on-heap data to devices (e.g. the GPU).  It's important to note that 
the interface is not necessarily zero-copy and will cause some JVMs to block GC 
(e.g. Hotspot [2])-- could lead to longer Spark GC pauses?  In any case, this 
feature might help expose the individual elements of an RDD to native code 
without any major changes to Spark (e.g. to the BlockManager).

Nevertheless, native code would ideally not run a JNI call per-item (e.g. per 
row) and instead could get access to a segment of rows or an entire partition.  
However, blocking the GC while processing an entire partition would probably 
not work well in practice...

[1] 
https://github.com/android/platform_frameworks_base/search?p=3=GetPrimitiveArrayCritical=%E2%9C%93
[2] 
https://github.com/openjdk-mirror/jdk7u-hotspot/blob/50bdefc3afe944ca74c3093e7448d6b889cd20d1/src/share/vm/prims/jni.cpp#L4235

> Off Heap Memory Access for non-JVM libraries (C++)
> --
>
> Key: SPARK-10399
> URL: https://issues.apache.org/jira/browse/SPARK-10399
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Paul Weiss
>
> *Summary*
> Provide direct off-heap memory access to an external non-JVM program such as 
> a c++ library within the Spark running JVM/executor.  As Spark moves to 
> storing all data into off heap memory it makes sense to provide access points 
> to the memory for non-JVM programs.
> 
> *Assumptions*
> * Zero copies will be made during the call into non-JVM library
> * Access into non-JVM libraries will be accomplished via JNI
> * A generic JNI interface will be created so that developers will not need to 
> deal with the raw JNI call
> * C++ will be the initial target non-JVM use case
> * memory management will remain on the JVM/Spark side
> * the API from C++ will be similar to dataframes as much as feasible and NOT 
> require expert knowledge of JNI
> * Data organization and layout will support complex (multi-type, nested, 
> etc.) types
> 
> *Design*
> * Initially Spark JVM -> non-JVM will be supported 
> * Creating an embedded JVM with Spark running from a non-JVM program is 
> initially out of scope
> 
> *Technical*
> * GetDirectBufferAddress is the JNI call used to access byte buffer without 
> copy



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10399) Off Heap Memory Access for non-JVM libraries (C++)

2015-09-14 Thread Paul Wais (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744634#comment-14744634
 ] 

Paul Wais commented on SPARK-10399:
---

After investigating this issue a bit further, it might be feasible to expose 
*on*-heap Spark memory (without a copy) to native code through the 
{Get,Release}*Critical() JNI interface.  Android[1] uses this interface for 
copying on-heap data to devices (e.g. the GPU).  It's important to note that 
the interface is not necessarily zero-copy and will cause some JVMs to block GC 
(e.g. Hotspot [2])-- could lead to longer Spark GC pauses?  In any case, this 
feature might help expose the individual elements of an RDD to native code 
without any major changes to Spark (e.g. to the BlockManager).

Nevertheless, native code would ideally not run a JNI call per-item (e.g. per 
row) and instead could get access to a segment of rows or an entire partition.  
However, blocking the GC while processing an entire partition would probably 
not work well in practice...

[1] 
https://github.com/android/platform_frameworks_base/search?p=3=GetPrimitiveArrayCritical=%E2%9C%93
[2] 
https://github.com/openjdk-mirror/jdk7u-hotspot/blob/50bdefc3afe944ca74c3093e7448d6b889cd20d1/src/share/vm/prims/jni.cpp#L4235

> Off Heap Memory Access for non-JVM libraries (C++)
> --
>
> Key: SPARK-10399
> URL: https://issues.apache.org/jira/browse/SPARK-10399
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Paul Weiss
>
> *Summary*
> Provide direct off-heap memory access to an external non-JVM program such as 
> a c++ library within the Spark running JVM/executor.  As Spark moves to 
> storing all data into off heap memory it makes sense to provide access points 
> to the memory for non-JVM programs.
> 
> *Assumptions*
> * Zero copies will be made during the call into non-JVM library
> * Access into non-JVM libraries will be accomplished via JNI
> * A generic JNI interface will be created so that developers will not need to 
> deal with the raw JNI call
> * C++ will be the initial target non-JVM use case
> * memory management will remain on the JVM/Spark side
> * the API from C++ will be similar to dataframes as much as feasible and NOT 
> require expert knowledge of JNI
> * Data organization and layout will support complex (multi-type, nested, 
> etc.) types
> 
> *Design*
> * Initially Spark JVM -> non-JVM will be supported 
> * Creating an embedded JVM with Spark running from a non-JVM program is 
> initially out of scope
> 
> *Technical*
> * GetDirectBufferAddress is the JNI call used to access byte buffer without 
> copy



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10399) Off Heap Memory Access for non-JVM libraries (C++)

2015-09-06 Thread Paul Wais (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14733169#comment-14733169
 ] 

Paul Wais commented on SPARK-10399:
---

Image processing is a great use case.  I've deployed a JNA-based image 
processing Spark app on a cluster of ~200 cores and one of the pain points was 
memory management.  That solution copied images (via memcpy) since there was 
not time to implement a better solution.  Spark would have the JVM use 
essentially all available memory and would not account for native off-heap 
usage, so the native code would typically trigger an OOM after a while.  Tuning 
to curtail OOMs was hard.  Direct access to off-heap memory would have helped a 
ton here.

A similar use case is large-scale processing of text data (e.g. web pages, 
tweets, blog posts, etc).  java.lang.String is not very portable (noted below) 
and direct access to string buffers (especially if they're in proper UTF 
format) is very desirable.  Direct access to UTF-8 could also benefit Python 
support.

A major advantage of *in-process* native code (as opposed to, say, using 
`RDD.pipe()`) is that exceptions can get propagated, logged, and handled by 
Spark.  This feature alone IMO warrants the software cost of in-process native 
code.  Unfortunately, properly handling JNI-related exceptions and other 
nuances is tricky and a major pain.  I recommend Djinni, which helps a ton here 
(and is used in consumer mobile apps): https://bit.ly/djinnitalk   Furthermore, 
Djinni also recently added a type-marshaling feature that enables zero-copy 
type translation.  (The default type marshaling does deep copying). 

Some related issues:

 * Spark's BlockManager makes use of on-heap byte buffers for e.g. compression. 
 On-heap byte arrays are *not* necessarily zero-copy (the JVM is allowed to 
copy data in a JNI  `GetPrimitiveArrayCritical()` call; FMI see some discussion 
https://github.com/dropbox/djinni/issues/54 ).  A complete solution to this 
JIRA may necessitate some changes to Spark's core serializer API.  (In 
particular, it might be nice to have a code path that avoids any temporary 
on-heap buffers).

 * While Spark's Unsafe UTF-8 Strings are likely portable, java.lang.String is 
*not* particularly portable to C++: 
https://github.com/dropbox/djinni/blob/master/support-lib/jni/djinni_support.cpp#L431
  I've microbenchmarked that code and found it to be major overhead.  A 
solution to this JIRA might need some subtle API changes to encourage/help 
users avoid Java Strings.

 * Shipping and running a native library on a cluster is tricky.  Containers / 
virtualization (e.g. Docker) can help ensure the availability of dependencies, 
but sometimes those technologies aren't available.  One can compile all 
dependences (i.e. including libc++) into a single dynamic library, but that 
takes some special build set-up.  On-executor, dynamic code compilation (e.g. 
through Cling https://root.cern.ch/cling ) would be desirable but is probably 
beyond the scope of this JIRA.  I'm hoping to contribute a change to Djinni 
soon ( 
https://github.com/dropbox/djinni/compare/master...pwais:pwais_linux_build ) 
that will address the common use case where one simply wants to ship and run 
(on Spark) an app jar that contains a native library (and use system 
dependencies).



Are there any followers of this JIRA who have specific API requests?  My take 
on this issue is that there are a few main components:
  * Ensuring the accessibility of UnsafeRow to user code (which would then 
invoke native code).  (It's not clear to me that this is already part of Spark 
1.5; DataFrames simply interop with Row).  
  * Creating a byte buffer 'view' that's similar to UTF8String for buffer row 
attributes.  `UnSafeRow.getBytes()` currently deep-copies (into an on-heap 
array) and we'd want a 'view' of the bytes instead.
  * Define and implement core type mappers.  E.g. Spark UTF8String <-> 
std::string.  It might be nice for "Spark C++" types to be simple arrays (e.g. 
(pointer, length, nullable deleter)) with adapters to standard types (e.g. 
std::string and std::vector).  The deleter part is important if native code can 
be allowed to consume (and gain ownership) of data; a full solution needs a 
'move' API component.

With those pieces in place (and especially if any "Spark C++ support code" is 
header-only), it wouldn't be too hard for users to build & package Spark jars 
w/ native libs as they please. As mentioned above, I'd recommend Djinni as a 
facilitator to this project (and as a facilitator to users who want to write & 
deploy native libs).

There are some other misc issues:
 * Is Unsafe memory always aligned? If not, how can we flag this to native code?
 * As mentioned above, can we modify BlockManager to have a path that skips any 
on-heap buffers?
 * If native code *does* need to use substantial memory, how can it communicate 
that need to Spark?