BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-27 Thread Zahid Rahman
Hi,
version: spark-3.0.0-preview2-bin-hadoop2.7

As you can see from the code :

STEP 1:  I  create a object of type static frame which holds all the
information to the datasource (csv files).

STEP 2: Then I create a variable  called staticSchema  assigning the
information of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of
type spark.readStream.
and Into the .schema function parameters I pass the object staticSchema
which is meant to hold the information to the  csv files including the
.load(path) function etc.

So then when I am creating val StreamingDataFrame and passing it
.schema(staticSchema)
the variable StreamingDataFrame  should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format
("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to
StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

  def main(args: Array[String]): Unit = {

// create spark session
val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();
// set spark runtime  configuration
spark.conf.set("spark.sql.shuffle.partitions","5")

spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame
  val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")


staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

staticDataFrame
  .selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
  .groupBy(col("CustomerId"),
window(col("InvoiceDate"),
"1 day"))
  .sum("total_cost")
  .sort(desc("sum(total_cost)"))
  .show(2)

val streamingDataFrame = spark.readStream
  .schema(staticSchema)
  .format("csv")
  .option("maxFilesPerTrigger", 1)
  .option("header","true")
  .load("/data/retail-data/by-day/*.csv")

  println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to
start the action
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
  "CustomerId",
  "(UnitPrice * Quantity) as total_cost",
  "InvoiceDate")
.groupBy(
  col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")

// stream action to write to console
purchaseByCustomerPerHour.writeStream
  .format("console")
  .queryName("customer_purchases")
  .outputMode("complete")
  .start()

  } // main

} // object




















val staticSchema = staticDataFrame.schema













Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: Release Manager's official `branch-3.0` Assessment?

