Pre query execution hook for custom datasources

2020-09-18 Thread Shubham Chaurasia
Hi,

In our custom datasource implementation, we want to inject some query level
information.

For example -

scala> val df = spark.sql("some query")   // uses custom datasource under
the hood through Session Extensions.

scala> df.count //  here we want some kind of pre execution hook just
before the query starts it's execution

Is there a hook or some kind of callback that we can implement to achieve
this?

Or similar to org.apache.spark.sql.util.QueryExecutionListener which
provides callbacks for onSuccess and onFailure when query finishes, we want
something like "*beforeStart()*".

Any ideas on how to implement this?

Thanks,
Shubham


Incorrect results in left_outer join in DSv2 implementation with filter pushdown - spark 2.3.2

2019-09-19 Thread Shubham Chaurasia
Hi,

Consider the following statements:

1)
> scala> val df = spark.read.format("com.shubham.MyDataSource").load
> scala> df.show
> +---+---+
> |  i|  j|
> +---+---+
> |  0|  0|
> |  1| -1|
> |  2| -2|
> |  3| -3|
> |  4| -4|
> +---+---+
> 2)
> scala> val df1 = df.filter("i < 3")
> scala> df1.show
> +---+---+
> |  i|  j|
> +---+---+
> |  0|  0|
> |  1| -1|
> |  2| -2|
> +---+---+
> 3)
> scala> df.join(df1, Seq("i"), "left_outer").show
> +---+---+---+
> |  i|  j|  j|
> +---+---+---+
> |  1| -1| -1|
> |  2| -2| -2|
> |  0|  0|  0|
> +---+---+---+


3) is not producing the right results for left_outer join.

Here is the minimal code.

---

public class MyDataSourceReader implements DataSourceReader,
SupportsPushDownFilters {

  private Filter[] pushedFilters = new Filter[0];
  private boolean hasFilters = false;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
  }

  @Override
  public StructType readSchema() {
return (new StructType())
.add("i", "int")
.add("j", "int");
  }

  @Override
  public Filter[] pushFilters(Filter[] filters) {
System.out.println("MyDataSourceReader.pushFilters: " +
Arrays.toString(filters));
hasFilters = true;
pushedFilters = filters;
// filter's that can't be pushed down.
return new Filter[0];
  }

  @Override
  public Filter[] pushedFilters() {
return pushedFilters;
  }

  @Override
  public List> createDataReaderFactories() {

System.out.println("===MyDataSourceReader.createBatchDataReaderFactories===");
int ltFilter = Integer.MAX_VALUE;
if (hasFilters) {
  ltFilter = getLTFilter("i");
}
hasFilters = false;
return Lists.newArrayList(new SimpleDataReaderFactory(0, 5, ltFilter));
  }

  private int getLTFilter(String attributeName) {
int filterValue = Integer.MAX_VALUE;
for (Filter pushedFilter : pushedFilters) {
  if (pushedFilter instanceof LessThan) {
LessThan lt = (LessThan) pushedFilter;
if (lt.attribute().equals(attributeName)) {
  filterValue = (int) lt.value();
}
  }
}
return filterValue;
  }

}



public class SimpleDataReaderFactory implements DataReaderFactory {

  private final int start;
  private final int end;
  private int current;
  private final int iLTFilter;

  public SimpleDataReaderFactory(int start, int end, int iLTFilter) {
this.start = start;
this.end = end;
this.iLTFilter = iLTFilter;
  }

  @Override
  public DataReader createDataReader() {
return new SimpleDataReader(start, end, iLTFilter);
  }

  public static class SimpleDataReader implements DataReader {
private final int start;
private final int end;
private int current;
private int iLTFilter;

public SimpleDataReader(int start, int end, int iLTFilter) {
  this.start = start;
  this.end = end;
  this.current = start - 1;
  this.iLTFilter = iLTFilter;
}
@Override
public boolean next() {
  current++;
  return current < end && current < iLTFilter ;
}
@Override
public Row get() {
  return new GenericRow(new Object[]{current, -current});
}
@Override
public void close() {
}
  }
}



It seems that somehow spark is applying filter (i < 3) after left_join
operation too because of which we see incorrect results in 3).
However I don't see any filter node after join in plan.

