DatasourceV2 reader for binary files

2018-06-27 Thread Lalwani, Jayesh
Is anyone working on porting existing readers to DataSourcev2. Specifically, 
has anyone implemented a Datasource v2 reader for binary files?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


[SS] Invalid call to dataType on unresolved object

2018-06-04 Thread Lalwani, Jayesh
This is possibly a bug introduced in 2.3.0.

I have this code that I run in spark-shell


spark.readStream.format("socket").option("host", "localhost").option("port", 
).load().toDF("employeeId").

 | withColumn("swipeTime", expr("current_timestamp()")).

 | createTempView("employeeSwipe")



spark.table("employeeSwipe").writeStream.outputMode("append").format("console").start


I run it in 2.2.1 and I get this


scala> spark.readStream.format("socket").option("host", 
"localhost").option("port", ).load().toDF("employeeId").

 | withColumn("swipeTime", expr("current_timestamp()")).

 | createTempView("employeeSwipe")

18/06/04 18:20:27 WARN ObjectStore: Failed to get database global_temp, 
returning NoSuchObjectException

18/06/04 18:20:28 WARN TextSocketSourceProvider: The socket source should not 
be used for production applications! It does not support recovery.



scala> 
spark.table("employeeSwipe").writeStream.outputMode("append").format("console").start

res1: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@24489d12



scala> ---

Batch: 0

---

+--++

|employeeId|   swipeTime|

+--++

| 1|2018-06-04 18:21:...|

+--++

I run the same in 2.3.0, and I get this


scala> spark.readStream.format("socket").option("host", 
"localhost").option("port", ).load().toDF("employeeId").

 |  | withColumn("swipeTime", expr("current_timestamp()")).

 |  | createTempView("employeeSwipe")

2018-06-04 18:37:12 WARN  TextSocketSourceProvider:66 - The socket source 
should not be used for production applications! It does not support recovery.

2018-06-04 18:37:15 WARN  ObjectStore:568 - Failed to get database global_temp, 
returning NoSuchObjectException



scala>



scala> 
spark.table("employeeSwipe").writeStream.outputMode("append").format("console").start

res1: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5a503cf0



scala> 2018-06-04 18:37:28 ERROR MicroBatchExecution:91 - Query [id = 
10f792b2-89fd-4d98-b2e7-f7531da385e4, runId = 
3b131ecf-a622-4b54-9295-ff06e9fc2566] terminated with error

org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
dataType on unresolved object, tree: 'swipeTime

   at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)

   at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)

   at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)

   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

   at scala.collection.immutable.List.foreach(List.scala:381)

   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

   at scala.collection.immutable.List.map(List.scala:285)

   at 
org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435)

   at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)

   at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157)

   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:447)

   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)

   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)

   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)

   at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)

   at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)

   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)

   at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)

   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)

   at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)

   at 

Re: eager execution and debuggability

2018-05-10 Thread Lalwani, Jayesh
If they are struggling to find bugs in their program because of lazy execution 
model of Spark, they are going to struggle to debug issues when the program 
runs into problems in production. Learning how to debug Spark is part of 
learning Spark. It’s better that they run into issues in the classroom, and 
spend time-effort learning how to debug such issues rather than deploy critical 
code to production and not know how to resolve the issues

I would say that if they are struggling how to read and analyze a stack trace, 
then they are missing a prerequisite. They need to be taught how to look at a 
stack trace critically before they start on Spark. Learning how to analyze 
stack traces is part of learning Scala/Java/Python. They need to drop Spark, 
and go back to learning core Scala/Java/Python.



From: Reynold Xin 
Date: Tuesday, May 8, 2018 at 6:45 PM
To: Marco Gaido 
Cc: Ryan Blue , Koert Kuipers , dev 

Subject: Re: eager execution and debuggability

Marco,

There is understanding how Spark works, and there is finding bugs early in 
their own program. One can perfectly understand how Spark works and still find 
it valuable to get feedback asap, and that's why we built eager analysis in the 
first place.