2020-03-27 Thread Jungtaek Lim
Now the end of March is just around the corner. I'm not qualified to say
(and honestly don't know) where we are, but if we were intended to be in
blocker mode it doesn't seem to work; lots of developments still happen,
and priority/urgency doesn't seem to be applied to the sequence of
reviewing.

How about listing (or linking to epic, or labelling) JIRA issues/PRs which
are blockers (either from priority or technically) for Spark 3.0 release,
and make clear we should try to review these blockers first? Github PR
label may help here to filter out other PRs and concentrate these things.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Wed, Mar 25, 2020 at 1:52 PM Xiao Li  wrote:

> Let us try to finish the remaining major blockers in the next few days.
> For example, https://issues.apache.org/jira/browse/SPARK-31085
>
> +1 to cut the RC even if we still have the blockers that will fail the
> RCs.
>
> Cheers,
>
> Xiao
>
>
> On Tue, Mar 24, 2020 at 6:56 PM Dongjoon Hyun 
> wrote:
>
>> +1
>>
>> Thanks,
>> Dongjoon.
>>
>> On Tue, Mar 24, 2020 at 14:49 Reynold Xin  wrote:
>>
>>> I actually think we should start cutting RCs. We can cut RCs even with
>>> blockers.
>>>
>>>
>>> On Tue, Mar 24, 2020 at 12:51 PM, Dongjoon Hyun >> > wrote:
>>>
 Hi, All.

 First of all, always "Community Over Code"!
 I wish you the best health and happiness.

 As we know, we are still working on QA period, we didn't reach RC
 stage. It seems that we need to make website up-to-date once more.

 https://spark.apache.org/versioning-policy.html

 If possible, it would be really great if we can get `3.0.0` release
 manager's official `branch-3.0` assessment because we have only 1 week
 before the end of March.

 Cloud you, the 3.0.0 release manager, share your thought and update the
 website, please?

 Bests
 Dongjoon.

>>>
>>>
>
> --
> 
>


Re: OFF TOPIC LIST CRITERIA

2020-03-27 Thread Zahid Rahman
OK *user support. user@ is DONE !!!*

I reported a work around to an existing bug actually to the experienced
user.
and "the experienced user" was "not aware" of the
setting in the log4j.properties so he learned something new too.
Clearly neither were you.

Also it may surprise some people but  there are people who have been
formally
trained in software development.
We can tell a self trained a mile away.


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Sat, 28 Mar 2020 at 03:02, Sean Owen  wrote:

> BCC user, dev, and I encourage others to not reply.
>
> I said _dev@_ is not for user support. user@ is. You heard that
> yesterday, too, and not to cross-post.
> You actually got answers to several questions, despite their tone,
> from experienced developers of the project.
>
> Messages like yours are, I assure you, not useful to _anybody_. If we
> let people talk like this on big community lists, yes _that_ will put
> up barriers.
> So, the answer for you is: you are not using either of these lists
> appropriately right now. If you can keep it civil and on-topic, use
> user@.
> Otherwise we will block you from the lists.
>
>
> Sean
>
> On Fri, Mar 27, 2020 at 9:46 PM Zahid Rahman  wrote:
> >
> >
> > Sean Owen says the criteria of these two emailing list is not help to
> support some body
> > who is new but for people who have been using the software for a long
> time.
> >
> > He is implying I think that I should only send email when I find bugs so
> that I can help him in his work.
> > A one way street.
> >
> > He is suggesting the more familiar you are with this software the more
> important you are.
> > Some kind of Alpha male type heirachy.
> >
> > He wants to put a barrier in place where Apache foundation wants no
> barriers to free learning and free software.
> >
> > He has not reported any bugs while I have reported so many in such a
> short space of time.
> > He has warned me as well
> >
> > So that Sean Owen does not put a barrier in place for me in my path to
> free learning and free  Apache software
> > I would like somebody to clarify the criteria for me.
> >
> >
> > Backbutton.co.uk
> > ¯\_(ツ)_/¯
> > ♡۶Java♡۶RMI ♡۶
> > Make Use Method {MUM}
> > makeuse.org
>


Re: OFF TOPIC LIST CRITERIA

2020-03-27 Thread Sean Owen
BCC user, dev, and I encourage others to not reply.

I said _dev@_ is not for user support. user@ is. You heard that
yesterday, too, and not to cross-post.
You actually got answers to several questions, despite their tone,
from experienced developers of the project.

Messages like yours are, I assure you, not useful to _anybody_. If we
let people talk like this on big community lists, yes _that_ will put
up barriers.
So, the answer for you is: you are not using either of these lists
appropriately right now. If you can keep it civil and on-topic, use
user@.
Otherwise we will block you from the lists.


Sean

On Fri, Mar 27, 2020 at 9:46 PM Zahid Rahman  wrote:
>
>
> Sean Owen says the criteria of these two emailing list is not help to support 
> some body
> who is new but for people who have been using the software for a long time.
>
> He is implying I think that I should only send email when I find bugs so that 
> I can help him in his work.
> A one way street.
>
> He is suggesting the more familiar you are with this software the more 
> important you are.
> Some kind of Alpha male type heirachy.
>
> He wants to put a barrier in place where Apache foundation wants no barriers 
> to free learning and free software.
>
> He has not reported any bugs while I have reported so many in such a short 
> space of time.
> He has warned me as well
>
> So that Sean Owen does not put a barrier in place for me in my path to free 
> learning and free  Apache software
> I would like somebody to clarify the criteria for me.
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



OFF TOPIC LIST CRITERIA

2020-03-27 Thread Zahid Rahman
Sean Owen says the criteria of these two emailing list is not help to
support some body
who is new but for people who have been using the software for a long time.

He is implying I think that I should only send email when I find bugs so
that I can help him in his work.
A one way street.

He is suggesting the more familiar you are with this software the more
important you are.
Some kind of Alpha male type heirachy.

He wants to put a barrier in place where Apache foundation wants no
barriers to free learning and free software.

He has not reported any bugs while I have reported so many in such a short
space of time.
He has warned me as well

So that Sean Owen does not put a barrier in place for me in my path to free
learning and free  Apache software
I would like somebody to clarify the criteria for me.


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: spark.readStream.schema(??)

2020-03-27 Thread Zahid Rahman
I found another bug.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Sat, 28 Mar 2020 at 02:12, Zahid Rahman  wrote:

>
> I have sorted the error anyway because I am the best there is.
> It is downhill for me from here.
>
> There is no nobody using this email list anyway for anything. The email is
> a dead a dodo.
> probably because of people like you.
>
> *That is exactly what this email is for.*
> *It is not just for me to test your buggy software and report the bugs
> free of cost.*
>
> *and not get anything in return.*
>
> *Another words free consultancy for you because you now the software *
> *after spending years of your life while I am going to mastering in weeks.*
>
> Have we eaten something that disagrees with us today.
> Do you have a sore throat ?
> May be a little temperature ?
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Sat, 28 Mar 2020 at 02:03, Sean Owen  wrote:
>
>> (this isn't an email list for user support)
>>
>> On Fri, Mar 27, 2020 at 8:32 PM Zahid Rahman 
>> wrote:
>> >
>> > version: spark-3.0.0-preview2-bin-hadoop2.7
>> >
>> > The syntax checker objects to the following argument which is what I am
>> supposed to enter.
>> >
>> > .schema(staticSchema)
>> >
>> > However when I  provide the  following argument it works but I don't
>> think that is correct.
>> > What is the correct argument for this case ?
>> >
>> > import org.apache.spark.sql.SparkSession
>> > import org.apache.spark.sql.functions.{window,column,desc,col}
>> >
>> > object RetailData {
>> >
>> >
>> >   def main(args: Array[String]): Unit = {
>> >
>> > // crete spark session
>> > val spark = SparkSession.builder().master("spark://
>> 192.168.0.38:7077").appName("Retail Data").getOrCreate();
>> > // set spark runtime  configuration
>> > spark.conf.set("spark,sql.shuffle.partitions","5")
>> >
>> > // create a static frame
>> >   val staticDataFrame = spark.read.format("csv")
>> > .option ("header","true")
>> > .option("inferschema","true")
>> > .load("/data/retail-data/by-day/*.csv")
>> >
>> > staticDataFrame.createOrReplaceTempView("retail_data")
>> > val staticFrame = staticDataFrame.schema
>> >
>> > staticDataFrame
>> >   .selectExpr(
>> > "CustomerId","UnitPrice * Quantity as total_cost",
>> "InvoiceDate")
>> >   .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> >   .sum("total_cost")
>> >   .sort(desc("sum(total_cost)"))
>> >   .show(2)
>> >
>> > val streamingDataFrame = spark.readStream
>> >   .schema(staticDataFrame.schema)
>> >   .option("maxFilesPerTrigger", 1)
>> >   .load("/data/retail-data/by-day/*.csv")
>> >
>> >   println(streamingDataFrame.isStreaming)
>> >
>> >   } // main
>> >
>> > } // object
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Backbutton.co.uk
>> > ¯\_(ツ)_/¯
>> > ♡۶Java♡۶RMI ♡۶
>> > Make Use Method {MUM}
>> > makeuse.org
>>
>


spark.readStream.schema(??)

2020-03-27 Thread Zahid Rahman
version: spark-3.0.0-preview2-bin-hadoop2.7

The syntax checker objects to the following argument which is what I am
supposed to enter.

.schema(staticSchema)

However when I  provide the  following argument it works but I don't think
that is correct.
What is the correct argument for this case ?

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {


  def main(args: Array[String]): Unit = {

// crete spark session
val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();
// set spark runtime  configuration
spark.conf.set("spark,sql.shuffle.partitions","5")

// create a static frame
  val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticFrame = staticDataFrame.schema

staticDataFrame
  .selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
  .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
  .sum("total_cost")
  .sort(desc("sum(total_cost)"))
  .show(2)

val streamingDataFrame = spark.readStream
  .schema(staticDataFrame.schema)
  .option("maxFilesPerTrigger", 1)
  .load("/data/retail-data/by-day/*.csv")

  println(streamingDataFrame.isStreaming)

  } // main

} // object







Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: what a plava !

2020-03-27 Thread Sean Owen
- dev@, which is more for project devs to communicate. Cross-posting
is discouraged too.

The book isn't from the Spark OSS project, so not really the place to
give feedback here.

I don't quite understand the context of your other questions, but
would elaborate them in individual, clear emails instead to increase
the chance that someone will answer.

On Fri, Mar 27, 2020 at 4:49 PM Zahid Rahman  wrote:
>
>
> I was very impressed with the amount of material available from 
> https://github.com/databricks/Spark-The-Definitive-Guide/
> Over 450+  megabytes.
>
> I have a corrected the scala code  by adding
> .sort(desc("sum(total_cost)")) to the code provided on page 34 (see below).
>
> I have noticed numerous uses of exclamation marks almost over use.
> for example:
> page 23: Let's specify some more transformatrions !
> page 24: you've read your first explain plan !
> page 26: Notice that these plans compile to the exactsame underlying plan !
> page 29: The last step is our action !
> page 34: The best thing about structured  streaming rapidly... with 
> virtually no code
>
> 1. I have never read a science book with such emotion of frustration.
> Is Spark difficult to understand made more complicated  with the 
> proliferation of languages
> scala , Java , python SQL R.
>
> 2. Secondly, Is spark architecture made more complex due to competing 
> technologies ?
>
> I have spark cluster setup with master and slave to load balancing heavy 
> activity like so:
> sbin/start-master.sh
> sbin/start-slave.sh spark://192.168.0.38:7077
> for load balancing I imagine, conceptually speaking,  although I haven't 
> tried it , I can have as many
> slaves(workers)  on other physical machines  by simply downloading spark zip 
> file
> and running workers from those other physical machine(s) with  
> sbin/start-slave.sh  spark://192.168.0.38:7077.
> My question is under the circumstances do I need to bother with mesos or yarn 
> ?
>
> Collins dictionary
> The exclamation mark is used after exclamations and emphatic expressions.
>
> I can’t believe it!
> Oh, no! Look at this mess!
>
> The exclamation mark loses its effect if it is overused. It is better to use 
> a full stop after a sentence expressing mild excitement or humour.
>
> It was such a beautiful day.
> I felt like a perfect banana.
>
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.{window,column,desc,col}
>
> object RetailData {
>
>   def main(args: Array[String]): Unit = {
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
> Data").getOrCreate();
>
> // create a static frame
>   val staticDataFrame = spark.read.format("csv")
> .option ("header","true")
> .option("inferschema","true")
> .load("/data/retail-data/by-day/*.csv")
>
> staticDataFrame.createOrReplaceTempView("retail_data")
> val staticFrame = staticDataFrame.schema
>
> staticDataFrame
>   .selectExpr(
> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>   .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>   .sum("total_cost")
>   .sort(desc("sum(total_cost)"))
>   .show(1)
>
>   } // main
>
> } // object
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



what a plava !

2020-03-27 Thread Zahid Rahman
I was very impressed with the amount of material available from
https://github.com/databricks/Spark-The-Definitive-Guide/
Over 450+
*  megabytes.*


I have a corrected the scala code  by adding
*.sort(desc("sum(total_cost)"))* to the code provided on page 34 (see
below).

I have noticed numerous uses of exclamation marks almost over use.
for example:
page 23: Let's specify some more *transformatrions !*
page 24: you've read your first explain *plan !*
page 26: Notice that these plans compile to the exactsame underlying *plan
!*
page 29: The last step is our *action !*
page 34: The best thing about structured  streaming rapidly...
with *virtually
no code *

1. I have never read a science book with such emotion of frustration.
Is Spark difficult to understand made more complicated  with the
proliferation of languages
scala , Java , python SQL R.

2. Secondly, Is spark architecture made more complex due to competing
technologies ?

I have spark cluster setup with master and slave to load balancing heavy
activity like so:
sbin/start-master.sh
sbin/start-slave.sh spark://192.168.0.38:7077
for load balancing I imagine, conceptually speaking,  although I haven't
tried it , I can have as many
slaves(workers)  on other physical machines  by simply downloading spark
zip file
and running workers from those other physical machine(s) with
sbin/start-slave.sh  spark://192.168.0.38:7077.

*My question is under the circumstances do I need to bother with mesos or
yarn ?*

Collins dictionary
The exclamation mark is used after exclamations and emphatic expressions.

   - I can’t believe it!
   - Oh, no! Look at this mess!

The exclamation mark loses its effect if it is overused. It is better to
use a full stop after a sentence expressing mild excitement or humour.

   It was such a beautiful day.
   I felt like a perfect banana.


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

  def main(args: Array[String]): Unit = {

val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();

// create a static frame
  val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticFrame = staticDataFrame.schema

staticDataFrame
  .selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
  .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
  .sum("total_cost")
  .sort(desc("sum(total_cost)"))
  .show(1)

  } // main

} // object



Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: Programmatic: parquet file corruption error

2020-03-27 Thread Zahid Rahman
Thanks Wenchen.  SOLVED! KINDA!

I removed all dependencies from the pom.xml  in my IDE so I wouldn't be
picking up any libraries from maven repository.
I *instead* included the libraries (jar)  from the *spark download* of
*spark-3.0.0-preview2-bin-hadoop2.7*
This way I am using the *same libraries* which are used when running
*spark-submit
scripts*.

I  believe I managed to trace the issue.
I copied  the log4j.properties.template into Intellij's resources
directory in my project.
Obviously renaming it to log4.properties.
So now I am using also *same** log4j.properties* as when running *spark-submit
scipt.*

I noticed the value of *log4j.logger.org.apache.parquet=ERROR* &
*log4j.logger.parquet=ERROR*.
It appears that this parquet corruption warning is an *outstanding bug* and
the *work around* is to quieten the warning.


#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss}
%p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the
spark-shell, the
# log level for this class is used to overwrite the root logger's log
level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up
nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR




Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 07:44, Wenchen Fan  wrote:

> Running Spark application with an IDE is not officially supported. It may
> work under some cases but there is no guarantee at all. The official way is
> to run interactive queries with spark-shell or package your application to
> a jar and use spark-submit.
>
> On Thu, Mar 26, 2020 at 4:12 PM Zahid Rahman  wrote:
>
>> Hi,
>>
>> When I run the code for a user defined data type dataset using case class
>> in scala  and run the code in the interactive spark-shell against parquet
>> file. The results are as expected.
>> However I then the same code programmatically in IntelliJ IDE then spark
>> is give a file corruption error.
>>
>> Steps I have taken to determine the source of error are :
>> I have tested for file permission and made sure to chmod 777 , just in
>> case.
>> I tried a fresh copy of same parquet file.
>> I ran both programme before and after the fresh copy.
>> I also rebooted then ran programmatically against a fresh parquet file.
>> The corruption error was consistent in all cases.
>> I have copy and pasted the spark-shell , the error message and the code
>> in the IDE and the pom.xml, IntelliJ java  classpath command line.
>>
>> Perhaps the code in the libraries are different than the ones  used by
>> spark-shell from that when run programmatically.
>> I don't believe it is an error on my part.
>>
>> <--
>>
>> 07:28:45 WARN  CorruptStatistics:117 - Ignoring statistics because
>> created_by could not be parsed (see PARQUET-251): parquet-mr (build
>> 32c46643845ea8a705c35d4ec8fc654cc8ff816d)
>> org.apache.parquet.VersionParser$VersionParseException: Could not parse
>> created_by: parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d)
>> using format:
>> (.*?)\s+version\s*(?:([^(]*?)\s*(?:\(\s*build\s*([^)]*?)\s*\))?)?
>> at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
>> at
>> 

[DISCUSS][K8s] Copy files securely to the pods or containers.

2020-03-27 Thread Prashant Sharma
Hello All,
The issue SPARK-23153 
lets us copy any file to the pod/container, by first copying it to a hadoop
supported filesystem e.g. HDFS, s3, cos etc. This is especially useful if,
the files have to be copied to large number of pods/nodes.  However, in
most cases we need the file to be copied only to the driver, it may not be
always convenient (esp. in case of clusters with smaller no. of nodes or
limited resources), to setup an additional intermediate storage just for
this, it cannot work without an intermediate distributed storage of some
sort.
So, while going through the code of kubectl cp command

. It appears, that we can use the same technique using
tar cf - /tmp/foo | kubectl exec -i -n   -- tar
xf - -C /tmp/bar to copy files in a more secure way (because the file goes
through kubernetes API, which has its own security in place)
This also lets us compress the file while sending.

If there is any interest in this sort of feature, I am ready to open an
issue and work on it. So let us discuss, if this has already been explored
and there are some known issues with this approach.

Thank you,
Prashant.


Re: Programmatic: parquet file corruption error

2020-03-27 Thread Wenchen Fan
Running Spark application with an IDE is not officially supported. It may
work under some cases but there is no guarantee at all. The official way is
to run interactive queries with spark-shell or package your application to
a jar and use spark-submit.

On Thu, Mar 26, 2020 at 4:12 PM Zahid Rahman  wrote:

> Hi,
>
> When I run the code for a user defined data type dataset using case class
> in scala  and run the code in the interactive spark-shell against parquet
> file. The results are as expected.
> However I then the same code programmatically in IntelliJ IDE then spark
> is give a file corruption error.
>
> Steps I have taken to determine the source of error are :
> I have tested for file permission and made sure to chmod 777 , just in
> case.
> I tried a fresh copy of same parquet file.
> I ran both programme before and after the fresh copy.
> I also rebooted then ran programmatically against a fresh parquet file.
> The corruption error was consistent in all cases.
> I have copy and pasted the spark-shell , the error message and the code in
> the IDE and the pom.xml, IntelliJ java  classpath command line.
>
> Perhaps the code in the libraries are different than the ones  used by
> spark-shell from that when run programmatically.
> I don't believe it is an error on my part.
>
> <--
>
> 07:28:45 WARN  CorruptStatistics:117 - Ignoring statistics because
> created_by could not be parsed (see PARQUET-251): parquet-mr (build
> 32c46643845ea8a705c35d4ec8fc654cc8ff816d)
> org.apache.parquet.VersionParser$VersionParseException: Could not parse
> created_by: parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d)
> using format:
> (.*?)\s+version\s*(?:([^(]*?)\s*(?:\(\s*build\s*([^)]*?)\s*\))?)?
> at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
> at
> org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:72)
> at
> org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatisticsInternal(ParquetMetadataConverter.java:435)
> at
> org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:454)
> at
> org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:914)
> at
> org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:885)
> at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:532)
> at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
> at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
> at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
> at
> org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:105)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:131)
> at
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory.buildReaderBase(ParquetPartitionReaderFactory.scala:174)
> at
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory.createVectorizedReader(ParquetPartitionReaderFactory.scala:205)
> at
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory.buildColumnarReader(ParquetPartitionReaderFactory.scala:103)
> at
> org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory.$anonfun$createColumnarReader$1(FilePartitionReaderFactory.scala:38)
> at
> org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory$$Lambda$2018/.apply(Unknown
> Source)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
> at
> org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.getNextReader(FilePartitionReader.scala:109)
> at
> org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.next(FilePartitionReader.scala:42)
> at
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:62)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:321)
> at

Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Zahid Rahman
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-slave.sh spark://
192.168.0.38:7077
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-master.sh

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 06:12, Zahid Rahman  wrote:

