Re: Custom Spark data source in Java

2017-03-22 Thread Jörn Franke
ok, I understand. For 1) As a minimum you need to implement inferSchema and
buildReader. InferSchema must return the Schema of a row. For example, if
it contains one column of type String it returns:
StructType(collection.immutable.Seq(StructField("column1", StringType,
true))

buildreader: here you find an article how to create a function1 in Java:
http://www.codecommit.com/blog/java/interop-between-java-and-scala

It returns basically a function that takes a file as input and returns the
rows as output (Iterator).  Btw. for better readability i would recommend
Java8 Lambda functions. instead of Function 1 etc. this would look also
much more similar to Scala, but is fully Java compliant.

you can find an example in Scala here: https://github.com/ZuInnoTe/
spark-hadoopoffice-ds/blob/master/src/main/scala/org/
zuinnote/spark/office/excel/DefaultSource.scala
It is a little bit more complex, because it returns for each row an array
that contains element of a  complex type (Excel cell)

For 2) it is in fact similar. You have to create a class that inherits from
Baserelation and implements TableScan. There you need to implement schema
and buildScan. Then you return simply ;-) an object of this class.

Here another example in Scala: https://github.com/ZuInnoTe/
spark-hadoopcryptoledger-ds/blob/master/src/main/scala/
org/zuinnote/spark/bitcoin/block/BitcoinBlockRelation.scala

Sorry it is again a little bit more complex, because it returns Bitcoin
blocks from the blockchain...

I hope it helps as a start. Let me know if you have more questions.

On Wed, Mar 22, 2017 at 9:35 PM, Jean Georges Perrin  wrote:

> Thanks Jörn,
>
> I tried to super simplify my project so I can focus on the plumbing and I
> will add the existing code & library later. So, as of now, the project will
> not have a lot of meaning but will allow me to understand the job.
>
> my call is:
>
> String filename = "src/test/resources/simple.json";
> SparkSession spark = SparkSession.builder().appName("X-parse").master("
> local").getOrCreate();
> Dataset df = spark.read().format("x.CharCounterDataSource")
> .option("char", "a") // count the number of 'a'
> .load(filename); // local file (line 40 in the stacks below)
> df.show();
>
> Ideally, this should display something like:
>
> +--+
> | a|
> +--+
> |45|
> +--+
>
> Things gets trickier when I try to work on x.CharCounterDataSource:
>
> I looked at 2 ways to do it:
>
> 1) one based on FileFormat:
>
> public class CharCounterDataSource implements FileFormat {
>
> @Override
> public Function1
> buildReader(SparkSession arg0, StructType arg1,
> StructType arg2, StructType arg3, Seq arg4, Map
> arg5, Configuration arg6) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public Function1
> buildReaderWithPartitionValues(SparkSession arg0,
> StructType arg1, StructType arg2, StructType arg3, Seq arg4,
> Map arg5,
> Configuration arg6) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public Option inferSchema(SparkSession arg0, Map String> arg1, Seq arg2) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public boolean isSplitable(SparkSession arg0, Map arg1,
> Path arg2) {
> // TODO Auto-generated method stub
> return false;
> }
>
> @Override
> public OutputWriterFactory prepareWrite(SparkSession arg0, Job arg1,
> Map arg2, StructType arg3) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public boolean supportBatch(SparkSession arg0, StructType arg1) {
> // TODO Auto-generated method stub
> return false;
> }
> }
>
> I know it is an empty class (generated by Eclipse) and I am not expecting
> much out of it.
>
> Running it says:
>
> java.lang.NullPointerException
> at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$
> sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(
> DataSource.scala:188)
> at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(
> DataSource.scala:387)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
> at x.spark.datasource.counter.CharCounterDataSourceTest.test(
> CharCounterDataSourceTest.java:40)
>
> Nothing surprising...
>
> 2) One based on RelationProvider:
>
> public class CharCounterDataSource implements RelationProvider {
>
> @Override
> public BaseRelation createRelation(SQLContext arg0, Map
> arg1) {
> // TODO Auto-generated method stub
> return null;
> }
>
> }
>
> which fails too...
>
> java.lang.NullPointerException
> at org.apache.spark.sql.execution.datasources.LogicalRelation.(
> LogicalRelation.scala:40)
> at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(
> SparkSession.scala:389)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)

Re: Custom Spark data source in Java

2017-03-22 Thread Jean Georges Perrin
Thanks Jörn,

I tried to super simplify my project so I can focus on the plumbing and I will 
add the existing code & library later. So, as of now, the project will not have 
a lot of meaning but will allow me to understand the job.

my call is:

String filename = "src/test/resources/simple.json";
SparkSession spark = 
SparkSession.builder().appName("X-parse").master("local").getOrCreate();
Dataset df = spark.read().format("x.CharCounterDataSource")
.option("char", "a") // count the number of 'a'
.load(filename); // local file (line 40 in the stacks below)
df.show();

Ideally, this should display something like:

+--+
| a|
+--+
|45|
+--+

Things gets trickier when I try to work on x.CharCounterDataSource:

I looked at 2 ways to do it:

1) one based on FileFormat:

public class CharCounterDataSource implements FileFormat {

@Override
public Function1 
buildReader(SparkSession arg0, StructType arg1,
StructType arg2, StructType arg3, Seq arg4, 
Map arg5, Configuration arg6) {
// TODO Auto-generated method stub
return null;
}

@Override
public Function1 
buildReaderWithPartitionValues(SparkSession arg0,
StructType arg1, StructType arg2, StructType arg3, 
Seq arg4, Map arg5,
Configuration arg6) {
// TODO Auto-generated method stub
return null;
}

@Override
public Option inferSchema(SparkSession arg0, Map arg1, Seq arg2) {
// TODO Auto-generated method stub
return null;
}

@Override
public boolean isSplitable(SparkSession arg0, Map arg1, 
Path arg2) {
// TODO Auto-generated method stub
return false;
}

@Override
public OutputWriterFactory prepareWrite(SparkSession arg0, Job arg1, 
Map arg2, StructType arg3) {
// TODO Auto-generated method stub
return null;
}

@Override
public boolean supportBatch(SparkSession arg0, StructType arg1) {
// TODO Auto-generated method stub
return false;
}
}

I know it is an empty class (generated by Eclipse) and I am not expecting much 
out of it.

Running it says:

java.lang.NullPointerException
at 
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
at 
x.spark.datasource.counter.CharCounterDataSourceTest.test(CharCounterDataSourceTest.java:40)

Nothing surprising...

2) One based on RelationProvider:

public class CharCounterDataSource implements RelationProvider {

@Override
public BaseRelation createRelation(SQLContext arg0, Map 
arg1) {
// TODO Auto-generated method stub
return null;
}

}

which fails too...

java.lang.NullPointerException
at 
org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:40)
at 
org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:389)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
at x.CharCounterDataSourceTest.test(CharCounterDataSourceTest.java:40)


Don't get me wrong - I understand it fails - but what I need is "just one hint" 
to continue building the glue ;-)...

(Un)fortunately, we cannot use Scala...

jg

> On Mar 22, 2017, at 4:00 PM, Jörn Franke  wrote:
> 
> I think you can develop a Spark data source in Java, but you are right most 
> use for the glue Spark even if they have a Java library (this is what I did 
> for the project I open sourced). Coming back to your question, it is a little 
> bit difficult to assess the exact issue without the code.
> You could also try to first have a very simple Scala data source that works 
> and then translate it to Java and do the test there. You could then also post 
> the code here without disclosing confidential stuff.
> Or you try directly in Java a data source that returns always a row with one 
> column containing a String. I fear in any case you need to import some Scala 
> classes in Java and/or have some wrappers in Scala.
> If you use fileformat that you need at least spark 2.0. 
> 
> On 22 Mar 2017, at 20:27, Jean Georges Perrin  > wrote:
> 
>> 
>> 

Re: Custom Spark data source in Java

2017-03-22 Thread Jörn Franke
I think you can develop a Spark data source in Java, but you are right most use 
for the glue Spark even if they have a Java library (this is what I did for the 
project I open sourced). Coming back to your question, it is a little bit 
difficult to assess the exact issue without the code.
You could also try to first have a very simple Scala data source that works and 
then translate it to Java and do the test there. You could then also post the 
code here without disclosing confidential stuff.
Or you try directly in Java a data source that returns always a row with one 
column containing a String. I fear in any case you need to import some Scala 
classes in Java and/or have some wrappers in Scala.
If you use fileformat that you need at least spark 2.0. 

> On 22 Mar 2017, at 20:27, Jean Georges Perrin  wrote:
> 
> 
> Hi,
> 
> I am trying to build a custom file data source for Spark, in Java. I have 
> found numerous examples in Scala (including the CSV and XML data sources from 
> Databricks), but I cannot bring Scala in this project. We also already have 
> the parser itself written in Java, I just need to build the "glue" between 
> the parser and Spark.
> 
> This is how I'd like to call it:
> 
> String filename = "src/test/resources/simple.x";
> 
> SparkSession spark = 
> SparkSession.builder().appName("X-parse").master("local").getOrCreate();
> 
> Dataset df = spark.read().format("x.RandomDataSource")
> .option("metadataTag", "schema") // hint to find schema
> .option("dataTag", "data") // hint to find data
> .load(filename); // local file
> So far, I tried is implement x.RandomDataSource:
> 
>   • Based on FileFormat, which makes the most sense, but I do not have a 
> clue on how to build buildReader()...
>   • Based on RelationProvider, but same here...
> 
> It seems that in both case, the call is made to the right class, but I get 
> into NPE because I do not provide much. Any hint or example would be greatly 
> appreciated!
> 
> Thanks
> 
> jg


Custom Spark data source in Java

2017-03-22 Thread Jean Georges Perrin

Hi,

I am trying to build a custom file data source for Spark, in Java. I have found 
numerous examples in Scala (including the CSV and XML data sources from 
Databricks), but I cannot bring Scala in this project. We also already have the 
parser itself written in Java, I just need to build the "glue" between the 
parser and Spark.

This is how I'd like to call it:

String filename = "src/test/resources/simple.x";

SparkSession spark = 
SparkSession.builder().appName("X-parse").master("local").getOrCreate();

Dataset df = spark.read().format("x.RandomDataSource")
.option("metadataTag", "schema") // hint to find schema
.option("dataTag", "data") // hint to find data
.load(filename); // local file
So far, I tried is implement x.RandomDataSource:

• Based on FileFormat, which makes the most sense, but I do not have a 
clue on how to build buildReader()...
• Based on RelationProvider, but same here...

It seems that in both case, the call is made to the right class, but I get into 
NPE because I do not provide much. Any hint or example would be greatly 
appreciated!

Thanks

jg