== Physical Plan ==
> *(5) Project [i#136, j#137, j#228]
> +- SortMergeJoin [i#136], [i#227], LeftOuter
>:- *(2) Sort [i#136 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(i#136, 200)
>: +- *(1) DataSourceV2Scan [i#136, j#137],
> com.shubham.reader.MyDataSourceReader@714bd7ad
>+- *(4) Sort [i#227 ASC NULLS FIRST], false, 0
>   +- ReusedExchange [i#227, j#228], Exchange hashpartitioning(i#136,
> 200)


Any ideas what might be going wrong?

Thanks,
Shubham


DataSourceV2: pushFilters() is not invoked for each read call - spark 2.3.2

2019-09-06 Thread Shubham Chaurasia
Hi,

I am using spark v2.3.2. I have an implementation of DSV2. Here is what is
happening:

1) Obtained a dataframe using MyDataSource

scala> val df1 = spark.read.format("com.shubham.MyDataSource").load
> MyDataSource.MyDataSource
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader:
> Instantiatedcom.shubham.reader.MyDataSourceReader@2b85edc7
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> df1: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more field]
>

2) show() on df1

> scala> df1.show
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pruneColumns:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> ===MyDataSourceReader.createBatchDataReaderFactories===
> prunedSchema = StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> pushedFilters = []
> ===MyDataSourceReader.createBatchDataReaderFactories===
> +---+---+---+
> | c1| c2| c3|
> +---+---+---+
> +---+---+---+
>

3) val df2 = df1.filter($"c3" > 1)

>
> scala> df2.show
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushedFilters: []
> MyDataSourceReader.pushFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pruneColumns:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> ===MyDataSourceReader.createBatchDataReaderFactories===
> prunedSchema = StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]
> ===MyDataSourceReader.createBatchDataReaderFactories===
> +---+---+---+
> | c1| c2| c3|
> +---+---+---+
> +---+---+---+


4) Again df1.show() <=== As df2 is derived from df1(and share same instance
of MyDataSourceReader), this modifies pushedFilters even for df1