> sbin/start-master.sh
> sbin/start-slave.sh spark://192.168.0.38:7077
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:
>
>> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
>> just include Spark dependency in IntelliJ?
>>
>> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman 
>> wrote:
>>
>>> I have configured  in IntelliJ as external jars
>>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>>
>>> not pulling anything from maven.
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>>
>>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>>
 Which Spark/Scala version do you use?

 On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
 wrote:

>
> with the following sparksession configuration
>
> val spark = SparkSession.builder().master("local[*]").appName("Spark 
> Session take").getOrCreate();
>
> this line works
>
> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>
> however if change the master url like so, with the ip address then the
> following error is produced by the position of .take(5)
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
>
> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
> instance of java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in 
> instance
> of org.apache.spark.rdd.MapPartitionsRDD
>
> BUT if I  remove take(5) or change the position of take(5) or insert
> an extra take(5) as illustrated in code then it works. I don't see why the
> position of take(5) should cause such an error or be caused by changing 
> the
> master url
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
> + 5))
>flights.show(5)
>
>
> complete code if you wish to replicate it.
>
> import org.apache.spark.sql.SparkSession
>
> object sessiontest {
>
>   // define specific  data type class then manipulate it using the filter 
> and map functions
>   // this is also known as an Encoder
>   case class flight (DEST_COUNTRY_NAME: String,
>  ORIGIN_COUNTRY_NAME:String,
>  count: BigInt)
>
>
>   def main(args:Array[String]): Unit ={
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
> import spark.implicits._
> val flightDf = 
> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
> val flights = flightDf.as[flight]
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME 
> != "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>flights.show(5)
>
>   } // main
> }
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>



Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Zahid Rahman
sbin/start-master.sh
sbin/start-slave.sh spark://192.168.0.38:7077

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:

> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
> just include Spark dependency in IntelliJ?
>
> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:
>
>> I have configured  in IntelliJ as external jars
>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>
>> not pulling anything from maven.
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>>
>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>
>>> Which Spark/Scala version do you use?
>>>
>>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>>> wrote:
>>>

 with the following sparksession configuration

 val spark = SparkSession.builder().master("local[*]").appName("Spark 
 Session take").getOrCreate();

 this line works

 flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)


 however if change the master url like so, with the ip address then the
 following error is produced by the position of .take(5)

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();


 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
 instance of java.lang.invoke.SerializedLambda to field
 org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
 of org.apache.spark.rdd.MapPartitionsRDD

 BUT if I  remove take(5) or change the position of take(5) or insert an
 extra take(5) as illustrated in code then it works. I don't see why the
 position of take(5) should cause such an error or be caused by changing the
 master url

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
 + 5))
flights.show(5)


 complete code if you wish to replicate it.

 import org.apache.spark.sql.SparkSession

 object sessiontest {

   // define specific  data type class then manipulate it using the filter 
 and map functions
   // this is also known as an Encoder
   case class flight (DEST_COUNTRY_NAME: String,
  ORIGIN_COUNTRY_NAME:String,
  count: BigInt)


   def main(args:Array[String]): Unit ={

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();

 import spark.implicits._
 val flightDf = 
 spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
 val flights = flightDf.as[flight]

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
 fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
flights.show(5)

   } // main
 }





 Backbutton.co.uk
 ¯\_(ツ)_/¯
 ♡۶Java♡۶RMI ♡۶
 Make Use Method {MUM}
 makeuse.org
 

