ok, I understand. For 1) As a minimum you need to implement inferSchema and
buildReader. InferSchema must return the Schema of a row. For example, if
it contains one column of type String it returns:
StructType(collection.immutable.Seq(StructField("column1", StringType,
true))buildreader: here you find an article how to create a function1 in Java: http://www.codecommit.com/blog/java/interop-between-java-and-scala It returns basically a function that takes a file as input and returns the rows as output (Iterator). Btw. for better readability i would recommend Java8 Lambda functions. instead of Function 1 etc. this would look also much more similar to Scala, but is fully Java compliant. you can find an example in Scala here: https://github.com/ZuInnoTe/ spark-hadoopoffice-ds/blob/master/src/main/scala/org/ zuinnote/spark/office/excel/DefaultSource.scala It is a little bit more complex, because it returns for each row an array that contains element of a complex type (Excel cell) For 2) it is in fact similar. You have to create a class that inherits from Baserelation and implements TableScan. There you need to implement schema and buildScan. Then you return simply ;-) an object of this class. Here another example in Scala: https://github.com/ZuInnoTe/ spark-hadoopcryptoledger-ds/blob/master/src/main/scala/ org/zuinnote/spark/bitcoin/block/BitcoinBlockRelation.scala Sorry it is again a little bit more complex, because it returns Bitcoin blocks from the blockchain... I hope it helps as a start. Let me know if you have more questions. On Wed, Mar 22, 2017 at 9:35 PM, Jean Georges Perrin <[email protected]> wrote: > Thanks Jörn, > > I tried to super simplify my project so I can focus on the plumbing and I > will add the existing code & library later. So, as of now, the project will > not have a lot of meaning but will allow me to understand the job. > > my call is: > > String filename = "src/test/resources/simple.json"; > SparkSession spark = SparkSession.builder().appName("X-parse").master(" > local").getOrCreate(); > Dataset<Row> df = spark.read().format("x.CharCounterDataSource") > .option("char", "a") // count the number of 'a' > .load(filename); // local file (line 40 in the stacks below) > df.show(); > > Ideally, this should display something like: > > +--+ > | a| > +--+ > |45| > +--+ > > Things gets trickier when I try to work on x.CharCounterDataSource: > > I looked at 2 ways to do it: > > 1) one based on FileFormat: > > public class CharCounterDataSource implements FileFormat { > > @Override > public Function1<PartitionedFile, Iterator<InternalRow>> > buildReader(SparkSession arg0, StructType arg1, > StructType arg2, StructType arg3, Seq<Filter> arg4, Map<String, String> > arg5, Configuration arg6) { > // TODO Auto-generated method stub > return null; > } > > @Override > public Function1<PartitionedFile, Iterator<InternalRow>> > buildReaderWithPartitionValues(SparkSession arg0, > StructType arg1, StructType arg2, StructType arg3, Seq<Filter> arg4, > Map<String, String> arg5, > Configuration arg6) { > // TODO Auto-generated method stub > return null; > } > > @Override > public Option<StructType> inferSchema(SparkSession arg0, Map<String, > String> arg1, Seq<FileStatus> arg2) { > // TODO Auto-generated method stub > return null; > } > > @Override > public boolean isSplitable(SparkSession arg0, Map<String, String> arg1, > Path arg2) { > // TODO Auto-generated method stub > return false; > } > > @Override > public OutputWriterFactory prepareWrite(SparkSession arg0, Job arg1, > Map<String, String> arg2, StructType arg3) { > // TODO Auto-generated method stub > return null; > } > > @Override > public boolean supportBatch(SparkSession arg0, StructType arg1) { > // TODO Auto-generated method stub > return false; > } > } > > I know it is an empty class (generated by Eclipse) and I am not expecting > much out of it. > > Running it says: > > java.lang.NullPointerException > at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$ > sql$execution$datasources$DataSource$$getOrInferFileFormatSchema( > DataSource.scala:188) > at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation( > DataSource.scala:387) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135) > at x.spark.datasource.counter.CharCounterDataSourceTest.test( > CharCounterDataSourceTest.java:40) > > Nothing surprising... > > 2) One based on RelationProvider: > > public class CharCounterDataSource implements RelationProvider { > > @Override > public BaseRelation createRelation(SQLContext arg0, Map<String, String> > arg1) { > // TODO Auto-generated method stub > return null; > } > > } > > which fails too... > > java.lang.NullPointerException > at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>( > LogicalRelation.scala:40) > at org.apache.spark.sql.SparkSession.baseRelationToDataFrame( > SparkSession.scala:389) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135) > at x.CharCounterDataSourceTest.test(CharCounterDataSourceTest.java:40) > > > Don't get me wrong - I understand it fails - but what I need is "just one > hint" to continue building the glue ;-)... > > (Un)fortunately, we cannot use Scala... > > jg > > On Mar 22, 2017, at 4:00 PM, Jörn Franke <[email protected]> wrote: > > I think you can develop a Spark data source in Java, but you are right > most use for the glue Spark even if they have a Java library (this is what > I did for the project I open sourced). Coming back to your question, it is > a little bit difficult to assess the exact issue without the code. > You could also try to first have a very simple Scala data source that > works and then translate it to Java and do the test there. You could then > also post the code here without disclosing confidential stuff. > Or you try directly in Java a data source that returns always a row with > one column containing a String. I fear in any case you need to import some > Scala classes in Java and/or have some wrappers in Scala. > If you use fileformat that you need at least spark 2.0. > > On 22 Mar 2017, at 20:27, Jean Georges Perrin <[email protected]> wrote: > > > Hi, > > I am trying to build a custom file data source for Spark, in Java. I have > found numerous examples in Scala (including the CSV and XML data sources > from Databricks), but I cannot bring Scala in this project. We also already > have the parser itself written in Java, I just need to build the "glue" > between the parser and Spark. > > This is how I'd like to call it: > > String filename = "src/test/resources/simple.x"; > > SparkSession spark = > SparkSession.builder().appName("X-parse").master("local").getOrCreate(); > > Dataset<Row> df = spark.read().format("x.RandomDataSource") > .option("metadataTag", "schema") // hint to find schema > .option("dataTag", "data") // hint to find data > .load(filename); // local file > > So far, I tried is implement x.RandomDataSource: > > • Based on FileFormat, which makes the most sense, but I do not have a > clue on how to build buildReader()... > • Based on RelationProvider, but same here... > > It seems that in both case, the call is made to the right class, but I get > into NPE because I do not provide much. Any hint or example would be > greatly appreciated! > > Thanks > > jg > > >