> scala> df1.show
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pruneColumns:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema:
> StructType(StructField(c1,IntegerType,true),
> StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
> 

Re: Clean up method for DataSourceReader

2019-06-12 Thread Shubham Chaurasia
FYI, I am already using QueryExecutionListener which satisfies the
requirements.

But that only works for dataframe APIs. If someone does
df.rdd().someAction(), QueryExecutionListener is never invoked. I want
something like QueryExecutionListener works in case of
df.rdd().someAction() too.
I explored SparkListener#onJobEnd, but then how to propagate some state
from DataSourceReader to SparkListener?

On Wed, Jun 12, 2019 at 2:22 PM Shubham Chaurasia 
wrote:

> Hi All,
>
> Is there any way to receive some event that a DataSourceReader is
> finished?
> I want to do some clean up after all the DataReaders are finished reading
> and hence need some kind of cleanUp() mechanism at DataSourceReader(Driver)
> level.
>
> How to achieve this?
>
> For instance, in DataSourceWriter we can rely on commit() and abort()
> methods to know that all the DataWriters are finished.
>
> Thanks,
> Shubham
>


Clean up method for DataSourceReader

2019-06-12 Thread Shubham Chaurasia
Hi All,

Is there any way to receive some event that a DataSourceReader is finished?
I want to do some clean up after all the DataReaders are finished reading
and hence need some kind of cleanUp() mechanism at DataSourceReader(Driver)
level.

How to achieve this?

For instance, in DataSourceWriter we can rely on commit() and abort()
methods to know that all the DataWriters are finished.

Thanks,
Shubham


Re: Static partitioning in partitionBy()

2019-05-08 Thread Shubham Chaurasia
Thanks

On Wed, May 8, 2019 at 10:36 AM Felix Cheung 
wrote:

> You could
>
> df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save
>
> It could get some data skew problem but might work for you
>
>
>
> --
> *From:* Burak Yavuz 
> *Sent:* Tuesday, May 7, 2019 9:35:10 AM
> *To:* Shubham Chaurasia
> *Cc:* dev; user@spark.apache.org
> *Subject:* Re: Static partitioning in partitionBy()
>
> It depends on the data source. Delta Lake (https://delta.io) allows you
> to do it with the .option("replaceWhere", "c = c1"). With other file
> formats, you can write directly into the partition directory
> (tablePath/c=c1), but you lose atomicity.
>
> On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
> wrote:
>
>> Hi All,
>>
>> Is there a way I can provide static partitions in partitionBy()?
>>
>> Like:
>> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>>
>> Above code gives following error as it tries to find column `c=c1` in df.
>>
>> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
>> in schema struct;
>>
>> Thanks,
>> Shubham
>>
>


Static partitioning in partitionBy()

2019-05-07 Thread Shubham Chaurasia
Hi All,

Is there a way I can provide static partitions in partitionBy()?

Like:
df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save

Above code gives following error as it tries to find column `c=c1` in df.

org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
in schema struct;

Thanks,
Shubham


Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Shubham Chaurasia
Writing:
scala> df.write.orc("")

For looking into contents, I used orc-tools-X.Y.Z-uber.jar (
https://orc.apache.org/docs/java-tools.html)

On Wed, Apr 24, 2019 at 6:24 PM Wenchen Fan  wrote:

> How did you read/write the timestamp value from/to ORC file?
>
> On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia <
> shubh.chaura...@gmail.com> wrote:
>
>> Hi All,
>>
>> Consider the following(spark v2.4.0):
>>
>> Basically I change values of `spark.sql.session.timeZone` and perform an
>> orc write. Here are 3 samples:-
>>
>> 1)
>> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
>>
>> scala> val df = sc.parallelize(Seq("2019-04-23
>> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
>> df: org.apache.spark.sql.DataFrame = [ts: timestamp]
>>
>> df.show() Output  ORC File Contents
>> -
>> 2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}
>>
>> 2)
>> scala> spark.conf.set("spark.sql.session.timeZone", "UTC")
>>
>> df.show() Output  ORC File Contents
>> -
>> 2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>
>> 3)
>> scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>
>> df.show() Output  ORC File Contents
>> -
>> 2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>
>> It can be seen that in all the three cases it stores {"ts":"2019-04-23
>> 09:15:04.0"} in orc file. I understand that orc file also contains writer
>> timezone with respect to which spark is able to convert back to actual time
>> when it reads orc.(and that is equal to df.show())
>>
>> But it's problematic in the sense that it is not adjusting(plus/minus)
>> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
>> 09:15:04.0"} in ORC file. I mean loading data to any system other than
>> spark would be a problem.
>>
>> Any ideas/suggestions on that?
>>
>> PS: For csv files, it stores exactly what we see as the output of
>> df.show()
>>
>> Thanks,
>> Shubham
>>
>>


DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Shubham Chaurasia
Hi All,

Consider the following(spark v2.4.0):

Basically I change values of `spark.sql.session.timeZone` and perform an
orc write. Here are 3 samples:-

1)
scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")

scala> val df = sc.parallelize(Seq("2019-04-23
09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

df.show() Output  ORC File Contents
-
2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}

2)
scala> spark.conf.set("spark.sql.session.timeZone", "UTC")

df.show() Output  ORC File Contents
-
2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}

3)
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")

df.show() Output  ORC File Contents
-
2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}

It can be seen that in all the three cases it stores {"ts":"2019-04-23
09:15:04.0"} in orc file. I understand that orc file also contains writer
timezone with respect to which spark is able to convert back to actual time
when it reads orc.(and that is equal to df.show())

But it's problematic in the sense that it is not adjusting(plus/minus)
timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
09:15:04.0"} in ORC file. I mean loading data to any system other than
spark would be a problem.

Any ideas/suggestions on that?

PS: For csv files, it stores exactly what we see as the output of df.show()

Thanks,
Shubham


Re: DataSourceV2 producing wrong date value in Custom Data Writer

2019-02-06 Thread Shubham Chaurasia
Thanks Ryan

On Tue, Feb 5, 2019 at 10:28 PM Ryan Blue  wrote:

> Shubham,
>
> DataSourceV2 passes Spark's internal representation to your source and
> expects Spark's internal representation back from the source. That's why
> you consume and produce InternalRow: "internal" indicates that Spark
> doesn't need to convert the values.
>
> Spark's internal representation for a date is the ordinal from the unix
> epoch date, 1970-01-01 = 0.
>
> rb
>
> On Tue, Feb 5, 2019 at 4:46 AM Shubham Chaurasia <
> shubh.chaura...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am using custom DataSourceV2 implementation (*Spark version 2.3.2*)
>>
>> Here is how I am trying to pass in *date type *from spark shell.
>>
>> scala> val df =
>>> sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype",
>>> col("datetype").cast("date"))
>>> scala> df.write.format("com.shubham.MyDataSource").save
>>
>>
>> Below is the minimal write() method of my DataWriter implementation.
>>
>> @Override
>> public void write(InternalRow record) throws IOException {
>>   ByteArrayOutputStream format = streamingRecordFormatter.format(record);
>>   System.out.println("MyDataWriter.write: " + record.get(0, 
>> DataTypes.DateType));
>>
>> }
>>
>> It prints an integer as output:
>>
>> MyDataWriter.write: 17039
>>
>>
>> Is this a bug?  or I am doing something wrong?
>>
>> Thanks,
>> Shubham
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


