Re: Manually reading parquet files.

2019-03-21 Thread Ryan Blue
You're getting InternalRow instances. They probably have the data you want,
but the toString representation doesn't match the data for InternalRow.

On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew 
wrote:

> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues.  This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
>   //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
>   *TestRow*(1,1,"asdf"),
>   *TestRow*(2,1,"asdf"),
>   *TestRow*(3,1,"asdf"),
>   *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
>   partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
>   filePath = path.toString,
>   start = 0,
>   length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
>   fileFormat.buildReaderWithPartitionValues(
> sparkSession = *spark*,
> dataSchema = dataSchema,
> partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
> requiredSchema = dataSchema,
> filters = *Seq*.empty,
> options = *Map*.*empty*,
> hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
>   )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>
>   // This doesn't work. vector mode is doing something screwy
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,24,66647361])
> //??this is wrong I think
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 producing wrong date value in Custom Data Writer

2019-02-05 Thread Ryan Blue
Shubham,

DataSourceV2 passes Spark's internal representation to your source and
expects Spark's internal representation back from the source. That's why
you consume and produce InternalRow: "internal" indicates that Spark
doesn't need to convert the values.

Spark's internal representation for a date is the ordinal from the unix
epoch date, 1970-01-01 = 0.

rb

On Tue, Feb 5, 2019 at 4:46 AM Shubham Chaurasia 
wrote:

> Hi All,
>
> I am using custom DataSourceV2 implementation (*Spark version 2.3.2*)
>
> Here is how I am trying to pass in *date type *from spark shell.
>
> scala> val df =
>> sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype",
>> col("datetype").cast("date"))
>> scala> df.write.format("com.shubham.MyDataSource").save
>
>
> Below is the minimal write() method of my DataWriter implementation.
>
> @Override
> public void write(InternalRow record) throws IOException {
>   ByteArrayOutputStream format = streamingRecordFormatter.format(record);
>   System.out.println("MyDataWriter.write: " + record.get(0, 
> DataTypes.DateType));
>
> }
>
> It prints an integer as output:
>
> MyDataWriter.write: 17039
>
>
> Is this a bug?  or I am doing something wrong?
>
> Thanks,
> Shubham
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-19 Thread Ryan Blue
MyDataSourceReader@3095c449 are being created. Consequently schema is
>> null in MyDataSourceReader@3095c449.
>>
>> Am I not doing it the correct way?
>>
>> Thanks,
>> Shubham
>>
>> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf 
>> wrote:
>>
>>> I am using v2.4.0-RC2
>>>
>>>
>>>
>>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns
>>> null). How are you calling it?
>>>
>>>
>>>
>>> When I do:
>>>
>>> Val df = spark.read.format(mypackage).load().show()
>>>
>>> I am getting a single creation, how are you creating the reader?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Assaf
>>>
>>>
>>>
>>> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com]
>>> *Sent:* Tuesday, October 9, 2018 2:02 PM
>>> *To:* Mendelson, Assaf; user@spark.apache.org
>>> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
>>> DataSourceReader and hence not preserving the state
>>>
>>>
>>>
>>> [EXTERNAL EMAIL]
>>> Please report any suspicious attachments, links, or requests for
>>> sensitive information.
>>>
>>> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>>>
>>>
>>>
>>> Full Code:
>>>
>>>
>>>
>>> MyDataSource is the entry point which simply creates Reader and Writer
>>>
>>>
>>>
>>> public class MyDataSource implements DataSourceV2, WriteSupport,
>>> ReadSupport, SessionConfigSupport {
>>>
>>>
>>>
>>>   @Override public DataSourceReader createReader(DataSourceOptions
>>> options) {
>>>
>>> return new MyDataSourceReader(options.asMap());
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public Optional createWriter(String jobId,
>>> StructType schema,
>>>
>>>   SaveMode mode, DataSourceOptions options) {
>>>
>>> // creates a dataSourcewriter here..
>>>
>>> return Optional.of(dataSourcewriter);
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override public String keyPrefix() {
>>>
>>> return "myprefix";
>>>
>>>   }
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>> public class MyDataSourceReader implements DataSourceReader,
>>> SupportsScanColumnarBatch {
>>>
>>>
>>>
>>>   StructType schema = null;
>>>
>>>   Map options;
>>>
>>>
>>>
>>>   public MyDataSourceReader(Map options) {
>>>
>>> System.out.println("MyDataSourceReader.MyDataSourceReader:
>>> Instantiated" + this);
>>>
>>> this.options = options;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public List> planBatchInputPartitions() {
>>>
>>> //variable this.schema is null here since readSchema() was called on
>>> a different instance
>>>
>>> System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
>>> this + " schema: " + this.schema);
>>>
>>> //more logic..
>>>
>>> return null;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public StructType readSchema() {
>>>
>>> //some logic to discover schema
>>>
>>> this.schema = (new StructType())
>>>
>>> .add("col1", "int")
>>>
>>> .add("col2", "string");
>>>
>>> System.out.println("MyDataSourceReader.readSchema: " + this + "
>>> schema: " + this.schema);
>>>
>>> return this.schema;
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Shubham
>>>
>>>
>>>
>>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf 
>>> wrote:
>>>
>>> Could you add a fuller code example? I tried to reproduce it in my
>>> environment and I am getting just one instance of the reader…
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Assaf
>

Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-03 Thread Ryan Blue
Yes, you can usually use a broadcast join to avoid skew problems.

On Wed, May 2, 2018 at 8:57 PM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> I am performing join operation , if I convert reduce side join to map side
> (no shuffle will happen)  and I assume in that case this error shouldn't
> come. Let me know if this understanding is correct
>
> On Tue, May 1, 2018 at 9:37 PM, Ryan Blue <rb...@netflix.com> wrote:
>
>> This is usually caused by skew. Sometimes you can work around it by in
>> creasing the number of partitions like you tried, but when that doesn’t
>> work you need to change the partitioning that you’re using.
>>
>> If you’re aggregating, try adding an intermediate aggregation. For
>> example, if your query is select sum(x), a from t group by a, then try select
>> sum(partial), a from (select sum(x) as partial, a, b from t group by a, b)
>> group by a.
>>
>> rb
>> ​
>>
>> On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar <pralabhku...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> I am getting the above error in Spark SQL . I have increase (using 5000
>>> ) number of partitions but still getting the same error .
>>>
>>> My data most probably is skew.
>>>
>>>
>>>
>>> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-01 Thread Ryan Blue
This is usually caused by skew. Sometimes you can work around it by in
creasing the number of partitions like you tried, but when that doesn’t
work you need to change the partitioning that you’re using.

If you’re aggregating, try adding an intermediate aggregation. For example,
if your query is select sum(x), a from t group by a, then try select
sum(partial), a from (select sum(x) as partial, a, b from t group by a, b)
group by a.

rb
​

On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> Hi
>
> I am getting the above error in Spark SQL . I have increase (using 5000 )
> number of partitions but still getting the same error .
>
> My data most probably is skew.
>
>
>
> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-02 Thread Ryan Blue
Chetan,

When you're writing to a partitioned table, you want to use a shuffle to
avoid the situation where each task has to write to every partition. You
can do that either by adding a repartition by your table's partition keys,
or by adding an order by with the partition keys and then columns you
normally use to filter when reading the table. I generally recommend the
second approach because it handles skew and prepares the data for more
efficient reads.

If that doesn't help, then you should look at your memory settings. When
you're getting killed by YARN, you should consider setting
`spark.shuffle.io.preferDirectBufs=false` so you use less off-heap memory
that the JVM doesn't account for. That is usually an easier fix than
increasing the memory overhead. Also, when you set executor memory, always
change spark.memory.fraction to ensure the memory you're adding is used
where it is needed. If your memory fraction is the default 60%, then 60% of
the memory will be used for Spark execution, not reserved whatever is
consuming it and causing the OOM. (If Spark's memory is too low, you'll see
other problems like spilling too much to disk.)

rb

On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Can anyone please guide me with above issue.
>
>
> On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <chetan.opensou...@gmail.com
> > wrote:
>
>> Hello Spark Users,
>>
>> I have Hbase table reading and writing to Hive managed table where i
>> applied partitioning by date column which worked fine but it has generate
>> more number of files in almost 700 partitions but i wanted to use
>> reparation to reduce File I/O by reducing number of files inside each
>> partition.
>>
>> *But i ended up with below exception:*
>>
>> ExecutorLostFailure (executor 11 exited caused by one of the running
>> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
>> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
>> memoryOverhead.
>>
>> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>>
>> Do you think below setting can help me to overcome above issue:
>>
>> spark.default.parellism=1000
>> spark.sql.shuffle.partitions=1000
>>
>> Because default max number of partitions are 1000.
>>
>>
>>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: What is correct behavior for spark.task.maxFailures?

2017-04-24 Thread Ryan Blue
Looking at the code a bit more, it appears that blacklisting is disabled by
default. To enable it, set spark.blacklist.enabled=true.

The updates in 2.1.0 appear to provide much more fine-grained settings for
this, like the number of tasks that can fail before an executor is
blacklisted for a stage. In that version, you probably want to set
spark.blacklist.task.maxTaskAttemptsPerExecutor. See the settings docs
<http://spark.apache.org/docs/latest/configuration.html> and search for
“blacklist” to see all the options.

rb
​

On Mon, Apr 24, 2017 at 9:41 AM, Ryan Blue <rb...@netflix.com> wrote:

> Chawla,
>
> We hit this issue, too. I worked around it by setting spark.scheduler.
> executorTaskBlacklistTime=5000. The problem for us was that the scheduler
> was using locality to select the executor, even though it had already
> failed there. The executor task blacklist time controls how long the
> scheduler will avoid using an executor for a failed task, which will cause
> it to avoid rescheduling on the executor. The default was 0, so the
> executor was put back into consideration immediately.
>
> In 2.1.0 that setting has changed to spark.blacklist.timeout. I’m not
> sure if that does exactly the same thing. The default for that setting is
> 1h instead of 0. It’s better to have a non-zero default to avoid what
> you’re seeing.
>
> rb
> ​
>
> On Fri, Apr 21, 2017 at 1:32 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
>> I am seeing a strange issue. I had a bad behaving slave that failed the
>> entire job.  I have set spark.task.maxFailures to 8 for my job.  Seems
>> like all task retries happen on the same slave in case of failure.  My
>> expectation was that task will be retried on different slave in case of
>> failure, and chance of all 8 retries to happen on same slave is very less.
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>



-- 
Ryan Blue
Software Engineer
Netflix


Re: What is correct behavior for spark.task.maxFailures?

2017-04-24 Thread Ryan Blue
Chawla,

We hit this issue, too. I worked around it by setting
spark.scheduler.executorTaskBlacklistTime=5000. The problem for us was that
the scheduler was using locality to select the executor, even though it had
already failed there. The executor task blacklist time controls how long
the scheduler will avoid using an executor for a failed task, which will
cause it to avoid rescheduling on the executor. The default was 0, so the
executor was put back into consideration immediately.

In 2.1.0 that setting has changed to spark.blacklist.timeout. I’m not sure
if that does exactly the same thing. The default for that setting is 1h
instead of 0. It’s better to have a non-zero default to avoid what you’re
seeing.

rb
​

On Fri, Apr 21, 2017 at 1:32 PM, Chawla,Sumit <sumitkcha...@gmail.com>
wrote:

> I am seeing a strange issue. I had a bad behaving slave that failed the
> entire job.  I have set spark.task.maxFailures to 8 for my job.  Seems
> like all task retries happen on the same slave in case of failure.  My
> expectation was that task will be retried on different slave in case of
> failure, and chance of all 8 retries to happen on same slave is very less.
>
>
> Regards
> Sumit Chawla
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Driver hung and happend out of memory while writing to console progress bar

2017-02-10 Thread Ryan Blue
age 172:==>   
>   (10366 + 93) / 16144][Stage 172:==> 
> (10367 + 92) / 16144][Stage 172:==>   
>   (10367 + 93) / 16144][Stage 172:==> 
> (10367 + 93) / 16144][Stage 172:==>   
>   (10367 + 93) / 16144][Stage 172:==> 
> (10367 + 93) / 16144][Stage 
> 172:==> (10367 + 93) / 
> 16144][Stage 172:==> (10367 + 93) 
> / 16144][Stage 172:==> (10367 + 
> 93) / 16144][Stage 172:==> (10367 
> + 93) / 16144][Stage 172:==> 
> (10367 + 93) / 16144]Exception in thread "JobGenerator" 
> java.lang.OutOfMemoryError: Java heap spaceat 
> com.fasterxml.jackson.core.util.BufferRecycler.calloc(BufferRecycler.java:156)
>at 
> com.fasterxml.jackson.core.util.BufferRecycler.allocCharBuffer(BufferRecycler.java:124)
>   at 
> com.fasterxml.jackson.core.io.IOContext.allocTokenBuffer(IOContext.java:189) 
> at 
> com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:879)Exception:
>  java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in 
> thread "JobGenerator"Exception in thread "refresh progress" 
> java.lang.OutOfMemoryError: Java heap space   at 
> java.util.Arrays.copyOfRange(Arrays.java:3664)   at 
> java.lang.String.(String.java:207) at 
> java.lang.StringBuilder.toString(StringBuilder.java:407) at 
> scala.collection.mutable.StringBuilder.toString(StringBuilder.scala:430) 
> at org.apache.spark.ui.ConsoleProgressBar.show(ConsoleProgressBar.scala:101)  
>   at 
> org.apache.spark.ui.ConsoleProgressBar.org$apache$spark$ui$ConsoleProgressBar$$refresh(ConsoleProgressBar.scala:71)
>   at 
> org.apache.spark.ui.ConsoleProgressBar$$anon$1.run(ConsoleProgressBar.scala:55)
>   at java.util.TimerThread.mainLoop(Timer.java:555)   at 
> java.util.TimerThread.run(Timer.java:505)
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Apache Hive with Spark Configuration

2017-01-03 Thread Ryan Blue
Chetan,

Spark is currently using Hive 1.2.1 to interact with the Metastore. Using
that version for Hive is going to be the most reliable, but the metastore
API doesn't change very often and we've found (from having different
versions as well) that older versions are mostly compatible. Some things
fail occasionally, but we haven't had too many problems running different
versions with the same metastore in practice.

rb

On Wed, Dec 28, 2016 at 4:22 AM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Hello Users / Developers,
>
> I am using Hive 2.0.1 with MySql as a Metastore, can you tell me which
> version is more compatible with Spark 2.0.2 ?
>
> THanks
>



-- 
Ryan Blue
Software Engineer
Netflix


Re: Do we need schema for Parquet files with Spark?

2016-03-04 Thread Ryan Blue
Hi Ashok,

The schema for your data comes from the data frame you're using in Spark
and resolved with a Hive table schema if you are writing to one. For
encodings, you don't need to configure them because they are selected for
your data automatically. For example, Parquet will try dictionary-encoding
first and fall back to non-dictionary if it looks like the
dictionary-encoding would take more space.

I recommend writing out a data frame to Parquet and then just taking a look
at the result using parquet-tools, which you can download from maven
central.

rb

On Thu, Mar 3, 2016 at 10:50 PM, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Hi Ted,
>
> Thanks for pointing out this. This page has mailing list for developers but
> not for users yet it seems. Including developers mailing list only.
>
> Hi Parquet Team,
>
> Could you please clarify the question below? Please let me know if there is
> a separate mailing list for users but not developers.
>
> Regards
> Ashok
>
> On Fri, Mar 4, 2016 at 11:01 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
> > Have you taken a look at https://parquet.apache.org/community/ ?
> >
> > On Thu, Mar 3, 2016 at 7:32 PM, ashokkumar rajendran <
> > ashokkumar.rajend...@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> I am exploring to use Apache Parquet with Spark SQL in our project. I
> >> notice that Apache Parquet uses different encoding for different
> columns.
> >> The dictionary encoding in Parquet will be one of the good ones for our
> >> performance. I do not see much documentation in Spark or Parquet on how
> to
> >> configure this. For example, how would Parquet know dictionary of words
> if
> >> there is no schema provided by user? Where/how to specify my schema /
> >> config for Parquet format?
> >>
> >> Could not find Apache Parquet mailing list in the official site. It
> would
> >> be great if anyone could share it as well.
> >>
> >> Regards
> >> Ashok
> >>
> >
> >
>



-- 
Ryan Blue
Software Engineer
Netflix