Also I'm afraid you've significantly underestimated the level of technical 
sophistication of users. In many cases they struggle to get anything to work, 
and performance optimization of their programs is secondary to getting things 
working. As John Ousterhout says, "the greatest performance improvement of all 
is when a system goes from not-working to working".

I really like Ryan's approach. Would be great if it is something more turn-key.






On Tue, May 8, 2018 at 2:35 PM Marco Gaido 
> wrote:
I am not sure how this is useful. For students, it is important to understand 
how Spark works. This can be critical in many decision they have to take 
(whether and what to cache for instance) in order to have performant Spark 
application. Creating a eager execution probably can help them having something 
running more easily, but let them also using Spark knowing less about how it 
works, thus they are likely to write worse application and to have more 
problems in debugging any kind of problem which may later (in production) occur 
(therefore affecting their experience with the tool).

Moreover, as Ryan also mentioned, there are tools/ways to force the execution, 
helping in the debugging phase. So they can achieve without a big effort the 
same result, but with a big difference: they are aware of what is really 
happening, which may help them later.

Thanks,
Marco

2018-05-08 21:37 GMT+02:00 Ryan Blue 
>:

At Netflix, we use Jupyter notebooks and consoles for interactive sessions. For 
anyone interested, this mode of interaction is really easy to add in Jupyter 
and PySpark. You would just define a different repr_html or repr method for 
Dataset that runs a take(10) or take(100) and formats the result.

That way, the output of a cell or console execution always causes the dataframe 
to run and get displayed for that immediate feedback. But, there is no change 
to Spark’s behavior because the action is run by the REPL, and only when a 
dataframe is a result of an execution in order to display it. Intermediate 
results wouldn’t be run, but that gives users a way to avoid too many 
executions and would still support method chaining in the dataframe API (which 
would be horrible with an aggressive execution model).

There are ways to do this in JVM languages as well if you are using a Scala or 
Java interpreter (see 
jvm-repr).
 This is actually what we do in our Spark-based SQL interpreter to display 
results.

rb
​

On Tue, May 8, 2018 at 12:05 PM, Koert Kuipers 
> wrote:
yeah we run into this all the time with new hires. they will send emails 
explaining there is an error in the .write operation and they are debugging the 
writing to disk, focusing on that piece of code :)
unrelated, but another frequent cause for confusion is cascading errors. like 
the FetchFailedException. they will be debugging the reducer task not realizing 
the error happened before that, and the FetchFailedException is not the root 
cause.
[https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]


On Tue, May 8, 2018 at 2:52 PM, Reynold Xin 
> wrote:
Similar to the thread yesterday about improving ML/DL integration, I'm sending 

Re: Toward an "API" for spark images used by the Kubernetes back-end

2018-03-22 Thread Lalwani, Jayesh
I would like to add that many people run Spark behind corporate proxies. It’s 
very common to add http proxy to extraJavaOptions.  Being able to provide 
custom extraJavaOption should be supported.

Also, Hadoop FS 2.7.3 is pretty limited wrt S3 buckets. You cannot use 
temporary AWS tokens. You cannot assume roles. You cannot use KMS buckets. All 
of this comes out of the box on EMR because EMR is build with it’s own 
customized Hadoop FS. For standalone installations, It’s pretty common to 
“customize” your Spark installation using Hadoop 2.8.3 or higher. I don’t know 
if a Spark container with Hadoop 2.8.3 will be a standard container. If it 
isn’t, I see a lot of people creating a customized container with Hadoop FS 
2.8.3

From: Rob Vesse 
Date: Thursday, March 22, 2018 at 6:11 AM
To: "dev@spark.apache.org" 
Subject: Re: Toward an "API" for spark images used by the Kubernetes back-end

The difficulty with a custom Spark config is that you need to be careful that 
the Spark config the user provides does not conflict with the auto-generated 
portions of the Spark config necessary to make Spark on K8S work.  So part of 
any “API” definition might need to be what Spark config is considered “managed” 
by the Kubernetes scheduler backend.