DataSourceV2 producing wrong date value in Custom Data Writer

2019-02-05 Thread Shubham Chaurasia
Hi All,

I am using custom DataSourceV2 implementation (*Spark version 2.3.2*)

Here is how I am trying to pass in *date type *from spark shell.

scala> val df =
> sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype",
> col("datetype").cast("date"))
> scala> df.write.format("com.shubham.MyDataSource").save


Below is the minimal write() method of my DataWriter implementation.

@Override
public void write(InternalRow record) throws IOException {
  ByteArrayOutputStream format = streamingRecordFormatter.format(record);
  System.out.println("MyDataWriter.write: " + record.get(0,
DataTypes.DateType));

}

It prints an integer as output:

MyDataWriter.write: 17039


Is this a bug?  or I am doing something wrong?

Thanks,
Shubham


Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Shubham Chaurasia
Alright, so it is a big project which uses a SQL store underneath.
I extracted out the minimal code and made a smaller project out of it and
still it is creating multiple instances.

Here is my project:

├── my-datasource.iml
├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │   └── shubham
│   │   │   ├── MyDataSource.java
│   │   │   └── reader
│   │   │   └── MyDataSourceReader.java


MyDataSource.java
-

package com.shubham;

import com.shubham.reader.MyDataSourceReader;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;

import java.util.Optional;

public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {

  public DataSourceReader createReader(DataSourceOptions options) {
System.out.println("MyDataSource.createReader: Going to create a
new MyDataSourceReader");
return new MyDataSourceReader(options.asMap());
  }

  public Optional createWriter(String writeUUID,
StructType schema, SaveMode mode, DataSourceOptions options) {
return Optional.empty();
  }
}


MyDataSourceReader.java
-

package com.shubham.reader;

import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  private Map options;
  private StructType schema;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
this.options = options;
  }

  @Override
  public StructType readSchema() {
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + "
schema: " + this.schema);
return this.schema;
  }

  @Override
  public List> planBatchInputPartitions() {
System.out.println("MyDataSourceReader.planBatchInputPartitions: "
+ this + " schema: " + this.schema);
return new ArrayList<>();
  }
}



spark-shell output

scala> spark.read.format("com.shubham.MyDataSource").option("query",
"select * from some_table").load.show

MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiatedcom.shubham.reader.MyDataSourceReader@69fa5536
MyDataSourceReader.readSchema:
com.shubham.reader.MyDataSourceReader@69fa5536 schema:
StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiatedcom.shubham.reader.MyDataSourceReader@3095c449
MyDataSourceReader.planBatchInputPartitions:
com.shubham.reader.MyDataSourceReader@3095c449 schema: null
+++
|col1|col2|
+++
+++


Here 2 instances of reader, MyDataSourceReader@69fa5536 and
MyDataSourceReader@3095c449 are being created. Consequently schema is null
in MyDataSourceReader@3095c449.

Am I not doing it the correct way?

Thanks,
Shubham

On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf 
wrote:

> I am using v2.4.0-RC2
>
>
>
> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null).
> How are you calling it?
>
>
>
> When I do:
>
> Val df = spark.read.format(mypackage).load().show()
>
> I am getting a single creation, how are you creating the reader?
>
>
>
> Thanks,
>
> Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
> *Sent:* Tuesday, October 9, 2018 2:02 PM
> *To:* Mendelson, Assaf; user@spark.apache.org
> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>
>
>
> Full Code:
>
>
>
> MyDataSource is the entry point w

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Shubham Chaurasia
Thanks Assaf, you tried with *tags/v2.4.0-rc2?*

Full Code:

MyDataSource is the entry point which simply creates Reader and Writer