>>>


Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Wenchen Fan
Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
just include Spark dependency in IntelliJ?

On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:

> I have configured  in IntelliJ as external jars
> spark-3.0.0-preview2-bin-hadoop2.7/jar
>
> not pulling anything from maven.
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>
>> Which Spark/Scala version do you use?
>>
>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>> wrote:
>>
>>>
>>> with the following sparksession configuration
>>>
>>> val spark = SparkSession.builder().master("local[*]").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> this line works
>>>
>>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>
>>> however if change the master url like so, with the ip address then the
>>> following error is produced by the position of .take(5)
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>>
>>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
>>> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>>> instance of java.lang.invoke.SerializedLambda to field
>>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>>> of org.apache.spark.rdd.MapPartitionsRDD
>>>
>>> BUT if I  remove take(5) or change the position of take(5) or insert an
>>> extra take(5) as illustrated in code then it works. I don't see why the
>>> position of take(5) should cause such an error or be caused by changing the
>>> master url
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>>> 5))
>>>flights.show(5)
>>>
>>>
>>> complete code if you wish to replicate it.
>>>
>>> import org.apache.spark.sql.SparkSession
>>>
>>> object sessiontest {
>>>
>>>   // define specific  data type class then manipulate it using the filter 
>>> and map functions
>>>   // this is also known as an Encoder
>>>   case class flight (DEST_COUNTRY_NAME: String,
>>>  ORIGIN_COUNTRY_NAME:String,
>>>  count: BigInt)
>>>
>>>
>>>   def main(args:Array[String]): Unit ={
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> import spark.implicits._
>>> val flightDf = 
>>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>>> val flights = flightDf.as[flight]
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>>flights.show(5)
>>>
>>>   } // main
>>> }
>>>
>>>
>>>
>>>
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>