For more controlled environments - i.e. security conscious - allowing end users 
to provide custom images may be a non-starter so the more we can do at the 
“API” level without customising the containers the better.  A practical example 
of this is managing Python dependencies, one option we’re considering is having 
a base image with Anaconda included and then simply projecting a Conda 
environment spec into the containers (via volume mounts) and then having the 
container recreate that Conda environment on startup.  That won’t work for all 
possible environments e.g. those that use non-standard Conda channels but it 
would provide a lot of capability without customising the images.

Rob

From: Felix Cheung 
Date: Thursday, 22 March 2018 at 06:21
To: Holden Karau , Erik Erlandson 
Cc: dev 
Subject: Re: Toward an "API" for spark images used by the Kubernetes back-end

I like being able to customize the docker image itself - but I realize this 
thread is more about “API” for the stock image.

Environment is nice. Probably we need a way to set custom spark config (as a 
file??)



From: Holden Karau 
Sent: Wednesday, March 21, 2018 10:44:20 PM
To: Erik Erlandson
Cc: dev
Subject: Re: Toward an "API" for spark images used by the Kubernetes back-end

I’m glad this discussion is happening on dev@ :)

Personally I like customizing with shell env variables during rolling my own 
image, but definitely documentation the expectations/usage of the variables is 
needed before we can really call it an API.

On the related question I suspect two of the more “common” likely 
customizations is adding additional jars for bootstrapping fetching from a DFS 
& also similarity complicated Python dependencies (although given the Pythons 
support isn’t merged yet it’s hard to say what exactly this would look like).

I could also see some vendors wanting to add some bootstrap/setup scripts to 
fetch keys or other things.

What other ways do folks foresee customizing their Spark docker containers?

On Wed, Mar 21, 2018 at 5:04 PM Erik Erlandson 
> wrote:
During the review of the recent PR to remove use of the init_container from 
kube pods as created by the Kubernetes back-end, the topic of documenting the 
"API" for these container images also came up. What information does the 
back-end provide to these containers? In what form? What assumptions does the 
back-end make about the structure of these containers?  This information is 
important in a scenario where a user wants to create custom images, 
particularly if these are not based on the reference dockerfiles.

A related topic is deciding what such an API should look like.  For example, 
early incarnations were based more purely on environment variables, which could 
have advantages in terms of an API that is easy to describe in a document.  If 
we document the current API, should we annotate it as Experimental?  If not, 
does that effectively freeze the API?

We are interested in community input about possible customization use cases and 
opinions on possible API designs!
Cheers,
Erik
--
Twitter: 
https://twitter.com/holdenkarau


The information contained in this e-mail is 

Prometheus Metrics SInk

2018-03-08 Thread Lalwani, Jayesh
What’s the status of this PR https://github.com/apache/spark/pull/19775 ? There 
was some question about whether this should be part of Spark core or not. We 
would like to have integration between Prometheus and Spark. If this PR is not 
going to make it, we will have to start building the integration ourselves


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


DataFrame to DataSet[String]

2018-01-09 Thread Lalwani, Jayesh
SPARK-15463 (https://issues.apache.org/jira/browse/SPARK-15463) was implemented 
in 2.2.0 and it allows you to take a Dataset[String] with raw CSV/JSON and 
convert it into a Dataframe. Should we have a way to go the other way too? 
Provide a way to convert Dataframe to DataSet[String]



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


https://issues.apache.org/jira/browse/SPARK-19609

2017-12-30 Thread Lalwani, Jayesh
Is there any interest in fixing this issue 
https://issues.apache.org/jira/browse/SPARK-19609 It would help us solve couple 
of problems. It would help a lot of applications that use Structured Streaming

I am interested in making a contribution, but would need some guidance.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Leveraging S3 select

2017-11-29 Thread Lalwani, Jayesh
AWS announced at re:Invent that they are launching S3 Select. This can allow 
Spark to push down predicates to S3, rather than read the entire file in 
memory. Are there any plans to update Spark to use S3 Select?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Is 2.2.1 going to be out soon?

2017-10-16 Thread Lalwani, Jayesh
We have one application that is running into problems because of 
https://issues.apache.org/jira/browse/SPARK-21696 which is released in 2.2.1. 
We would appreciate it if we have an idea when it’s going to be out


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.