public class MyDataSource implements DataSourceV2, WriteSupport,
ReadSupport, SessionConfigSupport {

  @Override public DataSourceReader createReader(DataSourceOptions options)
{
return new MyDataSourceReader(options.asMap());
  }

  @Override
  public Optional createWriter(String jobId, StructType
schema,
  SaveMode mode, DataSourceOptions options) {
// creates a dataSourcewriter here..
return Optional.of(dataSourcewriter);
  }

  @Override public String keyPrefix() {
return "myprefix";
  }

}

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a
different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
this + " schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema:
" + this.schema);
return this.schema;
  }
}

Thanks,
Shubham

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf 
wrote:

> Could you add a fuller code example? I tried to reproduce it in my
> environment and I am getting just one instance of the reader…
>
>
>
> Thanks,
>
> Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
> *Sent:* Tuesday, October 9, 2018 9:31 AM
> *To:* user@spark.apache.org
> *Subject:* DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Hi All,
>
>
>
> --Spark built with *tags/v2.4.0-rc2*
>
>
>
> Consider following DataSourceReader implementation:
>
>
>
> *public class *MyDataSourceReader *implements *DataSourceReader, 
> SupportsScanColumnarBatch {
>
>   StructType *schema *= *null*;
>   Map *options*;
>
>   *public *MyDataSourceReader(Map options) {
> System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: 
> Instantiated" *+ *this*);
> *this*.*options *= options;
>   }
>
>   @Override
>   *public *List> planBatchInputPartitions() {
>
> *//variable this.schema is null here since readSchema() was called on a 
> different instance
> *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ 
> *this *+ *" schema: " *+ *this*.*schema*);
>
> *//more logic..**return null*;
>   }
>
>   @Override
>   *public *StructType readSchema() {
>
> *//some logic to discover schema**this*.*schema *= (*new *StructType())
> .add(*"col1"*, *"int"*)
> .add(*"col2"*, *"string"*);
> System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" 
> schema: " *+ *this*.*schema*);
> *return this*.*schema*;
>   }
> }
>
> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets 
> class variable schema.
>
> 2) Now when planBatchInputPartitions() is called, it is being called on a 
> different instance of MyDataSourceReader and hence I am not getting the value 
> of schema in method planBatchInputPartitions().
>
>
>
> How can I get value of schema which was set in readSchema() method, in 
> planBatchInputPartitions() method?
>
>
>
> Console Logs:
>
>
>
> scala> mysource.executeQuery("select * from movie").show
>
>
>
> MyDataSourceReader.MyDataSourceReader: 
> InstantiatedMyDataSourceReader@59ea8f1b
>
> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: 
> StructType(StructField(col1,IntegerType,true), 
> StructField(col2,StringType,true))
>
> MyDataSourceReader.MyDataSourceReader: 
> InstantiatedMyDataSourceReader@a3cd3ff
>
> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff 
> schema: null
>
>
>
> Thanks,
>
> Shubham
>
>
>
>


DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Shubham Chaurasia
Hi All,

--Spark built with *tags/v2.4.0-rc2*

Consider following DataSourceReader implementation:

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  StructType schema = null;
  Map options;

  public MyDataSourceReader(Map options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated" + this);
this.options = options;
  }

  @Override
  public List> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called
on a different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: "
+ this + " schema: " + this.schema);
//more logic..
return null;
  }

  @Override
  public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + "
schema: " + this.schema);
return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which
sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called
on a different instance of MyDataSourceReader and hence I am not
getting the value of schema in method planBatchInputPartitions().

How can I get value of schema which was set in readSchema() method, in
planBatchInputPartitions() method?

Console Logs:

scala> mysource.executeQuery("select * from movie").show

MyDataSourceReader.MyDataSourceReader:
InstantiatedMyDataSourceReader@59ea8f1b
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema:
StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader:
InstantiatedMyDataSourceReader@a3cd3ff
MyDataSourceReader.planBatchInputPartitions:
MyDataSourceReader@a3cd3ff schema: null


Thanks,
Shubham


Target java version not set when building spark with tags/v2.4.0-rc2

2018-10-07 Thread Shubham Chaurasia
Hi All,

I built spark with *tags/v2.4.0-rc2* using
./build/mvn -DskipTests -Phadoop-2.7  -Dhadoop.version=3.1.0 clean install

Now from spark-shell when ever I call any static method residing in an
interface, it shows me error like :
:28: error: Static methods in interface require -target:jvm-1.8

However spark shell is getting correct java version, I verified like:

scala> java.lang.System.getProperty("java.version")
res1: String = 1.8.0_181

This was not the case when I was building with tags/v2.3.2

Have I missed something?

Thanks,
Shubham